Datastore Mongo Guide

Learn how to send Logisland Records into MongoDB through de the Datastore API

This guide covers:

  • datastore API

  • MongoDB setup

1. Prerequisites

To complete this guide, you need:

  • less than 15 minutes

  • an IDE

  • JDK 1.8+ installed with JAVA_HOME configured appropriately

  • Apache Maven 3.5.3+

  • The completed greeter application from the Getting Started Guide

2. Solution

We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.

Clone the Git repository: git clone https://github.com/hurence/logisland-quickstarts.git, or download an archive.

The solution is located in the conf/datastore directory.

This guide assumes you already have the completed application from the getting-started directory.

3. Setup the environment

For this guide we need a Logisland stack (Zookeeper, Kafka, Logisland) and an MongoDB as a complete Docker compose config.

Please note that you should not launch silmutaneously several docker-compose because we are exposing local port in them. So running several at the same time would be conflicting. So be sure to have killed all your previously launched (Logisland) containers.

Edit a file named docker-compose-datastore-MongoDB.yml with the following content :

version: '3'
services:

  zookeeper:
    image: hurence/zookeeper
    hostname: zookeeper
    ports:
      - '2181:2181'
    networks:
      - logisland

  kafka:
    image: hurence/kafka:0.10.2.2-scala-2.11
    hostname: kafka
    ports:
      - '9092:9092'
    volumes:
      - kafka-home:/opt/kafka_2.11-0.10.2.2/
    environment:
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_JMX_PORT: 7071
    networks:
      - logisland

  # Logisland container : does nothing but launching
  logisland:
    image: hurence/logisland:1.1.2
    command: tail -f bin/logisland.sh
    ports:
      - '4050:4050'
      - '8082:8082'
      - '9999:9999'
    volumes:
      - kafka-home:/opt/kafka_2.11-0.10.2.2/ # Just so that kafka scripts are available inside container
    environment:
      KAFKA_HOME: /opt/kafka_2.11-0.10.2.2
      KAFKA_BROKERS: kafka:9092
      ZK_QUORUM: zookeeper:2181
      MONGO_URI: mongodb://mongo:27017
    networks:
      - logisland

  mongo:
    hostname: mongo
    image: 'mongo:3.6.11'
    networks:
      - logisland

volumes:
  kafka-home:

networks:
  logisland:

Launch your docker containers with this command :

sudo docker-compose -f docker-compose-datastore-MongoDB.yml up -d

Make sure all containers are running and that there is no error.

sudo docker-compose ps

Those containers should now be up and running as shown below

CONTAINER ID        IMAGE                                                 COMMAND                  CREATED             STATUS              PORTS                                                                    NAMES
0d9e02b22c38        docker.elastic.co/kibana/kibana:5.4.0                 "/bin/sh -c /usr/loc…"   13 seconds ago      Up 8 seconds        0.0.0.0:5601->5601/tcp                                                   conf_kibana_1
ab15f4b5198c        docker.elastic.co/MongoDB/MongoDB:6.6.2   "/bin/bash bin/es-do…"   13 seconds ago      Up 7 seconds        0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp                           conf_MongoDB_1
a697e45d2d1a        hurence/logisland:1.1.0                               "tail -f bin/logisla…"   13 seconds ago      Up 9 seconds        0.0.0.0:4050->4050/tcp, 0.0.0.0:8082->8082/tcp, 0.0.0.0:9999->9999/tcp   conf_logisland_1
db80cdf23b45        hurence/zookeeper                                     "/bin/sh -c '/usr/sb…"   13 seconds ago      Up 10 seconds       2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 7072/tcp                     conf_zookeeper_1
7aa7a87dd16b        hurence/kafka:0.10.2.2-scala-2.11                     "start-kafka.sh"         13 seconds ago      Up 5 seconds        0.0.0.0:9092->9092/tcp                                                   conf_kafka_1

4. Mongo DB setup

