[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [DISCUSS] SplittableDoFn Java SDK User Facing API

Some late comments, and my pre excuses if some questions look silly,
but the last documents were a lot of info that I have not yet fully

I have some questions about the ‘new’ Backlog concept following a
quick look at the PR

1. Is the Backlog a specific concept for each IO? Or in other words:
ByteKeyRestrictionTracker can be used by HBase and Bigtable, but I am
assuming from what I could understand that the Backlog implementation
will be data store specific, is this the case? or it can be in some
case generalized (for example for Filesystems)?

2. Since the backlog is a byte[] this means that it is up to the user
to give it a meaning depending on the situation, is this correct? Also
since splitRestriction has now the Backlog as an argument, what do we
expect the person that implements this method in a DoFn to do ideally
with it? Maybe a more concrete example of how things fit for
File/Offset or HBase/Bigtable/ByteKey will be helpful (maybe also for
the BundleFinalizer concept too).

3. By default all Restrictions are assumed to be unbounded but there
is this new Restrictions.IsBounded method, can’t this behavior be
inferred (adapted) from the DoFn UnboundedPerElement/Bounded
annotation or are these independent concepts?

Extra unrelated comment:
Since SDF is still @Experimental we should probably rename
OffsetRangeTracker and ByteKeyRangeTracker into the RestrictionTracker
suffix (I don’t know why they share the RangeTracker suffix for the
new trackers, WDYT?
On Wed, Nov 7, 2018 at 5:47 PM Lukasz Cwik <lcwik@xxxxxxxxxx> wrote:
> On Wed, Nov 7, 2018 at 8:33 AM Robert Bradshaw <robertwb@xxxxxxxxxx> wrote:
>> I think that not returning the users specific subclass should be fine.
>> Does the removal of markDone imply that the consumer always knows a
>> "final" key to claim on any given restriction?
> Yes, each restriction needs to support claiming a "final" key that would make the restriction "done". In the BigTable/HBase case it is the empty key "", for files it can be a file offset beyond the end of the file. Generally, restriction trackers written by SDF authors could also take an instance of an object that they can compare instance equality against for a final key. Alternatively we could allow restriction trackers to implement markDone() but would need the SDK have knowledge of the method by having the RestrictionTracker implement interface, extend abstract base class, or reflectively found so that we would be able to wrap it to provide synchronization guarantees. I had toyed with the idea of using something like the ProxyInvocationHandler that backs PipelineOptions to be able to provide a modified version of the users instance that had the appropriate synchronization guarantees but couldn't get it to work.
>> On Wed, Nov 7, 2018 at 1:45 AM Lukasz Cwik <lcwik@xxxxxxxxxx> wrote:
>> >
>> > I have started to work on how to change the user facing API within the Java SDK to support splitting/checkpointing[1], backlog reporting[2] and bundle finalization[3].
>> >
>> > I have this PR[4] which contains minimal interface/type definitions to convey how the API surface would change with these 4 changes:
>> > 1) Exposes the ability for @SplitRestriction to take a backlog suggestion on how to perform splitting and for how many restrictions should be returned.
>> > 2) Adds the ability for RestrictionTrackers to report backlog
>> > 3) Updates @ProcessElement to be required to take a generic RestrictionTracker instead of the users own restriction tracker type.
>> > 4) Adds the ability for @StartBundle/@ProcessElement/@FinishBundle to register a callback that is invoked after bundle finalization.
>> >
>> > The details are in the javadoc comments as to how I would expect the contract to play out.
>> > Feel free to comment on the ML/PR around the contract and after the feedback is received/digested/implemented, I would like to get the changes submitted so that work can start  towards providing an implementation in the Java SDK, Python SDK, and Go SDK and the shared runner portability library.
>> >
>> > I would like to call out special attention to 3 since with this change it will enable us to remove the synchronization requirement for users as we will wrap the underlying restriction tracker allowing us to add appropriate synchronization as needed and also to watch any calls that pass through the object such as the claim calls. I also believe this prevents people from writing RestrictionTrackers where the contract of tryClaim is subverted since markDone is outside the purview of tryClaim as in ByteKeyRangeTracker[5].
>> >
>> > 1: https://s.apache.org/beam-checkpoint-and-split-bundles
>> > 2: https://s.apache.org/beam-bundles-backlog-splitting
>> > 3: https://s.apache.org/beam-finalizing-bundles
>> > 4: https://github.com/apache/beam/pull/6969
>> > 5: https://github.com/apache/beam/pull/6949