Datastore Connectors Guide
In this guide we will present you how to integrate kafka connect connectors into logisland.
1. Solution
We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.
Clone the Git repository: git clone https://github.com/hurence/logisland-quickstarts.git
, or download an archive.
The solution is located in the conf/connectors
directory.
This guide assumes you already have the completed application from the getting-started
directory.
1.1. Introduction
Logisland features the integration between kafka connect world and the spark structured streaming engine.
In order to seamlessy integrate both world, we just wrapped out the kafka connectors interfaces (unplugging them from kafka) and let the run in a logisland spark managed container. Hence the name "Logisland Connect" :-)
This allows you to leverage the existing kafka connectors library to import data into a logisland pipeline without having the need to make use of any another middleware or ETL system.
1.2. Prerequisites
You can use this functionality only with a spark engine >= 2.1.x
2. Getting started
In order to use a kafka connect source or sink you have to package and install the required libraries to the logisland lib folder.
Hopefully it can be easily done by using the components.sh tool.
bin/components.sh -i <plugin_artifact>
The plugin artifact should be provided according this format: groupId:artifactId:version where groupId, artifactId and version refer to the maven artifact you’re going to install.
Some examples, with the suggested artifacts to use, in the following table:
Simulator |
com.github.jcustenborder.kafka.connect:kafka-connect-simulator:0.1.118 |
|
OPC-DA / OPC-UA (IIoT) |
com.hurence.logisland:logisland-connector-opc:<logisland_version> |
|
FTP |
com.eneco:kafka-connect-ftp:0.1.4 |
|
Blockchain |
https://github.com/Landoop/stream-reactor/tree/master/kafka-connect-blockchain |
com.datamountaineer:kafka-connect-blockchain:1.1.2 |
JMS |
https://github.com/Landoop/stream-reactor/tree/master/kafka-connect-jms |
com.datamountaineer:kafka-connect-jms:1.1.2 |
JDBC |
https://docs.confluent.io/current/connect/connect-jdbc/docs/index.html |
io.confluent:kafka-connect-jdbc:5.0.0 |
3. Configuring
Once you have bundled the connectors you need, you are now ready to use them.
Let’s do it step by step.
First of all we need to declare a KafkaConnectStructuredSourceProviderService or a KafkaConnectStructuredSinkProviderService that will manage our connector in Logisland. Along with this we need to put some configuration (In general you can always refer to kafka connect documentation to better understand the underlying architecture and how to configure a connector):
Property | Description |
---|---|
kc.connector.class |
The class of the connector (Fully qualified name) |
kc.data.key.converter |
The class of the converter to be used for the key. Please refer to `Choosing the right converter`_ section |
kc.data.key.converter.properties |
The properties to be provided to the key converter |
kc.data.value.converter |
The class of the converter to be used for the value. Please refer to `Choosing the right converter`_ section |
kc.data.value.converter.properties |
The properties to be provided to the value converter |
kc.connector.properties |
The properties to be provided to the connector and specific to the connector itself. |
kc.worker.tasks.max |
How many concurrent threads to spawn for a connector |
kc.connector.offset.backing.store |
The offset backing store to use. Choose among memory (standalone in memory), file (standalone file based), kafka (distributed kafka topic based) |
kc.connector.offset.backing.store.properties |
Specific properties to configure the chosen backing store. |
Please refer to Kafka connect guide for further information about offset backing store and how to configure them.
4. Choosing the right converter
Choosing the right converter is perhaps one of the most important part. In fact we’re going to adapt what is coming from kafka connect to what is flowing into our logisland pipeline. This means that we have to know how the source is managing its data.
In order to simplify your choice, we recommend you to follow this simple approach (the same applies for both keys and values):
Source data | Kafka Converter | Logisland Encoder |
---|---|---|
String |
StringConverter |
StringEncoder |
Raw Bytes |
ByteArrayConverter |
BytesArraySerialiser |
Structured |
LogIslandRecordConverter |
The serializer used by the record converter (*) |
In case you deal with structured data, the LogIslandRecordConverter will embed the structured object in a logisland record. In order to do this you have to specify the serializer to be used to convert your data (the serializer property record.serializer). Generally the KryoSerialiser is a good choice to start with.
5. Putting all together
In the previous two sections we explained how to configure a connector and how to choose the right serializer for it.
The recap we can examine the following configuration example:
# Our source service
- controllerService: kc_source_service
component: com.hurence.logisland.stream.spark.provider.KafkaConnectStructuredSourceProviderService
documentation: >
A kafka source connector provider reading from its own source and providing
structured streaming to the underlying layer
configuration:
# We will use the logisland record converter for both key and value
kc.data.value.converter: com.hurence.logisland.connect.converter.LogIslandRecordConverter
# Use kryo to serialize the inner data
kc.data.value.converter.properties: |
record.serializer=com.hurence.logisland.serializer.KryoSerializer
kc.data.key.converter: com.hurence.logisland.connect.converter.LogIslandRecordConverter
# Use kryo to serialize the inner data
kc.data.key.converter.properties: |
record.serializer=com.hurence.logisland.serializer.KryoSerializer
# Only one task to handle source input (unique)
kc.worker.tasks.max: 1
# The kafka source connector to wrap (here we're using a simulator source)
kc.connector.class: com.github.jcustenborder.kafka.connect.simulator.SimulatorSourceConnector
# The properties for the connector (as per connector documentation)
kc.connector.properties: |
key.schema.fields=email
topic=simulator
value.schema.fields=email,firstName,middleName,lastName,telephoneNumber,dateOfBirth
# We are using a standalone source for testing. We can store processed offsets in memory
kc.connector.offset.backing.store: memory
In the example both key and value provided by the connector are structured objects.
For this reason we use for that the converter LogIslandRecordConverter. We provide the serializer to be used for both key and value converter specifying
record.serializer=com.hurence.logisland.serializer.KryoSerializer
among the related converter properties.
6. Going further
Please do not hesitate to take a look to our kafka connect tutorials for more details and practical use cases.