osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Go SDK: bad parameter type for reflect.methodValueCall: func(Record)


Got it. Thanks for the explanation.

On 2018/06/18 21:40:48, Robert Burke <robert@xxxxxxxxxxx> wrote: 
> Hello, and thanks for trying out the Go SDK!
> 
> tl;dr; You can bypass the beam correctness checks by pretending your type
> is a Protocol Buffer. See create_test.go
> <https://inbox.google.com/create_test.gohttps://github.com/apache/beam/blob/master/sdks/go/pkg/beam/create_test.go#L52>
> for
> an example of what needs to be done. This is not recommended but it works.
> 
> To answer your questions:
> You're right, at present, the Go SDK is not happy with the Map type as an
> element value. As a rule, it tries to avoid having free-floating pointers
> and reference types (like interfaces{}, slices, and maps) and some will
> likely never be allowed (like channels). The reason is that it opens up the
> risk of modifying the value *after* it has been emitted, which leads to
> unpredictable bugs.
> 
> It's at best, a feature that's not presently implemented, due to the
> mentioned danger.
> 
> You are not doing anything wrong. AFAICT your pipeline is correct.
> 
> There is a workaround, but we can't guarantee it is a viable long term
> solution, since it involves depending on a quirk of protocol buffers.
> 
> Essentially, you can convince both beam and the protocol buffer package
> that your custom type is a Proto, and write your own Marshal and Ummarshal
> methods to convert it to a []byte (though anything that can do that, should
> work).
> 
> See create_test.go
> <create_test.gohttps://github.com/apache/beam/blob/master/sdks/go/pkg/beam/create_test.go#L52>
> for an example of what needs to be done.
> 
> When the Go SDK has a proper Coder Registry or similar, we might have a
> better solution, but the danger will still be there.
> 
> Please let me know if you need help.
> 
> Cheers,
> Robert Burke
> 
> On 2018/06/18 21:10:33, ed...@xxxxxxxxx <e...@xxxxxxxxx> wrote:
> > I have the following code:>
> >
> > type Record struct {>
> >        Timestamp time.Time>
> >        Payload   string>
> > }>
> >
> > type processFn struct {>
> >         // etc...>
> > }>
> >
> > func (f *processFn) ProcessElement(ctx context.Context, data []byte, emit
> func(Record)) {>
> >          // etc..>
> >          emit(someRecord)>
> >          // etc...>
> > }>
> >
> > Which is eventually invoked as:>
> > beam.ParDo(scope, &processFn{}, pcoll)>
> >
> > This seems to work fine using the direct runner until I add a map to the
> Record struct as follows:>
> >
> > type Record struct {>
> >         Timestamp    time.Time>
> >         Payload      string>
> >         Labels       map[string]string>
> > }>
> >
> > Then I get the error mentioned in the subject line.>
> >
> > My questions are:>
> > - Are maps illegal? What is a legal structure? or>
> > - Is the feature yet to be implemented? or>
> > - Am I doing something wrong? Am I failing to setup my pipeline
> correctly?>
> > >
> > Thanks for your help.>
> >
>