Kafka Connect JMS Source

The JMS Source connector allows subscribe to messages on JMS queues and topics.

The Source supports:

  1. Pluggable converters of JMS payloads. If no converters are specified a Avro message is created representing the JMS Message, the payload from the message is stored as a byte array in the payload field of the Avro.
  2. Out of the box converters for Json/Avro and Binary
  3. The KCQL routing querying - JMS Destination to Kafka topic mapping.

Prerequisites

  • Confluent 3.3
  • Java 1.8
  • Scala 2.11
  • A JMS framework (ActiveMQ for example)

Setup

Before we can do anything, including the QuickStart we need to install the Confluent platform. For ActiveMQ follow http://activemq.apache.org/getting-started.html for the instruction of setting it up.

Confluent Setup

Follow the instructions here.

Source Connector QuickStart

We you start the Confluent Platform, Kafka Connect is started in distributed mode (confluent start). In this mode a Rest Endpoint on port 8083 is exposed to accept connector configurations. We developed Command Line Interface to make interacting with the Connect Rest API easier. The CLI can be found in the Stream Reactor download under the bin folder. Alternatively the Jar can be pulled from our GitHub releases page.

Starting the Connector (Distributed)

Download, unpack and install the Stream Reactor and Confluent. Follow the instructions here if you haven’t already done so. All paths in the quickstart are based in the location you installed the Stream Reactor.

Once the Connect has started we can now use the kafka-connect-tools cli to post in our distributed properties file for JMS. If you are using the dockers you will have to set the following environment variable to for the CLI to connect to the Rest API of Kafka Connect of your container.

export KAFKA_CONNECT_REST="http://myserver:myport"
➜  bin/connect-cli create jms-sink < conf/jms-source.properties

The jms-source.properties file defines:

name=jms-source
connector.class=com.datamountaineer.streamreactor.connect.jms.source.JMSSourceConnector
tasks.max=1
connect.jms.kcql=INSERT INTO topic SELECT * FROM jms-queue
connect.jms.queues=jms-queue
connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connect.jms.url=tcp://localhost:61616
connect.jms.connection.factory=ConnectionFactory
  1. The source connector name.
  2. The JMS Source Connector class name.
  3. The number of tasks to start.
  4. The KCQL routing querying.
  5. A comma separated list of queues destination types on the target JMS, must match the from element in KCQL.
  6. The JMS initial context factory.
  7. The url of the JMS broker.
  8. The JMS connection factory.

Use the Confluent CLI to view Connects logs.

# Get the logs from Connect
confluent log connect

# Follow logs from Connect
confluent log connect -f

We can use the CLI to check if the connector is up but you should be able to see this in logs as-well.

#check for running connectors with the CLI
➜ bin/connect-cli ps
jms-source

