osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Connection leak with flink elastic Sink


Hi Gordon,
We are using flink cluster 1.6.1, elastic search connector version: flink-connector-elasticsearch6_2.11
Attached the stack trace. 

Following are the max open file descriptor limit of theTask manager  process and open connections to the elastic
search cluster

Regards
Bhaskar
#lsof -p 62041 | wc -l

65583

All the connections to elastic cluster reached to:

netstat -aln | grep 9200 | wc -l

2333




On Thu, Dec 13, 2018 at 4:12 PM Tzu-Li (Gordon) Tai <tzulitai@xxxxxxxxxx> wrote:
Hi,

Besides the information that Chesnay requested, could you also provide a stack trace of the exception that caused the job to terminate in the first place?

The Elasticsearch sink does indeed close the internally used Elasticsearch client, which should in turn properly release all resources [1].
I would like to double check whether or not the case here is that that part of the code was never reached.

Cheers,
Gordon


On 13 December 2018 at 5:59:34 PM, Chesnay Schepler (chesnay@xxxxxxxxxx) wrote:

Specifically which connector are you using, and which Flink version?

On 12.12.2018 13:31, Vijay Bhaskar wrote:
> Hi
> We are using flink elastic sink which streams at the rate of 1000
> events/sec, as described in
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html.
> We are observing connection leak of elastic connections. After few
> minutes all the open connections are exceeding the process limits of
> the max open descriptors and Job is getting terminated. But the http
> connections with the elastic search server remain open forever. Am i
> missing any specific configuration setting to close the open
> connection, after serving the request?
> But there is no such setting is described in the above documentation
> of elastic sink
>
> Regards
> Bhaskar


