如何有效地使用窗口函数根据 N 个先前值来决定

How to use windowing functions efficiently to decide next N number of rows based on N number of previous values(如何有效地使用窗口函数根据 N 个先前值来决定接下来的 N 个行)
本文介绍了如何有效地使用窗口函数根据 N 个先前值来决定接下来的 N 个行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下数据.

+----------+----+-------+-----------------------+
|      date|item|avg_val|conditions             |
+----------+----+-------+-----------------------+
|01-10-2020|   x|     10|                      0|
|02-10-2020|   x|     10|                      0|
|03-10-2020|   x|     15|                      1|
|04-10-2020|   x|     15|                      1|
|05-10-2020|   x|      5|                      0|
|06-10-2020|   x|     13|                      1|
|07-10-2020|   x|     10|                      1|
|08-10-2020|   x|     10|                      0|
|09-10-2020|   x|     15|                      1|
|01-10-2020|   y|     10|                      0|
|02-10-2020|   y|     18|                      0|
|03-10-2020|   y|      6|                      1|
|04-10-2020|   y|     10|                      0|
|05-10-2020|   y|     20|                      0|
+----------+----+-------+-----------------------+

我正在尝试基于

  1. 如果标志值为 0,则新列值将为 0.
  2. 如果标志为 1,则新列将为 1,接下来的四个 N 行数将为零,即无需检查下一个 N 值.此过程将应用于每个项目,即按项目分区将起作用.

我在这里使用了 N = 4,

I have used here N = 4,

我已经使用了下面的代码,但没有有效的窗口函数是否有任何优化的方法.

I have used the below code but not effienntly windowing function is there any optimized way.

DROP TEMPORARY TABLE t2;
CREATE TEMPORARY TABLE t2
SELECT *,
MAX(conditions) OVER (PARTITION BY item ORDER BY item,`date` ROWS 4 PRECEDING ) AS new_row
FROM record
ORDER BY item,`date`;

 

 DROP TEMPORARY TABLE t3;
CREATE TEMPORARY TABLE t3
SELECT *,ROW_NUMBER() OVER (PARTITION BY item,new_row ORDER BY item,`date`) AS e FROM t2;

 


SELECT *,CASE WHEN new_row=1 AND e%5>1 THEN 0 
WHEN new_row=1 AND e%5=1 THEN 1 ELSE 0 END AS flag FROM t3;

输出类似于

+----------+----+-------+-----------------------+-----+
|      date|item|avg_val|conditions             |flag |
+----------+----+-------+-----------------------+-----+
|01-10-2020|   x|     10|                      0|    0|
|02-10-2020|   x|     10|                      0|    0|
|03-10-2020|   x|     15|                      1|    1|
|04-10-2020|   x|     15|                      1|    0|
|05-10-2020|   x|      5|                      0|    0|
|06-10-2020|   x|     13|                      1|    0|
|07-10-2020|   x|     10|                      1|    0|
|08-10-2020|   x|     10|                      0|    0|
|09-10-2020|   x|     15|                      1|    1|
|01-10-2020|   y|     10|                      0|    0|
|02-10-2020|   y|     18|                      0|    0|
|03-10-2020|   y|      6|                      1|    1|
|04-10-2020|   y|     10|                      0|    0|
|05-10-2020|   y|     20|                      0|    0|
+----------+----+-------+-----------------------+-----+

但是我无法获得输出,我尝试了更多.

But i am unable to get the ouput , i have tried more.

推荐答案

正如评论中所建议的(@nbk 和 @Akina),您将需要某种迭代器来实现逻辑.对于 SparkSQL 和 Spark 2.4+ 版,我们可以使用内置函数 aggregate 并设置一个结构数组和一个计数器作为累加器.下面是一个名为 record 的示例数据框和表(假设 conditions 列中的值为 01):

As suggested in the comments(by @nbk and @Akina), you will need some sort of iterator to implement the logic. With SparkSQL and Spark version 2.4+, we can use the builtin function aggregate and set an array of structs plus a counter as the accumulator. Below is an example dataframe and table named record(assume values in conditions column are either 0 or 1):

