osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Beam Schemas: current status


It's probably worth publishing this update as a blog post. 

On Fri, Aug 31, 2018 at 9:58 PM Reuven Lax <relax@xxxxxxxxxx> wrote:
In addition, JdbcIO is another source that could integrate with schemas.

Another point of integration could be with shared schema registries (such as Kafka Schema Registry.). Any source can integrate with an external registry and use it to set the schema on the output.

Reuven

On Fri, Aug 31, 2018 at 12:44 PM Robert Bradshaw <robertwb@xxxxxxxxxx> wrote:
On Fri, Aug 31, 2018 at 5:01 PM Alexey Romanenko <aromanenko.dev@xxxxxxxxx> wrote:
Thanks Reuven for updating community with this, great work!

One small question about IO integration. What kind of integration this is supposed to be?

Two IOs that I would love to see benefit from schemas are BigQuery and Avro (and really any source that already has a schema, even CSVs). This of course would require querying the source and possibly some of the data at pipeline construction time (which has pros and cons). Both of these examples also require a schema when writing, which under this scheme could be implicit rather than (re)provided by the user. 
 
Are there any IOs that already benefit from Schemas support? 

On 31 Aug 2018, at 16:46, Reuven Lax <relax@xxxxxxxxxx> wrote:



On Fri, Aug 31, 2018 at 2:22 AM Maximilian Michels <mxm@xxxxxxxxxx> wrote:
Thanks Reuven. That's an OK restriction. Apache Flink also requires
non-final fields to be able to generate TypeInformation (~=Schema) from
PoJos.

I agree that it's not very intuitive for Users.

I suppose it would work to assume a constructor with the same parameter
order as the fields in the class. So if instantiation with the default
constructor doesn't work, it would try to look up a constructor based on
the fields of the class.

Actually Java reflection doesn't guarantee any guaranteed order of fields or methods when you query them. We would have to look a constructor with the exact same parameter names as the fields. Unfortunately users sometimes shorten the parameter names when creating such constructors, which would defeat this. We could also provide a set of dedicated annotations to allow the user to mark the constructor (or static builder method) used to create the class. 


Perhaps too much magic, having a dedicated interface for construction is
a more programmatic approach.

-Max

