scala-如何使用SparkContext.提交作业调用REST API


0

有人能提供一个submitJob方法调用的例子吗

在这里找到参考:如何从map/filter/etc执行异步操作(即返回未来)。?

我相信我可以在我的用例中实现它

在我当前的实现中,我使用分区来调用并行调用,但是它们在调用下一个调用之前等待响应

Dataframe.rdd.reparition(TPS allowed on API)
.map(row => {
            val response = callApi(row)
            parse(response)
    })

但是由于API端有延迟,我在解析之前等待10秒的响应,然后进行下一次调用。我有100 TPS,但目前的逻辑只有4-7 TPS

如果有人用过SparkContext.提交作业,要进行异步调用,请提供一个示例,因为我是new spark和scala

我想在不等待响应的情况下调用调用,确保100 TPS,然后在收到响应后,我想在其上解析并创建 DataFrame 。

我以前尝试过从主节点收集行和调用API调用,但似乎受创建大型线程池的硬件限制

提交作业[T,U,R](rdd:rdd[T],processPartition:(迭代器[T])?.U,partitions:Seq[Int],resultHandler:(Int,U)?.Unit,resultFunc:?.R):SimpleFutureAction[R]

Rdd-Rdd超出我的 DataFrame

分区-我的rdd已分区,是否提供范围0到分区数在我的rdd里?

processPartition-它是我的callApi()吗?

resultHandler-不确定这里要做什么

resultFunc-我相信这将解析我的响应

如何在SimpleFutureAction之后创建 DataFrame

有人能帮忙吗

1 答案


0

submitJob不会自动加快API调用的速度。它是Spark并行处理的低级实现的一部分,Spark将操作拆分为作业,然后将它们提交给任何集群调度程序。调用submitJob就像启动一个Java线程-该作业将异步运行,但不会比在dataframe/RDD上调用操作快。

您最好的选择是使用mapPartitions,它允许您在每个分区的上下文中运行一个函数。您已经对数据进行了分区,因此为了确保最大的并发性,只需确保有足够的Spark executor来实际使这些分区同时运行:

df.rdd.repartition(#concurrent API calls)
  .mapPartitions(partition => {
    partition.map(row => {
      val response = callApi(row)
      parse(response)
    })
  })
  .toDF("col1", "col2", ...)

mapPartitions需要一个将迭代器[T] map到迭代器[U]并返回RDD[U]的函数。 transformation回 DataFrame 是用适当的列名链接对toDF()的调用的问题。

您可能希望在callApi中实现某种每线程速率限制,以确保没有单个 executor每秒触发大量请求。请记住, executor可以在单独的线程和/或单独的jvm中运行。

当然,仅仅调用mapPartitions什么也做不了。您需要在生成的 DataFrame 上触发一个操作,以便实际触发API调用。


我来回答

写文章

提问题

面试题