如何从Apache Kafka中的远程数据库中提取数据?

How to pull the data from remote database in Apache Kafka?(如何从Apache Kafka中的远程数据库中提取数据?)
本文介绍了如何从Apache Kafka中的远程数据库中提取数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在 Apache Kafka 中制作实时数据管道.我有位于远程位置的数据库,并且该数据库不断更新.我应该使用哪个 Kafka 连接 API 来从数据库中提取数据并实时摄取到 Kafka 代理中?稍后我将使用 kafka 流和 KSQL 运行临时查询来执行指标.

任何帮助将不胜感激!

解决方案

如果您想创建实时数据管道,您需要使用能够从 MySQL 流式传输更改的变更数据捕获 (CDC) 工具.我建议使用 Debezium,这是一个用于更改数据捕获的开源分布式平台.

捕获插入

当一个新记录被添加到一个表中时,会产生一个类似于下面的 JSON:

<代码>{有效载荷":{之前":空,后":{"id":1005,"first_name":"Giorgos","last_name":"无数","电子邮件":"giorgos@abc.com"},来源":{"name":"dbserver1",server_id":223344,ts_sec":1500369632,gtid":空,"file":"mysql-bin.000003",位置":364,行":0,快照":空,线程":13,"db":"库存",表":客户"},"op":"c",ts_ms":1500369632095}}

before 对象为空,after 对象包含新插入的值.请注意,op 属性是 c,表明这是一个 CREATE 事件.

捕获更新

假设 email 属性已更新,将生成类似于以下的 JSON:

