OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Pipeline is passing on local runner and failing on Dataflow runner - help with error


Hi Ella,

It seems like, the package related to indexes.base is not installed in the workers. Could you try one of the methods in "Managing Python Pipeline Dependencies" [1], to stage that dependency?

Ahmet

[1] https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/

On Thu, Jun 21, 2018 at 9:40 AM, OrielResearch Eila Arich-Landkof <eila@xxxxxxxxxxxxxxxxx> wrote:
Hello all,

Exploring that issue (Local runner - works great and Dataflow fails), there might be a mismatch between the apache_beam version and the dataflow version

Please let me know what your thoughts are. if it is a version issue, what updates should be executed? how do I cover the installation on the datalab VM and the Google Cloud Platform.

Running the following command / or a different command on the shell? on datalab?

I tried running this on the datalab and it didnt solve the issue (see below the full logs report)
pip install --upgrade apache_beam google-cloud-dataflow
Please advice.

Thanks,
Eila


All logs:


INFO:root:Staging the SDK tarball from PyPI to gs://archs4/staging/label-archs4-tsv.1529598693.453095/dataflow_python_sdk.tar
INFO:root:Executing command: ['/usr/local/envs/py2env/bin/python', '-m', 'pip', 'install', '--download', '/tmp/tmp5MM5wr', 'google-cloud-dataflow==2.0.0', '--no-binary', ':all:', '--no-deps']
INFO:root:file copy from /tmp/tmp5MM5wr/google-cloud-dataflow-2.0.0.tar.gz to gs://archs4/staging/label-archs4-tsv.1529598693.453095/dataflow_python_sdk.tar.
INFO:oauth2client.client:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Attempting refresh to obtain initial access_token
INFO:root:Create job: <Job
 createTime: u'2018-06-21T16:31:51.304121Z'
 currentStateTime: u'1970-01-01T00:00:00Z'
 id: u'2018-06-21_09_31_50-17545183031487377678'
 location: u'us-central1'
 name: u'label-archs4-tsv'
 projectId: u'orielresearch-188115'
 stageStates: []
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>
INFO:root:Created job with id: [2018-06-21_09_31_50-17545183031487377678]
INFO:root:To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/orielresearch-188115/dataflow/job/2018-06-21_09_31_50-17545183031487377678
INFO:root:Job 2018-06-21_09_31_50-17545183031487377678 is in state JOB_STATE_PENDING
INFO:root:2018-06-21T16:31:50.476Z: JOB_MESSAGE_DETAILED: Autoscaling is enabled for job 2018-06-21_09_31_50-17545183031487377678. The number of workers will be between 1 and 1000.
INFO:root:2018-06-21T16:31:50.506Z: JOB_MESSAGE_DETAILED: Autoscaling was automatically enabled for job 2018-06-21_09_31_50-17545183031487377678.
INFO:root:2018-06-21T16:31:53.079Z: JOB_MESSAGE_DETAILED: Checking required Cloud APIs are enabled.
INFO:root:2018-06-21T16:31:53.385Z: JOB_MESSAGE_DETAILED: Checking permissions granted to controller Service Account.
INFO:root:2018-06-21T16:31:54.161Z: JOB_MESSAGE_BASIC: Worker configuration: n1-standard-1 in us-central1-b.
INFO:root:2018-06-21T16:31:54.910Z: JOB_MESSAGE_DETAILED: Expanding CoGroupByKey operations into optimizable parts.
INFO:root:2018-06-21T16:31:54.936Z: JOB_MESSAGE_DEBUG: Combiner lifting skipped for step writing to TSV files/Write/WriteImpl/GroupByKey: GroupByKey not followed by a combiner.
INFO:root:2018-06-21T16:31:54.968Z: JOB_MESSAGE_DETAILED: Expanding GroupByKey operations into optimizable parts.
INFO:root:2018-06-21T16:31:54.992Z: JOB_MESSAGE_DETAILED: Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
INFO:root:2018-06-21T16:31:55.056Z: JOB_MESSAGE_DEBUG: Annotating graph with Autotuner information.
INFO:root:2018-06-21T16:31:55.168Z: JOB_MESSAGE_DETAILED: Fusing adjacent ParDo, Read, Write, and Flatten operations
INFO:root:2018-06-21T16:31:55.195Z: JOB_MESSAGE_DETAILED: Fusing consumer create more columns into Extract the rows from dataframe
INFO:root:2018-06-21T16:31:55.221Z: JOB_MESSAGE_DETAILED: Fusing consumer writing to TSV files/Write/WriteImpl/GroupByKey/Reify into writing to TSV files/Write/WriteImpl/WindowInto(WindowIntoFn)
INFO:root:2018-06-21T16:31:55.244Z: JOB_MESSAGE_DETAILED: Fusing consumer writing to TSV files/Write/WriteImpl/GroupByKey/Write into writing to TSV files/Write/WriteImpl/GroupByKey/Reify
INFO:root:2018-06-21T16:31:55.271Z: JOB_MESSAGE_DETAILED: Fusing consumer writing to TSV files/Write/WriteImpl/WriteBundles/Do into writing to TSV files/Write/WriteImpl/GroupByKey/GroupByWindow
INFO:root:2018-06-21T16:31:55.303Z: JOB_MESSAGE_DETAILED: Fusing consumer writing to TSV files/Write/WriteImpl/Map(<lambda at iobase.py:895>) into create more columns
INFO:root:2018-06-21T16:31:55.328Z: JOB_MESSAGE_DETAILED: Fusing consumer writing to TSV files/Write/WriteImpl/WindowInto(WindowIntoFn) into writing to TSV files/Write/WriteImpl/Map(<lambda at iobase.py:895>)
INFO:root:2018-06-21T16:31:55.341Z: JOB_MESSAGE_DETAILED: Fusing consumer writing to TSV files/Write/WriteImpl/GroupByKey/GroupByWindow into writing to TSV files/Write/WriteImpl/GroupByKey/Read
INFO:root:2018-06-21T16:31:55.365Z: JOB_MESSAGE_DETAILED: Fusing consumer writing to TSV files/Write/WriteImpl/InitializeWrite into writing to TSV files/Write/WriteImpl/DoOnce/Read
INFO:root:2018-06-21T16:31:55.396Z: JOB_MESSAGE_DEBUG: Workflow config is missing a default resource spec.
INFO:root:2018-06-21T16:31:55.432Z: JOB_MESSAGE_DEBUG: Adding StepResource setup and teardown to workflow graph.
INFO:root:2018-06-21T16:31:55.461Z: JOB_MESSAGE_DEBUG: Adding workflow start and stop steps.
INFO:root:2018-06-21T16:31:55.486Z: JOB_MESSAGE_DEBUG: Assigning stage ids.
INFO:root:2018-06-21T16:31:55.641Z: JOB_MESSAGE_DEBUG: Executing wait step start15
INFO:root:Job 2018-06-21_09_31_50-17545183031487377678 is in state JOB_STATE_RUNNING
INFO:root:2018-06-21T16:31:55.701Z: JOB_MESSAGE_BASIC: Executing operation writing to TSV files/Write/WriteImpl/DoOnce/Read+writing to TSV files/Write/WriteImpl/InitializeWrite
INFO:root:2018-06-21T16:31:55.727Z: JOB_MESSAGE_BASIC: Executing operation writing to TSV files/Write/WriteImpl/GroupByKey/Create
INFO:root:2018-06-21T16:31:55.739Z: JOB_MESSAGE_DEBUG: Starting worker pool setup.
INFO:root:2018-06-21T16:31:55.753Z: JOB_MESSAGE_BASIC: Starting 1 workers in us-central1-b...
INFO:root:2018-06-21T16:31:55.839Z: JOB_MESSAGE_DEBUG: Value "writing to TSV files/Write/WriteImpl/GroupByKey/Session" materialized.
INFO:root:2018-06-21T16:31:55.901Z: JOB_MESSAGE_BASIC: Executing operation Extract the rows from dataframe+create more columns+writing to TSV files/Write/WriteImpl/Map(<lambda at iobase.py:895>)+writing to TSV files/Write/WriteImpl/WindowInto(WindowIntoFn)+writing to TSV files/Write/WriteImpl/GroupByKey/Reify+writing to TSV files/Write/WriteImpl/GroupByKey/Write
INFO:root:2018-06-21T16:31:56.332Z: JOB_MESSAGE_BASIC: BigQuery export job "dataflow_job_576766793008965363" started. You can check its status with the bq tool: "bq show -j --project_id=orielresearch-188115 dataflow_job_576766793008965363".
INFO:root:2018-06-21T16:32:03.683Z: JOB_MESSAGE_DETAILED: Autoscaling: Raised the number of workers to 0 based on the rate of progress in the currently running step(s).
INFO:root:2018-06-21T16:32:14.181Z: JOB_MESSAGE_DETAILED: Autoscaling: Raised the number of workers to 1 based on the rate of progress in the currently running step(s).
INFO:root:2018-06-21T16:32:26.827Z: JOB_MESSAGE_DETAILED: BigQuery export job progress: "dataflow_job_576766793008965363" observed total of 1 exported files thus far.
INFO:root:2018-06-21T16:32:26.850Z: JOB_MESSAGE_BASIC: BigQuery export job finished: "dataflow_job_576766793008965363"
INFO:root:2018-06-21T16:32:33.078Z: JOB_MESSAGE_DETAILED: Workers have started successfully.
INFO:root:2018-06-21T16:35:35.511Z: JOB_MESSAGE_ERROR: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 581, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 166, in execute
    op.start()
  File "dataflow_worker/operations.py", line 283, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10680)
    def start(self):
  File "dataflow_worker/operations.py", line 284, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10574)
    with self.scoped_start_state:
  File "dataflow_worker/operations.py", line 289, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:9775)
    pickler.loads(self.spec.serialized_fn))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 225, in loads
    return dill.loads(s)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
    return load(file)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named indexes.base

