osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Pipeline is passing on local runner and failing on Dataflow runner - help with error


Hello all,

Exploring that issue (Local runner - works great and Dataflow fails), there might be a mismatch between the apache_beam version and the dataflow version

Please let me know what your thoughts are. if it is a version issue, what updates should be executed? how do I cover the installation on the datalab VM and the Google Cloud Platform.

Running the following command / or a different command on the shell? on datalab?

I tried running this on the datalab and it didnt solve the issue (see below the full logs report)
pip install --upgrade apache_beam google-cloud-dataflow
Please advice.

Thanks,
Eila


All logs:


INFO:root:Staging the SDK tarball from PyPI to gs://archs4/staging/label-archs4-tsv.1529598693.453095/dataflow_python_sdk.tar
INFO:root:Executing command: ['/usr/local/envs/py2env/bin/python', '-m', 'pip', 'install', '--download', '/tmp/tmp5MM5wr', 'google-cloud-dataflow==2.0.0', '--no-binary', ':all:', '--no-deps']
INFO:root:file copy from /tmp/tmp5MM5wr/google-cloud-dataflow-2.0.0.tar.gz to gs://archs4/staging/label-archs4-tsv.1529598693.453095/dataflow_python_sdk.tar.
INFO:oauth2client.client:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Attempting refresh to obtain initial access_token
INFO:root:Create job: <Job
 createTime: u'2018-06-21T16:31:51.304121Z'
 currentStateTime: u'1970-01-01T00:00:00Z'
 id: u'2018-06-21_09_31_50-17545183031487377678'
 location: u'us-central1'
 name: u'label-archs4-tsv'
 projectId: u'orielresearch-188115'
 stageStates: []
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>
INFO:root:Created job with id: [2018-06-21_09_31_50-17545183031487377678]
INFO:root:To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/orielresearch-188115/dataflow/job/2018-06-21_09_31_50-17545183031487377678
INFO:root:Job 2018-06-21_09_31_50-17545183031487377678 is in state JOB_STATE_PENDING
INFO:root:2018-06-21T16:31:50.476Z: JOB_MESSAGE_DETAILED: Autoscaling is enabled for job 2018-06-21_09_31_50-17545183031487377678. The number of workers will be between 1 and 1000.
INFO:root:2018-06-21T16:31:50.506Z: JOB_MESSAGE_DETAILED: Autoscaling was automatically enabled for job 2018-06-21_09_31_50-17545183031487377678.
INFO:root:2018-06-21T16:31:53.079Z: JOB_MESSAGE_DETAILED: Checking required Cloud APIs are enabled.
INFO:root:2018-06-21T16:31:53.385Z: JOB_MESSAGE_DETAILED: Checking permissions granted to controller Service Account.
INFO:root:2018-06-21T16:31:54.161Z: JOB_MESSAGE_BASIC: Worker configuration: n1-standard-1 in us-central1-b.
INFO:root:2018-06-21T16:31:54.910Z: JOB_MESSAGE_DETAILED: Expanding CoGroupByKey operations into optimizable parts.
INFO:root:2018-06-21T16:31:54.936Z: JOB_MESSAGE_DEBUG: Combiner lifting skipped for step writing to TSV files/Write/WriteImpl/GroupByKey: GroupByKey not followed by a combiner.
INFO:root:2018-06-21T16:31:54.968Z: JOB_MESSAGE_DETAILED: Expanding GroupByKey operations into optimizable parts.
INFO:root:2018-06-21T16:31:54.992Z: JOB_MESSAGE_DETAILED: Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
INFO:root:2018-06-21T16:31:55.056Z: JOB_MESSAGE_DEBUG: Annotating graph with Autotuner information.
INFO:root:2018-06-21T16:31:55.168Z: JOB_MESSAGE_DETAILED: Fusing adjacent ParDo, Read, Write, and Flatten operations
INFO:root:2018-06-21T16:31:55.195Z: JOB_MESSAGE_DETAILED: Fusing consumer create more columns into Extract the rows from dataframe
INFO:root:2018-06-21T16:31:55.221Z: JOB_MESSAGE_DETAILED: Fusing consumer writing to TSV files/Write/WriteImpl/GroupByKey/Reify into writing to TSV files/Write/WriteImpl/WindowInto(WindowIntoFn)
INFO:root:2018-06-21T16:31:55.244Z: JOB_MESSAGE_DETAILED: Fusing consumer writing to TSV files/Write/WriteImpl/GroupByKey/Write into writing to TSV files/Write/WriteImpl/GroupByKey/Reify
INFO:root:2018-06-21T16:31:55.271Z: JOB_MESSAGE_DETAILED: Fusing consumer writing to TSV files/Write/WriteImpl/WriteBundles/Do into writing to TSV files/Write/WriteImpl/GroupByKey/GroupByWindow
INFO:root:2018-06-21T16:31:55.303Z: JOB_MESSAGE_DETAILED: Fusing consumer writing to TSV files/Write/WriteImpl/Map(<lambda at iobase.py:895>) into create more columns
INFO:root:2018-06-21T16:31:55.328Z: JOB_MESSAGE_DETAILED: Fusing consumer writing to TSV files/Write/WriteImpl/WindowInto(WindowIntoFn) into writing to TSV files/Write/WriteImpl/Map(<lambda at iobase.py:895>)
INFO:root:2018-06-21T16:31:55.341Z: JOB_MESSAGE_DETAILED: Fusing consumer writing to TSV files/Write/WriteImpl/GroupByKey/GroupByWindow into writing to TSV files/Write/WriteImpl/GroupByKey/Read
INFO:root:2018-06-21T16:31:55.365Z: JOB_MESSAGE_DETAILED: Fusing consumer writing to TSV files/Write/WriteImpl/InitializeWrite into writing to TSV files/Write/WriteImpl/DoOnce/Read
INFO:root:2018-06-21T16:31:55.396Z: JOB_MESSAGE_DEBUG: Workflow config is missing a default resource spec.
INFO:root:2018-06-21T16:31:55.432Z: JOB_MESSAGE_DEBUG: Adding StepResource setup and teardown to workflow graph.
INFO:root:2018-06-21T16:31:55.461Z: JOB_MESSAGE_DEBUG: Adding workflow start and stop steps.
INFO:root:2018-06-21T16:31:55.486Z: JOB_MESSAGE_DEBUG: Assigning stage ids.
INFO:root:2018-06-21T16:31:55.641Z: JOB_MESSAGE_DEBUG: Executing wait step start15
INFO:root:Job 2018-06-21_09_31_50-17545183031487377678 is in state JOB_STATE_RUNNING
INFO:root:2018-06-21T16:31:55.701Z: JOB_MESSAGE_BASIC: Executing operation writing to TSV files/Write/WriteImpl/DoOnce/Read+writing to TSV files/Write/WriteImpl/InitializeWrite
INFO:root:2018-06-21T16:31:55.727Z: JOB_MESSAGE_BASIC: Executing operation writing to TSV files/Write/WriteImpl/GroupByKey/Create
INFO:root:2018-06-21T16:31:55.739Z: JOB_MESSAGE_DEBUG: Starting worker pool setup.
INFO:root:2018-06-21T16:31:55.753Z: JOB_MESSAGE_BASIC: Starting 1 workers in us-central1-b...
INFO:root:2018-06-21T16:31:55.839Z: JOB_MESSAGE_DEBUG: Value "writing to TSV files/Write/WriteImpl/GroupByKey/Session" materialized.
INFO:root:2018-06-21T16:31:55.901Z: JOB_MESSAGE_BASIC: Executing operation Extract the rows from dataframe+create more columns+writing to TSV files/Write/WriteImpl/Map(<lambda at iobase.py:895>)+writing to TSV files/Write/WriteImpl/WindowInto(WindowIntoFn)+writing to TSV files/Write/WriteImpl/GroupByKey/Reify+writing to TSV files/Write/WriteImpl/GroupByKey/Write
INFO:root:2018-06-21T16:31:56.332Z: JOB_MESSAGE_BASIC: BigQuery export job "dataflow_job_576766793008965363" started. You can check its status with the bq tool: "bq show -j --project_id=orielresearch-188115 dataflow_job_576766793008965363".
INFO:root:2018-06-21T16:32:03.683Z: JOB_MESSAGE_DETAILED: Autoscaling: Raised the number of workers to 0 based on the rate of progress in the currently running step(s).
INFO:root:2018-06-21T16:32:14.181Z: JOB_MESSAGE_DETAILED: Autoscaling: Raised the number of workers to 1 based on the rate of progress in the currently running step(s).
INFO:root:2018-06-21T16:32:26.827Z: JOB_MESSAGE_DETAILED: BigQuery export job progress: "dataflow_job_576766793008965363" observed total of 1 exported files thus far.
INFO:root:2018-06-21T16:32:26.850Z: JOB_MESSAGE_BASIC: BigQuery export job finished: "dataflow_job_576766793008965363"
INFO:root:2018-06-21T16:32:33.078Z: JOB_MESSAGE_DETAILED: Workers have started successfully.
INFO:root:2018-06-21T16:35:35.511Z: JOB_MESSAGE_ERROR: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 581, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 166, in execute
    op.start()
  File "dataflow_worker/operations.py", line 283, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10680)
    def start(self):
  File "dataflow_worker/operations.py", line 284, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10574)
    with self.scoped_start_state:
  File "dataflow_worker/operations.py", line 289, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:9775)
    pickler.loads(self.spec.serialized_fn))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 225, in loads
    return dill.loads(s)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
    return load(file)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named indexes.base

