osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Reading conditionally from different sources


Hi All,

At beam app start I wish to fetch a single object, and store it in a (singleton) PCollectionView for use as a side-input.

This object will *either* be read from a file (FileIO), or fetched via a remote call, depending upon the value of a runtime value (ValueProvider).

I cannot figure out how to code this conditional operation, and would appreciate any tips.

One approach I tried is to have two initial pipelines, each mapping a PBegin to a collection of the desired type, eg
  PCollection<T> c1 = pipeline.apply(myreader1); // read from file
  PCollection<T> c2 = pipeline.apply(myreader2); // custom fetch
  PCollectionView<T> c = pipeline.apply(Flatten.of(c1, c2)).apply(View.asSingleton())

However the problems with this approach are:
* afaict I need to write my "custom fetch" via Read.from(source) - which is a moderately large amount of boilerplate to wrap my code
  up as a BoundedSource which only ever returns one object. Seems doable, but ugly.
* I can't figure out how to execute the file-read only when the ValueProvider has a specific value at runtime (the FileIO class accepts
  only a PBegin as input)

Another approach I tried is to have a single pipeline:
*  begin.apply(Create.of(Optional.<T>empty()))..
*  each reader then just passes on the input if !empty, else if "active" then executes its read-op and outputs a non-empty optional.
*  and then use MapElement to expand the optional (which should be non-empty) and make a view of the result.

For my own reader, I can just use a MapElements operation whose body does the "if empty" and "if active" tests, then executes my reading logic.
However for the FileIO path, I need to somehow run the "if empty" and "if active" tests external to the FileIO.match() operation, and then cause
the pipeline to output either the input or the result of executing FileIO.match() and three following operations. If I use a MapElements operation,
then the return-type is a pcollection-of-pcollections, which is obviously not useful - and I am not sure that even if the types work that calling
pipeline.begin().apply(..) from within a MapElements.via callback will work..

I tried searching for "apache beam conditional logic", and related terms, and found nothing useful. Either of the "flatten" or "linear with optional"
approaches seem to be ways to implement conditional logic in general, but don't seem to work well with classes that take a PBegin as input..

Any tips welcome!

Thanks in advance,
Simon