osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Counter Implementation in Flink


I'm using Flink 1.4.2 and deploying job on Yarn Cluster. 

I have a streaming job, which flattens the data and outputs it. It basically
takes a input record and produces n output record. I'm using Table Function
for this. The logic to flatten the data is implemented in a UDF. The UDF has
a counter which basically counts the number of records produced. 
           this.context.getMetricGroup().counter("output_records_counter")

I know Flink provides numRecordsOut metric which is essentially gives me the
same number. 
When the job is started the output records count seen for
`output_records_counter` counter and `numRecordsOut` are exactly same. 

When a task manager is lost and the job is restarted there's a huge
difference in the count of output records  . As seen in the graph when the
job was started the both the counts are overlapping. When a task manager is
lost and is re-deployed the count is different.  I'm not sure why this
number varies so much. 

Can someone please shed some light on how is this counter implemented or
direct me to source code or any reference material. 

For numRecordsOut, each taskmanager emits the count of data. Is the same not
true for the output_records_counter. 

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1468/Screen_Shot_2018-11-24_at_9.png> 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/