Threshold Alerting Guide

Learn how to launch alerts based on thresholds policies This guide covers:

  • using a Redis K/V to store some aggregations

  • define alerting business rules as MVEL functions

1. Prerequisites

To complete this guide, you need:

  • less than 15 minutes

  • an IDE

  • JDK 1.8+ installed with JAVA_HOME configured appropriately

  • Apache Maven 3.5.3+

  • The completed greeter application from the Getting Started Guide

2. Architecture

In this guide, we expand on the initial log parsing stream that was created as part of the Getting Started Guide. We cover how to generate time window metrics on some http traffic (apache log records).

@TODO make a good diagram

3. Solution

We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.

Clone the Git repository: git clone https://github.com/hurence/logisland-quickstarts.git, or download an archive.

The solution is located in the conf/core/threshold-alerting.yml file.

This guide assumes you already have the completed application from the getting-started guide.

4. Threshold based alerting on Apache logs with Redis K/V store

In this tutorial we’ll how to use Redis K/V store as a cache storage and how to practice the use of ComputeTag, CheckThresholds and CheckAlerts processors in conjunction with this Redis Cache.

The following job is made of 2 streaming parts :

  1. A main stream which parses Apache logs and store them to a Redis cache .

  2. A timer based stream which compute some new tags values based on cached records, check some thresholds cross and send alerts if needed.

We will start by explaining each part of the config file.

4.1. Controller service part

The controllerServiceConfigurations part is here to define all services that be shared by processors within the whole job, here a Redis KV cache service that will be used later in the BulkPut processor.

# redis datastore service
- controllerService: datastore_service
    component: com.hurence.logisland.redis.service.RedisKeyValueCacheService
    configuration:
        connection.string: localhost:6379
        redis.mode: standalone
        database.index: 0
        communication.timeout: 10 seconds
        pool.max.total: 8
        pool.max.idle: 8
        pool.min.idle: 0
        pool.block.when.exhausted: true
        pool.max.wait.time: 10 seconds
        pool.min.evictable.idle.time: 60 seconds
        pool.time.between.eviction.runs: 30 seconds
        pool.num.tests.per.eviction.run: -1
        pool.test.on.create: false
        pool.test.on.borrow: false
        pool.test.on.return: false
        pool.test.while.idle: true
        record.recordSerializer: com.hurence.logisland.serializer.JsonSerializer

4.2. First stream : parse logs and compute tags

Here the stream will read all the logs sent in logisland_raw topic and push the processing output into logisland_events topic as Json serialized records.

# a processor that converts raw apache logs into structured log records
- stream: parsing_stream
    component: com.hurence.logisland.stream.spark.KafkaRecordStreamParallelProcessing
    configuration:
        kafka.input.topics: logisland_raw
        kafka.output.topics: logisland_events
        kafka.error.topics: logisland_errors
        kafka.input.topics.serializer: none
        kafka.output.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
        kafka.error.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
        kafka.metadata.broker.list: localhost:9092
        kafka.zookeeper.quorum: localhost:2181
        kafka.topic.autoCreate: true
        kafka.topic.default.partitions: 4
        kafka.topic.default.replicationFactor: 1

Within this stream a SplitText processor takes a log line as a String and computes a Record as a sequence of fields.

- processor: apache_parser
    component: com.hurence.logisland.processor.SplitText
    configuration:
    value.regex: (\S+)\s+(\S+)\s+(\S+)\s+\[([\w:\/]+\s[+\-]\d{4})\]\s+"(\S+)\s+(\S+)\s*(\S*)"\s+(\S+)\s+(\S+)
    value.fields: src_ip,identd,user,record_time,http_method,http_query,http_version,http_status,bytes_out

This stream will process log entries as soon as they will be queued into logisland_raw Kafka topics, each log will be parsed as an event which will be pushed back to Kafka in the logisland_events topic.

the next processing step is to assign bytes_out field as record_value

# change field name `bytes_out` to `record_value`
- processor: normalize_fields
    component: com.hurence.logisland.processor.NormalizeFields
    configuration:
        conflict.resolution.policy: overwrite_existing
        record_value: bytes_out

the we modify record_id to set its value as src_ip field.

# change current id to src_ip
- processor: modify_id
    component: com.hurence.logisland.processor.ModifyId
    configuration:
        id.generation.strategy: fromFields
        fields.to.hash: src_ip
        java.formatter.string: "%1$s"

now we’ll remove all the unwanted fields

# remove useless fields
- processor: remove_fields
    component: com.hurence.logisland.processor.RemoveFields
    configuration:
        fields.to.remove: >
            src_ip,identd,user,http_method,http_query,
            http_version,http_status,bytes_out

and then cast record_value as a double

# cast values to the correct type
- processor: cast
    component: com.hurence.logisland.processor.ConvertFieldsType
    configuration:
        record_value: double

The next processing step wil compute a dynamic Tag value from a Javascript expression. Here a new record with an record_id set to computed1 and as a record_value the resulting expression of cache("logisland.hurence.com").value * 10.2

