osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Sharing state between subtasks


Hey all,

I think all we need for this on the state sharing side is pretty simple.  I
opened a JIRA to track this work and submitted a PR for the state sharing
bit.

https://issues.apache.org/jira/browse/FLINK-10886
https://github.com/apache/flink/pull/7099

Please provide feedback :)

-Jamie


On Thu, Nov 1, 2018 at 3:33 AM Till Rohrmann <trohrmann@xxxxxxxxxx> wrote:

> Hi Thomas,
>
> using Akka directly would further manifest our dependency on Scala in
> flink-runtime. This is something we are currently trying to get rid of. For
> that purpose we have added the RpcService abstraction which encapsulates
> all Akka specific logic. We hope that we can soon get rid of the Scala
> dependency in flink-runtime by using a special class loader only for
> loading the AkkaRpcService implementation.
>
> I think the easiest way to sync the task information is actually going
> through the JobMaster because the subtasks don't know on which other TMs
> the other subtasks run. Otherwise, we would need to have some TM detection
> mechanism between TMs. If you choose this way, then you should be able to
> use the RpcService by extending the JobMasterGateway by additional RPCs.
>
> Cheers,
> Till
>
> On Wed, Oct 31, 2018 at 6:52 PM Thomas Weise <thw@xxxxxxxxxx> wrote:
>
> > Hi,
> >
> > We are planning to work on the Kinesis consumer in the following order:
> >
> > 1. Add per shard watermarking:
> > https://issues.apache.org/jira/browse/FLINK-5697 - this will be code we
> > already use internally; I will open a PR to add it to the Flink Kinesis
> > consumer
> > 2. Exchange of per subtask watermarks between all subtasks of one or
> > multiple sources
> > 3. Implement queue approach described in Jamie's document in to utilize
> 1.)
> > and 2.) to align the shard consumers WRT event time
> >
> > There was some discussion regarding the mechanism to share the watermarks
> > between subtasks. If there is something that can be re-used it would be
> > great. Otherwise I'm going to further investigate the Akka or JGroups
> > route. Regarding Akka, since it is used within Flink already, is there an
> > abstraction that you would recommend to consider to avoid direct
> > dependency?
> >
> > Thanks,
> > Thomas
> >
> >
> >
> > On Thu, Oct 18, 2018 at 8:34 PM Zhijiang(wangzhijiang999)
> > <wangzhijiang999@xxxxxxxxxx.invalid> wrote:
> >
> > > Not yet. We only have some initial thoughts and have not worked on it
> > yet.
> > > We will update the progress in this discussion if have.
> > >
> > > Best,
> > > Zhijiang
> > > ------------------------------------------------------------------
> > > 发件人:Aljoscha Krettek <aljoscha@xxxxxxxxxx>
> > > 发送时间:2018年10月18日(星期四) 17:53
> > > 收件人:dev <dev@xxxxxxxxxxxxxxxx>; Zhijiang(wangzhijiang999) <
> > > wangzhijiang999@xxxxxxxxxx>
> > > 抄 送:Till Rohrmann <trohrmann@xxxxxxxxxx>
> > > 主 题:Re: Sharing state between subtasks
> > >
> > > Hi Zhijiang,
> > >
> > > do you already have working code or a design doc for the second
> approach?
> > >
> > > Best,
> > > Aljoscha
> > >
> > > > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <
> > > wangzhijiang999@xxxxxxxxxx.INVALID> wrote:
> > > >
> > > > Just noticed this discussion from @Till Rohrmann's weekly community
> > > update and I want to share some thoughts from our experiences.
> > > >
> > > > We also encountered the source consuption skew issue before, and we
> are
> > > focused on improving this by two possible ways.
> > > >
> > > > 1. Control the read strategy by the downstream side. In detail, every
> > > input channel in downstream task corresponds to the consumption of one
> > > upstream source task, and we will tag each input channel with watermark
> > to
> > > find the lowest channel to read in high priority. In essence, we
> actually
> > > rely on the mechanism of backpressure. If the channel with highest
> > > timestamp is not read by downstream task for a while, it will block the
> > > corresponding source task to read when the buffers are exhausted. It is
> > no
> > > need to change the source interface in this way, but there are two
> major
> > > concerns: first it will affect the barier alignment resulting in
> > checkpoint
> > > delayed or expired. Second it can not confirm source consumption
> > alignment
> > > very precisely, and it is just a best effort way. So we gave up this
> way
> > > finally.
> > > >
> > > > 2. Add the new component of SourceCoordinator to coordinate the
> source
> > > consumption distributedly. For example we can start this componnet in
> the
> > > JobManager like the current role of CheckpointCoordinator. Then every
> > > source task would commnicate with JobManager via current RPC mechanism,
> > > maybe we can rely on the heartbeat message to attach the consumption
> > > progress as the payloads. The JobManagerwill accumulator or state all
> the
> > > reported progress and then give responses for different source tasks.
> We
> > > can define a protocol for indicating the fast soruce task to sleep for
> > > specific time for example. To do so, the coordinator has the global
> > > informations to give the proper decision for individuals, so it seems
> > more
> > > precise. And it will not affect the barrier alignment, because the
> > sleeping
> > > fast source can release the lock to emit barrier as normal. The only
> > > concern is the changes for source interface and may affect all related
> > > source implementations.
> > > >
> > > > Currently we prefer to the second way to implement and will refer to
> > > other good points above. :)
> > > >
> > > > Best,
> > > > Zhijiang
> > > > ------------------------------------------------------------------
> > > > 发件人:Jamie Grier <jgrier@xxxxxxxx.INVALID>
> > > > 发送时间:2018年10月17日(星期三) 03:28
> > > > 收件人:dev <dev@xxxxxxxxxxxxxxxx>
> > > > 主 题:Re: Sharing state between subtasks
> > > >
> > > > Here's a doc I started describing some changes we would like to make
> > > > starting with the Kinesis Source.. It describes a refactoring of that
> > > code
> > > > specifically and also hopefully a pattern and some reusable code we
> can
> > > use
> > > > in the other sources as well.  The end goal would be best-effort
> > > event-time
> > > > synchronization across all Flink sources but we are going to start
> with
> > > the
> > > > Kinesis Source first.
> > > >
> > > > Please take a look and please provide thoughts and opinions about the
> > > best
> > > > state sharing mechanism to use -- that section is left blank and
> we're
> > > > especially looking for input there.
> > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing
> > > >
> > > > -Jamie
> > > >
> > > >
> > > > On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <trohrmann@xxxxxxxxxx
> >
> > > wrote:
> > > >
> > > >> But on the Kafka source level it should be perfectly fine to do what
> > > Elias
> > > >> proposed. This is of course is not the perfect solution but could
> > bring
> > > us
> > > >> forward quite a bit. The changes required for this should also be
> > > minimal.
> > > >> This would become obsolete once we have something like shared state.
> > But
> > > >> until then, I think it would worth a try.
> > > >>
> > > >> Cheers,
> > > >> Till
> > > >>
> > > >> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <
> aljoscha@xxxxxxxxxx
> > >
> > > >> wrote:
> > > >>
> > > >>> The reason this selective reading doesn't work well in Flink in the
> > > >> moment
> > > >>> is because of checkpointing. For checkpointing, checkpoint barriers
> > > >> travel
> > > >>> within the streams. If we selectively read from inputs based on
> > > >> timestamps
> > > >>> this is akin to blocking an input if that input is very far ahead
> in
> > > >> event
> > > >>> time, which can happen when you have a very fast source and a slow
> > > source
> > > >>> (in event time), maybe because you're in a catchup phase. In those
> > > cases
> > > >>> it's better to simply not read the data at the sources, as Thomas
> > said.
> > > >>> This is also because with Kafka Streams, each operator is basically
> > its
> > > >> own
> > > >>> job: it's reading from Kafka and writing to Kafka and there is not
> a
> > > >>> complex graph of different operations with network shuffles in
> > between,
> > > >> as
> > > >>> you have with Flink.
> > > >>>
> > > >>> This different nature of Flink is also why I think that readers
> need
> > > >>> awareness of other readers to do the event-time alignment, and this
> > is
> > > >>> where shared state comes in.
> > > >>>
> > > >>>> On 10. Oct 2018, at 20:47, Elias Levy <
> fearsome.lucidity@xxxxxxxxx>
> > > >>> wrote:
> > > >>>>
> > > >>>> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <fhueske@xxxxxxxxx>
> > > >> wrote:
> > > >>>>
> > > >>>>> I think the new source interface would be designed to be able to
> > > >>> leverage
> > > >>>>> shared state to achieve time alignment.
> > > >>>>> I don't think this would be possible without some kind of shared
> > > >> state.
> > > >>>>>
> > > >>>>> The problem of tasks that are far ahead in time cannot be solved
> > with
> > > >>>>> back-pressure.
> > > >>>>> That's because a task cannot choose from which source task it
> > accepts
> > > >>>>> events and from which doesn't.
> > > >>>>> If it blocks an input, all downstream tasks that are connected to
> > the
> > > >>>>> operator are affected. This can easily lead to deadlocks.
> > > >>>>> Therefore, all operators need to be able to handle events when
> they
> > > >>> arrive.
> > > >>>>> If they cannot process them yet because they are too far ahead in
> > > >> time,
> > > >>>>> they are put in state.
> > > >>>>>
> > > >>>>
> > > >>>> The idea I was suggesting is not for operators to block an input.
> > > >>> Rather,
> > > >>>> it is that they selectively choose from which input to process the
> > > next
> > > >>>> message from based on their timestamp, so long as there are
> buffered
> > > >>>> messages waiting to be processed.  That is a best-effort alignment
> > > >>>> strategy.  Seems to work relatively well in practice, at least
> > within
> > > >>> Kafka
> > > >>>> Streams.
> > > >>>>
> > > >>>> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate
> for
> > > >> both
> > > >>>> its inputs.  Instead, it could keep them separate and selectively
> > > >> consume
> > > >>>> from the one that had a buffer available, and if both have buffers
> > > >>>> available, from the buffer with the messages with a lower
> timestamp.
> > > >>>
> > > >>>
> > > >>
> > > >
> > >
> > >
> > >
> >
>