Hello Robert + beam community,
I have added the element count metrics to the Java SDK in this PR. In doing so, I enhanced the Metrics.counter call to have a LabeledMetrics.counter() call which allows constructing a metric for a MonitoringInfo urn and set of labels, so they can be extracted properly with the PCollection label and packaged into a MonitoringInfo.
I was hoping you could take a look and let me know if I am putting code in the correct packages/projects:
- runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/LabeledMetrics.java # visible to runner harness and SDK implementations only, not pipeline authors
- runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricName.java # visible to runner harness and SDK implementations only, not pipeline authors
- sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/ElementCountFnDataReceiver.java # visible to SDK implementations only, not pipeline authors
- sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java # visible to SDK implementations only, not pipeline authors
There is also a refactor to the construction of PCollection consumers using a PCollectionConsumerRegistry, which allowed creating a spot in the code where we could wrap all PCollection consumption with an ElementCount counter.
Please let me know what you think. If you think this PR should be split, please let me know so that I can do it tomorrow morning