Was this page helpful?
Caution
You're viewing documentation for an unstable version of ScyllaDB Enterprise. Switch to the latest stable version.
Topic: Kafka Connector
Learn: how to setup the ScyllaDB Sink Connector against a Dockerized ScyllaDB
Audience: Application Developer
This quickstart will show how to setup the ScyllaDB Sink Connector against a Dockerized ScyllaDB.
Using Docker, follow the instructions to launch ScyllaDB.
Start the Docker container, replacing the --name
and --host name
parameters with your own information. For example:
docker run --name some-scylla --hostname some-scylla -d scylladb/scylla
Run docker ps
to show the exposed ports. The output should be similar to this example:
docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
26cc6d47efe3 replace-with-image-name "/docker-entrypoint.…" 4 hours ago Up 23 seconds 0.0.0.0:32777->1883/tcp, 0.0.0.0:32776->9001/tcp anonymous_my_1
Continue with either Confluent or Manual Installation.
If you are new to Confluent, download Confluent Platform.
In the Self managed software box, click DOWNLOAD FREE
Fill in your email address.
Open the Select Deployment Type drop-down and select ZIP.
Accept the Terms & Conditions and click DOWNLOAD FREE.
You will receive an email with instructions. Download / move the file to the desired location.
Continue with the setup following this document.
Navigate to the Kafka Connect Scylladb Sink github page and clone the repository.
Using a terminal, open the source code (src) folder.
Run the command mvn clean install
.
Run the Integration Tests in an IDE. If tests fail run mvn clean install -DskipTests
.
Note
To run Integration Tests, there is no need to run Confluent. Use docker-compose.yml file in the github repository and run the following command (it contains images to run Kafka and other services):
docker-compose -f docker-compose.yml up
After completion of the above steps, a folder named components
will be created in the target folder of the source code folder. The Connector jar files are present in {source-code-folder}/target/components/packages/[jar-files]
Create a folder by the name of ScyllaDB-Sink-Connector
and copy the jar files into it. Navigate to your Confluent Platform installation directory and place this folder in {confluent-directory}/share/java
.
The ScyllaDB sink connector is used to publish records from a Kafka topic into ScyllaDB. Adding a new connector plugin requires restarting Connect. Use the Confluent CLI to restart Connect.
Run the following
confluent local stop && confluent local start
The output will be similar to:
confluent local stop && confluent local start
Starting zookeeper
zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Check if the kafka-connect-scylladb connector plugin has been installed correctly and picked up by the plugin loader:
curl -sS localhost:8083/connector-plugins | jq .[].class | grep ScyllaDbSinkConnector
Your output should resemble:
io.connect.scylladb.ScyllaDbSinkConnector
Save the configuration settings in a file named kafka-connect-scylladb.json
its contents should contain:
{
"name" : "scylladb-sink-connector",
"config" : {
"connector.class" : "io.connect.scylladb.ScyllaDbSinkConnector",
"tasks.max" : "1",
"topics" : "topic1,topic2,topic3",
"scylladb.contact.points" : "scylladb-hosts",
"scylladb.keyspace" : "test"
}
Load the connector. Run the following command:
curl -s -X POST -H 'Content-Type: application/json' --data @kafka-connect-scylladb.json http://localhost:8083/connectors
Update the configuration of the existing connector.
curl -s -X PUT -H 'Content-Type: application/json' --data @kafka-connect-scylladb.json http://localhost:8083/connectors/scylladb/config
Once the Connector is up and running, use the command kafka-avro-console-producer
to produce records(in AVRO format) into the Kafka topic.
kafka-avro-console-producer
--broker-list localhost:9092
--topic example
--property parse.key=true
--property key.schema='{"type":"record",name":"key_schema","fields":[{"name":"id","type":"int"}]}'
--property "key.separator=$"
--property value.schema='{"type":"record","name":"value_schema","fields":[{"name":"id","type":"int"},
{"name":"firstName","type":"string"},{"name":"lastName","type":"string"}]}'
{"id":1}${"id":1,"firstName":"first","lastName":"last"}
Test ScyllaDB by running a select cql query:
cqlsh>select * from demo.example;
id | firstname | lastname
----+-----------+----------
1 | first | last
There are two modes, Standalone and Distributed.
Standard - will use the properties based example.
Distributed - will use the JSON / REST examples.
Use this command to load the connector and connect to ScyllaDB instance without authentication:
curl -s -X POST -H 'Content-Type: application/json' --data @kafka-connect-scylladb.json http://localhost:8083/connectors
Select one of the following configuration methods based on how you have deployed |kconnect-long|
. Distributed Mode will the JSON / REST examples. The standalone mode will use the properties based example.
Note
Each json record should consist of a schema and payload.
{
"name" : "scylladb-sink-connector",
"config" : {
"connector.class" : "io.connect.scylladb.ScyllaDbSinkConnector",
"tasks.max" : "1",
"topics" : "topic1,topic2,topic3",
"scylladb.contact.points" : "scylladb-hosts",
"scylladb.keyspace" : "test",
"key.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter"
"key.converter.schemas.enable" : "true",
"value.converter.schemas.enable" : "true",
"transforms" : "createKey",
"transforms.createKey.fields" : "[field-you-want-as-primary-key-in-scylla]",
"transforms.createKey.type" : "org.apache.kafka.connect.transforms.ValueToKey"
}
}
To load the connector in Standalone mode use:
confluent local load scylladb-sink-conector -- -d scylladb-sink-connector.properties
Use the following configuratopn settings:
connector.class=io.connect.scylladb.ScyllaDbSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
scylladb.contact.points=cassandra
scylladb.keyspace=test
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
transforms=createKey
transforms.createKey.fields=[field-you-want-as-primary-key-in-scylla]
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
For Example:
kafka-console-producer --broker-list localhost:9092 --topic sample-topic
>{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"department"},"payload":{"id":10,"name":"John Doe10","department":"engineering"}}
Run the select cql query to view the data:
Select * from keyspace_name.topic-name;
Note
To publish records in Avro Format use the following properties:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter.schemas.enable=true
value.converter.schemas.enable=true
This example connects to a ScyllaDB instance with security enabled and username / password authentication.
Select one of the following configuration methods based on how you have deployed |kconnect-long|
. Distributed Mode will the JSON / REST examples. The standalone mode will use the properties based example.
{
"name" : "scylladbSinkConnector",
"config" : {
"connector.class" : "io.connect.scylladb.ScyllaDbSinkConnector",
"tasks.max" : "1",
"topics" : "topic1,topic2,topic3",
"scylladb.contact.points" : "cassandra",
"scylladb.keyspace" : "test",
"scylladb.security.enabled" : "true",
"scylladb.username" : "example",
"scylladb.password" : "password",
**add other properties same as in the above example**
}
}
connector.class=io.connect.scylladb.ScyllaDbSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
scylladb.contact.points=cassandra
scylladb.keyspace=test
scylladb.ssl.enabled=true
scylladb.username=example
scylladb.password=password
To check logs for the Confluent Platform use:
confluent local log <service> -- [<argument>] --path <path-to-confluent>
To check logs for ScyllaDB:
docker logs some-scylla | tail
Was this page helpful?