ScyllaDB University LIVE, FREE Virtual Training Event | March 21
Register for Free
ScyllaDB Documentation Logo Documentation
  • Server
  • Cloud
  • Tools
    • ScyllaDB Manager
    • ScyllaDB Monitoring Stack
    • ScyllaDB Operator
  • Drivers
    • CQL Drivers
    • DynamoDB Drivers
  • Resources
    • ScyllaDB University
    • Community Forum
    • Tutorials
Download
ScyllaDB Docs ScyllaDB Enterprise Getting Started Scylla Integrations and Connectors Integrate Scylla with Kafka Shard-Aware Kafka Connector for Scylla Kafka Sink Connector Quickstart

Caution

You're viewing documentation for a previous version. Switch to the latest stable version.

Kafka Sink Connector Quickstart¶

Topic: Kafka Connector

Learn: how to setup the ScyllaDB Sink Connector against a Dockerized ScyllaDB

Audience: Application Developer

Synopsis¶

This quickstart will show how to setup the ScyllaDB Sink Connector against a Dockerized ScyllaDB.

Preliminary setup¶

  1. Using Docker, follow the instructions to launch Scylla.

  2. Start the Docker container, replacing the --name and --host name parameters with your own information. For example:

    docker run --name some-scylla --hostname some-scylla -d scylladb/scylla
    
  3. Run docker ps to show the exposed ports. The output should be similar to this example:

    docker ps
    CONTAINER ID        IMAGE                     COMMAND                  CREATED             STATUS              PORTS                                              NAMES
    26cc6d47efe3        replace-with-image-name   "/docker-entrypoint.…"   4 hours ago         Up 23 seconds       0.0.0.0:32777->1883/tcp, 0.0.0.0:32776->9001/tcp   anonymous_my_1
    
  4. Continue with either Confluent or Manual Installation.

Install using Confluent Platform¶

If you are new to Confluent, download Confluent Platform.

  1. In the Self managed software box, click DOWNLOAD FREE

  2. Fill in your email address.

  3. Open the Select Deployment Type drop-down and select ZIP.

  4. Accept the Terms & Conditions and click DOWNLOAD FREE.

  5. You will receive an email with instructions. Download / move the file to the desired location.

  6. Continue with the setup following this document.

Install Kafka Connector manually¶

  1. Navigate to the Kafka Connect Scylladb Sink github page and clone the repository.

  2. Using a terminal, open the source code (src) folder.

  3. Run the command mvn clean install.

  4. Run the Integration Tests in an IDE. If tests fail run mvn clean install -DskipTests.

    Note

    To run Integration Tests, there is no need to run Confluent. Use docker-compose.yml file in the github repository and run the following command (it contains images to run Kafka and other services):

    docker-compose -f docker-compose.yml up
    

    After completion of the above steps, a folder named components will be created in the target folder of the source code folder. The Connector jar files are present in {source-code-folder}/target/components/packages/[jar-files] Create a folder by the name of ScyllaDB-Sink-Connector and copy the jar files into it. Navigate to your Confluent Platform installation directory and place this folder in {confluent-directory}/share/java.

Add Sink Connector plugin¶

The Scylla sink connector is used to publish records from a Kafka topic into Scylla. Adding a new connector plugin requires restarting Connect. Use the Confluent CLI to restart Connect.

  1. Run the following

    confluent local stop && confluent local start
    

    The output will be similar to:

    confluent local stop && confluent local start
    Starting zookeeper
    zookeeper is [UP]
    Starting Kafka
    Kafka is [UP]
    Starting schema-registry
    schema-registry is [UP]
    Starting kafka-rest
    kafka-rest is [UP]
    Starting connect
    connect is [UP]
    
  2. Check if the kafka-connect-scylladb connector plugin has been installed correctly and picked up by the plugin loader:

    curl -sS localhost:8083/connector-plugins | jq .[].class | grep ScyllaDbSinkConnector
    

    Your output should resemble:

    io.connect.scylladb.ScyllaDbSinkConnector

