# Re: [DISCUSS] SplittableDoFn Java SDK User Facing API

I'll perform the swap for a fraction because as I try to map more of the spaces to an arbitrary byte[] I naturally first map the space onto natural numbers before mapping to a byte[].

Any preference between these options:
A:
// Represents a non-negative decimal number: unscaled_value * 10^(-scale)
message Decimal {
// Represents the unscaled value as a big endian unlimited precision non-negative integer.
bytes unscaled_value = 1;
// Represents the scale
uint32 scale = 2;
}

B:
// Textual representation of a decimal (i.e. "123.00")
string decimal = 1;

C:
// Represents a non-negative decimal number: "integer"."fraction"
message Decimal {
// Represents the integral part of the decimal in big endian as a big endian unlimited precision non-negative integer.
bytes integer = 1;
// Represents the fractional part of the decimal represented as a big endian unlimited precision non-negative integer.
bytes fraction = 2;
}

A is the most common and seems to be supported by Java (BigDecimal), Python (decimal module) and Go (via shopspring/decimal). B is a close second since many languages can convert it.

On Tue, Nov 20, 2018 at 3:09 AM Robert Bradshaw <robertwb@xxxxxxxxxx> wrote:
I'm still trying to wrap my head around what is meant by backlog here, as it's different than what I've seen in previous discussions.

Generally, the backlog represented a measure of the known but undone part of a restriction. This is useful for a runner to understand in some manner what progress is being made and where remaining work lies, and this is difficult to do if expressed as an opaque byte array, and more so if backlog is local to a restriction rather than an arbitrary quantity that can be compared (and aggregated) across restrictions. Even better if a similar measure can be applied to arbitrary (e.g. completed) restrictions for estimation of a mapping to the time domain. Does using a byte[] here have advantage over using a(n often integral) floating point number?

I like the idea of using an arbitrary precision floating point number (like SQL decimal, Java BigDecimal, python decimal) since it solves several questions such as to how to aggregate values and most languages have a native representation for a decimal type. The issue is about providing a mapping for key range based sources such as Bigtable/HBase. Imagine your at key 000 and you advance to key 0000 for the restriction [0, 1), what fraction of work have you advanced?

The only solution I can provide for the backlog is if I choose a maximum precision and clamp the length of the byte[] and then provide each possible byte string a number. For example I clamp the length to 3 and give each byte string a position:
index: byte string
1: 0
2: 00
3: 000
4: 001
5: 01
6: 010
7: 011
8: 1
9: 10
10: 100
11: 101
12: 11
13: 110
14: 111

Since each key is given a value including the "largest key" I can compute the distance between two keys.

I have thought about increasing the precision as I find significantly larger keys but don't know how this will impact scaling decisions in runners.

I'm also a bit unclear on why it's desirable to pass this backlog back to the SDF when trying to split restrictions. Here it seems much more natural (for both the runner and SDK) to simply pass a floating point value in [0, 1) for the proportion of work that should be split, rather than manipulate the given backlog in to try to approximate this. (There's some ambiguity here of whether multiple splits should be returned if less than half should be retained.)

Returning the backlog using the same space as the SDF will prevent skew in what is returned since the SDF may make progress in the meantime. For example you have 100mb to process and you ask for 40% of the work and the SDK has processed 10mb in the meantime which means you'll get 40% of 90mb = 36mb back instead of 40mb. I also believe that the backlog should subdivide the space so that a request for 20mb from a backlog of 100mb should subdivide the space into 5 segments.

Having a polymorphically-interpreted bytes[] backlog seems to add a lot of complexity that would need to be justified.

Each source needs to polymorphically interpret every generic representation such as an integral fraction onto their space. There is a natural mapping for remaining bytes in a file and also for number of messages on a queue but not as clean for key range based sources as shown above.

It seems there's consensus for the finalization protocol; perhaps that can be checked in as a separate PR? (Also, the idea of having to have a final key is not ideal, e.g. it means that for lexicographic range sources the empty key has different meanings depending on whether it's a start or end key, but I think we can provide a mark-as-done method in a future compatible way if this becomes too burdensome.)

I'll pull out the finalization to another PR.

I spoke with the Bigtable folks and they said that using "" as the start and end key is unambiguous since they don't allow the "" key to exist and also that all their requests are based upon a specific key or key ranges ["", "") is unambiguous. So being at "" and advancing to "" means that you advanced to the end.

The gotchas about having a markAsDone() method is that:
* The SDK harness code powering an SDF needs to be aware that this has happened
* The SDK harness code may want to know the "position" that was last used and mark as done obfuscates that