#Connector `jms-source`:
name=jms-source
connect.jms.kcql=INSERT INTO topic SELECT * FROM jms-queue
tasks.max=1
connector.class=com.datamountaineer.streamreactor.connect.jms.source.JMSSourceConnector
connect.jms.queues=jms-queue
connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connect.jms.url=tcp://localhost:61616
connect.jms.connection.factory=ConnectionFactory
#task ids: 0
INFO Kafka version : 0.10.2.0-cp1 (org.apache.kafka.common.utils.AppInfoParser:83)
INFO Kafka commitId : 64c9b42f3319cdc9 (org.apache.kafka.common.utils.AppInfoParser:84)
INFO     ____        __        __  ___                  __        _
        / __ \____ _/ /_____ _/  |/  /___  __  ______  / /_____ _(_)___  ___  ___  _____
       / / / / __ `/ __/ __ `/ /|_/ / __ \/ / / / __ \/ __/ __ `/ / __ \/ _ \/ _ \/ ___/
      / /_/ / /_/ / /_/ /_/ / /  / / /_/ / /_/ / / / / /_/ /_/ / / / / /  __/  __/ /
     /_____/\__,_/\__/\__,_/_/  /_/\____/\__,_/_/ /_/\__/\__,_/_/_/ /_/\___/\___/_/
             ____  _____________
            / /  |/  / ___/ ___/____  __  _______________
       __  / / /|_/ /\__ \\__ \/ __ \/ / / / ___/ ___/ _ \  By Andrew Stevenson
      / /_/ / /  / /___/ /__/ / /_/ / /_/ / /  / /__/  __/
      \____/_/  /_//____/____/\____/\__,_/_/   \___/\___/
 (com.datamountaineer.streamreactor.connect.jms.source.JMSSourceTask:22)
INFO JMSConfig values:
    connect.jms.batch.size = 100
    connect.jms.connection.factory = ConnectionFactory
    connect.jms.converter.throw.on.error = false
    connect.jms.destination.selector = CDI
    connect.jms.error.policy = THROW
    connect.jms.initial.context.extra.params = []
    connect.jms.initial.context.factory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
    connect.jms.kcql = INSERT INTO topic SELECT * FROM jms-queue
    connect.jms.max.retries = 20
    connect.jms.password = null
    connect.jms.queues = [jms-queue]
    connect.jms.retry.interval = 60000
    connect.jms.default.converters =
    connect.jms.topics = []
    connect.jms.url = tcp://localhost:61616
    connect.jms.username = null
 (com.datamountaineer.streamreactor.connect.jms.config.JMSConfig:180)
INFO Instantiated connector jms-source with version null of type class com.datamountaineer.streamreactor.connect.jms.source.JMSSourceConnector (org.apache.kafka.connect.runtime.Worker:181)
INFO Finished creating connector jms-source (org.apache.kafka.connect.runtime.Worker:194)
INFO SourceConnectorConfig values:
    connector.class = com.datamountaineer.streamreactor.connect.jms.source.JMSSourceConnector
    key.converter = null
    name = jms-source
    tasks.max = 1
    transforms = null
    value.converter = null
 (org.apache.kafka.connect.runtime.SourceConnectorConfig:180)

Test Records

Now we need to send some records into the ActiveMQ broker for the Source Connector to pick up. We can do this with the ActiveMQ command line producer. In the bin folder of the Active MQ location run the following to insert 1000 messages into a queue called jms-queue.

activemq producer --destination queue://jms-queue --message "hello DataMountaineer"

We should immediately see the records coming through the sink and into our Kafka topic:

${CONFLUENT_HOME}/bin/kafka-avro-console-consumer \
--zookeeper localhost:2181 \
--topic topic \
--from-beginning
{"message_timestamp":{"long":1490799748984},"correlation_id":null,"redelivered":{"boolean":false},"reply_to":null,"destination":{"string":"queue://jms-queue"},"message_id":{"string":"ID:Andrews-MacBook-Pro.local-49870-1490799747943-1:1:1:1:997"},"mode":{"int":2},"type":null,"priority":{"int":4},"bytes_payload":{"bytes":"hello"},"properties":null}
{"message_timestamp":{"long":1490799748985},"correlation_id":null,"redelivered":{"boolean":false},"reply_to":null,"destination":{"string":"queue://jms-queue"},"message_id":{"string":"ID:Andrews-MacBook-Pro.local-49870-1490799747943-1:1:1:1:998"},"mode":{"int":2},"type":null,"priority":{"int":4},"bytes_payload":{"bytes":"hello"},"properties":null}
{"message_timestamp":{"long":1490799748986},"correlation_id":null,"redelivered":{"boolean":false},"reply_to":null,"destination":{"string":"queue://jms-queue"},"message_id":{"string":"ID:Andrews-MacBook-Pro.local-49870-1490799747943-1:1:1:1:999"},"mode":{"int":2},"type":null,"priority":{"int":4},"bytes_payload":{"bytes":"hello"},"properties":null}
{"message_timestamp":{"long":1490799748987},"correlation_id":null,"redelivered":{"boolean":false},"reply_to":null,"destination":{"string":"queue://jms-queue"},"message_id":{"string":"ID:Andrews-MacBook-Pro.local-49870-1490799747943-1:1:1:1:1000"},"mode":{"int":2},"type":null,"priority":{"int":4},"bytes_payload":{"bytes":"hello"},"properties":null}

Features

The Source supports:

  1. KCQL routing of JMS destination messages to Kafka topics.
  2. Pluggable converters.
  3. Default conversion of JMS Messages to Avro with the payload as a Byte array.
  4. Extra connection properties for specialized connections such as SOLACE_VPN.

Converters

We provide four converters out of the box but you can plug your own. See an example here. which and be set in connect.jms.kcql statement.

AvroConverter

com.datamountaineer.streamreactor.connect.converters.source.AvroConverter

The payload of the JMS message is an Avro message. In this case you need to provide a path for the Avro schema file to be able to decode it.

JsonSimpleConverter

com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter

The payload for the JMS message is a Json message. This converter will parse the json and create an Avro record for it which will be sent over to Kafka.

JsonConverterWithSchemaEvolution

An experimental converter for converting Json messages to Avro. The resulting Avro schema is fully compatible as new fields are added as the JMS json payload evolves.

BytesConverter

com.datamountaineer.streamreactor.connect.converters.source.BytesConverter

This is the default implementation. The JMS payload is taken as is: an array of bytes and sent over Kafka as an avro record with Schema.BYTES. You don’t have to provide a mapping for the source to get this converter!!

Kafka Connect Query Language

K afka C onnect Q uery L anguage found here GitHub repo allows for routing and mapping using a SQL like syntax, consolidating typically features in to one configuration option.

The JMS Source supports the following:

INSERT INTO <kafka target> SELECT * FROM <jms destination> WITHTYPE <TOPIC|QUEUE> [WITHCONVERTER=myclass]

Example:

#select from a JMS queue and write to a kafka topic
INSERT INTO topicA SELECT * FROM jms_queue WITHTYPE QUEUE

#select from a JMS topic and write to a kafka topic
INSERT INTO topicA SELECT * FROM jms_queue WITHTYPE TOPIC

Configurations

connect.jms.url

Provides the JMS broker url

  • Data Type: string
  • Importance: high
  • Optional : no

connect.jms.username

Provides the user for the JMS connection.

  • Data Type: string
  • Importance: high
  • Optional : no

connect.jms.password

Provides the password for the JMS connection.

  • Data Type: string
  • Importance: high
  • Optional : no

connect.jms.initial.context.factory

  • Data Type: string
  • Importance: high
  • Optional: no

Initial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory.

connect.jms.connection.factory

The ConnectionFactory implementation to use.

  • Data Type: string
  • Importance: high
  • Optional : no

connect.jms.destination.selector

  • Data Type: String
  • Importance: high
  • Optional: no
  • Default: CDI

Selector to use for destination lookup. Either CDI or JNDI.

connect.jms.initial.context.extra.params

  • Data Type: String
  • Importance: high
  • Optional: yes

List (comma separated) of extra properties as key/value pairs with a colon delimiter to supply to the initial context e.g. SOLACE_JMS_VPN:my_solace_vp.

connect.jms.kcql

KCQL expression describing field selection and routes. The kcql expression also handles setting the JMS destination type, i.e. TOPIC or QUEUE via the withtype keyword and additionally the converter via the withconverter keyword. If no converter is specified the sink will default to the BytesConverter. This will send an avro message over Kafka using Schema.BYTES

  • Data Type: string
  • Importance: high
  • Optional : no

connect.converter.avro.schemas

If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $JMS_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE

  • Data type: bool
  • Importance: medium
  • Optional: yes
  • Default: null

connect.jms.batch.size

  • Type: int
  • Importance: medium
  • Optional: yes
  • Default: 100

The batch size to take from the JMS destination on each poll of Kafka Connect.

connect.progress.enabled

Enables the output for how many records have been processed.

  • Type: boolean
  • Importance: medium
  • Optional: yes
  • Default : false

Provide your own Converter

You can always provide your own logic for converting the JMS message to your an avro record. If you have messages coming in Protobuf format you can deserialize the message based on the schema and create the avro record. All you have to do is create a new project and add our dependency:

Gradle:

compile "com.datamountaineer:kafka-connect-common:0.7.1"

Maven:

<dependency>
    <groupId>com.datamountaineer</groupId>
    <artifactId>kafka-connect-common</artifactId>
    <version>0.7.1</version>
</dependency>

Then all you have to do is implement com.datamountaineer.streamreactor.connect.converters.source.Converter.

Here is our BytesConverter class code:

class BytesConverter extends Converter {
  override def convert(kafkaTopic: String, sourceTopic: String, messageId: String, bytes: Array[Byte]): SourceRecord = {
    new SourceRecord(Collections.singletonMap(Converter.TopicKey, sourceTopic),
      null,
      kafkaTopic,
      MsgKey.schema,
      MsgKey.getStruct(sourceTopic, messageId),
      Schema.BYTES_SCHEMA,
      bytes)
  }
}

Schema Evolution

Not applicable.

Deployment Guidelines

Distributed Mode

Connect, in production should be run in distributed mode.

  1. Install the Confluent Platform on each server that will form your Connect Cluster.
  2. Create a folder on the server called plugins/streamreactor/libs.
  3. Copy into the folder created in step 2 the required connector jars from the stream reactor download.
  4. Edit connect-avro-distributed.properties in the etc/schema-registry folder where you installed Confluent and uncomment the plugin.path option. Set it to the path you deployed the stream reactor connector jars in step 2.
  5. Start Connect, bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties

Connect Workers are long running processes so set an init.d or systemctl service accordingly.

Connector configurations can then be push to any of the workers in the Cluster via the CLI or curl, if using the CLI remember to set the location of the Connect worker you are pushing to as it defaults to localhost.

export KAFKA_CONNECT_REST="http://myserver:myport"

Kubernetes

Helm Charts are provided at our repo, add the repo to your Helm instance and install. We recommend using the Landscaper to manage Helm Values since typically each Connector instance has it’s own deployment.

Add the Helm charts to your Helm instance:

helm repo add datamountaineer https://datamountaineer.github.io/helm-charts/

TroubleShooting

Please review the FAQs and join our slack channel.