Connector configuration¶

  1. Save the configuration settings in a file named kafka-connect-scylladb.json its contents should contain:

    {
         "name" : "scylladb-sink-connector",
         "config" : {
           "connector.class" : "io.connect.scylladb.ScyllaDbSinkConnector",
           "tasks.max" : "1",
           "topics" : "topic1,topic2,topic3",
           "scylladb.contact.points" : "scylladb-hosts",
           "scylladb.keyspace" : "test"
    }
    
  2. Load the connector. Run the following command:

    curl -s -X POST -H 'Content-Type: application/json' --data @kafka-connect-scylladb.json http://localhost:8083/connectors
    
  3. Update the configuration of the existing connector.

    curl -s -X PUT -H 'Content-Type: application/json' --data @kafka-connect-scylladb.json http://localhost:8083/connectors/scylladb/config
    
  4. Once the Connector is up and running, use the command kafka-avro-console-producer to produce records(in AVRO format) into the Kafka topic.

    kafka-avro-console-producer
    --broker-list localhost:9092
    --topic example
    --property parse.key=true
    --property key.schema='{"type":"record",name":"key_schema","fields":[{"name":"id","type":"int"}]}'
    --property "key.separator=$"
    --property value.schema='{"type":"record","name":"value_schema","fields":[{"name":"id","type":"int"},
    {"name":"firstName","type":"string"},{"name":"lastName","type":"string"}]}'
    {"id":1}${"id":1,"firstName":"first","lastName":"last"}
    
  5. Test Scylla by running a select cql query:

    cqlsh>select * from demo.example;
     id | firstname | lastname
    ----+-----------+----------
      1 |     first |     last
    

Scylla modes¶

There are two modes, Standalone and Distributed.

  • Standard - will use the properties based example.

  • Distributed - will use the JSON / REST examples.

Use this command to load the connector and connect to ScyllaDB instance without authentication:

curl -s -X POST -H 'Content-Type: application/json' --data @kafka-connect-scylladb.json http://localhost:8083/connectors

Select one of the following configuration methods based on how you have deployed |kconnect-long|. Distributed Mode will the JSON / REST examples. The standalone mode will use the properties based example.

Note

Each json record should consist of a schema and payload.

Distributed Mode JSON example¶

 {
  "name" : "scylladb-sink-connector",
  "config" : {
    "connector.class" : "io.connect.scylladb.ScyllaDbSinkConnector",
    "tasks.max" : "1",
    "topics" : "topic1,topic2,topic3",
    "scylladb.contact.points" : "scylladb-hosts",
    "scylladb.keyspace" : "test",
    "key.converter" : "org.apache.kafka.connect.json.JsonConverter",
    "value.converter" : "org.apache.kafka.connect.json.JsonConverter"
    "key.converter.schemas.enable" : "true",
    "value.converter.schemas.enable" : "true",

    "transforms" : "createKey",
    "transforms.createKey.fields" : "[field-you-want-as-primary-key-in-scylla]",
    "transforms.createKey.type" : "org.apache.kafka.connect.transforms.ValueToKey"
  }
}

Standalone Mode JSON example¶

To load the connector in Standalone mode use:

confluent local load scylladb-sink-conector -- -d scylladb-sink-connector.properties

Use the following configuratopn settings:

connector.class=io.connect.scylladb.ScyllaDbSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
scylladb.contact.points=cassandra
scylladb.keyspace=test

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

transforms=createKey
transforms.createKey.fields=[field-you-want-as-primary-key-in-scylla]
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey

For Example:

kafka-console-producer --broker-list localhost:9092 --topic sample-topic
>{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"department"},"payload":{"id":10,"name":"John Doe10","department":"engineering"}}

Run the select cql query to view the data:

Select * from keyspace_name.topic-name;

Note

To publish records in Avro Format use the following properties:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter.schemas.enable=true
value.converter.schemas.enable=true

Authentication¶

This example connects to a Scylla instance with security enabled and username / password authentication.

Select one of the following configuration methods based on how you have deployed |kconnect-long|. Distributed Mode will the JSON / REST examples. The standalone mode will use the properties based example.

Distributed Mode example¶

{
  "name" : "scylladbSinkConnector",
  "config" : {
    "connector.class" : "io.connect.scylladb.ScyllaDbSinkConnector",
    "tasks.max" : "1",
    "topics" : "topic1,topic2,topic3",
    "scylladb.contact.points" : "cassandra",
    "scylladb.keyspace" : "test",
    "scylladb.security.enabled" : "true",
    "scylladb.username" : "example",
    "scylladb.password" : "password",
    **add other properties same as in the above example**
  }
}

Standalone Mode example¶

connector.class=io.connect.scylladb.ScyllaDbSinkConnector
tasks.max=1
topics=topic1,topic2,topic3
scylladb.contact.points=cassandra
scylladb.keyspace=test
scylladb.ssl.enabled=true
scylladb.username=example
scylladb.password=password

Logging¶

To check logs for the Confluent Platform use:

confluent local log <service> -- [<argument>] --path <path-to-confluent>

To check logs for Scylla:

docker logs some-scylla | tail

Additional information¶

  • Kafka Sink Connector Configuration

Was this page helpful?

PREVIOUS
Shard-Aware Kafka Connector for Scylla
NEXT
Kafka Sink Connector Configuration
  • Create an issue

