hadoop-如何在spark rdd中不使用combinebykey和aggregatebykey获得指定的输出


0

以下是我的数据:

val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", bar=C","bar=D", "bar=D")  

现在我需要以下类型的输出,但不使用CombineByKey和AggregateByKey:

1) Array[(String, Int)] = Array((foo,5), (bar,3))  
2) Array((foo,Set(B, A)),
(bar,Set(C, D)))  

以下是我的尝试:

scala> val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C",
     | "bar=D", "bar=D")  
scala> val sample=keysWithValuesList.map(_.split("=")).map(p=>(p(0),(p(1))))
sample: Array[(String, String)] = Array((foo,A), (foo,A), (foo,A), (foo,A), (foo,B), (bar,C), (bar,D), (bar,D))  

现在,当我 加入变量名后跟tab以查看 map的rdd的适用方法时,我可以看到以下选项,其中没有一个可以满足我的要求:

scala> sample.
apply          asInstanceOf   clone          isInstanceOf   length         toString       update         

我怎么才能做到这一点呢??

1 答案

0

这是一个标准的方法。

注意:您需要使用RDD。我认为这就是瓶颈。

干得好:

val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C","bar=D", "bar=D") 

val sample=keysWithValuesList.map(_.split("=")).map(p=>(p(0),(p(1))))

val sample2 = sc.parallelize(sample.map(x => (x.1, 1)))
val sample3 = sample2.reduceByKey(
+_)
sample3.collect()

val sample4 = sc.parallelize(sample.map(x => (x._1, x._2))).groupByKey()
sample4.collect()

val sample5 = sample4.map(x => (x._1, x._2.toSet))
sample5.collect()


我来回答

写文章

提问题