One important requirement many clients ask us to work on with LeanXcale is to have an instance running in parallel with their databases while not changing their existing systems.
A solution that achieves this is the implementation of CDC, or ‘change data capture’ with Debezium. In this post, we explain how to integrate a MySQL database with LeanXcale, although you can use other source databases, such as Postgres or MongoDB.
Topology
Following the documentation offered by Debezium, the simplified topology for our example is illustrated as:
Components
The creation of the components is performed with docker images that includes, according to the architecture of Debezium, the following:
- Apache Zookeeper.
- Apache Kafka.
- Kafka Connect / Debezium image with the modification of the Kafka Connector for LeanXcale placed into /kafka/connect directory.
- An empty MySQL database image into which we perform some create statements.
- An empty LeanXcale database into which all changes from MySQL are replicated.
Configuration
We use the utility docker-compose to configure all the images, which is defined in the file docker-compose.yaml:
version: '2' services: zookeeper: image: debezium/zookeeper:${DEBEZIUM_VERSION} ports: - 2181:2181 - 2888:2888 - 3888:3888 kafka: image: debezium/kafka:${DEBEZIUM_VERSION} ports: - 9092:9092 links: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 mysql: image: debezium/example-mysql:${DEBEZIUM_VERSION} ports: - 3306:3306 environment: - MYSQL_ROOT_PASSWORD=debezium - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw connect: image: debezium/connect:${DEBEZIUM_VERSION} ports: - 8083:8083 links: - kafka - mysql environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses lx: image: local:lx links: - kafka - connect
With this command, the components are started:
export DEBEZIUM_VERSION=1.1 docker-compose -f docker-compose.yaml up
The next step copies the LeanXcale Kafka connector into the debezium/connect container. First, it must be downloaded from the drivers page, and then unpacked and copied into the docker:
tar -xzvf kafka-lx-connector-1.7.tar.gz docker cp kafka-lx-connector-1.7/ json_connect_1:/kafka/connect
Note: The name of the debezium/connect container is json_connect_1.
Then, a restart is necessary:
docker restart json_connect_1
Note: Instead of copying into the container, we can create a new image with the LeanXcale connector and avoid the copy and container restart procedure.
The following steps register the MySQL and LeanXcale connectors. The MySQL connector is the producer, and the following is the register-mysql.json file:
{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.inventory", "transforms": "route,unwrap", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", "transforms.route.replacement": "$3", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", "transforms.unwrap.drop.tombstones": "false", "transforms.unwrap.delete.handling.mode": "none" } }
The command to register the connector us:
curl -i -X POST -H "Accept:application/json" -H "Content- Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
In the MySQL connector, we include two transformations of “transforms”: “route,unwrap” With the transformation route, the connector puts the messages into the topic using the table name. With the second transformation, unwrap, the original message is changed to be compatible with JDBC connectors, such as the LeanXcale connector.
The next step is to register the file register-lx.json with the details about the LeanXcale connector:
{ "name": "lx-connector", "config": { "connector.class": "com.leanxcale.connector.kafka.LXSinkConnector", "tasks.max": "1", "topics": "t1", "connection.properties": "lx://lx:9876/db@APP", "auto.create": "true", "delete.enabled": "true", "insert.mode": "upsert", "batch.size": "500", "connection.check.timeout": "20", "sink.connection.mode": "kivi", "sink.transactional": "false", "table.name.format": "t1", "pk.mode": "record_key", "fields.whitelist": "field1,field2" } }
In this example, we indicate only one table (t1) in the connector where the table is auto-created at the first insert. We can delete rows by setting “delete.enabled”: “true”.
A similar command registers this file:
curl -i -X POST -H "Accept:application/json" -H "Content- Type:application/json" http://localhost:8083/connectors/ -d @register-lx.json
Let’s play!
When all components are up, and the connectors are configured, we execute commands in the MySQL database to verify that data is replicated into the LeanXcale database as expected.
Summary
In this post, we configured a typical Debezium topology, with a MySQL database as the source and LeanXcale as the sink. With this configuration described, all data initially inserted in MySQL was replicated in LeanXcale.
Next Step
For another approach, we will describe in the following post the same configuration but using Avro instead of JSON messages, while also checking the time it takes to start up a system that contains millions of records.
WRITTEN BY
Jacob Roldán
Software engineer at LeanXcale