2018-12-13 06:14:42,120 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2018-12-13 06:14:42,122 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Starting StandaloneSessionClusterEntrypoint (Version: 1.6.1, Rev:23e2636, Date:14.09.2018 @ 19:56:46 UTC)
2018-12-13 06:14:42,122 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  OS current user: root
2018-12-13 06:14:42,122 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Current Hadoop/Kerberos user: <no hadoop dependency found>
2018-12-13 06:14:42,122 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.181-b13
2018-12-13 06:14:42,122 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Maximum heap size: 981 MiBytes
2018-12-13 06:14:42,123 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JAVA_HOME: (not set)
2018-12-13 06:14:42,123 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  No Hadoop Dependency available
2018-12-13 06:14:42,123 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM Options:
2018-12-13 06:14:42,123 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xms1024m
2018-12-13 06:14:42,123 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xmx1024m
2018-12-13 06:14:42,123 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog.file=/root/flink-1.6.1/log/flink-root-standalonesession-0-contrail-eng-raisa-cloudsecure-eng-raisa-flink.log
2018-12-13 06:14:42,123 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog4j.configuration=file:/root/flink-1.6.1/conf/log4j.properties
2018-12-13 06:14:42,124 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlogback.configurationFile=file:/root/flink-1.6.1/conf/logback.xml
2018-12-13 06:14:42,124 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Program Arguments:
2018-12-13 06:14:42,124 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --configDir
2018-12-13 06:14:42,124 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     /root/flink-1.6.1/conf
2018-12-13 06:14:42,124 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --executionMode
2018-12-13 06:14:42,124 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     cluster
2018-12-13 06:14:42,124 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Classpath: /root/flink-1.6.1/lib/flink-python_2.11-1.6.1.jar:/root/flink-1.6.1/lib/log4j-1.2.17.jar:/root/flink-1.6.1/lib/log4j-api-2.9.1.jar:/root/flink-1.6.1/lib/slf4j-log4j12-1.7.7.jar:/root/flink-1.6.1/lib/flink-dist_2.11-1.6.1.jar:::
2018-12-13 06:14:42,124 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2018-12-13 06:14:42,126 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Registered UNIX signal handlers for [TERM, HUP, INT]
2018-12-13 06:14:42,141 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, contrail-eng-raisa-cloudsecure-eng-raisa-flink
2018-12-13 06:14:42,141 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2018-12-13 06:14:42,141 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2018-12-13 06:14:42,142 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 8192m
2018-12-13 06:14:42,142 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 20
2018-12-13 06:14:42,142 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2018-12-13 06:14:42,143 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 10000
2018-12-13 06:14:42,143 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.server.max-content-length, 209715200
2018-12-13 06:14:42,220 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Starting StandaloneSessionClusterEntrypoint.
2018-12-13 06:14:42,220 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install default filesystem.
2018-12-13 06:14:42,226 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2018-12-13 06:14:42,238 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install security context.
2018-12-13 06:14:42,247 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2018-12-13 06:14:42,269 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
2018-12-13 06:14:42,270 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Initializing cluster services.
2018-12-13 06:14:42,293 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Trying to start actor system at contrail-eng-raisa-cloudsecure-eng-raisa-flink:6123
2018-12-13 06:14:42,903 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
2018-12-13 06:14:42,964 INFO  akka.remote.Remoting                                          - Starting remoting
2018-12-13 06:14:43,126 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[akka.tcp://flink@contrail-eng-raisa-cloudsecure-eng-raisa-flink:6123]
2018-12-13 06:14:43,133 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Actor system started at akka.tcp://flink@contrail-eng-raisa-cloudsecure-eng-raisa-flink:6123
2018-12-13 06:14:43,149 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'jobmanager.rpc.address' instead of proper key 'rest.address'
2018-12-13 06:14:43,157 INFO  org.apache.flink.runtime.blob.BlobServer                      - Created BLOB server storage directory /tmp/blobStore-391ff1d3-92a9-4fee-bc05-528b89981d77
2018-12-13 06:14:43,161 INFO  org.apache.flink.runtime.blob.BlobServer                      - Started BLOB server at 0.0.0.0:37997 - max concurrent requests: 50 - max backlog: 1000
2018-12-13 06:14:43,178 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl           - No metrics reporter configured, no metrics will be exposed/reported.
2018-12-13 06:14:43,183 INFO  org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore  - Initializing FileArchivedExecutionGraphStore: Storage directory /tmp/executionGraphStore-0198f848-97b1-40b4-9579-8553f49ed785, expiration time 3600000, maximum cache size 52428800 bytes.
2018-12-13 06:14:43,283 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Created BLOB cache storage directory /tmp/blobStore-a1c092f4-e67f-4a62-857a-5098615bd006
2018-12-13 06:14:43,291 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'jobmanager.rpc.address' instead of proper key 'rest.address'
2018-12-13 06:14:43,292 WARN  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Upload directory /tmp/flink-web-049db008-0157-4bd2-9af2-ab6dfa9bbd22/flink-web-upload does not exist, or has been deleted externally. Previously uploaded files are no longer available.
2018-12-13 06:14:43,293 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Created directory /tmp/flink-web-049db008-0157-4bd2-9af2-ab6dfa9bbd22/flink-web-upload for file uploads.
2018-12-13 06:14:43,306 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Starting rest endpoint.
2018-12-13 06:14:43,548 INFO  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Determined location of main cluster component log file: /root/flink-1.6.1/log/flink-root-standalonesession-0-contrail-eng-raisa-cloudsecure-eng-raisa-flink.log
2018-12-13 06:14:43,548 INFO  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Determined location of main cluster component stdout file: /root/flink-1.6.1/log/flink-root-standalonesession-0-contrail-eng-raisa-cloudsecure-eng-raisa-flink.out
2018-12-13 06:14:43,691 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Rest endpoint listening at contrail-eng-raisa-cloudsecure-eng-raisa-flink:10000
2018-12-13 06:14:43,691 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - http://contrail-eng-raisa-cloudsecure-eng-raisa-flink:10000 was granted leadership with leaderSessionID=00000000-0000-0000-0000-000000000000
2018-12-13 06:14:43,691 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Web frontend listening at http://contrail-eng-raisa-cloudsecure-eng-raisa-flink:10000.
2018-12-13 06:14:43,706 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
2018-12-13 06:14:43,821 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher .
2018-12-13 06:14:43,864 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - ResourceManager akka.tcp://flink@contrail-eng-raisa-cloudsecure-eng-raisa-flink:6123/user/resourcemanager was granted leadership with fencing token 00000000000000000000000000000000
2018-12-13 06:14:43,865 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Starting the SlotManager.
2018-12-13 06:14:43,881 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Dispatcher akka.tcp://flink@contrail-eng-raisa-cloudsecure-eng-raisa-flink:6123/user/dispatcher was granted leadership with fencing token 00000000-0000-0000-0000-000000000000
2018-12-13 06:14:43,883 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Recovering all persisted jobs.
2018-12-13 06:14:45,132 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering TaskManager with ResourceID dd42505b766f2f2bec54b01d4ec0da68 (akka.tcp://flink@contrail-eng-raisa-cloudsecure-eng-raisa-flink1:42215/user/taskmanager_0) at ResourceManager
2018-12-13 06:14:45,171 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Registering TaskManager dd42505b766f2f2bec54b01d4ec0da68 under 070da7ce60bf3b7b27fab6613502df08 at the SlotManager.
2018-12-13 06:14:45,318 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering TaskManager with ResourceID 782993c5b0de4c91de664828db92cd07 (akka.tcp://flink@contrail-eng-raisa-cloudsecure-eng-raisa-flink:43675/user/taskmanager_0) at ResourceManager
2018-12-13 06:14:45,345 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Registering TaskManager 782993c5b0de4c91de664828db92cd07 under d226e2b9aa60e8ce77d62ef9e1035b79 at the SlotManager.
2018-12-13 06:14:45,631 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering TaskManager with ResourceID 5e989a835acf03cda15267557da2d97d (akka.tcp://flink@contrail-eng-raisa-cloudsecure-eng-raisa-flink2:44768/user/taskmanager_0) at ResourceManager
2018-12-13 06:14:45,653 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Registering TaskManager 5e989a835acf03cda15267557da2d97d under d47e24549a6e98db24b10ee313c72fd2 at the SlotManager.
2018-12-13 06:15:03,966 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job 4f4693ec97180650392129f4906ac488 (events-sink).
2018-12-13 06:15:04,039 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_0 .
2018-12-13 06:15:04,047 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Initializing job events-sink (4f4693ec97180650392129f4906ac488).
2018-12-13 06:15:04,056 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=5, delayBetweenRestartAttempts=15000) for events-sink (4f4693ec97180650392129f4906ac488).
2018-12-13 06:15:04,061 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.slotpool.SlotPool at akka://flink/user/b5031fa7-0a93-4192-9967-9283786ab6e4 .
2018-12-13 06:15:04,088 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job recovers via failover strategy: full graph restart
2018-12-13 06:15:04,129 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Running initialization on master for job events-sink (4f4693ec97180650392129f4906ac488).
2018-12-13 06:15:04,129 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Successfully ran initialization on master in 0 ms.
2018-12-13 06:15:04,151 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
2018-12-13 06:15:04,161 INFO  org.apache.flink.runtime.jobmaster.JobManagerRunner           - JobManager runner for job events-sink (4f4693ec97180650392129f4906ac488) was granted leadership with session id 00000000-0000-0000-0000-000000000000 at akka.tcp://flink@contrail-eng-raisa-cloudsecure-eng-raisa-flink:6123/user/jobmanager_0.
2018-12-13 06:15:04,164 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Starting execution of job events-sink (4f4693ec97180650392129f4906ac488)
2018-12-13 06:15:04,166 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state CREATED to RUNNING.
2018-12-13 06:15:04,170 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (210bb0ece43b25acc2cdcbdc0db14a2f) switched from CREATED to SCHEDULED.
2018-12-13 06:15:04,174 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (d8f250d8661964f1a2733f3dd5ff64c0) switched from CREATED to SCHEDULED.
2018-12-13 06:15:04,178 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Connecting to ResourceManager akka.tcp://flink@contrail-eng-raisa-cloudsecure-eng-raisa-flink:6123/user/resourcemanager(00000000000000000000000000000000)
2018-12-13 06:15:04,184 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Resolved ResourceManager address, beginning registration
2018-12-13 06:15:04,184 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Registration at ResourceManager attempt 1 (timeout=100ms)
2018-12-13 06:15:04,189 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering job manager 00000000000000000000000000000000@xxxxxxxx://flink@contrail-eng-raisa-cloudsecure-eng-raisa-flink:6123/user/jobmanager_0 for job 4f4693ec97180650392129f4906ac488.
2018-12-13 06:15:04,191 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{37d5fe09fa8db3212fb03572e7ff8b6c}]
2018-12-13 06:15:04,198 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Registered job manager 00000000000000000000000000000000@xxxxxxxx://flink@contrail-eng-raisa-cloudsecure-eng-raisa-flink:6123/user/jobmanager_0 for job 4f4693ec97180650392129f4906ac488.
2018-12-13 06:15:04,200 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000.
2018-12-13 06:15:04,201 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Requesting new slot [SlotRequestId{37d5fe09fa8db3212fb03572e7ff8b6c}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
2018-12-13 06:15:04,203 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 4f4693ec97180650392129f4906ac488 with allocation id AllocationID{edcebade0296856844d6d7389fe739e1}.
2018-12-13 06:15:04,316 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (210bb0ece43b25acc2cdcbdc0db14a2f) switched from SCHEDULED to DEPLOYING.
2018-12-13 06:15:04,317 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Source: Custom Source (1/1) (attempt #0) to contrail-eng-raisa-cloudsecure-eng-raisa-flink1
2018-12-13 06:15:04,331 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (d8f250d8661964f1a2733f3dd5ff64c0) switched from SCHEDULED to DEPLOYING.
2018-12-13 06:15:04,331 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Map -> Sink: Unnamed (1/1) (attempt #0) to contrail-eng-raisa-cloudsecure-eng-raisa-flink1
2018-12-13 06:15:05,529 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (d8f250d8661964f1a2733f3dd5ff64c0) switched from DEPLOYING to RUNNING.
2018-12-13 06:15:05,781 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (210bb0ece43b25acc2cdcbdc0db14a2f) switched from DEPLOYING to RUNNING.
2018-12-13 06:17:08,364 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - The heartbeat of TaskManager with id dd42505b766f2f2bec54b01d4ec0da68 timed out.
2018-12-13 06:17:08,364 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Closing TaskExecutor connection dd42505b766f2f2bec54b01d4ec0da68 because: The heartbeat of TaskManager with id dd42505b766f2f2bec54b01d4ec0da68  timed out.
2018-12-13 06:17:08,364 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Unregister TaskManager 070da7ce60bf3b7b27fab6613502df08 from the SlotManager.
2018-12-13 06:17:08,364 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (d8f250d8661964f1a2733f3dd5ff64c0) switched from RUNNING to FAILED.
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id dd42505b766f2f2bec54b01d4ec0da68 timed out.
	at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1609)
	at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2018-12-13 06:17:08,373 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state RUNNING to FAILING.
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id dd42505b766f2f2bec54b01d4ec0da68 timed out.
	at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1609)
	at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2018-12-13 06:17:08,380 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (210bb0ece43b25acc2cdcbdc0db14a2f) switched from RUNNING to CANCELING.
2018-12-13 06:17:08,386 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (210bb0ece43b25acc2cdcbdc0db14a2f) switched from CANCELING to CANCELED.
2018-12-13 06:17:08,386 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Try to restart or fail the job events-sink (4f4693ec97180650392129f4906ac488) if no longer possible.
2018-12-13 06:17:08,386 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state FAILING to RESTARTING.
2018-12-13 06:17:08,387 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Restarting the job events-sink (4f4693ec97180650392129f4906ac488).
2018-12-13 06:17:23,388 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state RESTARTING to CREATED.
2018-12-13 06:17:23,389 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state CREATED to RUNNING.
2018-12-13 06:17:23,389 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (6dc1d23b1d3885238183406283742cc2) switched from CREATED to SCHEDULED.
2018-12-13 06:17:23,389 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (2b7f87ccd486c14629367f306ce6fade) switched from CREATED to SCHEDULED.
2018-12-13 06:17:23,390 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Requesting new slot [SlotRequestId{2eaebe14b705b6ef18723512329cb7d1}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
2018-12-13 06:17:23,390 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 4f4693ec97180650392129f4906ac488 with allocation id AllocationID{3c8333cc905681db3b5470b550798b69}.
2018-12-13 06:17:23,459 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (6dc1d23b1d3885238183406283742cc2) switched from SCHEDULED to DEPLOYING.
2018-12-13 06:17:23,459 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Source: Custom Source (1/1) (attempt #1) to contrail-eng-raisa-cloudsecure-eng-raisa-flink
2018-12-13 06:17:23,461 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (2b7f87ccd486c14629367f306ce6fade) switched from SCHEDULED to DEPLOYING.
2018-12-13 06:17:23,461 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Map -> Sink: Unnamed (1/1) (attempt #1) to contrail-eng-raisa-cloudsecure-eng-raisa-flink
2018-12-13 06:17:24,521 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (2b7f87ccd486c14629367f306ce6fade) switched from DEPLOYING to RUNNING.
2018-12-13 06:17:24,833 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (6dc1d23b1d3885238183406283742cc2) switched from DEPLOYING to RUNNING.
2018-12-13 06:18:57,397 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (2b7f87ccd486c14629367f306ce6fade) switched from RUNNING to FAILED.
java.util.concurrent.TimeoutException: Futures timed out after [60 seconds]
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
	at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
	at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
	at scala.concurrent.Await$.result(package.scala:190)
	at com.sksamuel.elastic4s.ElasticApi$RichFuture.await(ElasticApi.scala:87)
	at com.netapp.df.uba.server.common.elastic.ElasticUtils$.searchAllHits(ElasticUtils.scala:324)
	at com.netapp.df.uba.server.sink.events.DbModelWrapper$$anon$1.getOlderEntityRecords(DbModelWrapper.scala:200)
	at com.netapp.df.uba.server.sink.events.DbModelWrapper$$anon$1.map(DbModelWrapper.scala:43)
	at com.netapp.df.uba.server.sink.events.DbModelWrapper$$anon$1.map(DbModelWrapper.scala:28)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
2018-12-13 06:18:57,397 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state RUNNING to FAILING.
java.util.concurrent.TimeoutException: Futures timed out after [60 seconds]
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
	at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
	at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
	at scala.concurrent.Await$.result(package.scala:190)
	at com.sksamuel.elastic4s.ElasticApi$RichFuture.await(ElasticApi.scala:87)
	at com.netapp.df.uba.server.common.elastic.ElasticUtils$.searchAllHits(ElasticUtils.scala:324)
	at com.netapp.df.uba.server.sink.events.DbModelWrapper$$anon$1.getOlderEntityRecords(DbModelWrapper.scala:200)
	at com.netapp.df.uba.server.sink.events.DbModelWrapper$$anon$1.map(DbModelWrapper.scala:43)
	at com.netapp.df.uba.server.sink.events.DbModelWrapper$$anon$1.map(DbModelWrapper.scala:28)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
2018-12-13 06:18:57,398 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (6dc1d23b1d3885238183406283742cc2) switched from RUNNING to CANCELING.
2018-12-13 06:18:57,629 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (6dc1d23b1d3885238183406283742cc2) switched from CANCELING to CANCELED.
2018-12-13 06:18:57,630 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Try to restart or fail the job events-sink (4f4693ec97180650392129f4906ac488) if no longer possible.
2018-12-13 06:18:57,630 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state FAILING to RESTARTING.
2018-12-13 06:18:57,630 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Restarting the job events-sink (4f4693ec97180650392129f4906ac488).
2018-12-13 06:19:12,631 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state RESTARTING to CREATED.
2018-12-13 06:19:12,631 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state CREATED to RUNNING.
2018-12-13 06:19:12,631 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (a94b305bead6a91148f33f574e87ce8e) switched from CREATED to SCHEDULED.
2018-12-13 06:19:12,632 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (5131d9f223d6fadd97059bf6e9f2dd45) switched from CREATED to SCHEDULED.
2018-12-13 06:19:12,636 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (a94b305bead6a91148f33f574e87ce8e) switched from SCHEDULED to DEPLOYING.
2018-12-13 06:19:12,636 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Source: Custom Source (1/1) (attempt #2) to contrail-eng-raisa-cloudsecure-eng-raisa-flink
2018-12-13 06:19:12,637 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (5131d9f223d6fadd97059bf6e9f2dd45) switched from SCHEDULED to DEPLOYING.
2018-12-13 06:19:12,637 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Map -> Sink: Unnamed (1/1) (attempt #2) to contrail-eng-raisa-cloudsecure-eng-raisa-flink
2018-12-13 06:19:13,294 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (5131d9f223d6fadd97059bf6e9f2dd45) switched from DEPLOYING to RUNNING.
2018-12-13 06:19:13,602 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (a94b305bead6a91148f33f574e87ce8e) switched from DEPLOYING to RUNNING.
2018-12-13 06:19:16,053 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (a94b305bead6a91148f33f574e87ce8e) switched from RUNNING to FAILED.
java.lang.UnsatisfiedLinkError: no snappyjava in java.library.path
	at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
	at java.lang.Runtime.loadLibrary0(Runtime.java:870)
	at java.lang.System.loadLibrary(System.java:1122)
	at org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:182)
	at org.xerial.snappy.SnappyLoader.loadSnappyApi(SnappyLoader.java:154)
	at org.xerial.snappy.Snappy.<clinit>(Snappy.java:47)
	at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:435)
	at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:466)
	at java.io.DataInputStream.readByte(DataInputStream.java:265)
	at org.apache.kafka.common.utils.ByteUtils.readVarint(ByteUtils.java:168)
	at org.apache.kafka.common.record.DefaultRecord.readFrom(DefaultRecord.java:292)
	at org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(DefaultRecordBatch.java:264)
	at org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:563)
	at org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:532)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:1104)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1139)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:527)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1185)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