On this page

  • Kafka Sink Connector Quickstart
    • Synopsis
    • Preliminary setup
      • Install using Confluent Platform
      • Install Kafka Connector manually
    • Add Sink Connector plugin
    • Connector configuration
    • Scylla modes
      • Distributed Mode JSON example
      • Standalone Mode JSON example
    • Authentication
      • Distributed Mode example
      • Standalone Mode example
    • Logging
    • Additional information
ScyllaDB Enterprise
  • 2024.1
    • 2024.2
    • 2024.1
    • 2023.1
    • 2022.2
  • Getting Started
    • Install ScyllaDB Enterprise
      • ScyllaDB Web Installer for Linux
      • Install ScyllaDB Without root Privileges
      • Air-gapped Server Installation
      • ScyllaDB Housekeeping and how to disable it
      • ScyllaDB Developer Mode
      • Launch ScyllaDB on AWS
      • Launch ScyllaDB on GCP
      • Launch ScyllaDB on Azure
    • Configure ScyllaDB
    • ScyllaDB Configuration Reference
    • ScyllaDB Requirements
      • System Requirements
      • OS Support by Linux Distributions and Version
      • Cloud Instance Recommendations
      • ScyllaDB in a Shared Environment
    • Migrate to ScyllaDB
      • Migration Process from Cassandra to Scylla
      • Scylla and Apache Cassandra Compatibility
      • Migration Tools Overview
    • Integration Solutions
      • Integrate Scylla with Spark
      • Integrate Scylla with KairosDB
      • Integrate ScyllaDB with Presto
      • Integrate Scylla with Elasticsearch
      • Integrate Scylla with Kubernetes
      • Integrate Scylla with the JanusGraph Graph Data System
      • Integrate Scylla with DataDog
      • Integrate Scylla with Kafka
      • Integrate Scylla with IOTA Chronicle
      • Integrate Scylla with Spring
      • Shard-Aware Kafka Connector for Scylla
      • Install Scylla with Ansible
      • Integrate Scylla with Databricks
      • Integrate Scylla with Jaeger Server
      • Integrate Scylla with MindsDB
    • Tutorials
  • ScyllaDB for Administrators
    • Administration Guide
    • Procedures
      • Cluster Management
      • Backup & Restore
      • Change Configuration
      • Maintenance
      • Best Practices
      • Benchmarking Scylla
      • Migrate from Cassandra to Scylla
      • Disable Housekeeping
    • Security
      • ScyllaDB Security Checklist
      • Enable Authentication
      • Enable and Disable Authentication Without Downtime
      • Creating a Custom Superuser
      • Generate a cqlshrc File
      • Reset Authenticator Password
      • Enable Authorization
      • Grant Authorization CQL Reference
      • Certificate-based Authentication
      • Role Based Access Control (RBAC)
      • ScyllaDB Auditing Guide
      • Encryption: Data in Transit Client to Node
      • Encryption: Data in Transit Node to Node
      • Generating a self-signed Certificate Chain Using openssl
      • Encryption at Rest
      • LDAP Authentication
      • LDAP Authorization (Role Management)
    • Admin Tools
      • Nodetool Reference
      • CQLSh
      • REST
      • Tracing
      • Scylla SStable
      • Scylla Types
      • SSTableLoader
      • cassandra-stress
      • SSTabledump
      • SSTable2json
      • SSTableMetadata
      • Scylla Logs
      • Seastar Perftune
      • Virtual Tables
      • SELECT * FROM MUTATION_FRAGMENTS() Statement
    • ScyllaDB Monitoring Stack
    • ScyllaDB Operator
    • ScyllaDB Manager
    • Upgrade Procedures
      • ScyllaDB Versioning
      • ScyllaDB Enterprise
      • ScyllaDB Open Source to ScyllaDB Enterprise
      • ScyllaDB Image
    • System Configuration
      • System Configuration Guide
      • scylla.yaml
      • ScyllaDB Snitches
    • Benchmarking ScyllaDB
    • ScyllaDB Diagnostic Tools
  • ScyllaDB for Developers
    • Learn To Use ScyllaDB
      • ScyllaDB University
      • Course catalog
      • ScyllaDB Essentials
      • Basic Data Modeling
      • Advanced Data Modeling
      • MMS - Learn by Example
      • Care-Pet an IoT Use Case and Example
    • Scylla Alternator
    • Scylla Features
      • Scylla Open Source Features
      • Scylla Enterprise Features
    • Scylla Drivers
      • Scylla CQL Drivers
      • Scylla DynamoDB Drivers
    • Workload Attributes
  • CQL Reference
    • CQLSh: the CQL shell
    • Appendices
    • Compaction
    • Consistency Levels
    • Consistency Level Calculator
    • Data Definition
    • Data Manipulation
      • SELECT
      • INSERT
      • UPDATE
      • DELETE
      • BATCH
    • Data Types
    • Definitions
    • Global Secondary Indexes
    • Expiring Data with Time to Live (TTL)
    • Functions
    • Wasm support for user-defined functions
    • JSON Support
    • Materialized Views
    • Non-Reserved CQL Keywords
    • Reserved CQL Keywords
    • ScyllaDB CQL Extensions
  • ScyllaDB Architecture
    • ScyllaDB Ring Architecture
    • ScyllaDB Fault Tolerance
    • Consistency Level Console Demo
    • ScyllaDB Anti-Entropy
      • Scylla Hinted Handoff
      • Scylla Read Repair
      • Scylla Repair
    • SSTable
      • ScyllaDB SSTable - 2.x
      • ScyllaDB SSTable - 3.x
    • Compaction Strategies
    • Raft Consensus Algorithm in ScyllaDB
  • Troubleshooting ScyllaDB
    • Errors and Support
      • Report a Scylla problem
      • Error Messages
      • Change Log Level
    • ScyllaDB Startup
      • Ownership Problems
      • Scylla will not Start
      • Scylla Python Script broken
    • Upgrade
      • Inaccessible configuration files after ScyllaDB upgrade
    • Cluster and Node
      • Failed Decommission Problem
      • Cluster Timeouts
      • Node Joined With No Data
      • SocketTimeoutException
      • NullPointerException
      • Failed Schema Sync
    • Data Modeling
      • Scylla Large Partitions Table
      • Scylla Large Rows and Cells Table
      • Large Partitions Hunting
    • Data Storage and SSTables
      • Space Utilization Increasing
      • Disk Space is not Reclaimed
      • SSTable Corruption Problem
      • Pointless Compactions
      • Limiting Compaction
    • CQL
      • Time Range Query Fails
      • COPY FROM Fails
      • CQL Connection Table
    • ScyllaDB Monitor and Manager
      • Manager and Monitoring integration
      • Manager lists healthy nodes as down
  • Knowledge Base
    • Upgrading from experimental CDC
    • Compaction
    • Consistency in ScyllaDB
    • Counting all rows in a table is slow
    • CQL Query Does Not Display Entire Result Set
    • When CQLSh query returns partial results with followed by “More”
    • Run Scylla and supporting services as a custom user:group
    • Customizing CPUSET
    • Decoding Stack Traces
    • Snapshots and Disk Utilization
    • DPDK mode
    • Debug your database with Flame Graphs
    • Efficient Tombstone Garbage Collection in ICS
    • How to Change gc_grace_seconds for a Table
    • Gossip in Scylla
    • Increase Permission Cache to Avoid Non-paged Queries
    • How does Scylla LWT Differ from Apache Cassandra ?
    • Map CPUs to Scylla Shards
    • Scylla Memory Usage
    • NTP Configuration for Scylla
    • Updating the Mode in perftune.yaml After a ScyllaDB Upgrade
    • POSIX networking for Scylla
    • Scylla consistency quiz for administrators
    • Recreate RAID devices
    • How to Safely Increase the Replication Factor
    • Scylla and Spark integration
    • Increase Scylla resource limits over systemd
    • Scylla Seed Nodes
    • How to Set up a Swap Space
    • Scylla Snapshots
    • Scylla payload sent duplicated static columns
    • Stopping a local repair
    • System Limits
    • How to flush old tombstones from a table
    • Time to Live (TTL) and Compaction
    • Scylla Nodes are Unresponsive
    • Update a Primary Key
    • Using the perf utility with Scylla
    • Configure Scylla Networking with Multiple NIC/IP Combinations
  • Reference
    • AWS Images
    • Configuration Parameters
    • Glossary
    • ScyllaDB Enterprise vs. Open Source Matrix
  • ScyllaDB University
  • ScyllaDB FAQ
  • Contribute to ScyllaDB
  • Alternator: DynamoDB API in Scylla
    • Getting Started With ScyllaDB Alternator
    • ScyllaDB Alternator for DynamoDB users
    • Alternator-specific APIs
Docs Tutorials University Contact Us About Us
© 2025, ScyllaDB. All rights reserved. | Terms of Service | Privacy Policy | ScyllaDB, and ScyllaDB Cloud, are registered trademarks of ScyllaDB, Inc.
Last updated on 09 Apr 2025.
Powered by Sphinx 7.4.7 & ScyllaDB Theme 1.8.6