Timeseries Guide

Learn how to manage timeseries analytics with Logisland Records.

This guide covers:

  • timeseries API

  • anomaly detection

1. Prerequisites

To complete this guide, you need:

  • less than 15 minutes

  • an IDE

  • JDK 1.8+ installed with JAVA_HOME configured appropriately

  • to be familiar with logisland (please refer to other guides first otherwise)

2. Solution

Clone the Git repository: git clone https://github.com/hurence/logisland-quickstarts.git, or download an archive.

The logisland job is located in the conf/timeseries/parse-timeseries-structured.yml file.

3. Environment setup

Start all the the services with the following command sudo docker-compose -f docker-compose.yml -f docker-compose.historian.yml up -d

4. Setup Logisland Job

Run directly the solution wioth the following command within the https://github.com/hurence/logisland-quickstarts.git root folder.

sudo docker exec -ti logisland-quickstarts_logisland_1 ./bin/logisland.sh --conf conf/timeseries/timeseries-parsing-structured.yml

5. the timeseries app

here is the logisland job for timeseries parsing, chunking and indexing

version: 1.1.2
documentation: LogIsland future factory job

engine:
  component: com.hurence.logisland.engine.spark.KafkaStreamProcessingEngine
  configuration:
    spark.app.name: TimeseriesParsing
    spark.master: local[2]
    spark.streaming.batchDuration: 200
    spark.streaming.kafka.maxRatePerPartition: 10000
    spark.streaming.timeout: -1

  controllerServiceConfigurations:

    - controllerService: file_service
      component: com.hurence.logisland.stream.spark.structured.provider.LocalFileStructuredStreamProviderService
      configuration:
        local.input.path: /opt/logisland/data/timeseries

    - controllerService: console_service
      component: com.hurence.logisland.stream.spark.structured.provider.ConsoleStructuredStreamProviderService

    - controllerService: kafka_service
      component: com.hurence.logisland.stream.spark.structured.provider.KafkaStructuredStreamProviderService
      configuration:
        kafka.input.topics: logisland_raw
        kafka.output.topics: logisland_measures
        kafka.error.topics: logisland_errors
        kafka.input.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
        kafka.output.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
        kafka.error.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
        kafka.metadata.broker.list: kafka:9092
        kafka.zookeeper.quorum: zookeeper:2181
        kafka.topic.autoCreate: true
        kafka.topic.default.partitions: 4
        kafka.topic.default.replicationFactor: 1

    - controllerService: kafka_service_out
      component: com.hurence.logisland.stream.spark.structured.provider.KafkaStructuredStreamProviderService
      configuration:
        kafka.input.topics: logisland_measures
        kafka.output.topics: logisland_metrics
        kafka.error.topics: logisland_errors
        kafka.input.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
        kafka.output.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
        kafka.error.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
        kafka.metadata.broker.list: kafka:9092
        kafka.zookeeper.quorum: zookeeper:2181
        kafka.topic.autoCreate: true
        kafka.topic.default.partitions: 4
        kafka.topic.default.replicationFactor: 1

    - controllerService: datastore_service
      component: com.hurence.logisland.service.solr.Solr8ClientService
      configuration:
        solr.cloud: false
        solr.connection.string: http://solr1:8983/solr
        solr.collection: historian
        solr.concurrent.requests: 4
        flush.interval: 1000
        batch.size: 200

  streamConfigurations:

    # This stream take all raw events as lines comming from local files
    # these lines are split into logisland records and sent into a kafka topic
    - stream: parsing_stream
      component: com.hurence.logisland.stream.spark.structured.StructuredStream
      configuration:
        read.topics.serializer: none
        read.stream.service.provider: kafka_service
        write.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
        write.stream.service.provider: kafka_service
      processorConfigurations:

        - processor: historian_parser
          component: com.hurence.logisland.processor.SplitText
          configuration:
            record.type: historian_serie
            value.regex: (\S+\s+\S+);(\S+);(\S+);(\S+)
            value.fields: record_time,tagname,record_value,quality

        - processor: create_aliases
          component: com.hurence.logisland.processor.NormalizeFields
          configuration:
            conflict.resolution.policy: keep_both_fields
            record_name: tagname

        - processor: fields_types_converter
          component: com.hurence.logisland.processor.ConvertFieldsType
          configuration:
            record_value: double
            quality: float

    # This stream will perform a statefull groupBy operation on tagname
    - stream: compaction_stream
      component: com.hurence.logisland.stream.spark.structured.StructuredStream
      configuration:
        read.topics.key.serializer: com.hurence.logisland.serializer.StringSerializer
        read.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
        read.stream.service.provider: kafka_service_out
        write.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
        write.stream.service.provider: kafka_service_out
        groupby: tagname
        chunk.size: 50
        state.timeout.ms: 30000

      processorConfigurations:

        # Make one chronix chunk from all records
        - processor: timeseries_converter
          component: com.hurence.logisland.processor.ConvertToTimeseries
          configuration:
            groupby: tagname
            metric: max;min;avg;count;trend;sax:7,0.01,10

        # all the parsed records are added to solr by bulk
        - processor: solr_publisher
          component: com.hurence.logisland.processor.datastore.BulkPut
          configuration:
            datastore.client.service: datastore_service

