osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Performance issue in Beam 2.4 onwards


Instead of reverting/working around specific checks/tests that the DirectRunner is doing, have you considered using one of the other runners like Flink or Spark with a local execution cluster. You won't hit the validation/verification bottlenecks that DirectRunner specifically imposes.

On Mon, Jul 9, 2018 at 8:46 AM Jean-Baptiste Onofré <jb@xxxxxxxxxxxx> wrote:
Thanks for the update Eugene.

@Vojta: do you mind to create a Jira ? I will tackle a fix for that.

Regards
JB

On 09/07/2018 17:33, Eugene Kirpichov wrote:
> Hi -
>
> If I remember correctly, the reason for this change was to ensure that
> the state is encodable at all. Prior to the change, there had been
> situations where the coder specified on a state cell is buggy, absent or
> set incorrectly (due to some issue in coder inference), but direct
> runner did not detect this because it never tried to encode the state
> cells - this would have blown up in any distributed runner.
>
> I think it should be possible to relax this and clone only values being
> added to the state, rather than cloning the whole state on copy(). I
> don't have time to work on this change myself, but I can review a PR if
> someone else does.
>
> On Mon, Jul 9, 2018 at 8:28 AM Jean-Baptiste Onofré <jb@xxxxxxxxxxxx
> <mailto:jb@xxxxxxxxxxxx>> wrote:
>
>     Hi Vojta,
>
>     I fully agree, that's why it makes sense to wait Eugene's feedback.
>
>     I remember we had some performance regression on the direct runner
>     identified thanks to Nexmark, but it has been addressed by reverting a
>     change.
>
>     Good catch anyway !
>
>     Regards
>     JB
>
>     On 09/07/2018 17:20, Vojtech Janota wrote:
>     > Hi Reuven,
>     >
>     > I'm not really complaining about DirectRunner. In fact it seems to
>     me as
>     > if what previously was considered as part of the "expensive extra
>     > checks" done by the DirectRunner is now done within the
>     > beam-runners-core-java library. Considering that all objects involved
>     > are immutable (in our case at least) and simple assignment is
>     > sufficient, the serialization-deserialization really seems as unwanted
>     > and hugely expensive correctness check. If there was a problem with
>     > identity copy, wasn't DirectRunner supposed to reveal it? 
>     >
>     > Regards,
>     > Vojta
>     >
>     > On Mon, Jul 9, 2018 at 4:46 PM, Reuven Lax <relax@xxxxxxxxxx
>     <mailto:relax@xxxxxxxxxx>
>     > <mailto:relax@xxxxxxxxxx <mailto:relax@xxxxxxxxxx>>> wrote:
>     >
>     >     Hi Vojita,
>     >
>     >     One problem is that the DirectRunner is designed for testing, not
>     >     for performance. The DirectRunner currently does many
>     >     purposely-inefficient things, the point of which is to better
>     expose
>     >     potential bugs in tests. For example, the DirectRunner will
>     randomly
>     >     shuffle the order of PCollections to ensure that your code
>     does not
>     >     rely on ordering.  All of this adds cost, because the current
>     runner
>     >     is designed for testing. There have been requests in the past
>     for an
>     >     "optimized" local runner, however we don't currently have such
>     a thing.
>     >
>     >     In this case, using coders to clone values is more correct. In a
>     >     distributed environment using encode/decode is the only way to
>     copy
>     >     values, and the DirectRunner is trying to ensure that your code is
>     >     correct in a distributed environment.
>     >
>     >     Reuven
>     >
>     >     On Mon, Jul 9, 2018 at 7:22 AM Vojtech Janota
>     >     <vojta.janota@xxxxxxxxx <mailto:vojta.janota@xxxxxxxxx>
>     <mailto:vojta.janota@xxxxxxxxx <mailto:vojta.janota@xxxxxxxxx>>> wrote:
>     >
>     >         Hi,
>     >
>     >         We are using Apache Beam in our project for some time now.
>     Since
>     >         our datasets are of modest size, we have so far used
>     >         DirectRunner as the computation easily fits onto a single
>     >         machine. Recently we upgraded Beam from 2.2 to 2.4 and
>     found out
>     >         that performance of our pipelines drastically deteriorated.
>     >         Pipelines that took ~3 minutes with 2.2 do not finish within
>     >         hours now. We tried to isolate the change that causes the
>     >         slowdown and came to the commits into the
>     >         "InMemoryStateInternals" class:
>     >
>     >         * https://github.com/apache/beam/commit/32a427c
>     >         <https://github.com/apache/beam/commit/32a427c>
>     >         * https://github.com/apache/beam/commit/8151d82
>     >         <https://github.com/apache/beam/commit/8151d82>
>     >
>     >         In a nutshell where previously the copy() method simply
>     assigned:
>     >
>     >           that.value = this.value
>     >
>     >         There is now coder encode/decode combo hidden behind:
>     >
>     >           that.value = uncheckedClone(coder, this.value)
>     >
>     >         Can somebody explain the purpose of this change? Is it
>     meant as
>     >         an additional "enforcement" point, similar to DirectRunner's
>     >         enforceImmutability and enforceEncodability? Or is it
>     something
>     >         that is genuinely needed to provide correct behaviour of the
>     >         pipeline?
>     >
>     >         Any hints or thoughts are appreciated.
>     >
>     >         Regards,
>     >         Vojta
>     >
>     >          
>     >
>     >
>     >
>     >
>
>     --
>     Jean-Baptiste Onofré
>     jbonofre@xxxxxxxxxx <mailto:jbonofre@xxxxxxxxxx>
>     http://blog.nanthrax.net
>     Talend - http://www.talend.com
>

--
Jean-Baptiste Onofré
jbonofre@xxxxxxxxxx
http://blog.nanthrax.net
Talend - http://www.talend.com