[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Invite to comment on the @RequiresStableInput design doc

The common use case for a timer is to read in data that was stored using the state API in processElement. There is no guarantee that is stable, and I believe no runner currently guarantees this. For example:

class MyDoFn extends DoFn<ElementT, Void> {
  @StateId("bag") private final StateSpec<BagState<ElementT>> buffer = StateSpec.bag(ElementCoder.of());
  @TimerId("timer") private final TimerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

  @ProcessElement public void processElement(@Element ElementT element, @StateId("bag") BagState<ElementT> bag, @TimerId("timer") Timer timer) {

  @OnTimer("timer") public void onTimer(@StateId("bag") BagState<ElementT> bag) {

If you tagged onTimer with @RequiresStableInput, then you could guarantee that if the timer retried then it would read the same elements out of the bag. Today this is not guaranteed - the data written to the bag might not even be persisted yet when the timer fires (for example, both the processElement and the onTimer might be executed by the runner in the same bundle).

This particular example is a simplistic one of course - you could accomplish the same thing with triggers. When Raghu worked on the exactly-once Kafka sink this was very problematic. The final solution used some specific details of Kafka to work, and is complicated and not portable to other sinks.

BTW - you can of course just have OnTimer produce the output to another transform marked with RequiresStableInput. However this solution is very expensive - every element must be persisted to stable storage multiple times - and we tried hard to avoid doing this in the Kafka sink.


On Mon, Jul 2, 2018 at 6:24 PM Robert Bradshaw <robertwb@xxxxxxxxxx> wrote:
Could you give an example of such a usecase? (I suppose I'm not quite following what it means for a timer to be unstable...)

On Mon, Jul 2, 2018 at 6:20 PM Reuven Lax <relax@xxxxxxxxxx> wrote:
One issue: we definitely have some strong use cases where we want this on ProcessTimer but not on ProcessElement. Since both are on the same DoFn, I'm not sure how you would represent this as a separate transform.

On Mon, Jul 2, 2018 at 5:05 PM Robert Bradshaw <robertwb@xxxxxxxxxx> wrote:
Thanks for the writeup. 

I'm wondering with, rather than phrasing this as an annotation on DoFn methods that gets plumbed down through the portability representation, if it would make more sense to introduce a new, primitive "EnsureStableInput" transform. For those runners whose reshuffle provide stable inputs, they could use that as an implementation, and other runners could provide other suitable implementations. 

On Mon, Jul 2, 2018 at 3:26 PM Robin Qiu <robinyq@xxxxxxxxxx> wrote:
Hi everyone,

Thanks for your feedback on the doc. I have revamped it according to all of the comments. The major changes I have made are:
* The problem description should be more general and accurate now.
* I added more background information, such as details about Reshuffle, so I should be easier to understand now.
* I made it clear what is the scope of my current project and what could be left to future work.
* It now reflects the current progress of my work, and discusses how it should work with the portable pipeline representation (WIP)

Also, I forgot to mention last time that this doc may be interesting to those of you interested in Reshuffle, because Reshuffle is used as a current workaround for the problem described in the doc.

More comments are always welcome.


On Fri, Jun 15, 2018 at 7:34 AM Kenneth Knowles <klk@xxxxxxxxxx> wrote:
Thanks for the write up. It is great to see someone pushing this through.

I wanted to bring Luke's high-level question back to the list for visibility: what about portability and other SDKs?

Portability is probably trivial, but the "other SDKs" question is probably best answered by folks working on them who can have opinions about how it works in their SDKs idioms.