One important requirement many clients ask us to work on with LeanXcale is to have an instance running in parallel with their databases while not changing their existing systems.

A solution that achieves this is the implementation of CDC, or ‘change data capture’ with Debezium. In this post, we explain how to integrate a MySQL database with LeanXcale, although you can use other source databases, such as Postgres or MongoDB.

Topology

Following the documentation offered by Debezium, the simplified topology for our example is illustrated as:

Components

The creation of the components is performed with docker images that includes, according to the architecture of Debezium, the following:

  • Apache Zookeeper.
  • Apache Kafka.
  • Kafka Connect / Debezium image with the modification of the Kafka Connector for LeanXcale placed into /kafka/connect directory.
  • An empty MySQL database image into which we perform some create statements.
  • An empty LeanXcale database into which all changes from MySQL are replicated.

Configuration

We use the utility docker-compose to configure all the images, which is defined in the file docker-compose.yaml:

version: '2'
services:
  zookeeper:
    image: debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: debezium/kafka:${DEBEZIUM_VERSION}
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  mysql:
    image: debezium/example-mysql:${DEBEZIUM_VERSION}
    ports:
     - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
  connect:
    image: debezium/connect:${DEBEZIUM_VERSION}
    ports:
     - 8083:8083
    links:
     - kafka
     - mysql
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses
  lx:
    image: local:lx
    links:
     - kafka
     - connect

With this command, the components are started:

export DEBEZIUM_VERSION=1.1
docker-compose -f docker-compose.yaml up

The next step copies the LeanXcale Kafka connector into the debezium/connect container. First, it must be downloaded from the drivers page, and then unpacked and copied into the docker:

tar -xzvf kafka-lx-connector-1.4.tar.gz
docker kafka-lx-connector-1.4/ json_connect_1:/kafka/connect

Note: The name of the debezium/connect container is json_connect_1.

Then, a restart is necessary:

docker restart json_connect_1

Note: Instead of copying into the container, we can create a new image with the LeanXcale connector and avoid the copy and container restart procedure.

The following steps register the MySQL and LeanXcale connectors. The MySQL connector is the producer, and the following is the register-mysql.json file:

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.whitelist": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
    "transforms": "route,unwrap",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "$3",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "none"

    }
}

The command to register the connector us:

curl -i -X POST -H "Accept:application/json" -H  "Content-
Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

In the MySQL connector, we include two transformations of “transforms”: “route,unwrap” With the transformation route, the connector puts the messages into the topic using the table name. With the second transformation, unwrap, the original message is changed to be compatible with JDBC connectors, such as the LeanXcale connector.

The next step is to register the file register-lx.json with the details about the LeanXcale connector:

{
    "name": "lx-connector",
    "config": {
        "connector.class": "com.leanxcale.connector.kafka.LXSinkConnector",
        "tasks.max": "1",
    "topics": "t1",
        "connection.url": "kivi:lxis://lx:9876",
        "connection.user": "APP",
        "connection.password": "APP",
        "connection.database": "db",
    "auto.create": "true",
    "delete.enabled": "true",
    "insert.mode": "upsert",
    "batch.size": "500",
    "connection.check.timeout": "20",
    "sink.connection.mode": "kivi",
    "sink.transactional": "false",
    "table.name.format": "t1",
    "pk.mode": "record_key",
    "fields.whitelist": "field1,field2"
    }
}

In this example, we indicate only one table (t1) in the connector where the table is auto-created at the first insert. We can delete rows by setting “delete.enabled”: “true”.

A similar command registers this file:

curl -i -X POST -H "Accept:application/json" -H  "Content-
Type:application/json" http://localhost:8083/connectors/ -d @register-lx.json

Let’s play!

When all components are up, and the connectors are configured, we execute commands in the MySQL database to verify that data is replicated into the LeanXcale database as expected.

Summary

In this post, we configured a typical Debezium topology, with a MySQL database as the source and LeanXcale as the sink. With this configuration described, all data initially inserted in MySQL was replicated in LeanXcale.

Next Step

For another approach, we will describe in the following post the same configuration but using Avro instead of JSON messages, while also checking the time it takes to start up a system that contains millions of records.

WRITTEN BY


Jacob Roldán

Software engineer at LeanXcale

jacob@leanxcale.com

https://www.linkedin.com/in/jacob-roldan-24a7aa20/