[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [SDF] Why do we need markDone (or an equivalent claim)?

On Fri, Nov 30, 2018 at 4:43 PM Robert Bradshaw <robertwb@xxxxxxxxxx> wrote:
On Fri, Nov 30, 2018 at 10:28 PM Lukasz Cwik <lcwik@xxxxxxxxxx> wrote:
> On Fri, Nov 30, 2018 at 12:47 PM Robert Bradshaw <robertwb@xxxxxxxxxx> wrote:
>> On Fri, Nov 30, 2018 at 7:10 PM Lukasz Cwik <lcwik@xxxxxxxxxx> wrote:
>> >
>> > Uh, I'm not sure what your asking.
>> I'm asking why we wanted a markDone in the first place.
> When looking at the byte key restriction tracker code, I found a couple of bugs around how ranges were being compared and how the byte key range was being claimed (we weren't computing the next key correctly). The usage of markDone seemed to be a crutch when attempting to correctly implement the tryClaim code. Also, all the framework code that powered SDF wasn't aware of markDone so it couldn't validate that the last claim failed. So I fixed the tryClaim code and then didn't need markDone and removed it since this was the only restriction tracker that had it.
>> > The SDF API already has a void return on processElement means that a call to tryClaim must have returned false
>> We could widen this to "or finished the restriction."
> Yes, having markDone could be added to the API. Is it a crutch for subtle bugs in tryClaim though?

I'm proposing removing the requirement of having either a markDone or
a tryClaim(EndKey).

Yes, I could see that.
>> > while a non void return allows the caller to either return STOP (tryClaim must have returned false) or return RESUME (with a time of when to resume).
>> We could also return STOP if tryClaim never returned false but the
>> restriction was finished.
>> > This allows the framework code to prevent user errors by ensuring the restriction has been completed.
>> I don't think the framework can ensure this. (It can enforce the above
>> constraints that on a STOP tryClaim did indeed return false on the
>> last call, but I'm fuzzy on the value this actually provides when it
>> just means the use must artificially force it to return a false value.
>> It also means we can't make it an error to try claiming values outside
>> the initial restriction. If we want to make things more explicit, we
>> could require a STOP or RESUME return rather than allow a void
>> return.)
> I don't think we want SDF authors to ensure that their values are in the initial range first before attempting to claim them as this is the purpose of tryClaim. The SDF code would then be checking that the range is valid twice.
> processElement() {
>   readElement
>   isElementInRange?
>   if (!tryClaim) {
>     return
>   }
> }
> (both isElementInRange and tryClaim are now doing the same bounds checking which can lead to subtle bounds checking errors).

Generally code would be iterating over the range, and it would likely
be a bug to check past it, but if we want to support code that ignores
range.getEndPosition() and lets tryClaim do all the work I buy that as
a good argument to allow arbitrary claim attempts.

>> Maybe I'm just not clever enough to come up with a kind of source
>> where this could be good at catching errors?
> I think the value is that we expect to implement a few restriction tracker classes which will be re-used across many SDF implementations. In this case, we could point out to the SDF author that they haven't claimed all that they said they would process. This would be true whether markDone existed or not.

The general pattern is

processRestriction() {
  for (element : source[restriction]) {
    if (!tryClaim(element)) {
      return STOP
    } else {
  return STOP  // or if CONTINUE is returned, omit the above line

and I'm having a hard time coming up with any bugs that would be
caught if we didn't require the (seemingly boilerplate)
tryClaim(everything) line. Maybe I'm not thinking of the right source?

I was looking at the RestrictionTracker(https://github.com/apache/beam/blob/176851192bba449dba0b2bc7cc45a2342b587dbd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L869) that is part of the Watch PTransform and it is significantly more complicated then the others since the RestrictionTracker is responsible for "polling" for new elements to process and expects the caller to process everything that is part of the set. This may not be the best way this RestrictionTracker could have been implemented since it seemed like we were misusing several of the RestrictionTracker concepts and significantly tying the implementation to the implementation of Watch but this could be the example worth studying.

The other case I could see that is worth investigating is whether it helps ensure restrictions are completed in the case of poor/incorrect exception handling on the part of the SDF implementation.

>> > Also, "" is the byte key range, the code could have just passed in range.getEndPosition() in to the final tryClaim, its just that "" is shorthand and would be similar to passing in Long.MAX_VALUE for the file offset range.
>> Having to choose a value pass depending on the restriction tracker
>> type is something that could simply be eliminated.
>> > On Fri, Nov 30, 2018 at 2:45 AM Robert Bradshaw <robertwb@xxxxxxxxxx> wrote:
>> >>
>> >> In looking at the SDF examples, it seems error-prone to have to
>> >> remember to write
>> >>
>> >>     tryClaim([fake-end-position])
>> >>
>> >> to indicate that a restriction is finished. IIRC, this was done to
>> >> decide whether the entire restriction had been processed on return in
>> >> the case that tryClaim never returned false. It seems preferable to
>> >> encode this into the return value (with a void return meaning iff
>> >> tryClaim returned false, and a non-void return being able to indicate
>> >> any hints as to when, if ever, process should be called again).
>> >>
>> >> Can someone job my memory as to if there was a case in which this wouldn't work?