Neither are insurmountable but we don't have many SDF implementations to provide guidance as to what is terrible with the API so would rather be more restrictive now and fix these issues as we go on.

On Tue, Nov 20, 2018 at 1:22 AM Lukasz Cwik <lcwik@xxxxxxxxxx> wrote:
I also addressed a bunch of PR comments which clarified the contract/expectations as described in my previous e-mail and the splitting/backlog reporting/bundle finalization docs.

On Mon, Nov 19, 2018 at 3:19 PM Lukasz Cwik <lcwik@xxxxxxxxxx> wrote:

On Mon, Nov 19, 2018 at 3:06 PM Lukasz Cwik <lcwik@xxxxxxxxxx> wrote:

On Thu, Nov 15, 2018 at 8:53 AM Ismaël Mejía <iemejia@xxxxxxxxx> wrote:
Some late comments, and my pre excuses if some questions look silly,
but the last documents were a lot of info that I have not yet fully
digested.

I have some questions about the ‘new’ Backlog concept following a
quick look at the PR
https://github.com/apache/beam/pull/6969/files

1. Is the Backlog a specific concept for each IO? Or in other words:
ByteKeyRestrictionTracker can be used by HBase and Bigtable, but I am
assuming from what I could understand that the Backlog implementation
will be data store specific, is this the case? or it can be in some
case generalized (for example for Filesystems)?

The backlog is tied heavily to the restriction tracker implementation, any data store using the same restriction tracker will provide the same backlog computation. For example, if HBase/Bigtable use the ByteKeyRestrictionTracker then they will use the same backlog calculation. Note that an implementation could subclass a restriction tracker if the data store could provide additional information. For example, the default backlog for a ByteKeyRestrictionTracker over [startKey, endKey) is distance(currentKey, lastKey) where distance is represented as byte array subtraction (which can be wildly inaccurrate as the density of data is not well reflected) but if HBase/Bigtable could provide the number of bytes from current key to last key, a better representation could be provided.

Other common examples of backlogs would be:
* files: backlog = length of file - current byte offset
* message queues: backlog = number of outstanding messages

2. Since the backlog is a byte[] this means that it is up to the user
to give it a meaning depending on the situation, is this correct? Also
since splitRestriction has now the Backlog as an argument, what do we
expect the person that implements this method in a DoFn to do ideally
with it? Maybe a more concrete example of how things fit for
File/Offset or HBase/Bigtable/ByteKey will be helpful (maybe also for
the BundleFinalizer concept too).

