osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Integrating Stateful DoFns from the Python SDK


I think Create() simply ignores any type hints it's given (which should be fixed). 

On Wed, Oct 17, 2018 at 4:17 PM Maximilian Michels <mxm@xxxxxxxxxx> wrote:
Type hints turn out to be not so predictable:

1) WORKS
   p | beam.Impulse() \
     | beam.ParDo(MyCreate()).with_output_types(typehints.KV[K, V]) \
     | "statefulParDo" >> beam.ParDo(AddIndex())

2) DOES NOT (no KvCoder)
   p | beam.Create(inputs).with_output_types(typehints.KV[K, V]) \
     | "statefulParDo" >> beam.ParDo(AddIndex())

Do you know a way to make 2) work, i.e. set the KvCoder for the Create?


In the first example, the Create runs in a ParDo, in the second example
On 17.10.18 15:34, Maximilian Michels wrote:
> Thanks Robert. I was able to get it working by adding this to the
> transform before my stateful DoFn:
>
>    .with_output_types(typehints.KV[K, V])
>
> For some reason `.with_input_types(typehints.KV[K, V])` on my stateful
> DoFn did not work.
>
> Until we enforce KV during pipeline construction, we will have to throw
> an informative exception in the Runner.
>
> On 17.10.18 15:03, Robert Bradshaw wrote:
>> Yes, we should be enforcing keyness (and use of KeyCoder with)
>> stateful DoFns, similar to what we do for GBKs. See e.g.
>> https://github.com/apache/beam/pull/6304#issuecomment-421935375
>>
>> (This possibly relates to a long-standing issue that the coder
>> inference should be moved up into construction, or at least before we
>> pass the graph to the runner.)
>>
>> On Wed, Oct 17, 2018 at 2:52 PM Maximilian Michels <mxm@xxxxxxxxxx
>> <mailto:mxm@xxxxxxxxxx>> wrote:
>>
>>     Hi everyone,
>>
>>     While integrating portable state with the FlinkRunner, I hit a
>> problem
>>     and wanted to get your opinion.
>>
>>     Stateful DoFns require their input to be KV records. The reason for
>>     this
>>     is that state is isolated by key. The (non-portable) FlinkRunner uses
>>     Flink's `keyBy(key)` construct to partition state by key [1].
>>
>>     That works fine for portable Java pipelines where we enforce the `KV`
>>     class for Stateful DoFns. After running tests with the Python SDK, I
>>     came to the conclusion that tuples, e.g. `(key, value)` which are
>> used
>>     for KV functionality, do not go through the KvCoder but are encoded
>>     using a byte array encoder.
>>
>>     How do we infer the key in the Runner from an opaque sequence of
>> bytes?
>>     Should we also require the KvCoder for stateful DoFns in the
>> Python SDK?
>>
>>     Thanks,
>>     Max
>>
>>     [1]
>>     
>> https://github.com/apache/beam/blob/22f59deaf2a0aa77d98f2f024ce4b2a399014382/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L471
>>
>>