PYSpark显示最大值(S)和多重排序

Pyspark display max value(S) and multiple sorting(PYSpark显示最大值(S)和多重排序)
本文介绍了PYSpark显示最大值(S)和多重排序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

感谢您在这里提供的帮助。使用Pyspark(请不能使用SQL)。因此,我有一个存储为RDD对的元组列表:

[((‘City1’,‘2020-03-27’,‘X1’),44),

(‘City1’,‘2020-03-28’,‘X1’),44),

(‘City3’,‘2020-03-28’,‘X3’),15),

((‘City4’,‘2020-03-27’,‘X4’),5),

((‘City4’,‘2020-03-26’,‘X4’),4),

(‘City2’,‘2020-03-26’,‘X2’),14),

((‘City2’,‘2020-03-25’,‘X2’),4),

((‘City4’,‘2020-03-25’,‘X4’),1),

((‘City1’,‘2020-03-29’,‘X1’),1),

((‘City5’,‘2020-03-25’,‘X5’),15)]

例如(‘City5’,‘2020-03-25’,‘X5’)作为关键字,15作为最后一对的值。

我希望获得以下结果:

City1,X1,2020-03-27,44

City1,X1,2020-03-28,44

城市5、X3、2020-03-25、15

City3,X3,2020-03-28,15

City2,X2,2020-03-26,14

城市4、X4、2020-03-27、5

请注意,结果显示为:

  • 每个城市的最大值关键字(这是最难的部分,如果在不同日期有相似的最大值,要显示同一城市两次,我假设不能使用ReduceByKey(),因为关键字不唯一,可能是GroupBy()或Filter()?

  • 按以下顺序排序:

  1. 最大值递减
  2. 升序日期
  3. 降序城市名称(例如:City1)

所以我尝试了以下代码:

res = rdd2.map(lambda x: ((x[0][0],x[0][2]), (x[0][1], x[1])))
rdd3 = res.reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[1]))
rdd4 = rdd3.sortBy(lambda a: a[1][1], ascending=False)
rdd5 = rdd4.sortBy(lambda a: a[1][0])

虽然它给了我最大值的城市,但如果两个城市在两个不同的日期有相似的最大值,它不会两次返回同一个城市(因为减去了键:City)。

我希望它足够清楚,任何精确度请询问! 非常感谢!

推荐答案

要使所有城市的值都等于最大值,您仍然可以使用reduceByKey,但使用数组而不是Over Value:

  • 将行转换为键/值,值是元组数组而不是元组
  • 按键减少,如果数组包含相同的值,则合并数组,否则保留具有最大值的数组,reduceByKey
  • 通过flatMap
  • 平面化Value数组,将键与它们合并
  • 最后执行排序

完整代码如下:

def merge(array1, array2):
    if array1[0][2] > array2[0][2]:
        return array1
    elif array1[0][2] == array2[0][2]:
        return array1 + array2
    else:
        return array2


res = rdd2.map(lambda x: (x[0][0], [(x[0][1], x[0][2], x[1])]))
rdd3 = res.reduceByKey(lambda x1, x2: merge(x1, x2))
rdd4 = rdd3.flatMap(lambda x: map(lambda y: (x[0], y[1], y[0], y[2]), x[1]))
rdd5 = rdd4.sortBy(lambda a: (-a[3], a[2], a[0]))

然后您可以打印您的RDD:

[print(', '.join([row[0], row[1], row[2], str(row[3])])) for row in rdd5.collect()]

这与您的输入一起提供了以下输出:

City1, X1, 2020-03-27, 44
City1, X1, 2020-03-28, 44
City5, X5, 2020-03-25, 15
City3, X3, 2020-03-28, 15
City2, X2, 2020-03-26, 14
City4, X4, 2020-03-27, 5

这篇关于PYSpark显示最大值(S)和多重排序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

相关文档推荐

Leetcode 234: Palindrome LinkedList(Leetcode 234:回文链接列表)
How do I read an Excel file directly from Dropbox#39;s API using pandas.read_excel()?(如何使用PANDAS.READ_EXCEL()直接从Dropbox的API读取Excel文件?)
subprocess.Popen tries to write to nonexistent pipe(子进程。打开尝试写入不存在的管道)
I want to realize Popen-code from Windows to Linux:(我想实现从Windows到Linux的POpen-code:)
Reading stdout from a subprocess in real time(实时读取子进程中的标准输出)
How to call type safely on a random file in Python?(如何在Python中安全地调用随机文件上的类型?)