INFO:root:2018-06-21T16:35:38.897Z: JOB_MESSAGE_ERROR: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 581, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 166, in execute
    op.start()
  File "dataflow_worker/operations.py", line 283, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10680)
    def start(self):
  File "dataflow_worker/operations.py", line 284, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10574)
    with self.scoped_start_state:
  File "dataflow_worker/operations.py", line 289, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:9775)
    pickler.loads(self.spec.serialized_fn))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 225, in loads
    return dill.loads(s)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
    return load(file)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named indexes.base

INFO:root:2018-06-21T16:35:42.245Z: JOB_MESSAGE_ERROR: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 581, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 166, in execute
    op.start()
  File "dataflow_worker/operations.py", line 283, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10680)
    def start(self):
  File "dataflow_worker/operations.py", line 284, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10574)
    with self.scoped_start_state:
  File "dataflow_worker/operations.py", line 289, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:9775)
    pickler.loads(self.spec.serialized_fn))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 225, in loads
    return dill.loads(s)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
    return load(file)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named indexes.base

INFO:root:2018-06-21T16:35:45.619Z: JOB_MESSAGE_ERROR: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 581, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 166, in execute
    op.start()
  File "dataflow_worker/operations.py", line 283, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10680)
    def start(self):
  File "dataflow_worker/operations.py", line 284, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10574)
    with self.scoped_start_state:
  File "dataflow_worker/operations.py", line 289, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:9775)
    pickler.loads(self.spec.serialized_fn))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 225, in loads
    return dill.loads(s)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
    return load(file)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named indexes.base

