NIFI - QueryDatabaseTable 处理器.如何查询被修改的行

NIFI - QueryDatabaseTable processor. How to query rows which is modified?(NIFI - QueryDatabaseTable 处理器.如何查询被修改的行?)
本文介绍了NIFI - QueryDatabaseTable 处理器.如何查询被修改的行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究 NIFI 数据流,其中我的用例是获取 mysql 表数据并将其放入 hdfs/本地文件系统.

我构建了一个数据流管道,其中使用了 querydatabaseTable 处理器 ------ ConvertRecord --- putFile 处理器.

我的表架构 ---> id,name,city,Created_date

即使我在表中插入新记录,我也能在目的地接收文件

但是,但是……

当我更新现有行时,处理器没有获取这些记录,看起来它有一些限制.

我的问题是,如何处理这种情况?由任何其他处理器或需要更新某些属性.

请人帮忙@Bryan Bende

解决方案

QueryDatabaseTable Processor 需要被告知它可以使用哪些列来识别新数据.

串行 idcreated 时间戳是不够的.

不要忘记将 Maximum-value Columns 设置为这些列.

所以我基本上要说的是:

<块引用>

如果你自己不能判断这是sql中的新记录,nifi也不能.

I am working on NIFI Data Flow where my usecase is fetch mysql table data and put into hdfs/local file system.

I have built a data flow pipeline where i used querydatabaseTable processor ------ ConvertRecord --- putFile processor.

My Table Schema ---> id,name,city,Created_date

I am able to receive files in destination even when i am inserting new records in table

But, but ....

When i am updating exsiting rows then processor is not fetching those records looks like it has some limitation.

My Question is ,How to handle this scenario? either by any other processor or need to update some property.

PLease someone help @Bryan Bende

解决方案

QueryDatabaseTable Processor needs to be informed which columns it can use to identify new data.

A serial id or created timestamp is not sufficient.

From the documentation:

Maximum-value Columns:

A comma-separated list of column names. The processor will keep track of the maximum value for each column that has been returned since the processor started running. Using multiple columns implies an order to the column list, and each column's values are expected to increase more slowly than the previous columns' values. Thus, using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables. This processor can be used to retrieve only those rows that have been added/updated since the last retrieval. Note that some JDBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these types should not be listed in this property, and will result in error(s) during processing. If no columns are provided, all rows from the table will be considered, which could have a performance impact. NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly.

Judging be the table scheme, there is no sql-way of telling whether data was updated.

There are many ways to solve this. In your case, the easiest thing to do might be to rename column created to modified and set to now() on updates or to work with a second timestamp column.

So for instance

| stamp_updated | timestamp | CURRENT_TIMESTAMP   | on update CURRENT_TIMESTAMP |

is the new column added. In the processor you use the stamp_updated column to identify new data

Don't forget to set Maximum-value Columns to those columns.

So what I am basically saying is:

If you cannot tell that it is a new record in sql yourself, nifi cannot either.

这篇关于NIFI - QueryDatabaseTable 处理器.如何查询被修改的行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

相关文档推荐

Hibernate reactive No Vert.x context active in aws rds(AWS RDS中的休眠反应性非Vert.x上下文处于活动状态)
Bulk insert with mysql2 and NodeJs throws 500(使用mysql2和NodeJS的大容量插入抛出500)
Flask + PyMySQL giving error no attribute #39;settimeout#39;(FlASK+PyMySQL给出错误,没有属性#39;setTimeout#39;)
auto_increment column for a group of rows?(一组行的AUTO_INCREMENT列?)
Sort by ID DESC(按ID代码排序)
SQL/MySQL: split a quantity value into multiple rows by date(SQL/MySQL:按日期将数量值拆分为多行)