如何使用 Debezium 从 MS SQL 将 250 个表摄取到 Kafk

How to ingest 250 tables into Kafka from MS SQL with Debezium(如何使用 Debezium 从 MS SQL 将 250 个表摄取到 Kafka)
本文介绍了如何使用 Debezium 从 MS SQL 将 250 个表摄取到 Kafka的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试在 PostgreSQL 之间构建 Kafka 连接管道作为源到 SQL Server 作为目标.我使用了 3 个 Kafka broker,需要消费 252 个主题(一个主题与一张 PostgreSQL 表相同).运行一个多小时后,252张表中只能拉出218张.我发现的错误是 SQL Server 中存在死锁机制,可以将事务保存到 SQL Server 并尝试重试,Debezium 复制槽也已存在.

Hi i have try to build Kafka connect pipeline between PostgreSQL as source to SQL Server as the destination. I used 3 Kafka brokers, and need to consume 252 topics (one topics same as one PostgreSQL table). After run for more than an hour, it only can pull 218 out of 252 tables. The error that i found is there's deadlock mechanism in SQL Server which can hold transaction to SQL Server and try to retry it, also Debezium replication slot has been there.

我在接收器上使用最多 3 个工人的分布式连接器,但也许这似乎还不够.还可以尝试使用更高的 offset.time_out.ms 到 60000 和更高的偏移分区 (100).恐怕这不是我想要的生产水平.任何人都可以就此案提出建议吗?是否有任何计算可以确定我需要的最佳工人数量?

I use distributed connectors with 3 max worker on sink, but maybe it seems not enough. Also try with higher offset.time_out.ms to 60000 and higher offset partition (100). I'm afraid that this is not an production level that i want. Anyone can give suggestion about this case? Is there any calculation to decide best number of workers that i need?

更新

这里出现了一些错误.我看到一些连接器被杀死了.有人告诉我 死锁发生在 SQL SERVER 中 :

here some error i get. I see some connectors are killed. One tell me that deadlock happen in SQL SERVER :

[2020-03-26 15:06:28,494] ERROR WorkerSinkTask{id=sql_server_sink_XXA-0} RetriableException from SinkTask: (org.apache.kafka.connect.runtime.WorkerSinkTask:552)
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Transaction (Process ID 62) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:93)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Transaction (Process ID 62) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

2020 年 4 月 14 日更新

我仍然有这个问题,我忘了告诉我如何部署连接器.现在我使用 2 个工人,一个用于源,一个用于接收器.我在 csv 中列出我的所有表和 pk 并循环遍历行以创建连接器而无需睡眠或等待每分钟.我还为每个主题使用单个主题分区和 3 个副本.但是我仍然有sql server连接死锁

I still have problem with this, i forgot to tell about how i deploy the connectors. Now i use 2 workers, one for source and one for sink. I list all of my tables and pk in an csv and loop through rows to create the connectors without sleep or wait for every minutes. I also use single topics partition and 3 replica for each topics. But i still have sql server connection deadlock

推荐答案

问题可能是同时访问多个任务的同一个 SQL 表,并导致同步问题,如您提到的死锁.
由于您已经拥有大量主题,并且您的连接器可以并行访问它们,我建议您将每个主题的分区数减少到 1(减少分区数在Kafka,因此您应该删除并使用新的分区数重新创建每个主题).
这样,每个主题只有一个分区;每个分区只能在单个线程(/task/consumer)中访问,因此没有机会对同一个表进行并行 SQL 事务.

The problem may be accessing the same SQL table with multiple tasks in the same time and causing synchronization problems like deadlocks as you mentioned.
Since you already have a large number of topics, and your connector can access them in parallel, I would suggest you to reduce the number partitions for every topic to just 1 (reduce number of partitions is not supported in Kafka so you should delete and recreate every topic with the new number of partitions).
This way, every topic have only one partition; every partition can be accessed only in a single thread(/task/consumer) so there is no chance for parallel SQL transactions to the same table.

或者,更好的方法是创建一个包含 3 个分区的主题(与您拥有的任务/消费者数量相同),并让 生产者使用 SQL 表名作为消息键.
Kafka 保证具有相同键的消息总是转到同一个分区,因此具有相同表的所有消息将驻留在单个分区上(单线程消耗).

Alternatively, a better approach is to create a single topic with 3 partitions (same as the number of tasks/consumers you have) and make the producer use the SQL table name as the message key.
Kafka guarantees messages with the same key to always go to the same partition, so all the messages with the same table will reside on a single partition (single thread consuming).

如果你觉得有用,我可以附上更多关于如何创建 Kafka Producer 和发送密钥消息的信息.

If you find it useful, I can attach more information about how to create Kafka Producer and send keyed messages.

这篇关于如何使用 Debezium 从 MS SQL 将 250 个表摄取到 Kafka的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

相关文档推荐

Execute complex raw SQL query in EF6(在EF6中执行复杂的原始SQL查询)
SSIS: Model design issue causing duplications - can two fact tables be connected?(SSIS:模型设计问题导致重复-两个事实表可以连接吗?)
SQL Server Graph Database - shortest path using multiple edge types(SQL Server图形数据库-使用多种边类型的最短路径)
Invalid column name when using EF Core filtered includes(使用EF核心过滤包括时无效的列名)
How should make faster SQL Server filtering procedure with many parameters(如何让多参数的SQL Server过滤程序更快)
How can I generate an entity–relationship (ER) diagram of a database using Microsoft SQL Server Management Studio?(如何使用Microsoft SQL Server Management Studio生成数据库的实体关系(ER)图?)