osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: error with DirectRunner


+Robert Bradshaw I would be happy to debug and fix this, but I'd need more guidance on where to look.

On Tue, Oct 30, 2018 at 4:07 PM Udi Meiri <ehudm@xxxxxxxxxx> wrote:

On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik <lcwik@xxxxxxxxxx> wrote:
Udi, do you know if we have a bug tracking this issue?

If not, can you file one referencing this e-mail thread?

On Tue, Oct 30, 2018 at 6:33 AM Allie Chen <yifangchen@xxxxxxxxxx> wrote:
Thanks Udi. I agree, since it works fine removing either the side input or the last flatten and combine operation.

On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri <ehudm@xxxxxxxxxx> wrote:
This looks like a FnApiRunner bug.
When I override use_fnapi_runner = False in direct_runner.py the pipeline works.

It seems like either the side-input to _copy_number or the Flatten operation is the culprit.

On Mon, Oct 29, 2018 at 2:37 PM Allie Chen <yifangchen@xxxxxxxxxx> wrote:
Hi,

I have a project that started failing with DirectRunner, but works well using DataflowRunner (last working version is 2.4). The error message I received are:
line 1088, in run_stage 
  pipeline_components.pcollections[actual_pcoll_id].coder_id]] 
KeyError: u'ref_Coder_WindowedValueCoder_1'

I have simplified the pipeline to the following example. Can someone please take a look? Many thanks!

Allie

import apache_beam as beam
import argparse
from apache_beam import transforms
from apache_beam import pvalue
from apache_beam.options import pipeline_options


def _copy_number(number, side=None):
yield number


def fn_sum(values):
return sum(values)


def run(argv=None):
parser = argparse.ArgumentParser()
_, pipeline_args = parser.parse_known_args(argv)
options = pipeline_options.PipelineOptions(pipeline_args)
numbers = [1, 2]
with beam.Pipeline(options=options) as p:
sum_1 = (p
| 'ReadNumber1' >> transforms.Create(numbers)
| 'CalculateSum1' >> beam.CombineGlobally(fn_sum))

sum_2 = (p
| 'ReadNumber2' >> transforms.Create(numbers)
| beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
| 'CalculateSum2' >> beam.CombineGlobally(fn_sum))

_ = ((sum_1, sum_2)
| beam.Flatten()
| 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
| beam.io.WriteToText('gs://BUCKET/sum'))


Attachment: smime.p7s
Description: S/MIME Cryptographic Signature