Kafka Connect Blockchain¶
A Connector to hook into the live streaming providing a real time feed for new bitcoin blocks and transactions provided by www.blockhain.info The connector subscribe to notification on blocks, transactions or an address and receive JSON objects describing a transaction or block when an event occurs. This json is then pushed via kafka connect to a kafka topic and therefore can be consumed either by a Sink or have a live stream processing using for example Kafka Streams.
Since is a direct websocket connection the Source will only ever use one connector task at any point. There is no point spawning more and then have duplicate data.
One thing to remember is the subscription API from blockchain doesn’t offer an option to start from a given timestam. This means if the connect worker is down then you will miss some data.
The Sink connects to unconfirmed transaction!! Read more about the live data here
Source Connector QuickStart¶
We will start the connector in distributed mode. Each connector exposes a rest endpoint for stopping, starting and updating the configuration. We have developed
a Command Line Interface to make interacting with the Connect Rest API easier. The CLI can be found in the Stream Reactor download under
bin folder. Alternatively the Jar can be pulled from our GitHub
Starting the Connector (Distributed)¶
Download, unpack and install the Stream Reactor. Follow the instructions here if you haven’t already done so. All paths in the quickstart are based in the location you installed the Stream Reactor.
Start Kafka Connect in distributed more by running the
start-connect.sh script in the
Once the connector has started we can now use the kafka-connect-tools cli to post in our distributed properties file for BlockChain.
➜ bin/cli.sh create blockchain-source < conf/blockchain-source.properties #Connector `blockchain-source`: name=blockchain-source connector.class=com.datamountaineeer.streamreactor.connect.blockchain.source.BlockchainSourceConnector max.tasks=1 connect.blockchain.source.kafka.topic = blockchain-test max.tasks=1 #task ids:
blockchain-source.properties file defines:
- The name of the source.
- The Source class.
- The max number of tasks the connector is allowed to created (1 task only).
- The topics to write to.
If you switch back to the terminal you started the Connector in you should see the Blockchain Source being accepted and the task starting.
We can use the CLI to check if the connector is up but you should be able to see this in logs as-well.
#check for running connectors with the CLI ➜ bin/cli.sh ps blockchain-source
[2016-08-21 20:31:36,398] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:769) [2016-08-21 20:31:36,406] INFO ____ _ __ __ _ _ | _ \ __ _| |_ __ _| \/ | ___ _ _ _ __ | |_ __ _(_)_ __ ___ ___ _ __ | | | |/ _` | __/ _` | |\/| |/ _ \| | | | '_ \| __/ _` | | '_ \ / _ \/ _ \ '__| | |_| | (_| | || (_| | | | | (_) | |_| | | | | || (_| | | | | | __/ __/ | |____/ \__,_|\__\__,_|_| |_|\___/ \__,_|_| |_|\__\__,_|_|_| |_|\___|\___|_| ____ _ _ ____ _ _ ____ by Stefan Bocutiu | __ )| | ___ ___| | __/ ___| |__ __ _(_)_ __ / ___| ___ _ _ _ __ ___ ___ | _ \| |/ _ \ / __| |/ / | | '_ \ / _` | | '_ \ \___ \ / _ \| | | | '__/ __/ _ \ | |_) | | (_) | (__| <| |___| | | | (_| | | | | | ___) | (_) | |_| | | | (_| __/ |____/|_|\___/ \___|_|\_\\____|_| |_|\__,_|_|_| |_| |____/ \___/ \__,_|_| \___\___|
Now we need to see records pushed on the topic. We can use the
kafka-avro-console-producer to do this.
$ ./bin/kafka-avro-console-consumer --topic blockchain-test \ --zookeeper localhost:2181 \ --from-beginning
Now the console is reading blockchain transaction data which would print on the terminal.