Yes, the restriction tracker/restriction/SplittableDoFn must give the byte[] a meaning. This can have any meaning but we would like that the backlog byte[] representation to be lexicograhically comparable (when viewing the byte[] in big endian format and prefixes are smaller (e.g. 001 is smaller then 0010) and preferably a linear representation. Note that all restriction trackers of the same type should use the same "space" so that backlogs are comparable across multiple restriction tracker instances.

The backlog when provided to splitRestriction should be used to subdivide the restriction into smaller restrictions where each would have the backlog if processed (except for potentially the last).

A concrete example would be to represent the remaining bytes to process in a file as a 64 bit big endian integer, lets say that is 500MiB (524288000 bytes) or 00000000 00000000 00000000 00000000 00011111 01000000 (note that the trailing zeros are optional and doesn't impact the calculation). The runner could notice that processing the restriction will take 10 hrs, so it asks the SDF to split at 1/16 segments by shifting the bits over by 4 and asks to split using backlog 00000000 00000000 00000000 00000000 00000001 11110100. The SDK is able to convert this request back into 32768000 bytes and returns 16 restrictions. Another example would be for a message queue where we have 10000 messages on the queue remaining so the backlog would be 00000000 00000000 00000000 00000000 00000000 00000000 00100111 00010000 when represented as a 64 bit unsigned big endian integer. The runner could ask the SDK to split using a 1/8th backlog of 00000000 00000000 00000000 00000000 00000000 00000000 00000100 11100010 which the SDK would break out into 8 restrictions, the first 7 responsible for reading 1250 messages and stopping while the last restriction would read 1250 messages and then continue to read anything else that has been enqueued.

Bundle finalization is unrelated to backlogs but is needed since there is a class of data stores which need acknowledgement that says I have successfully received your data and am now responsible for it such as acking a message from a message queue.

Note that this does bring up the question of whether SDKs should expose coders for backlogs since ByteKeyCoder and BigEndianLongCoder exist which would cover a good number of scenarios described above. This coder doesn't have to be understood by the runner nor does it have to be part of the portability APIs (either Runner of Fn API). WDYT?

3. By default all Restrictions are assumed to be unbounded but there
is this new Restrictions.IsBounded method, can’t this behavior be
inferred (adapted) from the DoFn UnboundedPerElement/Bounded
annotation or are these independent concepts?

UnboundedPerElement/BoundedPerElement tells us during pipeline construction time what type of PCollection we will be creating since we may have a bounded PCollection goto an UnboundedPerElement DoFn and that will produce an unbounded PCollection and similarly we could have an unbounded PCollection goto a BoundedPerElement DoFn and that will produce an unbounded PCollection. Restrictions.IsBounded is used during pipeline execution to inform the runner whether a restriction being returned is bounded or not since unbounded restrictions can return bounded restrictions during splitting. So in the above example using the message queue, the first 7 restrictions that only read 1250 messages would be marked with the Restrictions.IsBounded interface while the last one would not be. This could also be a method on restrictions such as "IsBounded isBounded()" on PCollections.

Extra unrelated comment:
Since SDF is still @Experimental we should probably rename
OffsetRangeTracker and ByteKeyRangeTracker into the RestrictionTracker
suffix (I don’t know why they share the RangeTracker suffix for the
new trackers, WDYT?

Agree, will perform in a follow-up PR.

On Wed, Nov 7, 2018 at 5:47 PM Lukasz Cwik <lcwik@xxxxxxxxxx> wrote:
>
>
>
> On Wed, Nov 7, 2018 at 8:33 AM Robert Bradshaw <robertwb@xxxxxxxxxx> wrote:
>>
>> I think that not returning the users specific subclass should be fine.
>> Does the removal of markDone imply that the consumer always knows a
>> "final" key to claim on any given restriction?
>
>
> Yes, each restriction needs to support claiming a "final" key that would make the restriction "done". In the BigTable/HBase case it is the empty key "", for files it can be a file offset beyond the end of the file. Generally, restriction trackers written by SDF authors could also take an instance of an object that they can compare instance equality against for a final key. Alternatively we could allow restriction trackers to implement markDone() but would need the SDK have knowledge of the method by having the RestrictionTracker implement interface, extend abstract base class, or reflectively found so that we would be able to wrap it to provide synchronization guarantees. I had toyed with the idea of using something like the ProxyInvocationHandler that backs PipelineOptions to be able to provide a modified version of the users instance that had the appropriate synchronization guarantees but couldn't get it to work.
>
>>
>> On Wed, Nov 7, 2018 at 1:45 AM Lukasz Cwik <lcwik@xxxxxxxxxx> wrote:
>> >
>> > I have started to work on how to change the user facing API within the Java SDK to support splitting/checkpointing[1], backlog reporting[2] and bundle finalization[3].
>> >
>> > I have this PR[4] which contains minimal interface/type definitions to convey how the API surface would change with these 4 changes:
>> > 1) Exposes the ability for @SplitRestriction to take a backlog suggestion on how to perform splitting and for how many restrictions should be returned.
>> > 2) Adds the ability for RestrictionTrackers to report backlog
>> > 3) Updates @ProcessElement to be required to take a generic RestrictionTracker instead of the users own restriction tracker type.
>> > 4) Adds the ability for @StartBundle/@ProcessElement/@FinishBundle to register a callback that is invoked after bundle finalization.
>> >
>> > The details are in the javadoc comments as to how I would expect the contract to play out.
>> > Feel free to comment on the ML/PR around the contract and after the feedback is received/digested/implemented, I would like to get the changes submitted so that work can start  towards providing an implementation in the Java SDK, Python SDK, and Go SDK and the shared runner portability library.
>> >
>> > I would like to call out special attention to 3 since with this change it will enable us to remove the synchronization requirement for users as we will wrap the underlying restriction tracker allowing us to add appropriate synchronization as needed and also to watch any calls that pass through the object such as the claim calls. I also believe this prevents people from writing RestrictionTrackers where the contract of tryClaim is subverted since markDone is outside the purview of tryClaim as in ByteKeyRangeTracker[5].
>> >
>> > 1: https://s.apache.org/beam-checkpoint-and-split-bundles
>> > 2: https://s.apache.org/beam-bundles-backlog-splitting
>> > 3: https://s.apache.org/beam-finalizing-bundles
>> > 4: https://github.com/apache/beam/pull/6969
>> > 5: https://github.com/apache/beam/pull/6949