问题描述
我正在尝试使用 Apache Kafka 将事件从
虽然插入和更新工作正常,但我无法理解了解如何从 MySQL
流式传输到 PostgreSQL
.MySQL
中删除记录并将此事件流式传输到 PostgreSQL
.
I am trying to stream events from MySQL
to PostgreSQL
using Apache Kafka.
Although insertions and updates work fine, I can't figure out how to delete a record from MySQL
and stream this event to PostgreSQL
.
假设以下拓扑:
+-------------+
| |
| MySQL |
| |
+------+------+
|
|
|
+---------------v------------------+
| |
| Kafka Connect |
| (Debezium, JDBC connectors) |
| |
+---------------+------------------+
|
|
|
|
+-------v--------+
| |
| PostgreSQL |
| |
+----------------+
我正在使用以下 docker 镜像;
I am using the following docker images;
- Apache-Zookeper
- Apache-Kafka
- Debezium/JDBC 连接器
然后
# Start the application
export DEBEZIUM_VERSION=0.6
docker-compose up
# Start PostgreSQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json
# Start MySQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json
这里是MySQL数据库的内容;
Here is the content of MySQL database;
docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
并且我们可以验证PostgresSQL的内容是一样的;
And we can verify that the content of PostgresSQL is identical;
docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
Thomas | 1001 | Sally | sally.thomas@acme.com
Bailey | 1002 | George | gbailey@foobar.com
Walker | 1003 | Edward | ed@walker.com
Kretchmar | 1004 | Anne | annek@noanswer.org
(4 rows)
假设我想从 MySQL 数据库中删除 id=1004
的记录;
Assume that I want to delete the record with id=1004
from MySQL database;
docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
mysql> delete from customers where id = 1004;
docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
+------+------------+-----------+-----------------------+
虽然从 MySQL 中删除了该记录,但该条目仍然出现在 PostgresSQL 中
Although the record is deleted from MySQL, the entry still appears in PostgresSQL
docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
Thomas | 1001 | Sally | sally.thomas@acme.com
Bailey | 1002 | George | gbailey@foobar.com
Walker | 1003 | Edward | ed@walker.com
Kretchmar | 1004 | Anne | annek@noanswer.org
(4 rows)
我知道支持软删除,但是是否可以从 PostgresSQL
中完全删除该特定条目(通过 Apache-Kafka 从 MySQL 流式传输 del 事件)?
I understand that soft deletes are supported however, is it possible to completely delete that particular entry from PostgresSQL
as well (by streaming the del event from MySQL via Apache-Kafka)?
这是source.json
文件的内容
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
这里是jdbc-sink.json
文件的内容
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "customers",
"connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"
}
}
我也尝试设置 "pk.mode": "record_key"
和 "delete.enabled": "true"
(错误修复建议) 但这种修改似乎不起作用.
I have also tried to set "pk.mode": "record_key"
and "delete.enabled": "true"
(bug fix suggestion) but this modification doesn't seem to work.
推荐答案
Confluent JDBC 接收器连接器当前不支持删除.有一个待处理的拉取请求(您已链接到它),但尚未合并.
Deletes are currently not supported by the Confluent JDBC sink connector. There's a pending pull request (you already linked to it), but this hasn't been merged yet.
目前,您可以自己基于该分支构建 JDBC 接收器连接器,也可以创建一个简单的自定义接收器连接器,该连接器通过在目标数据库上执行相应的 DELETE 语句来处理逻辑删除事件.
For the time being, you could either build the JDBC sink connector based on that branch yourself, or you create a simple custom sink connector which just handles tombstone events by executing a corresponding DELETE statement on the target database.
这篇关于通过 Apache-kafka 将删除事件从 MySQL 流式传输到 PostgreSQL的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!