In this post, we continue working with Debezium and LeanXcale configured for a common client scenario that has a large database, and we want to start a replication database as quickly as possible.

For this approach, we configure Avro as the message format instead of JSON. Being a more compact message, it can offer significantly improved performance.

We again use a MySQL database, as featured in the previous post. This time, we first load a CSV file using mysqlimport and monitor and compare the times for a LeanXcale database to load through Kafka with JSON and Avro.

Topology

Following the documentation from Debezium and expanding on the example from the previous post, we add the component for Avro of the schema registry:

Configuration

The new configuration for docker-compose includes the new component schema-registry as a docker image:

version: '2'
services:
  zookeeper:
    image: debezium/zookeeper:$
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: debezium/kafka:$
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  mysql:
    image: debezium/example-mysql:$
    ports:
     - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
  schema-registry:
    image: confluentinc/cp-schema-registry
    ports:
     - 8181:8181
     - 8081:8081
    environment:
     - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
     - SCHEMA_REGISTRY_HOST_NAME=schema-registry
     - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
    links:
     - zookeeper
  connect:
    image: debezium/connect:$
    ports:
     - 8083:8083
    links:
     - kafka
     - mysql
     - schema-registry
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses
     - KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
     - VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
     - INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
     - INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
     - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
     - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081

  lx:
    image: local:lx
    links:
     - kafka
     - schema-registry

For this example, we use a table with different types of fields:

CREATE TABLE example (
  id INTEGER NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512),
  weight DOUBLE,
  date1 TIMESTAMP
);

Next, the registers are configured. In the source MySQL register, we include all transformations as well as the Avro converter.

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.whitelist": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schemas.enable": "false",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "transforms": "route,unwrap,convert_date1",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "$3",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.unwrap.delete.handling.mode": "none",
    "transforms.convert_date1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.convert_date1.target.type": "Timestamp",
        "transforms.convert_date1.field": "date1",
        "transforms.convert_date1.format": "yyyy-MM-dd'T'HH:mm:ss'Z'"
    }
}

We added a new transformation for the Timestamp field because to send data as a Timestamp type instead of a String format. On the other hand, the LeanXcale sink register remains simple:

{
    "name": "lx-connector",
    "config": {
        "connector.class": "com.leanxcale.connector.kafka.LXSinkConnector",
        "tasks.max": "1",
    "topics": "example",
        "connection.url": "kivi:lxis://lx:9876",
        "connection.user": "APP",
        "connection.password": "APP",
        "connection.database": "db",
    "auto.create": "true",
    "delete.enabled": "true",
    "insert.mode": "upsert",
    "batch.size": "500",
    "connection.check.timeout": "20",
    "sink.connection.mode": "kivi",
    "sink.transactional": "false",
    "table.name.format": "$",
    "pk.mode": "record_key",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schemas.enable": "false",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "value.converter.schema.registry.url": "http://schema-registry:8081"
    }
}

All configuration files are available in our GitLab repository.

Let’s play!

Now, we call the sequence of all these commands to initiate the components and perform the first load of the CSV file:

#Start components

export DEBEZIUM_VERSION=1.1
docker-compose -f docker-compose-avro.yaml up


#Copy LeanXcale connector into container ‘connect’ and restart it

docker cp connector/kafka-lx-connector-1.4/ avro_connect_1:/kafka/connect
docker restart avro_connect_1


#Copy CSV file into mysql container and create a mysql session to create the ‘example table

docker cp /DATA_LX/FILES/EXAMPLE_100M.csv avro_mysql_1:/tmp/example.csv
docker-compose -f docker-compose-avro.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD
inventory'


#And finally, load the registers. With this, Debezium will start to copy data from MySql to LeanXcale, since there is data in MySql table.

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d
@register-lx-avro.json
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d
@register-mysql-avro.json

If you want to see the messages in Kafka, then they are now in the Avro format. However, you can see them in a human-readable format using the command kafka-avro-console-consumer:

docker-compose -f docker-compose-avro.yaml exec schema-registry
/usr/bin/kafka-avro-console-consumer \
    --bootstrap-server kafka:9092  \
    --from-beginning  \
    --property print.key=true \
    --property schema.registry.url=http://schema-registry:8081 \
    --topic example

We test this configuration with several row counts to determine any differences and record the execution times. Disclaimer: We could have applied another configuration to obtain a better performance. However, the purpose of this post is only to make a comparison with simple installations.

In the following table, we compare the time to import the CSV file into MySQL with the time for Debezium/Kafka to copy the data from MySQL into LeanXcale using either JSON or Avro messages.

WRITTEN BY


Jacob Roldán

Software engineer at LeanXcale

jacob@leanxcale.com

https://www.linkedin.com/in/jacob-roldan-24a7aa20/