OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Go SDK: bad parameter type for reflect.methodValueCall: func(Record)


Hello, and thanks for trying out the Go SDK!

tl;dr; You can bypass the beam correctness checks by pretending your type is a Protocol Buffer. See create_test.go for an example of what needs to be done. This is not recommended but it works.

To answer your questions:
You're right, at present, the Go SDK is not happy with the Map type as an element value. As a rule, it tries to avoid having free-floating pointers and reference types (like interfaces{}, slices, and maps) and some will likely never be allowed (like channels). The reason is that it opens up the risk of modifying the value *after* it has been emitted, which leads to unpredictable bugs.

It's at best, a feature that's not presently implemented, due to the mentioned danger.

You are not doing anything wrong. AFAICT your pipeline is correct.

There is a workaround, but we can't guarantee it is a viable long term solution, since it involves depending on a quirk of protocol buffers.

Essentially, you can convince both beam and the protocol buffer package that your custom type is a Proto, and write your own Marshal and Ummarshal methods to convert it to a []byte (though anything that can do that, should work).

See create_test.go for an example of what needs to be done. 

When the Go SDK has a proper Coder Registry or similar, we might have a better solution, but the danger will still be there.

Please let me know if you need help.

Cheers,
Robert Burke

On 2018/06/18 21:10:33, ed...@xxxxxxxxx <e...@xxxxxxxxx> wrote:
> I have the following code:>
>
> type Record struct {>
>        Timestamp time.Time>
>        Payload   string>
> }>
>
> type processFn struct {>
>         // etc...>
> }>
>
> func (f *processFn) ProcessElement(ctx context.Context, data []byte, emit func(Record)) {>
>          // etc..>
>          emit(someRecord)>
>          // etc...>
> }>
>
> Which is eventually invoked as:>
> beam.ParDo(scope, &processFn{}, pcoll)>
>
> This seems to work fine using the direct runner until I add a map to the Record struct as follows:>
>
> type Record struct {>
>         Timestamp    time.Time>
>         Payload      string>
>         Labels       map[string]string>
> }>
>
> Then I get the error mentioned in the subject line.>
>
> My questions are:>
> - Are maps illegal? What is a legal structure? or>
> - Is the feature yet to be implemented? or>
> - Am I doing something wrong? Am I failing to setup my pipeline correctly?>
> >
> Thanks for your help.>
>