OSDir

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Question about the behavior of TM when it lost the zookeeper client session in HA mode


Hi all,

After I changed the `high-availability.zookeeper.client.session-timeout` and `maxSessionTimeout` to 120000ms, the exception still occurred.

Here is the log snippet. It seems this is nothing to do with zookeeper client timeout, but I still don't know why kafka producer would be closed without any task state changed.

```
2018-05-14 05:18:53,468 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client session timed out, have not heard from server in 82828ms for sessionid 0x305f957eb8d000a
2018-05-14 05:18:53,468 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client session timed out, have not heard from server in 82828ms for sessionid 0x305f957eb8d000a, closing socket connection and attempting reconnect
2018-05-14 05:18:53,571 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: SUSPENDED
2018-05-14 05:18:53,574 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2018-05-14 05:18:53,850 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/mnt/jaas-466390940757021791.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it.
2018-05-14 05:18:53,850 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server XXX.XXX.XXX.XXX:2181
2018-05-14 05:18:53,852 ERROR org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - Authentication failed
2018-05-14 05:18:53,853 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to XXX.XXX.XXX.XXX:2181, initiating session
2018-05-14 05:18:53,859 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server XXX.XXX.XXX.XXX:2181, sessionid = 0x305f957eb8d000a, negotiated timeout = 120000
2018-05-14 05:18:53,860 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: RECONNECTED
2018-05-14 05:18:53,860 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
2018-05-14 05:28:54,781 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-05-14 05:28:54,829 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-05-14 05:28:54,918 INFO  org.apache.flink.runtime.taskmanager.Task                     - match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd -> Sink: kafka-sink-cd) (1/32) (e3462ff8bb565bb0cf4de49ffc2595fb) switched from RUNNING to FAILED.
java.lang.Exception: Failed to send data to Kafka: The server disconnected before a response was received.
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:288)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:464)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:441)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:415)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:464)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:441)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:415)
at org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.collect(CopyingDirectedOutput.java:62)
at org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.collect(CopyingDirectedOutput.java:34)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$4.apply(MatchRuleOperator.scala:39)
at com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$4.apply(MatchRuleOperator.scala:38)
at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.immutable.Map$Map2.foreach(Map.scala:137)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
at com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRuleOperator.scala:38)
at com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRuleOperator.scala:14)
at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement1(CoStreamFlatMap.java:53)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:243)
at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
```

Best Regards,
Tony Wei

2018-05-14 11:36 GMT+08:00 Tony Wei <tony19920430@xxxxxxxxx>:
Hi all,

Recently, my flink job met a problem that caused the job failed and restarted.

The log is list this screen snapshot



or this

```
2018-05-11 13:21:04,582 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client session timed out, have not heard from server in 61054ms for sessionid 0x3054b165fe2006a
2018-05-11 13:21:04,583 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client session timed out, have not heard from server in 61054ms for sessionid 0x3054b165fe2006a, closing socket connection and attempting reconnect
2018-05-11 13:21:04,683 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: SUSPENDED
2018-05-11 13:21:04,686 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2018-05-11 13:21:04,689 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-05-11 13:21:04,694 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-05-11 13:21:04,698 INFO  org.apache.flink.runtime.taskmanager.Task                     - match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd -> Sink: kafka-sink-cd) (4/32) (65a4044ac963e083f2635fe24e7f2403) switched from RUNNING to FAILED.
java.lang.Exception: Failed to send data to Kafka: The server disconnected before a response was received.
```

Logs showed `org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.` This timeout value is Long.MAX_VALUE. It happened when someone called `producer.close()`.

And I also saw the log said `org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client session timed out, have not heard from server in 61054ms for sessionid 0x3054b165fe2006a, closing socket connection and attempting reconnect`
and `org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.`

I have checked zookeeper and kafka and there was no error during that period.
I was wondering if TM will stop the tasks when it lost zookeeper client in HA mode. Since I didn't see any document or mailing thread discuss this, I'm not sure if this is the reason that made kafka producer closed.
Could someone who know HA well? Or someone know what happened in my job?

My flink cluster version is 1.4.0 with 2 masters and 10 slaves. My zookeeper cluster version is 3.4.11 with 3 nodes.
The `high-availability.zookeeper.client.session-timeout` is default value: 60000 ms.
The `maxSessionTimeout` in zoo.cfg is 40000ms.
I have already change the maxSessionTimeout to 120000ms this morning.

This problem happened many many times during the last weekend and made my kafka log delay grew up. Please help me. Thank you very much!

Best Regards,
Tony Wei