[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Need help to understand memory consumption


My use case is : 
- I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager + 1 TaskManager)
- I run N jobs per days. N may vary (one day : N=20, another day : N=50, ...). All jobs are the same. They connect to Kafka topics and have two DB2 connector.
- Depending on a special event, a job can self-restart via the command : bin/flink cancel <JobID>
- At the end of the day, I cancel all jobs
- Each VM is configured with 16Gb RAM
- Allocated memory configured for one taskmanager is 10Gb

After several days, the memory saturates (we exceed 14Gb of used memory).

I read the following posts but I did not succeed in understanding my problem :
- https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
- http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser

I did some tests on a machine (outside the cluster) with the top command and this is what I concluded (please see attached file - Flink_memory.PNG) :
- When a job is started and running, it consumes memory
- When a job is cancelled, a large part of the memory is still used
- When another job is started and running (after to have cancel the previous job), even more memory is consumed
- When I restart jobmanager and taskmanager, memory returns to normal

Why when a job is canceled, the memory is not released?

I added another attachment that represents the graph of a job - Graph.PNG.
If it can be useful we use MapFunction, FlatMapFunction, FilterFunction, triggers and windows, ...

Thanks in advance,

Attachment: Flink_memory.xlsx
Description: MS-Excel 2007 spreadsheet

Attachment: Graph.PNG
Description: PNG image

Attachment: Flink_memory.PNG
Description: PNG image