2018-12-13 06:19:16,053 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state RUNNING to FAILING.
java.lang.UnsatisfiedLinkError: no snappyjava in java.library.path
	at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
	at java.lang.Runtime.loadLibrary0(Runtime.java:870)
	at java.lang.System.loadLibrary(System.java:1122)
	at org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:182)
	at org.xerial.snappy.SnappyLoader.loadSnappyApi(SnappyLoader.java:154)
	at org.xerial.snappy.Snappy.<clinit>(Snappy.java:47)
	at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:435)
	at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:466)
	at java.io.DataInputStream.readByte(DataInputStream.java:265)
	at org.apache.kafka.common.utils.ByteUtils.readVarint(ByteUtils.java:168)
	at org.apache.kafka.common.record.DefaultRecord.readFrom(DefaultRecord.java:292)
	at org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(DefaultRecordBatch.java:264)
	at org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:563)
	at org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:532)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:1104)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1139)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:527)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1185)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
2018-12-13 06:19:16,055 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (5131d9f223d6fadd97059bf6e9f2dd45) switched from RUNNING to CANCELING.
2018-12-13 06:19:16,636 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (5131d9f223d6fadd97059bf6e9f2dd45) switched from CANCELING to CANCELED.
2018-12-13 06:19:16,637 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Try to restart or fail the job events-sink (4f4693ec97180650392129f4906ac488) if no longer possible.
2018-12-13 06:19:16,637 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state FAILING to RESTARTING.
2018-12-13 06:19:16,637 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Restarting the job events-sink (4f4693ec97180650392129f4906ac488).
2018-12-13 06:19:31,638 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state RESTARTING to CREATED.
2018-12-13 06:19:31,639 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state CREATED to RUNNING.
2018-12-13 06:19:31,639 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (08882408327a871bfcab506d43a3fc83) switched from CREATED to SCHEDULED.
2018-12-13 06:19:31,639 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (c18bcb280aeea33bb4b6a58b153a970a) switched from CREATED to SCHEDULED.
2018-12-13 06:19:31,640 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (08882408327a871bfcab506d43a3fc83) switched from SCHEDULED to DEPLOYING.
2018-12-13 06:19:31,641 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Source: Custom Source (1/1) (attempt #3) to contrail-eng-raisa-cloudsecure-eng-raisa-flink
2018-12-13 06:19:31,641 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (c18bcb280aeea33bb4b6a58b153a970a) switched from SCHEDULED to DEPLOYING.
2018-12-13 06:19:31,641 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Map -> Sink: Unnamed (1/1) (attempt #3) to contrail-eng-raisa-cloudsecure-eng-raisa-flink
2018-12-13 06:19:32,551 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (c18bcb280aeea33bb4b6a58b153a970a) switched from DEPLOYING to RUNNING.
2018-12-13 06:19:32,672 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (08882408327a871bfcab506d43a3fc83) switched from DEPLOYING to RUNNING.
2018-12-13 06:19:33,746 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (c18bcb280aeea33bb4b6a58b153a970a) switched from RUNNING to FAILED.
java.io.IOException: Too many open files
	at org.elasticsearch.client.RestClient$SyncResponseListener.get(RestClient.java:728)
	at org.elasticsearch.client.RestClient.performRequest(RestClient.java:235)
	at org.elasticsearch.client.RestClient.performRequest(RestClient.java:198)
	at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:522)
	at org.elasticsearch.client.RestHighLevelClient.ping(RestHighLevelClient.java:275)
	at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createClient(Elasticsearch6ApiCallBridge.java:79)
	at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createClient(Elasticsearch6ApiCallBridge.java:45)
	at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:296)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketException: Too many open files
	at sun.nio.ch.Net.socket0(Native Method)
	at sun.nio.ch.Net.socket(Net.java:411)
	at sun.nio.ch.Net.socket(Net.java:404)
	at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:105)
	at sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60)
	at java.nio.channels.SocketChannel.open(SocketChannel.java:145)
	at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processSessionRequests(DefaultConnectingIOReactor.java:256)
	at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:139)
	at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:348)
	at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:192)
	at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
	... 1 more
