In this post, we continue working with Debezium and LeanXcale configured for a common client scenario that has a large database, and we want to start a replication database as quickly as possible.
For this approach, we configure Avro as the message format instead of JSON. Being a more compact message, it can offer significantly improved performance.
We again use a MySQL database, as featured in the previous post. This time, we first load a CSV file using mysqlimport and monitor and compare the times for a LeanXcale database to load through Kafka with JSON and Avro.
Topology
Following the documentation from Debezium and expanding on the example from the previous post, we add the component for Avro of the schema registry:
Configuration
The new configuration for docker-compose includes the new component schema-registry as a docker image:
version: '2' services: zookeeper: image: debezium/zookeeper:$ ports: - 2181:2181 - 2888:2888 - 3888:3888 kafka: image: debezium/kafka:$ ports: - 9092:9092 links: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 mysql: image: debezium/example-mysql:$ ports: - 3306:3306 environment: - MYSQL_ROOT_PASSWORD=debezium - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw schema-registry: image: confluentinc/cp-schema-registry ports: - 8181:8181 - 8081:8081 environment: - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 - SCHEMA_REGISTRY_HOST_NAME=schema-registry - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 links: - zookeeper connect: image: debezium/connect:$ ports: - 8083:8083 links: - kafka - mysql - schema-registry environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses - KEY_CONVERTER=io.confluent.connect.avro.AvroConverter - VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter - INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter - INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 lx: image: local:lx links: - kafka - schema-registry
For this example, we use a table with different types of fields:
CREATE TABLE example ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512), weight DOUBLE, date1 TIMESTAMP );
Next, the registers are configured. In the source MySQL register, we include all transformations as well as the Avro converter.
{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.inventory", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schemas.enable": "false", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081", "transforms": "route,unwrap,convert_date1", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", "transforms.route.replacement": "$3", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", "transforms.unwrap.drop.tombstones": "false", "transforms.unwrap.delete.handling.mode": "none", "transforms.convert_date1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.convert_date1.target.type": "Timestamp", "transforms.convert_date1.field": "date1", "transforms.convert_date1.format": "yyyy-MM-dd'T'HH:mm:ss'Z'" } }
We added a new transformation for the Timestamp field because to send data as a Timestamp type instead of a String format. On the other hand, the LeanXcale sink register remains simple:
{ "name": "lx-connector", "config": { "connector.class": "com.leanxcale.connector.kafka.LXSinkConnector", "tasks.max": "1", "topics": "example", "connection.url": "lx://lx:9876", "connection.user": "APP", "connection.password": "APP", "connection.database": "db", "auto.create": "true", "delete.enabled": "true", "insert.mode": "upsert", "batch.size": "500", "connection.check.timeout": "20", "sink.connection.mode": "kivi", "sink.transactional": "false", "table.name.format": "$", "pk.mode": "record_key", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schemas.enable": "false", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.url": "http://schema-registry:8081" } }
All configuration files are available in our GitLab repository.
Let’s play!
Now, we call the sequence of all these commands to initiate the components and perform the first load of the CSV file:
#Start components export DEBEZIUM_VERSION=1.1 docker-compose -f docker-compose-avro.yaml up #Copy LeanXcale connector into container ‘connect’ and restart it docker cp connector/kafka-lx-connector-1.4/ avro_connect_1:/kafka/connect docker restart avro_connect_1 #Copy CSV file into mysql container and create a mysql session to create the ‘example table docker cp /DATA_LX/FILES/EXAMPLE_100M.csv avro_mysql_1:/tmp/example.csv docker-compose -f docker-compose-avro.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory' #And finally, load the registers. With this, Debezium will start to copy data from MySql to LeanXcale, since there is data in MySql table. curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-lx-avro.json curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql-avro.json
If you want to see the messages in Kafka, then they are now in the Avro format. However, you can see them in a human-readable format using the command kafka-avro-console-consumer:
docker-compose -f docker-compose-avro.yaml exec schema-registry /usr/bin/kafka-avro-console-consumer \ --bootstrap-server kafka:9092 \ --from-beginning \ --property print.key=true \ --property schema.registry.url=http://schema-registry:8081 \ --topic example
We test this configuration with several row counts to determine any differences and record the execution times. Disclaimer: We could have applied another configuration to obtain a better performance. However, the purpose of this post is only to make a comparison with simple installations.
In the following table, we compare the time to import the CSV file into MySQL with the time for Debezium/Kafka to copy the data from MySQL into LeanXcale using either JSON or Avro messages.
WRITTEN BY
Jacob Roldán
Software engineer at LeanXcale