# compute tags from given formulas. each dynamic property will return
# a new record according to the formula definition
# the record name will be set to the property name
# the record time will be set to the current timestamp
- processor: compute_tag
    component: com.hurence.logisland.processor.alerting.ComputeTags
    configuration:
        datastore.client.service: datastore_service
        output.record.type: computed_tag
        max.cpu.time: 500
        max.memory: 64800000
        max.prepared.statements: 5
        allow.no.brace: false
        computed1: return cache("logisland.hurence.com").value * 10.2;

The last processor will handle all the Records of this stream to index them into datastore previously defined (Redis)

# all the parsed records are added to datastore by bulk
- processor: datastore_publisher
    component: com.hurence.logisland.processor.datastore.BulkPut
    type: processor
    documentation: "indexes processed events in datastore"
    configuration:
    datastore.client.service: datastore_service

4.3. Second stream : check threshold cross and alerting

The second stream will read all the logs sent in logisland_events topic and push the processed outputs (threshold_cross & alerts records) into logisland_alerts topic as Json serialized records.

We won’t comment the stream definition as it is really straightforward.

The first processor of this stream pipeline makes use of CheckThresholds component which will add a new record of type threshold_cross with a record_id set to threshold1 if the JS expression cache("computed1").value > 2000.0 is evaluated to true.

- processor: compute_thresholds
    component: com.hurence.logisland.processor.alerting.CheckThresholds
    type: processor
    documentation: |
        compute threshold cross from given formulas.
        each dynamic property will return a new record according to the formula definition
        the record name will be set to the property name
        the record time will be set to the current timestamp

        a threshold_cross has the following properties : count, time, duration, value
    configuration:
        datastore.client.service: datastore_service
        output.record.type: threshold_cross
        max.cpu.time: 100
        max.memory: 12800000
        max.prepared.statements: 5
        record.ttl: 300000
        threshold1: cache("computed1").value > 2000.0
        - processor: compute_alerts1
          component: com.hurence.logisland.processor.alerting.CheckAlerts
          type: processor
          documentation: |
            compute threshold cross from given formulas.
            each dynamic property will return a new record according to the formula definition
            the record name will be set to the property name
            the record time will be set to the current timestamp
          configuration:
            datastore.client.service: datastore_service
            output.record.type: medium_alert
            alert.criticity: 1
            max.cpu.time: 100
            max.memory: 12800000
            max.prepared.statements: 5
            profile.activation.condition: cache("threshold1").value > 3000.0
            alert1: cache("threshold1").duration > 50.0

The last processor will handle all the Records of this stream to index them into datastore previously defined (Redis)

        - processor: datastore_publisher
          component: com.hurence.logisland.processor.datastore.BulkPut
          type: processor
          documentation: "indexes processed events in datastore"
          configuration:
            datastore.client.service: datastore_service

5. Run the job

Connect a shell to your logisland container to launch the following streaming jobs.

docker exec -i -t logisland-quickstarts_logisland_1 vim conf/core/threshold-alerting.yml

6. Inspect the logs and alerts

For this part of the tutorial we will use redis-py a Python client for Redis. You can install it by following instructions given on redis-py on github.

To install redis-py, simply:

$ sudo pip install redis

Getting Started, check if you can connect with Redis

>>> import redis
>>> r = redis.StrictRedis(host='localhost', port=6379, db=0)
>>> r.set('foo', 'bar')
>>> r.get('foo')

Then we want to grab some logs that have been collected to Redis. We first find some keys with a pattern and get the json content of one

>>> r.keys('1234*')
['123493eb-93df-4e57-a1c1-4a8e844fa92c', '123457d5-8ccc-4f0f-b4ba-d70967aa48eb', '12345e06-6d72-4ce8-8254-a7cc4bab5e31']

>>> r.get('123493eb-93df-4e57-a1c1-4a8e844fa92c')
'{\n  "id" : "123493eb-93df-4e57-a1c1-4a8e844fa92c",\n  "type" : "apache_log",\n  "creationDate" : 804574829000,\n  "fields" : {\n    "src_ip" : "204.191.209.4",\n    "record_id" : "123493eb-93df-4e57-a1c1-4a8e844fa92c",\n    "http_method" : "GET",\n    "http_query" : "/images/WORLD-logosmall.gif",\n    "bytes_out" : "669",\n    "identd" : "-",\n    "http_version" : "HTTP/1.0",\n    "record_raw_value" : "204.191.209.4 - - [01/Jul/1995:01:00:29 -0400] \\"GET /images/WORLD-logosmall.gif HTTP/1.0\\" 200 669",\n    "http_status" : "200",\n    "record_time" : 804574829000,\n    "user" : "-",\n    "record_type" : "apache_log"\n  }\n}'

>>> import json
>>> record = json.loads(r.get('123493eb-93df-4e57-a1c1-4a8e844fa92c'))
>>> record['fields']['bytes_out']