INFO:root:2018-06-21T16:35:45.668Z: JOB_MESSAGE_DEBUG: Executing failure step failure14
INFO:root:2018-06-21T16:35:45.695Z: JOB_MESSAGE_ERROR: Workflow failed. Causes: S04:Extract the rows from dataframe+create more columns+writing to TSV files/Write/WriteImpl/Map(<lambda at iobase.py:895>)+writing to TSV files/Write/WriteImpl/WindowInto(WindowIntoFn)+writing to TSV files/Write/WriteImpl/GroupByKey/Reify+writing to TSV files/Write/WriteImpl/GroupByKey/Write failed., A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. The work item was attempted on: 
  label-archs4-tsv-06210931-a4r1-harness-rlqz,
  label-archs4-tsv-06210931-a4r1-harness-rlqz,
  label-archs4-tsv-06210931-a4r1-harness-rlqz,
  label-archs4-tsv-06210931-a4r1-harness-rlqz
INFO:root:2018-06-21T16:35:45.799Z: JOB_MESSAGE_DETAILED: Cleaning up.
INFO:root:2018-06-21T16:35:46Z: JOB_MESSAGE_DEBUG: Starting worker pool teardown.
INFO:root:2018-06-21T16:35:46.027Z: JOB_MESSAGE_BASIC: Stopping worker pool...


On Wed, Jun 20, 2018 at 5:02 PM, OrielResearch Eila Arich-Landkof <eila@xxxxxxxxxxxxxxxxx> wrote:
Hello,

I am running the following pipeline on the local runner with no issues.

logging.info('Define the pipeline')
p =  beam.Pipeline(options=options)
samplePath = outputPath
ExploreData = (p | "Extract the rows from dataframe" >> beam.io.Read(beam.io.BigQuerySource('archs4.Debug_annotation'))
                 | "create more columns" >> beam.ParDo(CreateColForSampleFn(colListSubset,outputPath)))
(ExploreData | 'writing to TSV files' >> beam.io.WriteToText('gs://archs4/output/dataExploration.txt',file_name_suffix='.tsv',num_shards=1,append_trailing_newlines=True,header=colListStrHeader))


Running on Dataflow fires the below error. I don't have any idea where to look for the issue. The error is not pointing to my pipeline code but to apache beam modules. 
I will try debugging using elimination. Please let me know if you have any direction for me.

Many thanks,
Eila


======================================================
DataflowRuntimeExceptionTraceback (most recent call last)
<ipython-input-151-1e5aeb8b7d9b> in <module>()
----> 1 p.run().wait_until_finish()

/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.pyc in wait_until_finish(self, duration)
    776         raise DataflowRuntimeException(
    777             'Dataflow pipeline failed. State: %s, Error:\n%s' %
--> 778             (self.state, getattr(self._runner, 'last_error_msg', None)), self)
    779     return self.state
    780 

DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 581, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 166, in execute
    op.start()
  File "dataflow_worker/operations.py", line 283, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10680)
    def start(self):
  File "dataflow_worker/operations.py", line 284, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10574)
    with self.scoped_start_state:
  File "dataflow_worker/operations.py", line 289, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:9775)
    pickler.loads(self.spec.serialized_fn))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 225, in loads
    return dill.loads(s)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
    return load(file)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named indexes.base
======================================================

--



--