apache-kafka - 反序列化主题XXX到Avro的数据失败,在使用Kafka时检索id XXX的Avro架构时出错 - 连接到接收器数据到PostgreSQL

  显示原文与译文双语对照的内容

ERROR WorkerSinkTask{id=sink-postgresql-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)


org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler


 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)


 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)


 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)


 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)


 at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)


 at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)


 at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)


 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)


 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)


 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)


 at java.util.concurrent.FutureTask.run(FutureTask.java:266)


 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)


 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)


 at java.lang.Thread.run(Thread.java:748)


Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic XXX to Avro:


 at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)


 at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)


 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)


 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)


 ... 13 more


Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id XXX


Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401


 at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230)


 at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)


 at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:323)


 at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:311)


 at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:184)


 at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:297)


 at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:175)


 at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:144)


 at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:206)


 at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:148)


 at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:93)


 at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)


 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)


 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)


 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)


 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)


 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)


 at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)


 at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)


 at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)


 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)


 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)


 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)


 at java.util.concurrent.FutureTask.run(FutureTask.java:266)


 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)


 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)


 at java.lang.Thread.run(Thread.java:748)


[2020-01-29 16:57:17,346] ERROR WorkerSinkTask{id=sink-postgresql-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)




./connect-standalone ../etc/schema-registry/connect-avro-standalone.properties ../etc/kafka-connect-jdbc/sink-quickstart-postgresql.properties



connect-avro-standalone.properties文件


bootstrap.servers=localhost:9092


key.converter=org.apache.kafka.connect.storage.StringConverter


key.converter.schema.registry.url=http://localhost:8081


value.converter=io.confluent.connect.avro.AvroConverter


value.converter.schema.registry.url=http://localhost:8081


internal.key.converter=org.apache.kafka.connect.json.JsonConverter


internal.value.converter=org.apache.kafka.connect.json.JsonConverter


internal.key.converter.schemas.enable=false


internal.value.converter.schemas.enable=false


offset.storage.file.filename=/tmp/connect.offsets


plugin.path=<path2confluent>/confluent-5.3.1/share/java



sink-quickstart-postgresql.properties


auto.create=true


name=sink-postgresql


connector.class=io.confluent.connect.jdbc.JdbcSinkConnector


tasks.max=1


insert.mode=upsert


topics=topic-name


connection.url=jdbc:postgresql://localhost:5432/postgres


connection.user=postgres


connection.password=postgres


mode=incrementing


key.converter=org.apache.kafka.connect.storage.StringConverter


key.converter.schema.registry.url=http://localhost:8081


value.converter=io.confluent.connect.avro.AvroConverter


value.converter.schemas.enable=true


value.converter.schema.registry.url=http://localhost:8081


pk.mode=record_value


pk.fields=schema_field


config.action.reload=restart


errors.log.enable=true


errors.log.include.messages=true


print.key=true



我没有在PostgreSQL中创建表,但是db存在,当我使用kafka-avro-console-consumer检索数据时,它也是comming。

时间:

看起来你的主题名字有错别字。

你说你的kafka-avro-console-consumertopic_name ()一起工作,但在你的配置中。

...