<代码>{有效载荷":{前":{"id":1005,"first_name":"Giorgos","last_name":"无数","电子邮件":"giorgos@abc.com"},后":{"id":1005,"first_name":"Giorgos","last_name":"无数","email":"newEmail@abc.com"},来源":{"name":"dbserver1",server_id":223344,"ts_sec":1500369929,gtid":空,"file":"mysql-bin.000003",位置":673,行":0,快照":空,线程":13,"db":"库存",表":客户"},"op":"你",ts_ms":1500369929464}}

注意 op 现在是 u,表明这是一个 UPDATE 事件.before 对象显示更新前的行状态,after 对象捕获更新行的当前状态.

捕获删除

现在假设该行已被删除;

<代码>{有效载荷":{前":{"id":1005,"first_name":"Giorgos","last_name":"无数","email":"newEmail@abc.com"},之后":空,来源":{"name":"dbserver1",server_id":223344,ts_sec":1500370394,gtid":空,"file":"mysql-bin.000003",位置":1025,行":0,快照":空,线程":13,"db":"库存",表":客户"},"op":"d",ts_ms":1500370394589}}

op new 等于 d,表示 DELETE 事件.after 属性将为 null 并且 before 对象包含被删除之前的行.

您还可以查看他们网站上提供的详尽教程.

示例配置一个 MySQL 数据库

<代码>{名称":库存连接器",(1)配置":{"connector.class": "io.debezium.connector.mysql.MySqlConnector", (2)"database.hostname": "192.168.99.100", (3)"database.port": "3306", (4)"database.user": "debezium", (5)"database.password": "dbz", (6)"database.server.id": "184054", (7)"database.server.name": "fullfillment", (8)database.whitelist":库存",(9)"database.history.kafka.bootstrap.servers": "kafka:9092", (10)"database.history.kafka.topic": "dbhistory.fullfillment" (11)"include.schema.changes": "true" (12)}}

<块引用>

1 当我们向 Kafka Connect 注册时我们的连接器的名称服务.
2 此 MySQL 连接器类的名称.
3 地址MySQL服务器.
4 MySQL 服务器的端口号.
5 名称具有所需权限的 MySQL 用户.
6 密码具有所需权限的 MySQL 用户.
7 连接器的标识符在 MySQL 集群中必须是唯一的,并且类似于MySQL 的 server-id 配置属性.
8 逻辑名MySQL 服务器/集群,形成命名空间,用于所有连接器写入的 Kafka 主题的名称,Kafka连接模式名称,以及相应 Avro 的命名空间使用 Avro 连接器时的架构.
9 所有数据库的列表由此连接器将监视的此服务器托管.这是可选,还有其他属性可用于列出数据库和要包括或排除在监视之外的表.
10 卡夫卡名单此连接器将用于编写和恢复 DDL 的代理对数据库历史主题的语句.
11 数据库名称连接器将写入和恢复 DDL 的历史主题声明.本主题仅供内部使用,不应使用由消费者.
12 指定连接器应该使用名为 fullfillment 事件的模式更改主题生成消费者可以使用的 DDL 更改.

I want to make real-time data pipeline in Apache Kafka. I have database which is located at remote location and that database continuously updating. Can anybody which Kafka connect API i should use to pull the data from database and ingest into Kafka broker in real time? later on i would use kafka stream and KSQL to run ad-hoc queries to perform the metrics.

Any help would be highly appreciated!

解决方案

If you want to create a real-time data pipeline you need to use a Change Data Capture (CDC) tool which is able to stream changes from MySQL. I would suggest Debezium which is an open source distributed platform for change data capture.

Capturing Inserts

When a new record is added to a table, a JSON similar to the one below will be produced:

{  
   "payload":{  
      "before":null,
      "after":{  
         "id":1005,
         "first_name":"Giorgos",
         "last_name":"Myrianthous",
         "email":"giorgos@abc.com"
      },
      "source":{  
         "name":"dbserver1",
         "server_id":223344,
         "ts_sec":1500369632,
         "gtid":null,
         "file":"mysql-bin.000003",
         "pos":364,
         "row":0,
         "snapshot":null,
         "thread":13,
         "db":"inventory",
         "table":"customers"
      },
      "op":"c",
      "ts_ms":1500369632095
   }
}

before object is null and after object contains the newly inserted values. Note that the op attribute is c, indicating that this was a CREATE event.

Capturing Updates

Assuming that email attribute has been updated, a JSON similar to the one below will be produced:

{ 
    "payload":{  
      "before":{  
         "id":1005,
         "first_name":"Giorgos",
         "last_name":"Myrianthous",
         "email":"giorgos@abc.com"
      },
      "after":{  
         "id":1005,
         "first_name":"Giorgos",
         "last_name":"Myrianthous",
         "email":"newEmail@abc.com"
      },
      "source":{  
         "name":"dbserver1",
         "server_id":223344,
         "ts_sec":1500369929,
         "gtid":null,
         "file":"mysql-bin.000003",
         "pos":673,
         "row":0,
         "snapshot":null,
         "thread":13,
         "db":"inventory",
         "table":"customers"
      },
      "op":"u",
      "ts_ms":1500369929464
   }
}

Notice op which is now u, indicating that this was an UPDATE event. before object shows the row state before the update and after object captures the current state of the updated row.

Capturing deletes

Now assume that the row has been deleted;

{ 
    "payload":{  
      "before":{  
         "id":1005,
         "first_name":"Giorgos",
         "last_name":"Myrianthous",
         "email":"newEmail@abc.com"
      },
      "after":null,
      "source":{  
         "name":"dbserver1",
         "server_id":223344,
         "ts_sec":1500370394,
         "gtid":null,
         "file":"mysql-bin.000003",
         "pos":1025,
         "row":0,
         "snapshot":null,
         "thread":13,
         "db":"inventory",
         "table":"customers"
      },
      "op":"d",
      "ts_ms":1500370394589
   }
}

op new is equal to d, indicating a DELETE event. after attribute will be null and before object contains the row before it gets deleted.

You can also have a look at the extensive tutorial provided in their website.

EDIT: Example configuration for a MySQL database

{
  "name": "inventory-connector",  (1)
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector", (2)
    "database.hostname": "192.168.99.100", (3)
    "database.port": "3306", (4)
    "database.user": "debezium", (5)
    "database.password": "dbz", (6)
    "database.server.id": "184054", (7)
    "database.server.name": "fullfillment", (8)
    "database.whitelist": "inventory", (9)
    "database.history.kafka.bootstrap.servers": "kafka:9092", (10)
    "database.history.kafka.topic": "dbhistory.fullfillment" (11)
    "include.schema.changes": "true" (12)
  }
}

1 The name of our connector when we register it with a Kafka Connect service.
2 The name of this MySQL connector class.
3 The address of the MySQL server.
4 The port number of the MySQL server.
5 The name of the MySQL user that has the required privileges.
6 The password for the MySQL user that has the required privileges.
7 The connector’s identifier that must be unique within the MySQL cluster and similar to MySQL’s server-id configuration property.
8 The logical name of the MySQL server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used.
9 A list of all databases hosted by this server that this connector will monitor. This is optional, and there are other properties for listing the databases and tables to include or exclude from monitoring.
10 The list of Kafka brokers that this connector will use to write and recover DDL statements to the database history topic.
11 The name of the database history topic where the connector will write and recover DDL statements. This topic is for internal use only and should not be used by consumers.
12 The flag specifying that the connector should generate on the schema change topic named fullfillment events with the DDL changes that can be used by consumers.

这篇关于如何从Apache Kafka中的远程数据库中提取数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

相关文档推荐

Hibernate reactive No Vert.x context active in aws rds(AWS RDS中的休眠反应性非Vert.x上下文处于活动状态)
Bulk insert with mysql2 and NodeJs throws 500(使用mysql2和NodeJS的大容量插入抛出500)
Flask + PyMySQL giving error no attribute #39;settimeout#39;(FlASK+PyMySQL给出错误,没有属性#39;setTimeout#39;)
auto_increment column for a group of rows?(一组行的AUTO_INCREMENT列?)
Sort by ID DESC(按ID代码排序)
SQL/MySQL: split a quantity value into multiple rows by date(SQL/MySQL:按日期将数量值拆分为多行)