INFO:root:2018-06-21T16:35:38.897Z: JOB_MESSAGE_ERROR: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 581, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 166, in execute
    op.start()
  File "dataflow_worker/operations.py", line 283, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10680)
    def start(self):
  File "dataflow_worker/operations.py", line 284, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10574)
    with self.scoped_start_state:
  File "dataflow_worker/operations.py", line 289, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:9775)
    pickler.loads(self.spec.serialized_fn))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 225, in loads
    return dill.loads(s)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
    return load(file)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named indexes.base

INFO:root:2018-06-21T16:35:42.245Z: JOB_MESSAGE_ERROR: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 581, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 166, in execute
    op.start()
  File "dataflow_worker/operations.py", line 283, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10680)
    def start(self):
  File "dataflow_worker/operations.py", line 284, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10574)
    with self.scoped_start_state:
  File "dataflow_worker/operations.py", line 289, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:9775)
    pickler.loads(self.spec.serialized_fn))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 225, in loads
    return dill.loads(s)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
    return load(file)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named indexes.base

INFO:root:2018-06-21T16:35:45.619Z: JOB_MESSAGE_ERROR: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 581, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 166, in execute
    op.start()
  File "dataflow_worker/operations.py", line 283, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10680)
    def start(self):
  File "dataflow_worker/operations.py", line 284, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10574)
    with self.scoped_start_state:
  File "dataflow_worker/operations.py", line 289, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:9775)
    pickler.loads(self.spec.serialized_fn))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 225, in loads
    return dill.loads(s)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
    return load(file)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named indexes.base