2018-12-13 06:19:33,746 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state RUNNING to FAILING.
java.io.IOException: Too many open files
	at org.elasticsearch.client.RestClient$SyncResponseListener.get(RestClient.java:728)
	at org.elasticsearch.client.RestClient.performRequest(RestClient.java:235)
	at org.elasticsearch.client.RestClient.performRequest(RestClient.java:198)
	at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:522)
	at org.elasticsearch.client.RestHighLevelClient.ping(RestHighLevelClient.java:275)
	at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createClient(Elasticsearch6ApiCallBridge.java:79)
	at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createClient(Elasticsearch6ApiCallBridge.java:45)
	at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:296)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketException: Too many open files
	at sun.nio.ch.Net.socket0(Native Method)
	at sun.nio.ch.Net.socket(Net.java:411)
	at sun.nio.ch.Net.socket(Net.java:404)
	at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:105)
	at sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60)
	at java.nio.channels.SocketChannel.open(SocketChannel.java:145)
	at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processSessionRequests(DefaultConnectingIOReactor.java:256)
	at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:139)
	at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:348)
	at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:192)
	at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
	... 1 more
2018-12-13 06:19:33,750 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (08882408327a871bfcab506d43a3fc83) switched from RUNNING to CANCELING.
2018-12-13 06:19:34,303 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (08882408327a871bfcab506d43a3fc83) switched from CANCELING to CANCELED.
2018-12-13 06:19:34,304 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Try to restart or fail the job events-sink (4f4693ec97180650392129f4906ac488) if no longer possible.
2018-12-13 06:19:34,304 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state FAILING to RESTARTING.
2018-12-13 06:19:34,304 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Restarting the job events-sink (4f4693ec97180650392129f4906ac488).
2018-12-13 06:19:49,305 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state RESTARTING to CREATED.
2018-12-13 06:19:49,306 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state CREATED to RUNNING.
2018-12-13 06:19:49,306 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (481a725a6c66e52323886f8b269d6e5e) switched from CREATED to SCHEDULED.
2018-12-13 06:19:49,306 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (bae42b6a5daa06fdc2d80d3aca4b006a) switched from CREATED to SCHEDULED.
2018-12-13 06:19:49,307 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (bae42b6a5daa06fdc2d80d3aca4b006a) switched from SCHEDULED to DEPLOYING.
2018-12-13 06:19:49,307 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Map -> Sink: Unnamed (1/1) (attempt #4) to contrail-eng-raisa-cloudsecure-eng-raisa-flink
2018-12-13 06:19:49,307 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (481a725a6c66e52323886f8b269d6e5e) switched from SCHEDULED to DEPLOYING.
2018-12-13 06:19:49,308 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Source: Custom Source (1/1) (attempt #4) to contrail-eng-raisa-cloudsecure-eng-raisa-flink
2018-12-13 06:19:49,424 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (bae42b6a5daa06fdc2d80d3aca4b006a) switched from DEPLOYING to RUNNING.
2018-12-13 06:19:50,215 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (481a725a6c66e52323886f8b269d6e5e) switched from DEPLOYING to RUNNING.
2018-12-13 06:19:51,389 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (bae42b6a5daa06fdc2d80d3aca4b006a) switched from RUNNING to FAILED.
java.lang.IllegalStateException: Request cannot be executed; I/O reactor status: STOPPED
	at org.apache.http.util.Asserts.check(Asserts.java:46)
	at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase.ensureRunning(CloseableHttpAsyncClientBase.java:90)
	at org.apache.http.impl.nio.client.InternalHttpAsyncClient.execute(InternalHttpAsyncClient.java:123)
	at org.elasticsearch.client.RestClient.performRequestAsync(RestClient.java:366)
	at org.elasticsearch.client.RestClient.performRequestAsyncNoCatch(RestClient.java:351)
	at org.elasticsearch.client.RestClient.performRequest(RestClient.java:233)
	at org.elasticsearch.client.RestClient.performRequest(RestClient.java:198)
	at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:522)
	at org.elasticsearch.client.RestHighLevelClient.ping(RestHighLevelClient.java:275)
	at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createClient(Elasticsearch6ApiCallBridge.java:79)
	at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createClient(Elasticsearch6ApiCallBridge.java:45)
	at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:296)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
