[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Routing events by key

Perhaps what you're looking for is WithKeys [1]? Any PCollection<KV<.,.>> is essentially a KeyedDataStream. GroupByKey colocates the values for a given key (and window). What isn't promised by Apache Beam is that successive (key, value) pairs with the same key must be processed sequentially, on the same machine. In practice, all runners do so after a GBK. (Note that in the face of worker failure, no runner can 100% provide this guarantee.)

[1] https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/transforms/WithKeys.html

On Sun, Jul 8, 2018 at 9:08 AM Niels Basjes <Niels@xxxxxxxxx> wrote:
Hi all,

Thanks for the help.

I did some additional reading and it looks like what I want to have is what in Apache Flink is called a KeyedDataStream that is created by applying the KeyBy opration on a DataStream ( https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams ).
It seems that this is the feature I'm looking for in Beam.

Just to check if I understand it correctly  
1) the KeyedDataStream in Flink IS the same as what I am looking for in Beam?
2) in the current version of Beam such a concept does not yet exist out of the box. Yet this can be built using the construct Robert described?

Just to get my goal clear: I have change events coming in from various sources, I want to avoid the need for database side locking and as such I want to route all events for a specific entity into a single threaded stream (i.e. a single mapper instance). 

So what I understand right now to make it work in Beam:
- I should be able to use the Session window using my entity id as the session id.
- and then do a window operation that "does the work"

Because I'm interested in avoiding database locks I have to make sure I use the processing time to base these windows upon (not the event time).

Do you guys think this would work for my use case?

Towards the future;  
To me this "KeyBy" operation seems like a very fundamental operation on a stream that I would expect to find in any stream processing tool. 
Can this be added to Beam and then see if the runners can support it?

Niels Basjes

On Sun, Jul 8, 2018 at 1:43 AM, Robert Bradshaw <robertwb@xxxxxxxxxx> wrote:
Reshuffle for the purpose of ensuring stable inputs is deprecated, but this seems a valid(ish) usecase. 

Currently, all runners implement GroupByKey by sending everything with the same key to the same machine, however this is not guaranteed by the Beam model, and changing it has been tossed around (e.g. when using fixed windows, one could send different key-window pairs to different machines). Even when this is the case, there's no guarantee that there won't be backup workers processing the same key, or even if the runner doesn't do backups, zombie workers (e.g. where we thought the worker died, and allocated its work elsewhere, but it turns out the worker is till churning away possibly causing side effects). Such is the nature of a distributed system. 

If you go the GBK route, rather than windowing by 1 second, you could us an AfterPane.elementCountAtLeast(1) trigger [1] (even in the global window) for absolute minimal latency. https://beam.apache.org/documentation/programming-guide/#data-driven-triggers . This is essentially what Reshuffle does. 

- Robert

On Fri, Jul 6, 2018 at 11:11 PM Jean-Baptiste Onofré <jb@xxxxxxxxxxxx> wrote:
Hi Raghu,

AFAIR, Reshuffle is considered as deprecated. Maybe it would be better
to avoid to use it no ?


On 06/07/2018 18:28, Raghu Angadi wrote:
> I would use Reshuffle()[1] with entity id as the key. It internally does
> a GroupByKey and sets up windowing such that it does not buffer anything.
> [1]
> : https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L51 
> On Fri, Jul 6, 2018 at 8:01 AM Niels Basjes <Niels@xxxxxxxxx
> <mailto:Niels@xxxxxxxxx>> wrote:
>     Hi,
>     I have an unbounded stream of change events each of which has the id
>     of the entity that is changed.
>     To avoid the need for locking in the persistence layer that is
>     needed in part of my processing I want to route all events based on
>     this entity id.
>     That way I know for sure that all events around a single entity go
>     through the same instance of my processing sequentially, hence no
>     need for locking or other synchronization regarding this persistence.
>     At this point my best guess is that I need to use the GroupByKey but
>     that seems to need a Window. 
>     I think I don't want a window because the stream is unbounded and I
>     want the lowest possible latency (i.e. a Window of 1 second would be
>     ok for this usecase).
>     Also I want to be 100% sure that all events for a specific id go to
>     only a single instance because I do not want any race conditions.
>     My simple question is: What does that look like in the Beam Java API?
>     --
>     Best regards / Met vriendelijke groeten,
>     Niels Basjes

Jean-Baptiste Onofré
Talend - http://www.talend.com

Best regards / Met vriendelijke groeten,

Niels Basjes