序列化-任务不可序列化- sparkJava


0

我正在获取Spark中的任务不可序列化错误。我已经搜索并尝试使用一些帖子中建议的静态函数,但它仍然给出相同的错误。

代码如下:

public class Rating implements Serializable {
    private SparkSession spark;
    private SparkConf sparkConf;
    private JavaSparkContext jsc;
    private static Function<String, Rating> mapFunc;

public Rating() {
    mapFunc = new Function&lt;String, Rating&gt;() {
        public Rating call(String str) {
            return Rating.parseRating(str);
        }
    };
}

public void runProcedure() { 
    sparkConf = new SparkConf().setAppName("Filter Example").setMaster("local");
    jsc = new JavaSparkContext(sparkConf);
    SparkSession spark = SparkSession.builder().master("local").appName("Word Count")
        .config("spark.some.config.option", "some-value").getOrCreate();        

    JavaRDD&lt;Rating&gt; ratingsRDD = spark.read().textFile("sample_movielens_ratings.txt")
            .javaRDD()
            .map(mapFunc);
}

public static void main(String[] args) {
    Rating newRating = new Rating();
    newRating.runProcedure();
}

}

错误给出:

如何解决此错误?

2 答案

0

显然,评级不能序列化,因为它包含对spark结构(即sparksession、sparkconf等)的引用作为属性。

问题出在这里

JavaRDD<Rating> ratingsRD = spark.read().textFile("sample_movielens_ratings.txt")
            .javaRDD()
            .map(mapFunc);

如果查看mapfunc的定义,则返回一个分级对象。

mapFunc = new Function<String, Rating>() {
    public Rating call(String str) {
        return Rating.parseRating(str);
    }
};

此函数用于 map(Spark术语中的 transformation)中。因为 transformation是直接执行到 job节点而不是在驱动程序节点中,所以它们的代码必须是可序列化的。这迫使spark尝试序列化评级类,但这是不可能的。

尝试从评级中提取所需的特性,并将它们放在不具有任何 spark结构的其他类中。最后,使用这个新类作为mapfunc函数的返回类型。


0

此外,您必须确保在类中不包括不可序列化的变量,如JavasAspkContext和SparkSession。如果您需要包括它们,您应该这样定义:

private transient JavaSparkContext sparkCtx;
private transient SparkSession spark;

祝你好运。


我来回答