scala-Spark RDD与Cassandra表的 join


0

我加入了 SparkRDD与 Cassandra表(查找),但不能理解一些事情。

    将spark从Cassandra表中拉取range-start和range-end之间的所有记录,然后在spark内存中将其与RDD join,或者将RDD到Cassandra的所有值向下推并在那里执行 join

代码如下:

//creating dataframe with fields required for join with cassandra table
//and converting same to rdd
val df_for_join = src_df.select(src_df("col1"),src_df("col2"))
val rdd_for_join = df_for_join.rdd

val result_rdd = rdd_for_join
.joinWithCassandraTable("my_keyspace", "my_table"
,selectedColumns = SomeColumns("col1","col2","col3","col4")
,SomeColumns("col1", "col2")
).where("created_at >''range_start'' and created_at<= range_end")
.clusteringOrder(Ascending).limit(1)

Cassandra 子细节-

PRIMARY KEY ((col1, col2), created_at) WITH CLUSTERING ORDER BY (created_at ASC)

1 答案


0

joinWithCassandra表从传递的RDD中提取分区/主 key值,并将它们 transformation为针对Cassandra中的分区的单独请求。然后,除此之外,SCC还可以应用一个额外的过滤,比如说,你是where条件。如果我没记错的话,但我可能错了,这个限制不会完全推到Cassandra,它仍然可以为每个分区获取限制行。

您始终可以通过执行result来检查 join发生的位置_rdd.toDebugString数据库. 对于我的代码:

val df_for_join = Seq((2, 5),(5, 2)).toDF("col1", "col2")
val rdd_for_join = df_for_join.rdd

val result_rdd = rdd_for_join
.joinWithCassandraTable("test", "jt"
,selectedColumns = SomeColumns("col1","col2", "v")
,SomeColumns("col1", "col2")
).where("created_at >'2020-03-13T00:00:00Z' and created_at<= '2020-03-14T00:00:00Z'")
.limit(1)

它给出了以下内容:

scala> result_rdd.toDebugString
res7: String =
(2) CassandraJoinRDD[14] at RDD at CassandraRDD.scala:19 []
 |  MapPartitionsRDD[2] at rdd at <console>:45 []
 |  MapPartitionsRDD[1] at rdd at <console>:45 []
 |  ParallelCollectionRDD[0] at rdd at <console>:45 []

而如果你做一个“正常”的加入,你会得到以下结果:

scala> val rdd1 = sc.parallelize(Seq((2, 5),(5, 2)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[21] at parallelize at <console>:44
scala> val ct = sc.cassandraTable[(Int, Int)]("test", "jt").select("col1", "col2")
ct: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Int, Int)] = CassandraTableScanRDD[31] at RDD at CassandraRDD.scala:19

scala> rdd1.join(ct)
res15: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[34] at join at <console>:49
scala> rdd1.join(ct).toDebugString
res16: String =
(6) MapPartitionsRDD[37] at join at <console>:49 []
| MapPartitionsRDD[36] at join at <console>:49 []
| CoGroupedRDD[35] at join at <console>:49 []
+-(3) ParallelCollectionRDD[21] at parallelize at <console>:44 []
+-(6) CassandraTableScanRDD[31] at RDD at CassandraRDD.scala:19 []

更多信息见SCC文件的相应章节。


我来回答

写文章

提问题

面试题