val df = Seq(
    ("01-10-2020", "x", 10, 0), ("02-10-2020", "x", 10, 0), ("03-10-2020", "x", 15, 1),
    ("04-10-2020", "x", 15, 1), ("05-10-2020", "x", 5, 0), ("06-10-2020", "x", 13, 1),
    ("07-10-2020", "x", 10, 1), ("08-10-2020", "x", 10, 0), ("09-10-2020", "x", 15, 1),
    ("01-10-2020", "y", 10, 0), ("02-10-2020", "y", 18, 0), ("03-10-2020", "y", 6, 1),
    ("04-10-2020", "y", 10, 0), ("05-10-2020", "y", 20, 0)
).toDF("date", "item", "avg_val", "conditions")

df.createOrReplaceTempView("record")

SQL:

spark.sql("""
  SELECT t1.item, m.*
  FROM (
    SELECT item,
      sort_array(collect_list(struct(date,avg_val,int(conditions) as conditions,conditions as flag))) as dta
    FROM record
    GROUP BY item
  ) as t1 LATERAL VIEW OUTER inline(
    aggregate(
      /* expr: set up array `dta` from the 2nd element to the last 
       *       notice that indices for slice function is 1-based, dta[i] is 0-based
       */
      slice(dta,2,size(dta)),
      /* start: set up and initialize `acc` to a struct containing two fields:
       * - dta: an array of structs with a single element dta[0]
       * - counter: number of rows after flag=1, can be from `0` to `N+1`
       */
      (array(dta[0]) as dta, dta[0].conditions as counter),
      /* merge: iterate through the `expr` using x and update two fields of `acc`
       * - dta: append values from x to acc.dta array using concat + array functions
       *        update flag using `IF(acc.counter IN (0,5) and x.conditions = 1, 1, 0)`
       * - counter: increment by 1 if acc.counter is between 1 and 4
       *            , otherwise set value to x.conditions
       */
      (acc, x) -> named_struct(
          'dta', concat(acc.dta, array(named_struct(
              'date', x.date,
              'avg_val', x.avg_val,
              'conditions', x.conditions,
              'flag', IF(acc.counter IN (0,5) and x.conditions = 1, 1, 0)
            ))),
          'counter', IF(acc.counter > 0 and acc.counter < 5, acc.counter+1, x.conditions)
        ),
      /* finish: retrieve acc.dta only and discard acc.counter */
      acc -> acc.dta
    )
  ) m
""").show(50)

结果:

+----+----------+-------+----------+----+
|item|      date|avg_val|conditions|flag|
+----+----------+-------+----------+----+
|   x|01-10-2020|     10|         0|   0|
|   x|02-10-2020|     10|         0|   0|
|   x|03-10-2020|     15|         1|   1|
|   x|04-10-2020|     15|         1|   0|
|   x|05-10-2020|      5|         0|   0|
|   x|06-10-2020|     13|         1|   0|
|   x|07-10-2020|     10|         1|   0|
|   x|08-10-2020|     10|         0|   0|
|   x|09-10-2020|     15|         1|   1|
|   y|01-10-2020|     10|         0|   0|
|   y|02-10-2020|     18|         0|   0|
|   y|03-10-2020|      6|         1|   1|
|   y|04-10-2020|     10|         0|   0|
|   y|05-10-2020|     20|         0|   0|
+----+----------+-------+----------+----+

地点:

  1. 使用 groupby 将同一项目的行收集到名为 dta 列的结构数组中,该列具有 4 个字段:dateavg_valconditionsflag 并按 date
  2. 排序
  3. 使用aggregate函数遍历上述结构体数组,根据counterconditions更新flag字段strong>(详情见上面SQL代码注释)
  4. 使用 Lateral VIEW 和 inline 函数分解来自聚合函数的结果结构数组
  1. use groupby to collect rows for the same item into an array of structs named dta column with 4 fields: date, avg_val, conditions and flag and sorted by date
  2. use aggregate function to iterate through the above array of structs, update the flag field based on counter and conditions (details see the above SQL code comments)
  3. use Lateral VIEW and inline function to explode the resulting array of structs from the aggregate function

注意事项:

(1) 建议的 SQL 适用于 N=4,其中我们有 acc.counter IN (0,5)acc.counter <;5 在 SQL 中.对于任何 N,将上述调整为:acc.counter IN (0,N+1)acc.counter <;N+1,下图为N=2 相同样本数据的结果:

(1) the proposed SQL is for N=4, where we have acc.counter IN (0,5) and acc.counter < 5 in the SQL. For any N, adjust the above to: acc.counter IN (0,N+1) and acc.counter < N+1, the below shows the result for N=2 with the same sample data:

+----+----------+-------+----------+----+
|item|      date|avg_val|conditions|flag|
+----+----------+-------+----------+----+
|   x|01-10-2020|     10|         0|   0|
|   x|02-10-2020|     10|         0|   0|
|   x|03-10-2020|     15|         1|   1|
|   x|04-10-2020|     15|         1|   0|
|   x|05-10-2020|      5|         0|   0|
|   x|06-10-2020|     13|         1|   1|
|   x|07-10-2020|     10|         1|   0|
|   x|08-10-2020|     10|         0|   0|
|   x|09-10-2020|     15|         1|   1|
|   y|01-10-2020|     10|         0|   0|
|   y|02-10-2020|     18|         0|   0|
|   y|03-10-2020|      6|         1|   1|
|   y|04-10-2020|     10|         0|   0|
|   y|05-10-2020|     20|         0|   0|
+----+----------+-------+----------+----+

(2) 我们使用dta[0] 来初始化acc,它包括其字段的值和数据类型.理想情况下,我们应该确保这些字段的数据类型正确,以便正确进行所有计算.例如在计算 acc.counter 时,如果 conditions 是 StringType,acc.counter+1 将返回一个带有 DoubleType 值的 StringType

(2) we use dta[0] to initialize acc which includes both the values and datatypes of its fields. Ideally, we should make sure data types of these fields right so that all calculations are correctly conducted. for example when calculating acc.counter, if conditions is StringType, acc.counter+1 will return a StringType with a DoubleType value

spark.sql("select '2'+1").show()
+---------------------------------------+
|(CAST(2 AS DOUBLE) + CAST(1 AS DOUBLE))|
+---------------------------------------+
|                                    3.0|
+---------------------------------------+

当使用 acc.counter IN (0,5)acc.counter 将其值与整数进行比较时,可能会产生浮点错误.5.根据 OP 的反馈,这产生了错误的结果,没有任何警告/错误消息.

Which could yield floating-point errors when comparing their value with integers using acc.counter IN (0,5) or acc.counter < 5. Based on OP's feedback, this produced incorrect result without any WARNING/ERROR message.

  • 一种解决方法是在设置聚合函数的第二个参数时使用 CAST 指定确切的字段类型,以便在任何类型不匹配时报告错误,见下文:

  • One workaround is to specify exact field types using CAST when setting up the 2nd argument of aggregate function so it reports ERROR when any types mismatch, see below:

CAST((array(dta[0]), dta[0].conditions) as struct<dta:array<struct<date:string,avg_val:string,conditions:int,flag:int>>,counter:int>),

  • 另一种在创建 dta 列时强制类型的解决方案,在此示例中,请参阅以下代码中的 int(conditions) as conditions:

  • Another solution it to force types when creating dta column, in this example, see int(conditions) as conditions in below code:

    SELECT item,
      sort_array(collect_list(struct(date,avg_val,int(conditions) as conditions,conditions as flag))) as dta
    FROM record
    GROUP BY item
    

  • 我们也可以在计算中强制使用数据类型,例如,参见下面的int(acc.counter+1):

    IF(acc.counter > 0 and acc.counter < 5, int(acc.counter+1), x.conditions)      
    

  • 这篇关于如何有效地使用窗口函数根据 N 个先前值来决定接下来的 N 个行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

    本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

    相关文档推荐

    Hibernate reactive No Vert.x context active in aws rds(AWS RDS中的休眠反应性非Vert.x上下文处于活动状态)
    Bulk insert with mysql2 and NodeJs throws 500(使用mysql2和NodeJS的大容量插入抛出500)
    Flask + PyMySQL giving error no attribute #39;settimeout#39;(FlASK+PyMySQL给出错误,没有属性#39;setTimeout#39;)
    auto_increment column for a group of rows?(一组行的AUTO_INCREMENT列?)
    Sort by ID DESC(按ID代码排序)
    SQL/MySQL: split a quantity value into multiple rows by date(SQL/MySQL:按日期将数量值拆分为多行)