ApacheSpark-如何用pySpark transformation结构化流?


0

这看起来应该很明显,但是在查看文档和示例时,我不确定是否可以找到一种使用pyspark进行结构化流和 transformation的方法。

例如:

from pyspark.sql import SparkSession

spark = (
SparkSession
.builder
.appName('StreamingWordCount')
.getOrCreate()
)

raw_records = (
spark
.readStream
.format('socket')
.option('host', 'localhost')
.option('port', 9999)
.load()
)

I realize there's a SQL function for upper-case, just illustrating a sample

use of an arbitrary map function

records = raw_records.rdd.map(lambda w: w.upper()).toDF()

counts = (
records
.groupBy(records.value)
.count()
)

query = (
counts
.writeStream
.outputMode('complete')
.format('console')
.start()
)
query.awaitTermination()

这将引发以下异常:

Queries with streaming sources must be executed with writeStream.start

但是,如果我移除对rdd.map(….todf()的调用,事情似乎可以正常 job。

似乎对rdd.map的调用从流上下文分支了执行,并导致spark警告它从未启动过?

是否有一种“正确”的方法来使用结构化流和pyspark应用 map或 map样式 transformation?

1 答案

0

结构化流中应用的每个 transformation都必须完全包含在 DataSet 世界中-如果是pyspark,则意味着您只能使用 DataFrame 或SQL,不支持到RDD(或DStream或 local集合)的 transformation。

如果要使用纯Python代码,必须使用userdefinedFunction。

from pyspark.sql.functions import udf

@udf
def to_upper(s)
return s.upper()

raw_records.select(to_upper("value"))

另请参见Spark结构化流和Spark ML回归


我来回答