[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Rethinking Timers as PCollections




On Wed, Sep 19, 2018 at 2:46 PM Robert Bradshaw <robertwb@xxxxxxxxxx> wrote:
On Wed, Sep 19, 2018 at 8:31 PM Lukasz Cwik <lcwik@xxxxxxxxxx> wrote:
How does modelling a timer as a PCollection help the Beam model?

The largest concern was about how to model timers within Apache Beam that:
1) removed the need for the watermark hold that is typically accompanied with state/timer implementations
2) enabled the ability to set the explicit output time to be independent of the firing time for all timer specifications [1]

I felt as though treating timers as a self-loop around the ParDo PTransform allowed us to use the natural definition of output watermark = min(all input watermarks) as a way to define how timers hold output and using windowed values that contained timers as a natural way to represent the output time to be independent of the firing time. The purpose of the PCollection right now is to store the representation of how timers are encoded. I suspect that at some point in time we will have different timer encodings.

I agree that being able to separate the hold time from firing time of a timer is important, but in retrospect don't think timers as PCollections is the only (or most natural) way to represent that (in the model or in runner implementations). 
Can you go into more detail as to what your suggesting as the replacement and why you believe it fits the model more naturally since "state" doesn't have watermarks or produce output but timers can. I'm not disagreeing that timers as PCollections may not be a natural fit but I don't see them as state as well since "user state" doesn't produce output.
 
 
Having this fit well with how timers are delivered between the SDK and Runner was an added bonus. Also, a good portion of the code that I needed to fix up was more related to the assumption that there was ever only a single input producer to an executable stage and plumbing of timer specifications through all the runner library support layers.

----------
There is no "clear" for timers.

The current Java API for timers only allows you to set them. Clearing timers is not exposed to users and is only used by internal implementations to support runners[2] via TimerInternals. Usage of a timer is like so:
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @ProcessElement
  public void process(
      ProcessContext context,
      BoundedWindow window,
      @TimerId("timer") Timer myTimer) {

    myTimer.set(window.maxTimestamp().plus(allowedLateness));
  }

We'll probably want clear. But currently there's already exactly one timer per window per key, and setting another one overwrites the previous one, again making it more like state. Maybe, as you said, it could involve retractions (but every output being a retraction seems odd.)
Once retractions exist, most GBK firings will have a preceding retraction so I believe they will be very common. 
 

---------
I'm not a big fan of having timers as a separate field in the elements proto. I still think they should be treated as an input/output and we could update the representation so that inputs/outputs for PTransforms don't need to be "PCollections". I was thinking that our current PCollection representation assumes that we'll never want to change it to add extra information or do backwards incompatible changes like beam:pcollection:v2.

If the data never travels from one PTransform to another, but always go directly to/from the runner harness, I think using explicit channels to communicate this information in the fn api makes more sense than complicating the graph with special types of PCollections. This is consistent with how we do side inputs and state, and I think more consistent with the DAG a user has in their head when writing a pipline. (And I could also see speculatively pushing state or side input information in the data channel too.)

Especially when writing, they feel a lot more like they belong to state. And it could make sense to try to read unfired timers as well. 
 

---------
Other points:
* side inputs already require a runner to introspect the ParDo payload to get the SideInputSpec, requiring it to have knowledge of the TimerSpec is no different.

My point was that once it has knowelge of the TimerSpec, there is no need for (meaning no additional information provided by) the timer PCollection nor its edges.
The way in which the timer is encoded is missing. This could be explicit on the TimerSpec like the other StateSpec definitions though.

 
* multimap side input over timers where the key is the key that the timer is associated with. iterable side input over timers would allow you to iterate over <key, timer> pairs. This could be useful for skew control in sources since they would want to know how far they are ahead vs other restrictions.
* user state as a PCollection can make sense but I can't see how we can get past problems when we treat it as an "input" since the input watermark would be ignored or infinity?. I do agree that this could open the door to sharing "state" such as multi-key transactions but very speculative as you say.



On Wed, Sep 19, 2018 at 6:28 AM Thomas Weise <thw@xxxxxxxxxx> wrote:
Robert,

Thanks for presenting these thoughts. Your attempt to implement the timer support in the Python runner is the first strong signal we have and it is the right time to make changes - AFAIK no other runner work has been done.

I'm also a bit concerned about the acrobatics required in the PR to make this work. Luke will be in the best position to comment, but as I recall we considered modeling timers as special PCollections a simplification for SDK <> Runner interaction and overall implementation. The special treatment (and slight confusion) at the graph level perhaps was an early warning sign, discovering the extra complexity wiring this in a runner should be a reason to revisit.

Conceptually timers are special state, they are certainly more state than stream :) Regardless how they are passed to the harness, the runner will need to treat them similar to side inputs and user state.

Thanks,
Thomas




On Wed, Sep 19, 2018 at 3:33 AM Robert Bradshaw <robertwb@xxxxxxxxxx> wrote:
TLDR Perhaps we should revisit https://s.apache.org/beam-portability-timers in light of the fact that Timers are more like State than PCollections. 

--

While looking at implementing State and Timers in the Python SDK, I've been revisiting the ideas presented at https://s.apache.org/beam-portability-timers , and am now starting to wonder if this is actually the best way to model things (at least at the Runner level). Instead it seems Timers are more resemble, and are tightly bound to, State than PCollections. 

This is especially clear when writing timers. These timers are not a bag of emitted elements, rather one sets (and clears) timers and the set of timers that end up firing are a result of this *ordered* sequence of operations. It is also often important that the setting of timers be ordered with respect to the setting and clearing of state itself (and is more often than not collocated with such requests). 

In addition, these self-loops add complexity to the graph but provide no additional information--they are entirely redundant with the timerspecs already present on DoFns. Generally I prefer less redundancy in the spec, rather than have it be over-constrained. It's unclear what a runner that didn't introspect the DoFn's TimerSpecs would do with this these special edges, and also unclear how they would differ from possible self-loops due to more traditional iteration. 

The primary motivation to express timers in this way seems to be the desire to push them to workers using the data plan, rather than inventing another mechanism or making them pull-based like with state. I think this could be done by simply adding a Timer field to the Elements (or Data) proto. (Note that this is not the same as having an hacky ElementOrTimer elements flow through the graph.) Writes would be state requests, and perhaps it would even make sense to "read" the current value of an unfired timer over the state API, to be able to set things like {min,max}(new_timestamp,old_timestamp}.

(We could alternatively attempt to model State(s) as a PCollection(s), but this is more speculative and would likely exacerbate some of the issues above (though it could open the door for DoFns that somehow *share* state). They seem like different objects though, one is a mutable store, the other an immutable stream.) 

I realize this is a big shift, but we could probably adapt the existing Python/Java implementations fairly easily (and it would probably simplify them). And it's easier to do simplifications like this sooner rather than later. 

What do people think about this? Any obvious (or not-so-obvious) downsides that I'm missing? 

- Robert