2018-12-13 06:19:51,389 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state RUNNING to FAILING.
java.lang.IllegalStateException: Request cannot be executed; I/O reactor status: STOPPED
	at org.apache.http.util.Asserts.check(Asserts.java:46)
	at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase.ensureRunning(CloseableHttpAsyncClientBase.java:90)
	at org.apache.http.impl.nio.client.InternalHttpAsyncClient.execute(InternalHttpAsyncClient.java:123)
	at org.elasticsearch.client.RestClient.performRequestAsync(RestClient.java:366)
	at org.elasticsearch.client.RestClient.performRequestAsyncNoCatch(RestClient.java:351)
	at org.elasticsearch.client.RestClient.performRequest(RestClient.java:233)
	at org.elasticsearch.client.RestClient.performRequest(RestClient.java:198)
	at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:522)
	at org.elasticsearch.client.RestHighLevelClient.ping(RestHighLevelClient.java:275)
	at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createClient(Elasticsearch6ApiCallBridge.java:79)
	at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createClient(Elasticsearch6ApiCallBridge.java:45)
	at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:296)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
2018-12-13 06:19:51,390 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (481a725a6c66e52323886f8b269d6e5e) switched from RUNNING to CANCELING.
2018-12-13 06:19:51,605 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (481a725a6c66e52323886f8b269d6e5e) switched from CANCELING to CANCELED.
2018-12-13 06:19:51,605 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Try to restart or fail the job events-sink (4f4693ec97180650392129f4906ac488) if no longer possible.
2018-12-13 06:19:51,605 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state FAILING to RESTARTING.
2018-12-13 06:19:51,606 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Restarting the job events-sink (4f4693ec97180650392129f4906ac488).
2018-12-13 06:20:06,606 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state RESTARTING to CREATED.
2018-12-13 06:20:06,607 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state CREATED to RUNNING.
2018-12-13 06:20:06,607 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (316c4ebfe71c6cfab09ffa4d710ffdf5) switched from CREATED to SCHEDULED.
2018-12-13 06:20:06,607 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (d4ca7b9b70cbcf35bb129208f70a0fa7) switched from CREATED to SCHEDULED.
2018-12-13 06:20:06,608 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (316c4ebfe71c6cfab09ffa4d710ffdf5) switched from SCHEDULED to DEPLOYING.
2018-12-13 06:20:06,608 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Source: Custom Source (1/1) (attempt #5) to contrail-eng-raisa-cloudsecure-eng-raisa-flink
2018-12-13 06:20:06,608 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (d4ca7b9b70cbcf35bb129208f70a0fa7) switched from SCHEDULED to DEPLOYING.
2018-12-13 06:20:06,609 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Map -> Sink: Unnamed (1/1) (attempt #5) to contrail-eng-raisa-cloudsecure-eng-raisa-flink
2018-12-13 06:20:07,034 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (d4ca7b9b70cbcf35bb129208f70a0fa7) switched from DEPLOYING to RUNNING.
2018-12-13 06:20:08,100 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (316c4ebfe71c6cfab09ffa4d710ffdf5) switched from DEPLOYING to RUNNING.
2018-12-13 06:20:08,624 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (316c4ebfe71c6cfab09ffa4d710ffdf5) switched from RUNNING to FAILED.
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:793)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:644)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:58)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:469)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:154)
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:188)
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:192)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:718)
	... 11 more