INFO:root:2018-06-21T16:35:45.668Z: JOB_MESSAGE_DEBUG: Executing failure step failure14
INFO:root:2018-06-21T16:35:45.695Z: JOB_MESSAGE_ERROR: Workflow failed. Causes: S04:Extract the rows from dataframe+create more columns+writing to TSV files/Write/WriteImpl/Map(<lambda at iobase.py:895>)+writing to TSV files/Write/WriteImpl/WindowInto(WindowIntoFn)+writing to TSV files/Write/WriteImpl/GroupByKey/Reify+writing to TSV files/Write/WriteImpl/GroupByKey/Write failed., A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. The work item was attempted on: 
  label-archs4-tsv-06210931-a4r1-harness-rlqz,
  label-archs4-tsv-06210931-a4r1-harness-rlqz,
  label-archs4-tsv-06210931-a4r1-harness-rlqz,
  label-archs4-tsv-06210931-a4r1-harness-rlqz
INFO:root:2018-06-21T16:35:45.799Z: JOB_MESSAGE_DETAILED: Cleaning up.
INFO:root:2018-06-21T16:35:46Z: JOB_MESSAGE_DEBUG: Starting worker pool teardown.
INFO:root:2018-06-21T16:35:46.027Z: JOB_MESSAGE_BASIC: Stopping worker pool...


On Wed, Jun 20, 2018 at 5:02 PM, OrielResearch Eila Arich-Landkof <eila@xxxxxxxxxxxxxxxxx> wrote:
Hello,

I am running the following pipeline on the local runner with no issues.

logging.info('Define the pipeline')
p =  beam.Pipeline(options=options)
samplePath = outputPath
ExploreData = (p | "Extract the rows from dataframe" >> beam.io.Read(beam.io.BigQuerySource('archs4.Debug_annotation'))
                 | "create more columns" >> beam.ParDo(CreateColForSampleFn(colListSubset,outputPath)))
(ExploreData | 'writing to TSV files' >> beam.io.WriteToText('gs://archs4/output/dataExploration.txt',file_name_suffix='.tsv',num_shards=1,append_trailing_newlines=True,header=colListStrHeader))


Running on Dataflow fires the below error. I don't have any idea where to look for the issue. The error is not pointing to my pipeline code but to apache beam modules. 
I will try debugging using elimination. Please let me know if you have any direction for me.

Many thanks,
Eila


======================================================
DataflowRuntimeExceptionTraceback (most recent call last)
<ipython-input-151-1e5aeb8b7d9b> in <module>()
----> 1 p.run().wait_until_finish()

/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.pyc in wait_until_finish(self, duration)
    776         raise DataflowRuntimeException(
    777             'Dataflow pipeline failed. State: %s, Error:\n%s' %
--> 778             (self.state, getattr(self._runner, 'last_error_msg', None)), self)
    779     return self.state
    780 

DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 581, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 166, in execute
    op.start()
  File "dataflow_worker/operations.py", line 283, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10680)
    def start(self):
  File "dataflow_worker/operations.py", line 284, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10574)
    with self.scoped_start_state:
  File "dataflow_worker/operations.py", line 289, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:9775)
    pickler.loads(self.spec.serialized_fn))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 225, in loads
    return dill.loads(s)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
    return load(file)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named indexes.base
======================================================

--



--