osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: KafkaIO - Deadletter output


This is a great question. I've added the dev list to be sure it gets noticed by whoever may know best.

Kenn

On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <tobias.kaymak@xxxxxxxxxx> wrote:

Hi,

Is there a way to get a Deadletter Output from a pipeline that uses a KafkaIO
connector for it's input? As `TimestampPolicyFactory.withTimestampFn()` takes
only a SerializableFunction and not a ParDo, how would I be able to produce a
Deadletter output from it?

I have the following pipeline defined that reads from a KafkaIO input:

pipeline.apply(
  KafkaIO.<String, String>read()
    .withBootstrapServers(bootstrap)
    .withTopics(topics)
    .withKeyDeserializer(StringDeserializer.class)
    .withValueDeserializer(ConfigurableDeserializer.class)
    .updateConsumerProperties(
        ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, inputMessagesConfig))
    .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
    .updateConsumerProperties(ImmutableMap.of("group.id", "beam-consumers"))
    .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
    .withTimestampPolicyFactory(
        TimestampPolicyFactory.withTimestampFn(
            new MessageTimestampExtractor(inputMessagesConfig)))
    .withReadCommitted()
    .commitOffsetsInFinalize())


and I like to get deadletter outputs when my timestamp extraction fails.

Best,
Tobi