Caused by: java.io.IOException: Too many open files
	at sun.nio.ch.IOUtil.makePipe(Native Method)
	at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
	at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
	at java.nio.channels.Selector.open(Selector.java:227)
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:152)
	... 14 more
2018-12-13 06:20:08,641 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state RUNNING to FAILING.
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:793)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:644)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:58)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:469)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:154)
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:188)
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:192)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:718)
	... 11 more
Caused by: java.io.IOException: Too many open files
	at sun.nio.ch.IOUtil.makePipe(Native Method)
	at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
	at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
	at java.nio.channels.Selector.open(Selector.java:227)
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:152)
	... 14 more
2018-12-13 06:20:08,652 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (d4ca7b9b70cbcf35bb129208f70a0fa7) switched from RUNNING to CANCELING.
2018-12-13 06:20:08,845 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Unnamed (1/1) (d4ca7b9b70cbcf35bb129208f70a0fa7) switched from CANCELING to CANCELED.
2018-12-13 06:20:08,846 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Try to restart or fail the job events-sink (4f4693ec97180650392129f4906ac488) if no longer possible.
2018-12-13 06:20:08,846 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job events-sink (4f4693ec97180650392129f4906ac488) switched from state FAILING to FAILED.
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:793)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:644)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:58)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:469)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:154)
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:188)
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:192)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:718)
	... 11 more
