structuredstreaming读取kafka中数据单条数据过大


0

问题:在网上查修改kafka配置,重启服务,配置没有改变(修改前后保存信息一致fetch size 1048576):

#server.properties(10M)
message.max.bytes=10485760
max.partition.fetch.bytes=10485760

#consumer.properties(10M)
max.partition.fetch.bytes=10485760

#producer.properties(10M)
message.max.bytes=10485760

报错信息:

org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, 192.168.10.202, executor 0): org.apache.kafka.common.errors.RecordTooLargeException: There are some messages at [Partition=Offset]: {kafka_obinlog-0=447577} whose size is larger than the fetch size 1048576 and hence cannot be ever returned. Increase the fetch size on the client (using max.partition.fetch.bytes), or decrease the maximum message size the broker will allow (using message.max.bytes).

2 答案

0

在build sparksession的时候指定下batch size试试,参数名字忘记了,查下吧

  好的,谢谢,我查一下 - 冰玉 2019-11-08


0

maxRatePerPartition:每秒获取数据条数
batchDuration:批次大小
spark.streaming.kafka.maxRatePerPartition: set the maximum number of messages per partition per second. This when combined with batchDuration will control the batch size. You want the maxRatePerPartition to be set, and large (otherwise you are effectively throttling your job) and batchDuration

  消费者添加kafka的参数max.request.size,这个参数的值应该小于在server.properties中配置的message.max.bytes的值 - 拾肆 2019-11-09


我来回答