CDC is a design pattern that captures real time incremental changes to data. Some of its usecases include data replication between databases, building search index and data sync in microservices etc.

Kafka connect is a highly distributed, scalable, and fault tolerant kafka framework. It allows data integration between kafka and external systems. Debezium is a distributed open source platform for CDC built on top of Apache Kafka. It monitor changes in a database and trigger events to Kafka in real time! It has built in support for various connectors for databases like MySQL, MongoDB, PostgreSQL and others.

I will use docker compose file to quickly setup all instances that includes zookeeper, kafka, kafka consumer, kafka connect and mysql.

Our goal is to capture changes from mysql database in real time and publish them to a kafka topic, we will setup a consumer on this topic to detect those events. Here we will be using debezium mysql connector to detect changes on mysql database.

Here’s a docker compose to quickly spin up all the services. Its a modification of the original docker-compose by debezium

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
version: '3.6'
services:
  zookeeper:
    image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
    networks:
     - kafka-net-1
  kafka:
    image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
    depends_on:
      - zookeeper
    ports:
     - 9092:9092
    networks:
     - kafka-net-1
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  mysql:
    image: quay.io/debezium/example-mysql:${DEBEZIUM_VERSION}
    ports:
     - 3306:3306
    networks:
     - kafka-net-1
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
  connect:
    image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
    depends_on:
      - kafka
    ports:
     - 8083:8083
    networks:
     - kafka-net-1
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses
  consumer:
    image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
    tty: true
    depends_on:
      - kafka
    networks:
     - kafka-net-1
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
     - KAFKA_BROKER=kafka:9092
    command: 
     - watch-topic 
     - -a 
     - dbserver1.inventory.customers
networks:
  kafka-net-1: {}



Start above docker compose using docker compose up.

The services in the docker compose includes:

  • zookeeper, kafka and connect are being used to setup kafka and kafka-connect, kafka-connect will read changes from the mysql database and publish the changes to a kafka topic dbserver1.inventory.customers
  • consumer is a kafka consumer it subscribes to the topic dbserver1.inventory.customers.
  • mysql is a debezium example image for mysql which comes with an inventory database.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
mysql> use inventory;


mysql> show tables;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses           |
| customers           |
| geom                |
| orders              |
| products            |
| products_on_hand    |
+---------------------+

mysql> SELECT * FROM customers;
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+

Next, run the following command to check whether kafka connect is up and running if the connection is successful that means the kafka connect service is up and running.

1
curl -H "Accept:application/json" localhost:8083/

Initially there will be no connector. We will register a debezium mysql connector to inventory database, it allows kafka connect to read changes from mysql db.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" \
localhost:8083/connectors/ -d '{
   "name": "inventory-connector",
   "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "tasks.max": "1",
      "database.hostname": "mysql",
      "database.port": "3306",
      "database.user": "root",
      "database.password": "debezium",
      "database.server.id": "184054",
      "database.server.name": "dbserver1",
      "database.include.list": "inventory",
      "database.history.kafka.bootstrap.servers": "kafka:9092",
      "database.history.kafka.topic": "dbhistory.inventory"
   }
}'

Check all the registered connectors using

1
 curl -H "Accept:application/json" localhost:8083/connectors/

It returns something like this, this time.

1
{"version":"3.2.0","commit":"38103ffaa962ef50","kafka_cluster_id":"IvOJI6iASyWZAhBkPnkhOg"}

On the container logs you should see something like this once the connector is registered.

Next login to mysql and make changes. The changes will be detected by the debezium mysql connector and kafka connect will publish those to a kafka topic, we will also use consumer that would read the changes from the kafka topic.

1
2
docker exec -it mysql /bin/bash
mysql -umysqluser -pdebezium

We will update the customers table.

1
UPDATE customers SET first_name='Anne Marie' WHERE id=1004;

Now check the kafka consumer logs. If you look closely at the logs it contains information about the field changes.

We can delete a record in the table customers, we will remove the foreign key contraints first by deleting the record from the addresses table.

1
2
DELETE FROM addresses WHERE customer_id=1004;
DELETE FROM customers WHERE id=1004;

Similary we can also detect insert operation to the table.

In case kafka goes down for sometime, on restart the debezium mysql connector will read the binlog file from last commit(as long as the binlog isn’t purged by the mysql server). More information around it can be found in the official documentation of debezium mysql connector here.