Logisland Api Guide
1. API design
logisland is a framework that you can extend through its API,
you can use it to build your own Processors
or to build data processing apps over it.
You can extend logisland with the Java low-level API as described below.
2. The primary material : Records
The basic unit of processing is the Record.
A Record
is a collection of Field
, while a Field
has a name
, a type
and a value
.
You can instanciate a Record
like in the following code snipet:
String id = "firewall_record1";
String type = "cisco";
Record record = new StandardRecord(type).setId(id);
assertTrue(record.isEmpty());
assertEquals(record.size(), 0);
A record is defined by its type and a collection of fields. there are three special fields:
// shortcut for id
assertEquals(record.getId(), id);
assertEquals(record.getField(FieldDictionary.RECORD_ID).asString(), id);
// shortcut for time
assertEquals(record.getTime().getTime(),
record.getField(FieldDictionary.RECORD_TIME).asLong());
// shortcut for type
assertEquals(record.getType(), type);
assertEquals(record.getType(), record.getField(FieldDictionary.RECORD_TYPE).asString());
assertEquals(record.getType(), record.getField(FieldDictionary.RECORD_TYPE).getRawValue());
And the other fields have generic setters, getters and removers
record.setStringField("url_host", "origin-www.20minutes.fr")
.setField("method", FieldType.STRING, "GET")
.setField("response_size", FieldType.INT, 452)
.setField("is_outside_office_hours", FieldType.BOOLEAN, false)
.setField("tags", FieldType.ARRAY, Arrays.asList("spam", "filter", "mail"));
assertFalse(record.hasField("unkown_field"));
assertTrue(record.hasField("method"));
assertEquals(record.getField("method").asString(), "GET");
assertTrue(record.getField("response_size").asInteger() - 452 == 0);
assertTrue(record.getField("is_outside_office_hours").asBoolean());
record.removeField("is_outside_office_hours");
assertFalse(record.hasField("is_outside_office_hours"));
Fields are strongly typed, you can validate them
Record record = new StandardRecord();
record.setField("request_size", FieldType.INT, 1399);
assertTrue(record.isValid());
record.setField("request_size", FieldType.INT, "zer");
assertFalse(record.isValid());
record.setField("request_size", FieldType.INT, 45L);
assertFalse(record.isValid());
record.setField("request_size", FieldType.LONG, 45L);
assertTrue(record.isValid());
record.setField("request_size", FieldType.DOUBLE, 45.5d);
assertTrue(record.isValid());
record.setField("request_size", FieldType.DOUBLE, 45.5);
assertTrue(record.isValid());
record.setField("request_size", FieldType.DOUBLE, 45L);
assertFalse(record.isValid());
record.setField("request_size", FieldType.FLOAT, 45.5f);
assertTrue(record.isValid());
record.setField("request_size", FieldType.STRING, 45L);
assertFalse(record.isValid());
record.setField("request_size", FieldType.FLOAT, 45.5d);
assertFalse(record.isValid());
3. The tools to handle processing : Processor
logisland is designed as a component centric framework, so there’s a layer of abstraction to build configurable components. Basically a component can be Configurable and Configured.
The most common component you’ll use is the Processor
Let’s explain the code of a basic MockProcessor
, that doesn’t acheive a really useful work but which is really self-explanatory
we first need to extend AbstractProcessor
class (or to implement Processor
interface).
public class MockProcessor extends AbstractProcessor {
private static Logger logger = LoggerFactory.getLogger(MockProcessor.class);
private static String EVENT_TYPE_NAME = "mock";
public static final PropertyDescriptor FAKE_MESSAGE
= new PropertyDescriptor.Builder()
.name("fake.message")
.description("a fake message")
.required(true)
.addValidator(StandardPropertyValidators.NON_EMPTY_VALIDATOR)
.defaultValue("yoyo")
.build();
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(FAKE_MESSAGE);
return Collections.unmodifiableList(descriptors);
}
Then we have to define a list of supported PropertyDescriptor
. All theses properties and validation stuff are handled by
Configurable
interface.
then comes the initialization bloc of the component given a ComponentContext
(more on this later)
@Override
public void init(final ProcessContext context) {
logger.info("init MockProcessor");
}
And now the real business part with the process
method which handles all the work on the record’s collection.
@Override
public Collection<Record> process(final ProcessContext context, final Collection<Record> collection) {
final String message = context.getPropertyValue(FAKE_MESSAGE).asString();
final List<Record> outputRecords = new ArrayList<>(collection);
outputRecords.forEach(record -> record.setStringField("message", message));
return outputRecords;
}
The Processor can then be configured through yaml config files
- processor: mock_processor
component: com.hurence.logisland.util.runner.MockProcessor
type: parser
documentation: a parser that produce events for nothing
configuration:
fake.message: the super message
4. Transverse service injection : ControllerService
we often need to share access to external Services across the Processors, for example bulk buffers or client connections to external data sources.
For example a cache service that could cache K/V tuple across the worker node. We need to provide an interface API for this service :
public interface CacheService<K,V> extends ControllerService {
PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder()
.name("cache.size")
.description("The maximum number of element in the cache.")
.required(false)
.defaultValue("16384")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public V get(K k);
public void set(K k, V v);
}
And an implementation of the cache contract :
public class LRUKeyValueCacheService<K,V> extends AbstractControllerService
implements CacheService<K,V> {
private volatile Cache<K,V> cache;
@Override
public V get(K k) {
return cache.get(k);
}
@Override
public void set(K k, V v) {
cache.set(k, v);
}
@Override
@OnEnabled
public void init(ControllerServiceInitializationContext context)
throws InitializationException {
try {
this.cache = createCache(context);
}catch (Exception e){
throw new InitializationException(e);
}
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(CACHE_SIZE);
return Collections.unmodifiableList(props);
}
protected Cache<K,V> createCache(final ControllerServiceInitializationContext context)
throws IOException, InterruptedException {
final int capacity = context.getPropertyValue(CACHE_SIZE).asInteger();
return new LRUCache<K,V>(capacity);
}
}
You can then use this service in a custom processor :
public class com.hurence.logisland.service.elasticsearch.TestProcessor extends AbstractProcessor {
static final PropertyDescriptor CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("cache.service")
.description("CacheService")
.identifiesControllerService(CacheService.class)
.required(true)
.build();
@Override
public boolean hasControllerService() {
return true;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> propDescs = new ArrayList<>();
propDescs.add(CACHE_SERVICE);
return propDescs;
}
@Override
public Collection<Record> process(ProcessContext context, Collection<Record> records) {
return Collections.emptyList();
}
}
The injection is done through yaml config files by injecting the instance of lru_cache
Service.
...
controllerServiceConfigurations:
- controllerService: lru_cache
component: com.hurence.logisland.service.elasticsearch.LRUKeyValueCacheService
type: service
documentation: cache service
configuration:
cache.size: 5000
streamConfigurations:
- stream: parsing_stream
component: com.hurence.logisland.stream.spark.KafkaRecordStreamParallelProcessing
...
processorConfigurations:
- processor: mock_processor
component: com.hurence.logisland.processor.com.hurence.logisland.service.elasticsearch.TestProcessor
type: parser
documentation: a parser that produce events for nothing
configuration:
cache.service: lru_cache
5. Chaining processors in a stream : RecordStream
-
warning:: @todo
6. Running the processor’s flow : Engine
-
warning:: @todo
7. Testing your processors : TestRunner
When you have coded your processor, pretty sure you want to test it with unit test.
The framework provides you with the TestRunner
tool for that.
All you need is to instantiate a Testrunner with your Processor and its properties.
final String APACHE_LOG_SCHEMA = "/schemas/apache_log.avsc";
final String APACHE_LOG = "/data/localhost_access.log";
final String APACHE_LOG_FIELDS =
"src_ip,identd,user,record_time,http_method,http_query,http_version,http_status,bytes_out";
final String APACHE_LOG_REGEX =
"(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+\\[([\\w:/]+\\s[+\\-]\\d{4})\\]\\s+\"(\\S+)\\s+(\\S+)\\s+(\\S+)\"\\s+(\\S+)\\s+(\\S+)";
final TestRunner testRunner = TestRunners.newTestRunner(new SplitText());
testRunner.setProperty(SplitText.VALUE_REGEX, APACHE_LOG_REGEX);
testRunner.setProperty(SplitText.VALUE_FIELDS, APACHE_LOG_FIELDS);
// check if config is valid
testRunner.assertValid();
Now enqueue some messages as if they were sent to input Kafka topics
testRunner.clearQueues();
testRunner.enqueue(SplitTextTest.class.getResourceAsStream(APACHE_LOG));
Now run the process method and check that every Record
has been correctly processed.
testRunner.run();
testRunner.assertAllInputRecordsProcessed();
testRunner.assertOutputRecordsCount(200);
testRunner.assertOutputErrorCount(0);
You can validate that all output records are validated against an avro schema
final RecordValidator avroValidator = new AvroRecordValidator(SplitTextTest.class.getResourceAsStream
testRunner.assertAllRecords(avroValidator);
And check if your output records behave as expected.
MockRecord out = testRunner.getOutputRecords().get(0);
out.assertFieldExists("src_ip");
out.assertFieldNotExists("src_ip2");
out.assertFieldEquals("src_ip", "10.3.10.134");
out.assertRecordSizeEquals(9);
out.assertFieldEquals(FieldDictionary.RECORD_TYPE, "apache_log");
out.assertFieldEquals(FieldDictionary.RECORD_TIME, 1469342728000L);