osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Fwd: [Apache Beam] Custom DataSourceV2 instanciation: parameters passing and Encoders]


Etienne,

Have you looked at  https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala ? It seems that’s how Spark integrates Kafka with DataFrame API.

Thanks,
Manu Zhang
On Dec 20, 2018, 5:31 PM +0800, Etienne Chauchot <echauchot@xxxxxxxxxx>, wrote:
@Manu, anyway spark DataSource (V1) instanciation mechanism is the same than DataSource V2, so question remains even if we target v1 for stability reasons.

Etienne

Le mercredi 19 décembre 2018 à 17:07 +0100, Etienne Chauchot a écrit :
Yes, this is thanks to these spark community meetings that I got the name of Ryan. And, indeed, when I saw the design sync meetings, I realized how recent the DataSourceV2 API is.
I think you are right, I should wait for it to be finished and in the meantime use V1.

Etienne

Le mercredi 19 décembre 2018 à 23:27 +0800, Manu Zhang a écrit :
The Spark community has been holding a weekly sync meeting on DataSourceV2 and sharing notes back to their dev list https://lists.apache.org/list.html?dev@xxxxxxxxxxxxxxxx:lte=3M:DataSourceV2%20sync. At this time, there are still some moving pieces at Spark’s side. Is it too early to target DataSourceV2 ?

Thanks,
Manu Zhang
On Dec 19, 2018, 6:40 PM +0800, Etienne Chauchot <echauchot@xxxxxxxxxx>, wrote:
Thanks Kenn for taking the time to take a look

Le mardi 18 décembre 2018 à 11:39 -0500, Kenneth Knowles a écrit :
I don't know DataSourceV2 well, but I am reading around to try to help. I see the problem with the SparkSession API. Is there no other way to instantiate a DataSourceV2 and read the data from it?
=> No this is exactly what I'm looking for :)

Other thoughts:

 - Maybe start from Splittable DoFn since it is a new translator?
=> Yes but I still need to translate BoundedSource and UnBoundedSource for compatibility with IOs that have not migrated to SDF

 - I wonder if the reason for this API is that the class name and options are what is shipped to workers, so the limited API makes serialization easy for them?
=> Yes, that and because DataSource is the entry point of the spark pipeline so it should not need to receive more than user input conf, hence the String only support. But we are not users but DAG translators hence our need to pass more complex objects than Strings.

 - As a total hack, you could serialize the Beam objects (maybe to portable protos) and pass that as a single "primitive type" option.
=> Yes, sure, it could work. Another hack would be to use ASM or ByteBuddy to "enhance" Spark classes but it is weak and risky :)

You definitely need someone from Spark more than someone from Beam for this issue. At this point, I've read the scaladocs enough that I think I'd dig into Spark's code to see what is going on and if there is a way that is more obviously right.
=> Yes this is what I tried but got no answer on the public spark MLs. Luckily I asked directly Ryan Blue of the Spark community. He kindly answered. I'm digging into Catalog and Spark plans to get a different instanciation mechanism.

Etienne

Kenn

On Tue, Dec 18, 2018 at 11:09 AM Etienne Chauchot <echauchot@xxxxxxxxxx> wrote:
Hi everyone, 

Does anyone have comments on this question?

Thanks
Etienne

Le vendredi 14 décembre 2018 à 10:37 +0100, Etienne Chauchot a écrit :
Hi guys,
I'm currently coding a POC on a new spark runner based on structured streaming and new DataSourceV2 API and I'm having an interrogation. Having found no pointers on the internet, I've asked the spark community with no luck. If anyone of you have knowledge about new Spark DataSourceV2 API, can you share thoughts?

Also I did not mention in the email but I did not find any way to get a reference on the automatically created DataSourceV2 instance, so I cannot lazy init the source either.

Thanks

Etienne

-------- Message transféré --------
De: Etienne Chauchot <echauchot@xxxxxxxxxx>
Objet: [Apache Beam] Custom DataSourceV2 instanciation: parameters passing and Encoders
Date: Tue, 11 Dec 2018 19:02:23 +0100

Hi Spark guys,

I'm Etienne Chauchot and I'm a committer on the Apache Beam project.

We have what we call runners. They are pieces of software that translate pipelines written using Beam API into pipelines that use native execution engine API. Currently, the Spark runner uses old RDD / DStream APIs.
I'm writing a new runner that will use structured streaming (but not continuous processing, and also no schema for now).

I am just starting. I'm currently trying to map our sources to yours. I'm targeting new DataSourceV2 API. It maps pretty well with Beam sources but I have a problem with instanciation of the custom source.
I searched for an answer in stack-overflow and user ML with no luck. I guess it is a too specific question:

When visiting Beam DAG I have access to Beam objects such as Source and Reader that I need to map to MicroBatchReader and InputPartitionReader.
As far as I understand, a custom DataSourceV2 is instantiated automatically by spark thanks to sparkSession.readStream().format(providerClassName) or similar code. The problem is that I can only pass options of primitive types + String so I cannot pass the Beam Source to DataSourceV2.
=> Is there a way to do so ?


Also I get as an output a Dataset<Row>. The Row contains an instance of Beam WindowedValue<T>, T is the type parameter of the Source. I do a map on the Dataset to transform it to a Dataset<WindowedValue<T>>. I have a question related to the Encoder: 
=> how to properly create an Encoder for the generic type WindowedValue<T> to use in the map?

Here is the code:

And more specially:

Thanks,

Etienne