On 30.08.18 16:55, Reuven Lax wrote:
> Max,
>
> Nested Pojos are fully supported, as are nested array/collection and map
> types (e.g. if your Pojo contains List<OtherPojo>).
>
> One limitation right now is that only mutable Pojos are supported. For
> example, the following Pojo would _not_ work, because the fields aren't
> mutable.
>
> public class Pojo {
>    public final String field;
> }
>
> This is an annoying restriction, because in practice Pojo types often
> have final fields. The reason for the restriction is that the most
> general way to create an instance of this Pojo (after decoding) is to
> instantiate the object and then set the fields one by one (I also assume
> that there's a default constructor).  I can remove this restriction if
> there is an appropriate constructor or builder interface that lets us
> construct the object directly.
>
> Reuven
>
> On Thu, Aug 30, 2018 at 6:51 AM Maximilian Michels <mxm@xxxxxxxxxx
> <mailto:mxm@xxxxxxxxxx>> wrote:
>
>     That's a cool feature. Are there any limitations for the schema
>     inference apart from being a Pojo/Bean? Does it supported nested PoJos,
>     e.g. "wrapper.field"?
>
>     -Max
>
>     On 29.08.18 07:40, Reuven Lax wrote:
>      > I wanted to send a quick note to the community about the current
>     status
>      > of schema-aware PCollections in Beam. As some might remember we
>     had a
>      > good discussion last year about the design of these schemas,
>     involving
>      > many folks from different parts of the community. I sent a summary
>      > earlier this year explaining how schemas has been integrated into
>     the
>      > DoFn framework. Much has happened since then, and here are some
>     of the
>      > highlights.
>      >
>      >
>      > First, I want to emphasize that all the schema-aware classes are
>      > currently marked @Experimental. Nothing is set in stone yet, so
>     if you
>      > have questions about any decisions made, please start a discussion!
>      >
>      >
>      >       SQL
>      >
>      > The first big milestone for schemas was porting all of BeamSQL to
>     use
>      > the framework, which was done in pr/5956. This was a lot of work,
>      > exposed many bugs in the schema implementation, but now provides
>     great
>      > evidence that schemas work!
>      >
>      >
>      >       Schema inference
>      >
>      > Beam can automatically infer schemas from Java POJOs (objects with
>      > public fields) or JavaBean objects (objects with getter/setter
>     methods).
>      > Often you can do this by simply annotating the class. For example:
>      >
>      >
>      > @DefaultSchema(JavaFieldSchema.class)
>      >
>      > publicclassUserEvent{
>      >
>      > publicStringuserId;
>      >
>      > publicLatLonglocation;
>      >
>      > PublicStringcountryCode;
>      >
>      > publiclongtransactionCost;
>      >
>      > publicdoubletransactionDuration;
>      >
>      > publicList<String>traceMessages;
>      >
>      > };
>      >
>      >
>      > @DefaultSchema(JavaFieldSchema.class)
>      >
>      > publicclassLatLong{
>      >
>      > publicdoublelatitude;
>      >
>      > publicdoublelongitude;
>      >
>      > }
>      >
>      >
>      > Beam will automatically infer schemas for these classes! So if
>     you have
>      > a PCollection<UserEvent>, it will automatically get the following
>     schema:
>      >
>      >
>      > UserEvent:
>      >
>      >   userId: STRING
>      >
>      >   location: ROW(LatLong)
>      >
>      >   countryCode: STRING
>      >
>      >   transactionCost: INT64
>      >
>      >   transactionDuration: DOUBLE
>      >
>      >   traceMessages: ARRAY[STRING]]
>      >
>      >
>      > LatLong:
>      >
>      >   latitude: DOUBLE
>      >
>      >   longitude: DOUBLE
>      >
>      >
>      > Now it’s not always possible to annotate the class like this (you
>     may
>      > not own the class definition), so you can also explicitly
>     register this
>      > using Pipeline:getSchemaRegistry:registerPOJO, and the same for
>     JavaBeans.
>      >
>      >
>      >       Coders
>      >
>      > Beam has a built-in coder for any schema-aware PCollection, largely
>      > removing the need for users to care about coders. We generate
>     low-level
>      > bytecode (using ByteBuddy) to implement the coder for each
>     schema, so
>      > these coders are quite performant. This provides a better default
>     coder
>      > for Java POJO objects as well. In the past users were recommended
>     to use
>      > AvroCoder for pojos, which many have found inefficient. Now
>     there’s a
>      > more-efficient solution.
>      >
>      >
>      >       Utility Transforms
>      >
>      > Schemas are already useful for implementers of extensions such as
>     SQL,
>      > but the goal was to use them to make Beam itself easier to use.
>     To this
>      > end, I’ve been implementing a library of transforms that allow
>     for easy
>      > manipulation of schema PCollections. So far Filter and Select are
>      > merged, Group is about to go out for review (it needs some more
>     javadoc
>      > and unit tests), and Join is being developed but doesn’t yet have a
>      > final interface.
>      >
>      >
>      > Filter
>      >
>      > Given a PCollection<LatLong>, I want to keep only those in an
>     area of
>      > southern manhattan. Well this is easy!
>      >
>      >
>      > PCollection<LatLong>manhattanEvents =allEvents.apply(Filter
>      >
>      > .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699)
>      >
>      > .whereFieldName("longitude",long->long<-73.969&&long>-74.747));
>      >
>      >
>      > Schemas along with lambdas allows us to write this transform
>      > declaratively. The Filter transform also allows you to register
>     filter
>      > functions that operate on multiple fields at the same time.
>      >
>      >
>      > Select
>      >
>      > Let’s say that I don’t need all the fields in a row. For
>     instance, I’m
>      > only interested in the userId and traceMessages, and don’t care
>     about
>      > the location. In that case I can write the following:
>      >
>      >
>      > PCollection<Row>selected
>      > =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”));
>      >
>      >
>      > BTW, Beam also keeps track of which fields are accessed by a
>     transform
>      > In the future we can automatically insert Selects in front of
>     subgraphs
>      > to drop fields that are not referenced in that subgraph.
>      >
>      >
>      > Group
>      >
>      > Group is one of the more advanced transforms. In its most basic
>     form, it
>      > provides a convenient way to group by key:
>      >
>      >
>      > PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry =
>      >
>      >     allEvents.apply(Group.byFieldNames(“userId”,“countryCode”));
>      >
>      >
>      > Notice how much more concise this is than using GroupByKey directly!
>      >
>      >
>      > The Group transform really starts to shine however when you start
>      > specifying aggregations. You can aggregate any field (or fields) and
>      > build up an output schema based on these aggregations. For example:
>      >
>      >
>      > PCollection<KV<Row,Row>>aggregated =allEvents.apply(
>      >
>      > Group.byFieldNames(“userId”,“countryCode”)
>      >
>      > .aggregateField("cost",Sum.ofLongs(),"total_cost")
>      >
>      > .aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”)
>      >
>      >
>     .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
>      >
>      >               “durationHistogram”)));
>      >
>      >
>      > This will individually aggregate the specified fields of the
>     input items
>      > (by user and country), and generate an output schema for these
>      > aggregations. In this case, the output schema will be the following:
>      >
>      >
>      > AggregatedSchema:
>      >
>      >     total_cost: INT64
>      >
>      >     top_purchases: ARRAY[INT64]
>      >
>      >     durationHistogram: ARRAY[DOUBLE]
>      >
>      >
>      > There are some more utility transforms I've written that are worth
>      > looking at such as Convert (which can convert between user types
>     that
>      > share a schema) and Unnest (flattens nested schemas). There are also
>      > some others such as Pivot that we should consider writing
>      >
>      >
>      > There is still a lot to do. All the todo items are reflected in
>     JIRA,
>      > however here are some examples of current gaps:
>      >
>      >
>      >   *
>      >
>      >     Support for read-only POJOs (those with final fields) and
>     JavaBean
>      >     (objects without setters).
>      >
>      >   *
>      >
>      >     Automatic schema inference from more Java types: protocol
>     buffers,
>      >     avro, AutoValue, etc.
>      >
>      >   *
>      >
>      >     Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
>      >
>      >   *
>      >
>      >     Support for JsonPath expressions so users can better express
>     nested
>      >     fields. E.g. support expressions of the form
>      >     Select.fields(“field1.field2”, “field3.*”, “field4[0].field5”);
>      >
>      >   *
>      >
>      >     Schemas still need to be defined in our portability layer so they
>      >     can be used cross language.
>      >
>      >
>      > If anyone is interested in helping close these gaps, you'll be
>     helping
>      > make Beam a better, more-usable system!
>      >
>      > Reuven
>      >
>