osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Beam Schemas: current status


Thanks Reuven. That's an OK restriction. Apache Flink also requires non-final fields to be able to generate TypeInformation (~=Schema) from PoJos.

I agree that it's not very intuitive for Users.

I suppose it would work to assume a constructor with the same parameter order as the fields in the class. So if instantiation with the default constructor doesn't work, it would try to look up a constructor based on the fields of the class.

Perhaps too much magic, having a dedicated interface for construction is a more programmatic approach.

-Max

On 30.08.18 16:55, Reuven Lax wrote:
Max,

Nested Pojos are fully supported, as are nested array/collection and map types (e.g. if your Pojo contains List<OtherPojo>).

One limitation right now is that only mutable Pojos are supported. For example, the following Pojo would _not_ work, because the fields aren't mutable.

public class Pojo {
   public final String field;
}

This is an annoying restriction, because in practice Pojo types often have final fields. The reason for the restriction is that the most general way to create an instance of this Pojo (after decoding) is to instantiate the object and then set the fields one by one (I also assume that there's a default constructor).  I can remove this restriction if there is an appropriate constructor or builder interface that lets us construct the object directly.

Reuven

On Thu, Aug 30, 2018 at 6:51 AM Maximilian Michels <mxm@xxxxxxxxxx <mailto:mxm@xxxxxxxxxx>> wrote:

    That's a cool feature. Are there any limitations for the schema
    inference apart from being a Pojo/Bean? Does it supported nested PoJos,
    e.g. "wrapper.field"?

    -Max

    On 29.08.18 07:40, Reuven Lax wrote:
     > I wanted to send a quick note to the community about the current
    status
     > of schema-aware PCollections in Beam. As some might remember we
    had a
     > good discussion last year about the design of these schemas,
    involving
     > many folks from different parts of the community. I sent a summary
     > earlier this year explaining how schemas has been integrated into
    the
     > DoFn framework. Much has happened since then, and here are some
    of the
     > highlights.
     >
     >
     > First, I want to emphasize that all the schema-aware classes are
     > currently marked @Experimental. Nothing is set in stone yet, so
    if you
     > have questions about any decisions made, please start a discussion!
     >
     >
     >       SQL
     >
     > The first big milestone for schemas was porting all of BeamSQL to
    use
     > the framework, which was done in pr/5956. This was a lot of work,
     > exposed many bugs in the schema implementation, but now provides
    great
     > evidence that schemas work!
     >
     >
     >       Schema inference
     >
     > Beam can automatically infer schemas from Java POJOs (objects with
     > public fields) or JavaBean objects (objects with getter/setter
    methods).
     > Often you can do this by simply annotating the class. For example:
     >
     >
     > @DefaultSchema(JavaFieldSchema.class)
     >
     > publicclassUserEvent{
     >
     > publicStringuserId;
     >
     > publicLatLonglocation;
     >
     > PublicStringcountryCode;
     >
     > publiclongtransactionCost;
     >
     > publicdoubletransactionDuration;
     >
     > publicList<String>traceMessages;
     >
     > };
     >
     >
     > @DefaultSchema(JavaFieldSchema.class)
     >
     > publicclassLatLong{
     >
     > publicdoublelatitude;
     >
     > publicdoublelongitude;
     >
     > }
     >
     >
     > Beam will automatically infer schemas for these classes! So if
    you have
     > a PCollection<UserEvent>, it will automatically get the following
    schema:
     >
     >
     > UserEvent:
     >
     >   userId: STRING
     >
     >   location: ROW(LatLong)
     >
     >   countryCode: STRING
     >
     >   transactionCost: INT64
     >
     >   transactionDuration: DOUBLE
     >
     >   traceMessages: ARRAY[STRING]]
     >
     >
     > LatLong:
     >
     >   latitude: DOUBLE
     >
     >   longitude: DOUBLE
     >
     >
     > Now it’s not always possible to annotate the class like this (you
    may
     > not own the class definition), so you can also explicitly
    register this
     > using Pipeline:getSchemaRegistry:registerPOJO, and the same for
    JavaBeans.
     >
     >
     >       Coders
     >
     > Beam has a built-in coder for any schema-aware PCollection, largely
     > removing the need for users to care about coders. We generate
    low-level
     > bytecode (using ByteBuddy) to implement the coder for each
    schema, so
     > these coders are quite performant. This provides a better default
    coder
     > for Java POJO objects as well. In the past users were recommended
    to use
     > AvroCoder for pojos, which many have found inefficient. Now
    there’s a
     > more-efficient solution.
     >
     >
     >       Utility Transforms
     >
     > Schemas are already useful for implementers of extensions such as
    SQL,
     > but the goal was to use them to make Beam itself easier to use.
    To this
     > end, I’ve been implementing a library of transforms that allow
    for easy
     > manipulation of schema PCollections. So far Filter and Select are
     > merged, Group is about to go out for review (it needs some more
    javadoc
     > and unit tests), and Join is being developed but doesn’t yet have a
     > final interface.
     >
     >
     > Filter
     >
     > Given a PCollection<LatLong>, I want to keep only those in an
    area of
     > southern manhattan. Well this is easy!
     >
     >
     > PCollection<LatLong>manhattanEvents =allEvents.apply(Filter
     >
     > .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699)
     >
     > .whereFieldName("longitude",long->long<-73.969&&long>-74.747));
     >
     >
     > Schemas along with lambdas allows us to write this transform
     > declaratively. The Filter transform also allows you to register
    filter
     > functions that operate on multiple fields at the same time.
     >
     >
     > Select
     >
     > Let’s say that I don’t need all the fields in a row. For
    instance, I’m
     > only interested in the userId and traceMessages, and don’t care
    about
     > the location. In that case I can write the following:
     >
     >
     > PCollection<Row>selected
     > =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”));
     >
     >
     > BTW, Beam also keeps track of which fields are accessed by a
    transform
     > In the future we can automatically insert Selects in front of
    subgraphs
     > to drop fields that are not referenced in that subgraph.
     >
     >
     > Group
     >
     > Group is one of the more advanced transforms. In its most basic
    form, it
     > provides a convenient way to group by key:
     >
     >
     > PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry =
     >
     >     allEvents.apply(Group.byFieldNames(“userId”,“countryCode”));
     >
     >
     > Notice how much more concise this is than using GroupByKey directly!
     >
     >
     > The Group transform really starts to shine however when you start
     > specifying aggregations. You can aggregate any field (or fields) and
     > build up an output schema based on these aggregations. For example:
     >
     >
     > PCollection<KV<Row,Row>>aggregated =allEvents.apply(
     >
     > Group.byFieldNames(“userId”,“countryCode”)
     >
     > .aggregateField("cost",Sum.ofLongs(),"total_cost")
     >
     > .aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”)
     >
     >
    .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
     >
     >               “durationHistogram”)));
     >
     >
     > This will individually aggregate the specified fields of the
    input items
     > (by user and country), and generate an output schema for these
     > aggregations. In this case, the output schema will be the following:
     >
     >
     > AggregatedSchema:
     >
     >     total_cost: INT64
     >
     >     top_purchases: ARRAY[INT64]
     >
     >     durationHistogram: ARRAY[DOUBLE]
     >
     >
     > There are some more utility transforms I've written that are worth
     > looking at such as Convert (which can convert between user types
    that
     > share a schema) and Unnest (flattens nested schemas). There are also
     > some others such as Pivot that we should consider writing
     >
     >
     > There is still a lot to do. All the todo items are reflected in
    JIRA,
     > however here are some examples of current gaps:
     >
     >
     >   *
     >
     >     Support for read-only POJOs (those with final fields) and
    JavaBean
     >     (objects without setters).
     >
     >   *
     >
     >     Automatic schema inference from more Java types: protocol
    buffers,
     >     avro, AutoValue, etc.
     >
     >   *
     >
     >     Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
     >
     >   *
     >
     >     Support for JsonPath expressions so users can better express
    nested
     >     fields. E.g. support expressions of the form
     >     Select.fields(“field1.field2”, “field3.*”, “field4[0].field5”);
     >
     >   *
     >
     >     Schemas still need to be defined in our portability layer so they
     >     can be used cross language.
     >
     >
     > If anyone is interested in helping close these gaps, you'll be
    helping
     > make Beam a better, more-usable system!
     >
     > Reuven
     >