Threshold Alerting Guide
Learn how to launch alerts based on thresholds policies This guide covers:
-
using a Redis K/V to store some aggregations
-
define alerting business rules as MVEL functions
1. Prerequisites
To complete this guide, you need:
-
less than 15 minutes
-
an IDE
-
JDK 1.8+ installed with
JAVA_HOME
configured appropriately -
Apache Maven 3.5.3+
-
The completed greeter application from the Getting Started Guide
2. Architecture
In this guide, we expand on the initial log parsing stream that was created as part of the Getting Started Guide. We cover how to generate time window metrics on some http traffic (apache log records).
@TODO make a good diagram
3. Solution
We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.
Clone the Git repository: git clone https://github.com/hurence/logisland-quickstarts.git
, or download an archive.
The solution is located in the conf/core/threshold-alerting.yml
file.
This guide assumes you already have the completed application from the getting-started
guide.
4. Threshold based alerting on Apache logs with Redis K/V store
In this tutorial we’ll how to use Redis K/V store as a cache storage and how to practice the use of ComputeTag
, CheckThresholds
and CheckAlerts
processors in conjunction with this Redis Cache.
The following job is made of 2 streaming parts :
-
A main stream which parses Apache logs and store them to a Redis cache .
-
A timer based stream which compute some new tags values based on cached records, check some thresholds cross and send alerts if needed.
We will start by explaining each part of the config file.
4.1. Controller service part
The controllerServiceConfigurations
part is here to define all services that be shared by processors within the whole job, here a Redis KV cache service that will be used later in the BulkPut
processor.
# redis datastore service
- controllerService: datastore_service
component: com.hurence.logisland.redis.service.RedisKeyValueCacheService
configuration:
connection.string: localhost:6379
redis.mode: standalone
database.index: 0
communication.timeout: 10 seconds
pool.max.total: 8
pool.max.idle: 8
pool.min.idle: 0
pool.block.when.exhausted: true
pool.max.wait.time: 10 seconds
pool.min.evictable.idle.time: 60 seconds
pool.time.between.eviction.runs: 30 seconds
pool.num.tests.per.eviction.run: -1
pool.test.on.create: false
pool.test.on.borrow: false
pool.test.on.return: false
pool.test.while.idle: true
record.recordSerializer: com.hurence.logisland.serializer.JsonSerializer
4.2. First stream : parse logs and compute tags
Here the stream will read all the logs sent in logisland_raw
topic and push the processing output into logisland_events
topic as Json serialized records.
# a processor that converts raw apache logs into structured log records
- stream: parsing_stream
component: com.hurence.logisland.stream.spark.KafkaRecordStreamParallelProcessing
configuration:
kafka.input.topics: logisland_raw
kafka.output.topics: logisland_events
kafka.error.topics: logisland_errors
kafka.input.topics.serializer: none
kafka.output.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
kafka.error.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
kafka.metadata.broker.list: localhost:9092
kafka.zookeeper.quorum: localhost:2181
kafka.topic.autoCreate: true
kafka.topic.default.partitions: 4
kafka.topic.default.replicationFactor: 1
Within this stream a SplitText
processor takes a log line as a String and computes a Record
as a sequence of fields.
- processor: apache_parser
component: com.hurence.logisland.processor.SplitText
configuration:
value.regex: (\S+)\s+(\S+)\s+(\S+)\s+\[([\w:\/]+\s[+\-]\d{4})\]\s+"(\S+)\s+(\S+)\s*(\S*)"\s+(\S+)\s+(\S+)
value.fields: src_ip,identd,user,record_time,http_method,http_query,http_version,http_status,bytes_out
This stream will process log entries as soon as they will be queued into logisland_raw
Kafka topics, each log will
be parsed as an event which will be pushed back to Kafka in the logisland_events
topic.
the next processing step is to assign bytes_out
field as record_value
# change field name `bytes_out` to `record_value`
- processor: normalize_fields
component: com.hurence.logisland.processor.NormalizeFields
configuration:
conflict.resolution.policy: overwrite_existing
record_value: bytes_out
the we modify record_id
to set its value as src_ip
field.
# change current id to src_ip
- processor: modify_id
component: com.hurence.logisland.processor.ModifyId
configuration:
id.generation.strategy: fromFields
fields.to.hash: src_ip
java.formatter.string: "%1$s"
now we’ll remove all the unwanted fields
# remove useless fields
- processor: remove_fields
component: com.hurence.logisland.processor.RemoveFields
configuration:
fields.to.remove: >
src_ip,identd,user,http_method,http_query,
http_version,http_status,bytes_out
and then cast record_value
as a double
# cast values to the correct type
- processor: cast
component: com.hurence.logisland.processor.ConvertFieldsType
configuration:
record_value: double
The next processing step wil compute a dynamic Tag value from a Javascript expression.
Here a new record with an record_id
set to computed1
and as a record_value
the resulting expression of cache("logisland.hurence.com").value * 10.2
# compute tags from given formulas. each dynamic property will return
# a new record according to the formula definition
# the record name will be set to the property name
# the record time will be set to the current timestamp
- processor: compute_tag
component: com.hurence.logisland.processor.alerting.ComputeTags
configuration:
datastore.client.service: datastore_service
output.record.type: computed_tag
max.cpu.time: 500
max.memory: 64800000
max.prepared.statements: 5
allow.no.brace: false
computed1: return cache("logisland.hurence.com").value * 10.2;
The last processor will handle all the Records
of this stream to index them into datastore previously defined (Redis)
# all the parsed records are added to datastore by bulk
- processor: datastore_publisher
component: com.hurence.logisland.processor.datastore.BulkPut
type: processor
documentation: "indexes processed events in datastore"
configuration:
datastore.client.service: datastore_service
4.3. Second stream : check threshold cross and alerting
The second stream will read all the logs sent in logisland_events
topic and push the processed outputs (threshold_cross & alerts records) into logisland_alerts
topic as Json serialized records.
We won’t comment the stream definition as it is really straightforward.
The first processor of this stream pipeline makes use of CheckThresholds
component which will add a new record of type threshold_cross
with a record_id
set to threshold1
if the JS expression cache("computed1").value > 2000.0
is evaluated to true.
- processor: compute_thresholds
component: com.hurence.logisland.processor.alerting.CheckThresholds
type: processor
documentation: |
compute threshold cross from given formulas.
each dynamic property will return a new record according to the formula definition
the record name will be set to the property name
the record time will be set to the current timestamp
a threshold_cross has the following properties : count, time, duration, value
configuration:
datastore.client.service: datastore_service
output.record.type: threshold_cross
max.cpu.time: 100
max.memory: 12800000
max.prepared.statements: 5
record.ttl: 300000
threshold1: cache("computed1").value > 2000.0
- processor: compute_alerts1
component: com.hurence.logisland.processor.alerting.CheckAlerts
type: processor
documentation: |
compute threshold cross from given formulas.
each dynamic property will return a new record according to the formula definition
the record name will be set to the property name
the record time will be set to the current timestamp
configuration:
datastore.client.service: datastore_service
output.record.type: medium_alert
alert.criticity: 1
max.cpu.time: 100
max.memory: 12800000
max.prepared.statements: 5
profile.activation.condition: cache("threshold1").value > 3000.0
alert1: cache("threshold1").duration > 50.0
The last processor will handle all the Records
of this stream to index them into datastore previously defined (Redis)
- processor: datastore_publisher
component: com.hurence.logisland.processor.datastore.BulkPut
type: processor
documentation: "indexes processed events in datastore"
configuration:
datastore.client.service: datastore_service
5. Run the job
Connect a shell to your logisland container to launch the following streaming jobs.
docker exec -i -t logisland-quickstarts_logisland_1 vim conf/core/threshold-alerting.yml
6. Inspect the logs and alerts
For this part of the tutorial we will use redis-py a Python client for Redis. You can install it by following instructions given on redis-py on github.
To install redis-py, simply:
$ sudo pip install redis
Getting Started, check if you can connect with Redis
>>> import redis
>>> r = redis.StrictRedis(host='localhost', port=6379, db=0)
>>> r.set('foo', 'bar')
>>> r.get('foo')
Then we want to grab some logs that have been collected to Redis. We first find some keys with a pattern and get the json content of one
>>> r.keys('1234*')
['123493eb-93df-4e57-a1c1-4a8e844fa92c', '123457d5-8ccc-4f0f-b4ba-d70967aa48eb', '12345e06-6d72-4ce8-8254-a7cc4bab5e31']
>>> r.get('123493eb-93df-4e57-a1c1-4a8e844fa92c')
'{\n "id" : "123493eb-93df-4e57-a1c1-4a8e844fa92c",\n "type" : "apache_log",\n "creationDate" : 804574829000,\n "fields" : {\n "src_ip" : "204.191.209.4",\n "record_id" : "123493eb-93df-4e57-a1c1-4a8e844fa92c",\n "http_method" : "GET",\n "http_query" : "/images/WORLD-logosmall.gif",\n "bytes_out" : "669",\n "identd" : "-",\n "http_version" : "HTTP/1.0",\n "record_raw_value" : "204.191.209.4 - - [01/Jul/1995:01:00:29 -0400] \\"GET /images/WORLD-logosmall.gif HTTP/1.0\\" 200 669",\n "http_status" : "200",\n "record_time" : 804574829000,\n "user" : "-",\n "record_type" : "apache_log"\n }\n}'
>>> import json
>>> record = json.loads(r.get('123493eb-93df-4e57-a1c1-4a8e844fa92c'))
>>> record['fields']['bytes_out']