osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Flink cluster crashing going from 1.4.0 -> 1.5.3


I believe the only thing you can do is disable latency tracking, by setting the `latencyTrackingInterval` in `env.getExecutionConfig()` to 0 or a negative value.

The update frequency is not configurable and currently set to 10 seconds.

Latency metrics are tracked as the cross-product of all subtasks of all operators and all subtasks of all sources. That is, if you have 2 sources, with 2 other operators and a parallelism of 10 you can end up with 400 latency metrics. 10 (subtasks per source) * 10 (subtasks per operator) * 2 (# operators) * 2 (#-sources)

On 24.08.2018 11:28, Jozef Vilcek wrote:
For my small job, I see ~24k those latency metrics @ '/jobs/.../metrics'. That job is much smaller in terms of production parallelism.

Are there any options here. Can it be turned off, reduced histogram metrics, reduced update frequency, ... ?
Also, keeping it flat seems to use quite some memory of JM
{"id":"latency.source_id.2f6436c1f4f0c70e401663acf945a822.source_subtask_index.2.operator_id.4060d9664a78e1d82671ac80921843cd.operator_subtask_index.1.latency_stddev"}

On Fri, Aug 24, 2018 at 10:08 AM Chesnay Schepler <chesnay@xxxxxxxxxx <mailto:chesnay@xxxxxxxxxx>> wrote:

    In 1.5 the latency metric was changed to be reported on the job-level,
    that's why you see it under /jobs/.../metrics now, but not in 1.4.
    In 1.4 you would see something similar under
    /jobs/.../vertices/.../metrics, for each vertex.

    Additionally it is now a proper histogram, which significantly
    increases
    the number of accesses to the ConcurrentHashMaps that store
    metrics fort
    he UI. It could be that this code is just too slow for the amount of
    metrics.

    On 23.08.2018 19:06, Jozef Vilcek wrote:
    > parallelism is 100.  I tried clusters with 1 and 2 slots per TM
    yielding
    > 100 or 50 TMs in cluster.
    >
    > I did notice that URL
    http://jobmanager:port/jobs/job_id/metrics  in 1.5.x
    > returns huge list of "latency.source_id. ...." IDs. Heap dump
    shows that
    > hash map takes 1.6GB for me. I am guessing that is the one
    dispatcher
    > threads keep updating. Not sure what are those. In 1.4.0 that
    URL returns
    > something else, very short list.
    >
    > On Thu, Aug 23, 2018 at 6:44 PM Piotr Nowojski
    <piotr@xxxxxxxxxxxxxxxxx <mailto:piotr@xxxxxxxxxxxxxxxxx>>
    > wrote:
    >
    >> Hi,
    >>
    >> How many task slots do you have in the cluster and per machine,
    and what
    >> parallelism are you using?
    >>
    >> Piotrek
    >>
    >>> On 23 Aug 2018, at 16:21, Jozef Vilcek <jozo.vilcek@xxxxxxxxx
    <mailto:jozo.vilcek@xxxxxxxxx>> wrote:
    >>>
    >>> Yes, on smaller data and therefore smaller resources and
    parallelism
    >>> exactly same job runs fine
    >>>
    >>> On Thu, Aug 23, 2018, 16:11 Aljoscha Krettek
    <aljoscha@xxxxxxxxxx <mailto:aljoscha@xxxxxxxxxx>>
    >> wrote:
    >>>> Hi,
    >>>>
    >>>> So with Flink 1.5.3 but a smaller parallelism the job works fine?
    >>>>
    >>>> Best,
    >>>> Aljoscha
    >>>>
    >>>>> On 23. Aug 2018, at 15:25, Jozef Vilcek
    <jozo.vilcek@xxxxxxxxx <mailto:jozo.vilcek@xxxxxxxxx>> wrote:
    >>>>>
    >>>>> Hello,
    >>>>>
    >>>>> I am trying to get my Beam application (run on newer version
    of Flink
    >>>>> (1.5.3) but having trouble with that. When I submit application,
    >>>> everything
    >>>>> works fine but after a few mins (as soon as 2 minutes after
    job start)
    >>>>> cluster just goes bad. Logs are full of timeouts for heartbeats,
    >>>> JobManager
    >>>>> lost leadership, TaskExecutor timed out etc.
    >>>>>
    >>>>> At that time, also WebUI is not usable. Looking into job
    manager, I did
    >>>>> notice that all of "flink-akka.actor.default-dispatcher"
    threads are
    >> busy
    >>>>> or blocked. Most blocks are on metrics:
    >>>>>
    >>>>> =======================================
    >>>>> java.lang.Thread.State: BLOCKED (on object monitor)
    >>>>>        at
    >>>>>
    >>
    org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:84)
    >>>>>        - waiting to lock <0x000000053df75510> (a
    >>>>>
    org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)
    >>>>>        at
    >>>>>
    >>
    org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher.lambda$queryMetrics$5(MetricFetcher.java:205)
    >>>>>        at
    >>>>>
    >>
    org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher$$Lambda$201/995076607.accept(Unknown
    >>>>> Source)
    >>>>>        at
    >>>>>
    >>
    java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    >>>>>        ...
    >>>>> =======================================
    >>>>>
    >>>>> I tried to increase memory, as MetricStore seems to hold
    quite a lot
    >>>> stuff,
    >>>>> but it is not helping. On 1.4.0 job manager was running with
    4GB heap,
    >>>> now,
    >>>>> this behaviour also occur with 10G.
    >>>>>
    >>>>> Any suggestions?
    >>>>>
    >>>>> Best,
    >>>>> Jozef
    >>>>>
    >>>>> P.S.: Executed Beam app has problem in setup with 100
    parallelism, 100
    >>>> task
    >>>>> slots, 2100 running task, streaming mode. Smaller job runs
    without
    >>>> problem
    >>>>
    >>>>
    >>