6. some data to process

The loggen container send some raw values into logisland_raw topic

If you want to see what is flowing into logisland_raw topic, just run the following :

sudo docker exec -ti logisland-quickstarts_kafka_1 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic logisland_raw

you should see some random metrics :

07/02/2019 15:43:01;068_PI01;61.5777410770347;100
07/02/2019 15:43:01;068_PI02;35.2193234276959;100
07/02/2019 15:43:01;068_SI01;1788.41399658776;100
07/02/2019 15:43:01;068_TI01;29.8433602008229;100
07/02/2019 15:43:01;1C08.P1_TC01;17.2173590138412;100
07/02/2019 15:43:01;1C08.P1_TC02;17.1910309082389;100
07/02/2019 15:43:01;1C11.TE03;20.1282638510868;100
07/02/2019 15:43:02;1C11.TE03;80.8519077634337;100
07/02/2019 15:43:03;1C11.TE03;21.5853514802455;100
07/02/2019 15:43:04;1C11.TE03;21.3110893745982;100
07/02/2019 15:43:05;1C11.TE03;22.2834145029094;100
07/02/2019 15:43:06;1C11.TE03;22.3418653962952;100
07/02/2019 15:43:07;1C11.TE03;22.3587993997439;100
07/02/2019 15:43:08;1C11.TE03;184.15295340911;100
07/02/2019 15:43:09;1C11.TE03;385.684004054356;100
07/02/2019 15:43:10;1C11.TE03;398.042336907894;100
07/02/2019 15:43:11;1C11.TE03;549.025282963574;100
07/02/2019 15:43:12;1C11.TE03;558.109812532283;100

7. Points

If you want to see what is flowing into logisland_measures topic, just run the following :

sudo docker exec -ti logisland-quickstarts_kafka_1 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic logisland_measures

Each points is first parsed as

{
  "id" : "3752fb69-0d37-41a6-9d8f-9d19ad341eee",
  "type" : "historian_serie",
  "creationDate" : 1569509387301,
  "fields" : {
    "quality" : 100.0,
    "record_id" : "3752fb69-0d37-41a6-9d8f-9d19ad341eee",
    "record_name" : "pressure",
    "record_time" : 1569509387301,
    "record_type" : "historian_serie",
    "record_value" : 50.59789543992153,
    "tagname" : "pressure"
  }
}

8. Timeseries chunks with analytics

sudo docker exec -ti logisland-quickstarts_kafka_1 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic logisland_metrics

you should see something like this in your logisland logs

{
  "id" : "55967ac0-a683-42d4-b3f6-ff8a3ed1be0f",
  "type" : "historian_serie",
  "creationDate" : 1569509486549,
  "fields" : {
    "chunk_avg" : 50.090690268806696,
    "chunk_count" : 50.0,
    "chunk_end" : 1569509486201,
    "chunk_max" : 97.8005221466734,
    "chunk_min" : -1.2300222783725445,
    "chunk_sax" : "eadcfdcdee",
    "chunk_size" : 50,
    "chunk_size_bytes" : 531,
    "chunk_start" : 1569509485108,
    "chunk_trend" : true,
    "chunk_window_ms" : 1093,
    "quality" : "100.0",
    "record_id" : "55967ac0-a683-42d4-b3f6-ff8a3ed1be0f",
    "record_name" : "temp_a",
    "record_time" : 1569509486549,
    "record_type" : "historian_serie",
    "record_value" : "[B@6dc57670",
    "tagname" : "temp_a"
  }
}

9. Timeseries with solr

get all chunks for temp_a tag

get all chunks containing becd sax string

10. Cleanup

to remove all containers run the following

docker-compose -f docker-compose.yml -f docker-compose.historian.yml down

  • add DDC threshold param to record converter

  • add processor test

  • cleanup structured stream serializer

  • groupby.keys: tagname and group.by.field: tagname to rename

  • spark.sqlContext.setConf("spark.sql.shuffle.partitions", "4") hardcoded

  • add watemarking

  • test spark.streaming.timeout in StructuredTest