[subject change for discussion fork]
Thanks for the steps. I'm able to run the Python wordcount example, though it fails with local file output. Did you test with distributed FS or local FS?
Some more observations so far:
* Job server does not currently provide log output.
* The gradle task to run job server takes really long to start the server (> 1 minute on my macbook). Every restart appears to occur the same penalty, regardless of whether there are changes or not. Following can be used to just start the server:
java -jar ./runners/flink/job-server/build/install/beam-runners-flink_2.11-job-server-shadow/lib/beam-runners-flink_2.11-job-server-2.6.0-SNAPSHOT-shaded.jar "--job-host=localhost:8099" "--artifacts-dir=/tmp/flink-artifacts"
python -m apache_beam.examples.wordcount --input /etc/profile --output /tmp/py-wordcount-direct --experiments=beam_fn_api --runner=PortableRunner --job_endpoint=localhost:8099 --sdk_location=container
Updated the Flink section.
To run a basic Python wordcount (sent to you in a separate thread, but repeating here too for others to play with):
Step 1: Run once to build a container: "./gradlew -p sdks/python/container docker"
Step 2: ./gradlew :beam-runners-flink_2.11-job-server:runShadow - this starts up a local Flink portable JobService endpoint on localhost:8099
The command to run the job server appears to be: ./gradlew -p runners/flink/job-server runShadow
Can you please provide the equivalent of the super basic Python example from the prototype:
Looks as if the Python side runner changed:
Traceback (most recent call last):
File "flink-example.py", line 7, in <module>
from apache_beam.runners.portability import universal_local_runner
ImportError: cannot import name universal_local_runner
Those instructions are not current and I think should be discarded as they referred to a particular effort that is over - +Ankur Goenka
is, I believe, working on the remaining finishing touches for running from a clean clone of Beam master and documenting how to do that; could you help Thomas so we can start looking at what the streaming runner is missing?
We'll need to document this in a more prominent place. When we get to a state where we can run Python WordCount from master, we'll need to document it somewhere on the main portability page and/or the getting started guide; when we can run something more serious, e.g. Tensorflow pipelines, that will be worth a Beam blog post and worth documenting in the TFX documentation.
The basic streaming translation is already in place from the prototype, though I have not verified it on the master branch yet.
(I don't have a dependency on SDF since we are going to use custom native Flink sources/sinks at this time.)
Wanted to let you know that I've just merged the PR that adds checkpointable SDF support to the portable reference runner (ULR) and the Java SDK harness:
So now we have a reference implementation of SDF support in a portable runner, and a reference implementation of SDF support in a portable SDK harness.
From here on, we need to replicate this support in other portable runners and other harnesses. The obvious targets are Flink and Python respectively.
Chamikara was going to work on the Python harness. +Thomas Weise
Would you be interested in the Flink portable streaming runner side? It is of course blocked by having the rest of that runner working in streaming mode though (the batch mode is practically done - will send you a separate note about the status of that).
Luke is right - unbounded sources should go through SDF. I am currently working on adding such support to Fn API.
Ways you can help speed up this effort:
- Make necessary changes to Apex runner per se to support regular SDFs in streaming (without portability). They will likely largely carry over to portable world. I recall that the Apex runner had some level of support of SDFs, but didn't pass the ValidatesRunner tests yet.
- (general to Beam, not Apex-related per se) Implement the translation of Read.from(UnboundedSource) via impulse, which will require implementing an SDF that reads from a given UnboundedSource (taking the UnboundedSource as an element). This should be fairly straightforward and will allow all portable runners to take advantage of existing UnboundedSource's.
Using impulse is a precursor for both bounded and unbounded SDF.
So for streaming, we will need the Impulse translation for bounded input, identical with batch, and then in addition to that support for SDF?
Any pointers what's involved in adding the SDF support? Is it runner specific? Does the ULR cover it?