You have to create the db logisland with the collection apache.

    # open the mongo shell inside mongo container
    sudo docker exec -ti conf_mongo_1 mongo

    > use logisland
    switched to db logisland

    > db.apache.insert({src_ip:"19.123.12.67", identd:"-", user:"-", bytes_out:12344, http_method:"POST", http_version:"2.0", http_query:"/logisland/is/so?great=true",http_status:"404" })
    WriteResult({ "nInserted" : 1 })

    > db.apache.find()
{ "_id" : ObjectId("5b4f3c4a5561b53b7e862b57"), "src_ip" : "19.123.12.67", "identd" : "-", "user" : "-", "bytes_out" : 12344, "http_method" : "POST", "http_version" : "2.0", "http_query" : "/logisland/is/so?great=true", "http_status" : "404" }

5. Logisland job setup

Now that we have a fully functionnal infrastructure, we can go to the stream processing job definition.

The beginning of the job remains the same as the Getting Started one :

  • an Engine definition

  • a RecordStream to handle the processing pipeline

  • a sequence of Processor to parse the logs

What is new here is the

  • controllerService definition to instanciate an MongoDB Datastore

  • a BulkPut processor to send Records to the DataStore

The controllerServiceConfigurations part is here to define all services that be shared by processors within the whole job, here an MongoDB service that will be used later in the BulkAddMongoDB processor.

engine:
  component: com.hurence.logisland.engine.spark.KafkaStreamProcessingEngine
  configuration:
    spark.app.name: IndexApacheLogsDemo
    spark.master: local[2]
  controllerServiceConfigurations:

    - controllerService: datastore_service
      component: com.hurence.logisland.service.mongodb.MongoDBControllerService
      configuration:
        mongo.uri: ${MONGO_URI}
        mongo.db.name: logisland
        mongo.collection.name: apache
        # possible values ACKNOWLEDGED, UNACKNOWLEDGED, FSYNCED, JOURNALED, REPLICA_ACKNOWLEDGED, MAJORITY
        mongo.write.concern: ACKNOWLEDGED
        flush.interval: 2000
        batch.size: 1000

As you can see it uses environment variable so make sure to set them. (if you use the docker-compose file of this tutorial it is already done for you). You should notice that MongoDB_service is the unique name given to the ControllerService which can referenced by any Processor config further in the config file.

Inside this engine you will run a Kafka stream of processing, so we setup input/output topics and Kafka/Zookeeper hosts. Here the stream will read all the logs sent in logisland_raw topic and push the processing output into logisland_events topic.

- processor: apache_parser
    component: com.hurence.logisland.processor.SplitText
        ...

# all the parsed records are added to MongoDB by bulk
- processor: MongoDB_publisher
  component: com.hurence.logisland.processor.datastore.BulkPut
  configuration:
    datastore.client.service: datastore_service

you can now run the job inside the logisland container

sudo docker exec -ti conf_logisland_1 ./bin/logisland.sh --conf ./conf/index-apache-logs-MongoDB.yml

6. Inspect the logs

With mongo you can directly use the shell:

> db.apache.find()
{ "_id" : "507adf3e-3162-4ff0-843a-253e01a6df69", "src_ip" : "129.94.144.152", "record_id" : "507adf3e-3162-4ff0-843a-253e01a6df69", "http_method" : "GET", "record_value" : "129.94.144.152 - - [01/Jul/1995:00:00:17 -0400] \"GET /images/ksclogo-medium.gif HTTP/1.0\" 304 0", "http_query" : "/images/ksclogo-medium.gif", "bytes_out" : "0", "identd" : "-", "http_version" : "HTTP/1.0", "http_status" : "304", "record_time" : NumberLong("804571.1.200"), "user" : "-", "record_type" : "apache_log" }
{ "_id" : "c44a9d09-52b9-4ada-8126-39c70c90fdd3", "src_ip" : "ppp-mia-30.shadow.net", "record_id" : "c44a9d09-52b9-4ada-8126-39c70c90fdd3", "http_method" : "GET", "record_value" : "ppp-mia-30.shadow.net - - [01/Jul/1995:00:00:27 -0400] \"GET / HTTP/1.0\" 200 7074", "http_query" : "/", "bytes_out" : "7074", "identd" : "-", "http_version" : "HTTP/1.0", "http_status" : "200", "record_time" : NumberLong("804571227000"), "user" : "-", "record_type" : "apache_log" }

7. Stop the job

You can Ctr+c the console where you launched logisland job. Then to kill all containers used run :

sudo docker-compose -f docker-compose-datastore-MongoDB.yml down

Make sure all container have disappeared.

sudo docker ps