Timeseries Guide
Learn how to manage timeseries analytics with Logisland Records
.
This guide covers:
-
timeseries API
-
anomaly detection
1. Prerequisites
To complete this guide, you need:
-
less than 15 minutes
-
an IDE
-
JDK 1.8+ installed with
JAVA_HOME
configured appropriately -
to be familiar with logisland (please refer to other guides first otherwise)
2. Solution
Clone the Git repository: git clone https://github.com/hurence/logisland-quickstarts.git
, or download an archive.
The logisland job is located in the conf/timeseries/parse-timeseries-structured.yml
file.
3. Environment setup
Start all the the services with the following command sudo docker-compose -f docker-compose.yml -f docker-compose.historian.yml up -d
4. Setup Logisland Job
Run directly the solution wioth the following command within the https://github.com/hurence/logisland-quickstarts.git root folder.
sudo docker exec -ti logisland-quickstarts_logisland_1 ./bin/logisland.sh --conf conf/timeseries/timeseries-parsing-structured.yml
5. the timeseries app
here is the logisland job for timeseries parsing, chunking and indexing
version: 1.1.2
documentation: LogIsland future factory job
engine:
component: com.hurence.logisland.engine.spark.KafkaStreamProcessingEngine
configuration:
spark.app.name: TimeseriesParsing
spark.master: local[2]
spark.streaming.batchDuration: 200
spark.streaming.kafka.maxRatePerPartition: 10000
spark.streaming.timeout: -1
controllerServiceConfigurations:
- controllerService: file_service
component: com.hurence.logisland.stream.spark.structured.provider.LocalFileStructuredStreamProviderService
configuration:
local.input.path: /opt/logisland/data/timeseries
- controllerService: console_service
component: com.hurence.logisland.stream.spark.structured.provider.ConsoleStructuredStreamProviderService
- controllerService: kafka_service
component: com.hurence.logisland.stream.spark.structured.provider.KafkaStructuredStreamProviderService
configuration:
kafka.input.topics: logisland_raw
kafka.output.topics: logisland_measures
kafka.error.topics: logisland_errors
kafka.input.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
kafka.output.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
kafka.error.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
kafka.metadata.broker.list: kafka:9092
kafka.zookeeper.quorum: zookeeper:2181
kafka.topic.autoCreate: true
kafka.topic.default.partitions: 4
kafka.topic.default.replicationFactor: 1
- controllerService: kafka_service_out
component: com.hurence.logisland.stream.spark.structured.provider.KafkaStructuredStreamProviderService
configuration:
kafka.input.topics: logisland_measures
kafka.output.topics: logisland_metrics
kafka.error.topics: logisland_errors
kafka.input.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
kafka.output.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
kafka.error.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
kafka.metadata.broker.list: kafka:9092
kafka.zookeeper.quorum: zookeeper:2181
kafka.topic.autoCreate: true
kafka.topic.default.partitions: 4
kafka.topic.default.replicationFactor: 1
- controllerService: datastore_service
component: com.hurence.logisland.service.solr.Solr8ClientService
configuration:
solr.cloud: false
solr.connection.string: http://solr1:8983/solr
solr.collection: historian
solr.concurrent.requests: 4
flush.interval: 1000
batch.size: 200
streamConfigurations:
# This stream take all raw events as lines comming from local files
# these lines are split into logisland records and sent into a kafka topic
- stream: parsing_stream
component: com.hurence.logisland.stream.spark.structured.StructuredStream
configuration:
read.topics.serializer: none
read.stream.service.provider: kafka_service
write.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
write.stream.service.provider: kafka_service
processorConfigurations:
- processor: historian_parser
component: com.hurence.logisland.processor.SplitText
configuration:
record.type: historian_serie
value.regex: (\S+\s+\S+);(\S+);(\S+);(\S+)
value.fields: record_time,tagname,record_value,quality
- processor: create_aliases
component: com.hurence.logisland.processor.NormalizeFields
configuration:
conflict.resolution.policy: keep_both_fields
record_name: tagname
- processor: fields_types_converter
component: com.hurence.logisland.processor.ConvertFieldsType
configuration:
record_value: double
quality: float
# This stream will perform a statefull groupBy operation on tagname
- stream: compaction_stream
component: com.hurence.logisland.stream.spark.structured.StructuredStream
configuration:
read.topics.key.serializer: com.hurence.logisland.serializer.StringSerializer
read.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
read.stream.service.provider: kafka_service_out
write.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
write.stream.service.provider: kafka_service_out
groupby: tagname
chunk.size: 50
state.timeout.ms: 30000
processorConfigurations:
# Make one chronix chunk from all records
- processor: timeseries_converter
component: com.hurence.logisland.processor.ConvertToTimeseries
configuration:
groupby: tagname
metric: max;min;avg;count;trend;sax:7,0.01,10
# all the parsed records are added to solr by bulk
- processor: solr_publisher
component: com.hurence.logisland.processor.datastore.BulkPut
configuration:
datastore.client.service: datastore_service
6. some data to process
The loggen container send some raw values into logisland_raw topic
If you want to see what is flowing into logisland_raw
topic, just run the following :
sudo docker exec -ti logisland-quickstarts_kafka_1 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic logisland_raw
you should see some random metrics :
07/02/2019 15:43:01;068_PI01;61.5777410770347;100
07/02/2019 15:43:01;068_PI02;35.2193234276959;100
07/02/2019 15:43:01;068_SI01;1788.41399658776;100
07/02/2019 15:43:01;068_TI01;29.8433602008229;100
07/02/2019 15:43:01;1C08.P1_TC01;17.2173590138412;100
07/02/2019 15:43:01;1C08.P1_TC02;17.1910309082389;100
07/02/2019 15:43:01;1C11.TE03;20.1282638510868;100
07/02/2019 15:43:02;1C11.TE03;80.8519077634337;100
07/02/2019 15:43:03;1C11.TE03;21.5853514802455;100
07/02/2019 15:43:04;1C11.TE03;21.3110893745982;100
07/02/2019 15:43:05;1C11.TE03;22.2834145029094;100
07/02/2019 15:43:06;1C11.TE03;22.3418653962952;100
07/02/2019 15:43:07;1C11.TE03;22.3587993997439;100
07/02/2019 15:43:08;1C11.TE03;184.15295340911;100
07/02/2019 15:43:09;1C11.TE03;385.684004054356;100
07/02/2019 15:43:10;1C11.TE03;398.042336907894;100
07/02/2019 15:43:11;1C11.TE03;549.025282963574;100
07/02/2019 15:43:12;1C11.TE03;558.109812532283;100
7. Points
If you want to see what is flowing into logisland_measures
topic, just run the following :
sudo docker exec -ti logisland-quickstarts_kafka_1 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic logisland_measures
Each points is first parsed as
{
"id" : "3752fb69-0d37-41a6-9d8f-9d19ad341eee",
"type" : "historian_serie",
"creationDate" : 1569509387301,
"fields" : {
"quality" : 100.0,
"record_id" : "3752fb69-0d37-41a6-9d8f-9d19ad341eee",
"record_name" : "pressure",
"record_time" : 1569509387301,
"record_type" : "historian_serie",
"record_value" : 50.59789543992153,
"tagname" : "pressure"
}
}
8. Timeseries chunks with analytics
sudo docker exec -ti logisland-quickstarts_kafka_1 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic logisland_metrics
you should see something like this in your logisland logs
{
"id" : "55967ac0-a683-42d4-b3f6-ff8a3ed1be0f",
"type" : "historian_serie",
"creationDate" : 1569509486549,
"fields" : {
"chunk_avg" : 50.090690268806696,
"chunk_count" : 50.0,
"chunk_end" : 1569509486201,
"chunk_max" : 97.8005221466734,
"chunk_min" : -1.2300222783725445,
"chunk_sax" : "eadcfdcdee",
"chunk_size" : 50,
"chunk_size_bytes" : 531,
"chunk_start" : 1569509485108,
"chunk_trend" : true,
"chunk_window_ms" : 1093,
"quality" : "100.0",
"record_id" : "55967ac0-a683-42d4-b3f6-ff8a3ed1be0f",
"record_name" : "temp_a",
"record_time" : 1569509486549,
"record_type" : "historian_serie",
"record_value" : "[B@6dc57670",
"tagname" : "temp_a"
}
}
9. Timeseries with solr
get all chunks for temp_a tag
get all chunks containing becd sax string
10. Cleanup
to remove all containers run the following
docker-compose -f docker-compose.yml -f docker-compose.historian.yml down
-
add DDC threshold param to record converter
-
add processor test
-
cleanup structured stream serializer
-
groupby.keys: tagname and group.by.field: tagname to rename
-
spark.sqlContext.setConf("spark.sql.shuffle.partitions", "4") hardcoded
-
add watemarking
-
test spark.streaming.timeout in StructuredTest