osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Beam Schemas: current status


On Fri, Aug 31, 2018 at 11:22 AM Maximilian Michels <mxm@xxxxxxxxxx> wrote:
Thanks Reuven. That's an OK restriction. Apache Flink also requires
non-final fields to be able to generate TypeInformation (~=Schema) from
PoJos.

I agree that it's not very intuitive for Users.

I suppose it would work to assume a constructor with the same parameter
order as the fields in the class. So if instantiation with the default
constructor doesn't work, it would try to look up a constructor based on
the fields of the class.

I think this would make a lot of sense, but it would require some assumptions (e.g. the declared field order is the same as the constructor argument order (and/or the schema order), especially if there are fields of the same type). Probably still worth doing, either under a more limited set of constraints (all fields are of a different type), or as opt-in. 
 
Perhaps too much magic, having a dedicated interface for construction is
a more programmatic approach.

-Max

On 30.08.18 16:55, Reuven Lax wrote:
> Max,
>
> Nested Pojos are fully supported, as are nested array/collection and map
> types (e.g. if your Pojo contains List<OtherPojo>).
>
> One limitation right now is that only mutable Pojos are supported. For
> example, the following Pojo would _not_ work, because the fields aren't
> mutable.
>
> public class Pojo {
>    public final String field;
> }
>
> This is an annoying restriction, because in practice Pojo types often
> have final fields. The reason for the restriction is that the most
> general way to create an instance of this Pojo (after decoding) is to
> instantiate the object and then set the fields one by one (I also assume
> that there's a default constructor).  I can remove this restriction if
> there is an appropriate constructor or builder interface that lets us
> construct the object directly.
>
> Reuven
>
> On Thu, Aug 30, 2018 at 6:51 AM Maximilian Michels <mxm@xxxxxxxxxx
> <mailto:mxm@xxxxxxxxxx>> wrote:
>
>     That's a cool feature. Are there any limitations for the schema
>     inference apart from being a Pojo/Bean? Does it supported nested PoJos,
>     e.g. "wrapper.field"?
>
>     -Max
>
>     On 29.08.18 07:40, Reuven Lax wrote:
>      > I wanted to send a quick note to the community about the current
>     status
>      > of schema-aware PCollections in Beam. As some might remember we
>     had a
>      > good discussion last year about the design of these schemas,
>     involving
>      > many folks from different parts of the community. I sent a summary
>      > earlier this year explaining how schemas has been integrated into
>     the
>      > DoFn framework. Much has happened since then, and here are some
>     of the
>      > highlights.
>      >
>      >
>      > First, I want to emphasize that all the schema-aware classes are
>      > currently marked @Experimental. Nothing is set in stone yet, so
>     if you
>      > have questions about any decisions made, please start a discussion!
>      >
>      >
>      >       SQL
>      >
>      > The first big milestone for schemas was porting all of BeamSQL to
>     use
>      > the framework, which was done in pr/5956. This was a lot of work,
>      > exposed many bugs in the schema implementation, but now provides
>     great
>      > evidence that schemas work!
>      >
>      >
>      >       Schema inference
>      >
>      > Beam can automatically infer schemas from Java POJOs (objects with
>      > public fields) or JavaBean objects (objects with getter/setter
>     methods).
>      > Often you can do this by simply annotating the class. For example:
>      >
>      >
>      > @DefaultSchema(JavaFieldSchema.class)
>      >
>      > publicclassUserEvent{
>      >
>      > publicStringuserId;
>      >
>      > publicLatLonglocation;
>      >
>      > PublicStringcountryCode;
>      >
>      > publiclongtransactionCost;
>      >
>      > publicdoubletransactionDuration;
>      >
>      > publicList<String>traceMessages;
>      >
>      > };
>      >
>      >
>      > @DefaultSchema(JavaFieldSchema.class)
>      >
>      > publicclassLatLong{
>      >
>      > publicdoublelatitude;
>      >
>      > publicdoublelongitude;
>      >
>      > }
>      >
>      >
>      > Beam will automatically infer schemas for these classes! So if
>     you have
>      > a PCollection<UserEvent>, it will automatically get the following
>     schema:
>      >
>      >
>      > UserEvent:
>      >
>      >   userId: STRING
>      >
>      >   location: ROW(LatLong)
>      >
>      >   countryCode: STRING
>      >
>      >   transactionCost: INT64
>      >
>      >   transactionDuration: DOUBLE
>      >
>      >   traceMessages: ARRAY[STRING]]
>      >
>      >
>      > LatLong:
>      >
>      >   latitude: DOUBLE
>      >
>      >   longitude: DOUBLE
>      >
>      >
>      > Now it’s not always possible to annotate the class like this (you
>     may
>      > not own the class definition), so you can also explicitly
>     register this
>      > using Pipeline:getSchemaRegistry:registerPOJO, and the same for
>     JavaBeans.
>      >
>      >
>      >       Coders
>      >
>      > Beam has a built-in coder for any schema-aware PCollection, largely
>      > removing the need for users to care about coders. We generate
>     low-level
>      > bytecode (using ByteBuddy) to implement the coder for each
>     schema, so
>      > these coders are quite performant. This provides a better default
>     coder
>      > for Java POJO objects as well. In the past users were recommended
>     to use
>      > AvroCoder for pojos, which many have found inefficient. Now
>     there’s a
>      > more-efficient solution.
>      >
>      >
>      >       Utility Transforms
>      >
>      > Schemas are already useful for implementers of extensions such as
>     SQL,
>      > but the goal was to use them to make Beam itself easier to use.
>     To this
>      > end, I’ve been implementing a library of transforms that allow
>     for easy
>      > manipulation of schema PCollections. So far Filter and Select are
>      > merged, Group is about to go out for review (it needs some more
>     javadoc
>      > and unit tests), and Join is being developed but doesn’t yet have a
>      > final interface.
>      >
>      >
>      > Filter
>      >
>      > Given a PCollection<LatLong>, I want to keep only those in an
>     area of
>      > southern manhattan. Well this is easy!
>      >
>      >
>      > PCollection<LatLong>manhattanEvents =allEvents.apply(Filter
>      >
>      > .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699)
>      >
>      > .whereFieldName("longitude",long->long<-73.969&&long>-74.747));
>      >
>      >
>      > Schemas along with lambdas allows us to write this transform
>      > declaratively. The Filter transform also allows you to register
>     filter
>      > functions that operate on multiple fields at the same time.
>      >
>      >
>      > Select
>      >
>      > Let’s say that I don’t need all the fields in a row. For
>     instance, I’m
>      > only interested in the userId and traceMessages, and don’t care
>     about
>      > the location. In that case I can write the following:
>      >
>      >
>      > PCollection<Row>selected
>      > =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”));
>      >
>      >
>      > BTW, Beam also keeps track of which fields are accessed by a
>     transform
>      > In the future we can automatically insert Selects in front of
>     subgraphs
>      > to drop fields that are not referenced in that subgraph.
>      >
>      >
>      > Group
>      >
>      > Group is one of the more advanced transforms. In its most basic
>     form, it
>      > provides a convenient way to group by key:
>      >
>      >
>      > PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry =
>      >
>      >     allEvents.apply(Group.byFieldNames(“userId”,“countryCode”));
>      >
>      >
>      > Notice how much more concise this is than using GroupByKey directly!
>      >
>      >
>      > The Group transform really starts to shine however when you start
>      > specifying aggregations. You can aggregate any field (or fields) and
>      > build up an output schema based on these aggregations. For example:
>      >
>      >
>      > PCollection<KV<Row,Row>>aggregated =allEvents.apply(
>      >
>      > Group.byFieldNames(“userId”,“countryCode”)
>      >
>      > .aggregateField("cost",Sum.ofLongs(),"total_cost")
>      >
>      > .aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”)
>      >
>      >
>     .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
>      >
>      >               “durationHistogram”)));
>      >
>      >
>      > This will individually aggregate the specified fields of the
>     input items
>      > (by user and country), and generate an output schema for these
>      > aggregations. In this case, the output schema will be the following:
>      >
>      >
>      > AggregatedSchema:
>      >
>      >     total_cost: INT64
>      >
>      >     top_purchases: ARRAY[INT64]
>      >
>      >     durationHistogram: ARRAY[DOUBLE]
>      >
>      >
>      > There are some more utility transforms I've written that are worth
>      > looking at such as Convert (which can convert between user types
>     that
>      > share a schema) and Unnest (flattens nested schemas). There are also
>      > some others such as Pivot that we should consider writing
>      >
>      >
>      > There is still a lot to do. All the todo items are reflected in
>     JIRA,
>      > however here are some examples of current gaps:
>      >
>      >
>      >   *
>      >
>      >     Support for read-only POJOs (those with final fields) and
>     JavaBean
>      >     (objects without setters).
>      >
>      >   *
>      >
>      >     Automatic schema inference from more Java types: protocol
>     buffers,
>      >     avro, AutoValue, etc.
>      >
>      >   *
>      >
>      >     Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
>      >
>      >   *
>      >
>      >     Support for JsonPath expressions so users can better express
>     nested
>      >     fields. E.g. support expressions of the form
>      >     Select.fields(“field1.field2”, “field3.*”, “field4[0].field5”);
>      >
>      >   *
>      >
>      >     Schemas still need to be defined in our portability layer so they
>      >     can be used cross language.
>      >
>      >
>      > If anyone is interested in helping close these gaps, you'll be
>     helping
>      > make Beam a better, more-usable system!
>      >
>      > Reuven
>      >
>