在spark sql中转换两个数据帧

Transforming two dataframes in spark sql(在spark sql中转换两个数据帧)
本文介绍了在spark sql中转换两个数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 spark scala 中有两个数据框注册为表.从这两个表

I am having two dataframes in spark scala registered as tables. From these two tables

表 1:

   +-----+--------+
   |id   |values  |
   +-----+-----   +
   |   0 |  v1    |
   |   0 |  v2    |
   |   1 |  v3    |
   |   1 |  v1    |
   +-----+-----   +

表 2:

   +-----+----+--- +----+
   |id   |v1  |v2  | v3
   +-----+-------- +----+
   |   0 |  a1|  b1| -  |
   |   1 |  a2|  - | c2 |

   +-----+---------+----+   

我想用上面两个表生成一个新表.

I want to generate a new table using the above two tables.

表 3:

   +-----+--------+--------+
   |id   |values  | field  |
   +-----+--------+--------+
   |   0 |  v1    | a1     |
   |   0 |  v2    | b1     |
   |   1 |  v3    | c2     |
   |   1 |  v1    | a2     |
   +-----+--------+--------+

这里 v1 的形式是

Here v1 is of the form

 v1: struct (nullable = true)
    |    |-- level1: string (nullable = true)
    |    |-- level2: string (nullable = true)
    |    |-- level3: string (nullable = true)
    |    |-- level4: string (nullable = true)
    |    |-- level5: string (nullable = true)

我在 scala 中使用 spark sql.

I am using spark sql in scala .

是否可以通过在数据帧上编写一些 sql 查询或使用一些 spark 函数来完成所需的操作.

Is it possible to do the desired thing by writing some sql query or using some spark functions on dataframes.

推荐答案

这是您可以使用的示例代码,它将生成此输出:

Here is the sample code that you can use , that will generate this output :

代码如下:

val df1=sc.parallelize(Seq((0,"v1"),(0,"v2"),(1,"v3"),(1,"v1"))).toDF("id","values")
val df2=sc.parallelize(Seq((0,"a1","b1","-"),(1,"a2","-","b2"))).toDF("id","v1","v2","v3")
val joinedDF=df1.join(df2,"id")
val resultDF=joinedDF.rdd.map{row=>
val id=row.getAs[Int]("id")
val values=row.getAs[String]("values")
val feilds=row.getAs[String](values)
(id,values,feilds)
}.toDF("id","values","feilds")

在控制台上测试时:

scala> val df1=sc.parallelize(Seq((0,"v1"),(0,"v2"),(1,"v3"),(1,"v1"))).toDF("id","values")
df1: org.apache.spark.sql.DataFrame = [id: int, values: string]

scala> df1.show
+---+------+
| id|values|
+---+------+
|  0|    v1|
|  0|    v2|
|  1|    v3|
|  1|    v1|
+---+------+


scala> val df2=sc.parallelize(Seq((0,"a1","b1","-"),(1,"a2","-","b2"))).toDF("id","v1","v2","v3")
df2: org.apache.spark.sql.DataFrame = [id: int, v1: string ... 2 more fields]

scala> df2.show
+---+---+---+---+
| id| v1| v2| v3|
+---+---+---+---+
|  0| a1| b1|  -|
|  1| a2|  -| b2|
+---+---+---+---+


scala> val joinedDF=df1.join(df2,"id")
joinedDF: org.apache.spark.sql.DataFrame = [id: int, values: string ... 3 more fields]

scala> joinedDF.show
+---+------+---+---+---+                                                        
| id|values| v1| v2| v3|
+---+------+---+---+---+
|  1|    v3| a2|  -| b2|
|  1|    v1| a2|  -| b2|
|  0|    v1| a1| b1|  -|
|  0|    v2| a1| b1|  -|
+---+------+---+---+---+


scala> val resultDF=joinedDF.rdd.map{row=>
     | val id=row.getAs[Int]("id")
     | val values=row.getAs[String]("values")
     | val feilds=row.getAs[String](values)
     | (id,values,feilds)
     | }.toDF("id","values","feilds")
resultDF: org.apache.spark.sql.DataFrame = [id: int, values: string ... 1 more field]

scala> 

scala> resultDF.show
+---+------+------+                                                             
| id|values|feilds|
+---+------+------+
|  1|    v3|    b2|
|  1|    v1|    a2|
|  0|    v1|    a1|
|  0|    v2|    b1|
+---+------+------+

我希望这可能是您的问题.谢谢!

I hope this might your problem. Thanks!

这篇关于在spark sql中转换两个数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

相关文档推荐

Hibernate reactive No Vert.x context active in aws rds(AWS RDS中的休眠反应性非Vert.x上下文处于活动状态)
Bulk insert with mysql2 and NodeJs throws 500(使用mysql2和NodeJS的大容量插入抛出500)
Flask + PyMySQL giving error no attribute #39;settimeout#39;(FlASK+PyMySQL给出错误,没有属性#39;setTimeout#39;)
auto_increment column for a group of rows?(一组行的AUTO_INCREMENT列?)
Sort by ID DESC(按ID代码排序)
SQL/MySQL: split a quantity value into multiple rows by date(SQL/MySQL:按日期将数量值拆分为多行)