Caused by: java.io.IOException: Too many open files
	at sun.nio.ch.IOUtil.makePipe(Native Method)
	at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
	at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
	at java.nio.channels.Selector.open(Selector.java:227)
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:152)
	... 14 more
2018-12-13 06:20:08,847 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Could not restart the job events-sink (4f4693ec97180650392129f4906ac488) because the restart strategy prevented it.
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:793)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:644)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:58)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:469)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:154)
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:188)
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:192)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:718)
	... 11 more
Caused by: java.io.IOException: Too many open files
	at sun.nio.ch.IOUtil.makePipe(Native Method)
	at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
	at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
	at java.nio.channels.Selector.open(Selector.java:227)
	at org.apache.kafka.common.network.Selector.<init>(Selector.java:152)
	... 14 more
2018-12-13 06:20:08,847 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Stopping checkpoint coordinator for job 4f4693ec97180650392129f4906ac488.
2018-12-13 06:20:08,847 INFO  org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  - Shutting down
2018-12-13 06:20:08,849 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Job 4f4693ec97180650392129f4906ac488 reached globally terminal state FAILED.
2018-12-13 06:20:08,864 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Stopping the JobMaster for job events-sink(4f4693ec97180650392129f4906ac488).
2018-12-13 06:20:08,866 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Close ResourceManager connection ebeb4ec81e03741c92f338650164b310: JobManager is shutting down..
2018-12-13 06:20:08,866 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Disconnect job manager 00000000000000000000000000000000@xxxxxxxx://flink@contrail-eng-raisa-cloudsecure-eng-raisa-flink:6123/user/jobmanager_0 for job 4f4693ec97180650392129f4906ac488 from the resource manager.
2018-12-13 06:20:08,867 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Suspending SlotPool.
2018-12-13 06:20:08,867 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Stopping SlotPool.
2018-12-13 06:20:08,870 INFO  org.apache.flink.runtime.jobmaster.JobManagerRunner           - JobManagerRunner already shutdown.