Apache Flink SQL InvalidProgrammException:所选排序 key不是可排序类型


0

这是我的燧石SQL

SELECT t.reportCode FROM query_record_info as t LEFT JOIN credit_report_head as c ON t.reportCode = c.reportCode

当我运行它时,我出错了

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Selected sort key is not a sortable type
    at org.apache.flink.api.java.operators.SortPartitionOperator.ensureSortableKey(SortPartitionOperator.java:145)
    at org.apache.flink.api.java.operators.SortPartitionOperator.sortPartition(SortPartitionOperator.java:111)
    at org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:468)
    at org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:467)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
    at scala.collection.mutable.ArrayOps$ofInt.foldLeft(ArrayOps.scala:234)
    at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.partitionAndSort(DataSetJoin.scala:467)
    at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addLeftOuterJoin(DataSetJoin.scala:270)
    at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:178)
    at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
    at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:498)
    at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476)
    at org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:147)
    at org.myorg.quickstart.CreditTest.main(CreditTest.java:108)

但如果我把左边的sql.it没问题

SELECT t.reportCode FROM query_record_info as t JOIN credit_report_head as c ON t.reportCode = c.reportCode

我是火石的发起者…期待你的回复

1 答案


0
    // 创建环境
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);

    CreditReport creditReport = JSONUtil.toBean(jsonStr, CreditReport.class);
    DataSource<CreditReport> reportDataSource = env.fromElements(creditReport);



    //从资源中抽取出,记录信息
    FlatMapFunction<CreditReport, QueryRecordInfo> queryRecordInfoFlat = new FlatMapFunction<CreditReport,QueryRecordInfo>() {
        @Override
        public void flatMap(CreditReport value, Collector<QueryRecordInfo> out) throws Exception {
            List<QueryRecordInfo> queryRecordInfos = creditReport.getQueryRecordInfos();
            for (QueryRecordInfo queryRecordInfo : queryRecordInfos) {
                out.collect(queryRecordInfo);
            }
        }

    };

    //从资源中抽取出,记录报告头
    FlatMapFunction<CreditReport, CreditReportHead> queryRecordHeaderFlat = new FlatMapFunction<CreditReport,CreditReportHead>() {
        @Override
        public void flatMap(CreditReport value, Collector<CreditReportHead> out) throws Exception {
            out.collect(value.getCreditReportHead());
        }


    };

    DataSet<QueryRecordInfo> records = reportDataSource.flatMap(queryRecordInfoFlat);

    tableEnv.createTemporaryView(QueryRecordInfo, records);
    DataSet<CreditReportHead> headers = reportDataSource.flatMap(queryRecordHeaderFlat);
    tableEnv.createTemporaryView(CreditReportHead ,headers);

    Table queryResult = tableEnv.sqlQuery("SELECT t.reportCode as reportCode,reason as reason FROM credit_report_head as t left JOIN query_record_info as c ON t.reportCode = c.reportCode");
    DataSet<ReportCode> reportHeadDataSet = tableEnv.toDataSet(queryResult, ReportCode.class);

    reportHeadDataSet.print();

我来回答

写文章

提问题

面试题