osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Beam Schemas: current status


Good point with identical types. You definitely want to avoid the following:

class Pojo {
  final String param1;
  final String param2;

  Pojo(String param2, String param1) {
    this.param1 = param1;
    this.param2 = param2;
  }
}

This would change the Pojo after deserialization. So this should only do its magic if there is only one possible way to feed data to the constructor. That's why a dedicated interface would be the easier and safer way to opt-in.

On 31.08.18 11:27, Robert Bradshaw wrote:
On Fri, Aug 31, 2018 at 11:22 AM Maximilian Michels <mxm@xxxxxxxxxx <mailto:mxm@xxxxxxxxxx>> wrote:

    Thanks Reuven. That's an OK restriction. Apache Flink also requires
    non-final fields to be able to generate TypeInformation (~=Schema) from
    PoJos.

    I agree that it's not very intuitive for Users.

    I suppose it would work to assume a constructor with the same parameter
    order as the fields in the class. So if instantiation with the default
    constructor doesn't work, it would try to look up a constructor
    based on
    the fields of the class.


I think this would make a lot of sense, but it would require some assumptions (e.g. the declared field order is the same as the constructor argument order (and/or the schema order), especially if there are fields of the same type). Probably still worth doing, either under a more limited set of constraints (all fields are of a different type), or as opt-in.

    Perhaps too much magic, having a dedicated interface for
    construction is
    a more programmatic approach.

    -Max

    On 30.08.18 16:55, Reuven Lax wrote:
     > Max,
     >
     > Nested Pojos are fully supported, as are nested array/collection
    and map
     > types (e.g. if your Pojo contains List<OtherPojo>).
     >
     > One limitation right now is that only mutable Pojos are
    supported. For
     > example, the following Pojo would _not_ work, because the fields
    aren't
     > mutable.
     >
     > public class Pojo {
     >    public final String field;
     > }
     >
     > This is an annoying restriction, because in practice Pojo types
    often
     > have final fields. The reason for the restriction is that the most
     > general way to create an instance of this Pojo (after decoding)
    is to
     > instantiate the object and then set the fields one by one (I also
    assume
     > that there's a default constructor).  I can remove this
    restriction if
     > there is an appropriate constructor or builder interface that
    lets us
     > construct the object directly.
     >
     > Reuven
     >
     > On Thu, Aug 30, 2018 at 6:51 AM Maximilian Michels
    <mxm@xxxxxxxxxx <mailto:mxm@xxxxxxxxxx>
     > <mailto:mxm@xxxxxxxxxx <mailto:mxm@xxxxxxxxxx>>> wrote:
     >
     >     That's a cool feature. Are there any limitations for the schema
     >     inference apart from being a Pojo/Bean? Does it supported
    nested PoJos,
     >     e.g. "wrapper.field"?
     >
     >     -Max
     >
     >     On 29.08.18 07:40, Reuven Lax wrote:
     >      > I wanted to send a quick note to the community about the
    current
     >     status
     >      > of schema-aware PCollections in Beam. As some might
    remember we
     >     had a
     >      > good discussion last year about the design of these schemas,
     >     involving
     >      > many folks from different parts of the community. I sent a
    summary
     >      > earlier this year explaining how schemas has been
    integrated into
     >     the
     >      > DoFn framework. Much has happened since then, and here are
    some
     >     of the
     >      > highlights.
     >      >
     >      >
     >      > First, I want to emphasize that all the schema-aware
    classes are
     >      > currently marked @Experimental. Nothing is set in stone
    yet, so
     >     if you
     >      > have questions about any decisions made, please start a
    discussion!
     >      >
     >      >
     >      >       SQL
     >      >
     >      > The first big milestone for schemas was porting all of
    BeamSQL to
     >     use
     >      > the framework, which was done in pr/5956. This was a lot
    of work,
     >      > exposed many bugs in the schema implementation, but now
    provides
     >     great
     >      > evidence that schemas work!
     >      >
     >      >
     >      >       Schema inference
     >      >
     >      > Beam can automatically infer schemas from Java POJOs
    (objects with
     >      > public fields) or JavaBean objects (objects with getter/setter
     >     methods).
     >      > Often you can do this by simply annotating the class. For
    example:
     >      >
     >      >
     >      > @DefaultSchema(JavaFieldSchema.class)
     >      >
     >      > publicclassUserEvent{
     >      >
     >      > publicStringuserId;
     >      >
     >      > publicLatLonglocation;
     >      >
     >      > PublicStringcountryCode;
     >      >
     >      > publiclongtransactionCost;
     >      >
     >      > publicdoubletransactionDuration;
     >      >
     >      > publicList<String>traceMessages;
     >      >
     >      > };
     >      >
     >      >
     >      > @DefaultSchema(JavaFieldSchema.class)
     >      >
     >      > publicclassLatLong{
     >      >
     >      > publicdoublelatitude;
     >      >
     >      > publicdoublelongitude;
     >      >
     >      > }
     >      >
     >      >
     >      > Beam will automatically infer schemas for these classes! So if
     >     you have
     >      > a PCollection<UserEvent>, it will automatically get the
    following
     >     schema:
     >      >
     >      >
     >      > UserEvent:
     >      >
     >      >   userId: STRING
     >      >
     >      >   location: ROW(LatLong)
     >      >
     >      >   countryCode: STRING
     >      >
     >      >   transactionCost: INT64
     >      >
     >      >   transactionDuration: DOUBLE
     >      >
     >      >   traceMessages: ARRAY[STRING]]
     >      >
     >      >
     >      > LatLong:
     >      >
     >      >   latitude: DOUBLE
     >      >
     >      >   longitude: DOUBLE
     >      >
     >      >
     >      > Now it’s not always possible to annotate the class like
    this (you
     >     may
     >      > not own the class definition), so you can also explicitly
     >     register this
     >      > using Pipeline:getSchemaRegistry:registerPOJO, and the
    same for
     >     JavaBeans.
     >      >
     >      >
     >      >       Coders
     >      >
     >      > Beam has a built-in coder for any schema-aware
    PCollection, largely
     >      > removing the need for users to care about coders. We generate
     >     low-level
     >      > bytecode (using ByteBuddy) to implement the coder for each
     >     schema, so
     >      > these coders are quite performant. This provides a better
    default
     >     coder
     >      > for Java POJO objects as well. In the past users were
    recommended
     >     to use
     >      > AvroCoder for pojos, which many have found inefficient. Now
     >     there’s a
     >      > more-efficient solution.
     >      >
     >      >
     >      >       Utility Transforms
     >      >
     >      > Schemas are already useful for implementers of extensions
    such as
     >     SQL,
     >      > but the goal was to use them to make Beam itself easier to
    use.
     >     To this
     >      > end, I’ve been implementing a library of transforms that allow
     >     for easy
     >      > manipulation of schema PCollections. So far Filter and
    Select are
     >      > merged, Group is about to go out for review (it needs some
    more
     >     javadoc
     >      > and unit tests), and Join is being developed but doesn’t
    yet have a
     >      > final interface.
     >      >
     >      >
     >      > Filter
     >      >
     >      > Given a PCollection<LatLong>, I want to keep only those in an
     >     area of
     >      > southern manhattan. Well this is easy!
     >      >
     >      >
     >      > PCollection<LatLong>manhattanEvents =allEvents.apply(Filter
     >      >
     >      > .whereFieldName("latitude",lat ->lat <40.720&&lat >40.699)
     >      >
     >      >
    .whereFieldName("longitude",long->long<-73.969&&long>-74.747));
     >      >
     >      >
     >      > Schemas along with lambdas allows us to write this transform
     >      > declaratively. The Filter transform also allows you to
    register
     >     filter
     >      > functions that operate on multiple fields at the same time.
     >      >
     >      >
     >      > Select
     >      >
     >      > Let’s say that I don’t need all the fields in a row. For
     >     instance, I’m
     >      > only interested in the userId and traceMessages, and don’t
    care
     >     about
     >      > the location. In that case I can write the following:
     >      >
     >      >
     >      > PCollection<Row>selected
     >      > =allEvents.apply(Select.fieldNames(“userId”,“traceMessages”));
     >      >
     >      >
     >      > BTW, Beam also keeps track of which fields are accessed by a
     >     transform
     >      > In the future we can automatically insert Selects in front of
     >     subgraphs
     >      > to drop fields that are not referenced in that subgraph.
     >      >
     >      >
     >      > Group
     >      >
     >      > Group is one of the more advanced transforms. In its most
    basic
     >     form, it
     >      > provides a convenient way to group by key:
     >      >
     >      >
     >      > PCollection<KV<Row,Iterable<UserEvent>>byUserAndCountry =
     >      >
>      >    allEvents.apply(Group.byFieldNames(“userId”,“countryCode”));
     >      >
     >      >
     >      > Notice how much more concise this is than using GroupByKey
    directly!
     >      >
     >      >
     >      > The Group transform really starts to shine however when
    you start
     >      > specifying aggregations. You can aggregate any field (or
    fields) and
     >      > build up an output schema based on these aggregations. For
    example:
     >      >
     >      >
     >      > PCollection<KV<Row,Row>>aggregated =allEvents.apply(
     >      >
     >      > Group.byFieldNames(“userId”,“countryCode”)
     >      >
     >      > .aggregateField("cost",Sum.ofLongs(),"total_cost")
     >      >
     >      >
    .aggregateField("cost",Top.<Long>largestFn(10),“top_purchases”)
     >      >
     >      >
>  .aggregateField("transationDuration",ApproximateQuantilesCombineFn.create(21),
     >      >
     >      >               “durationHistogram”)));
     >      >
     >      >
     >      > This will individually aggregate the specified fields of the
     >     input items
     >      > (by user and country), and generate an output schema for these
     >      > aggregations. In this case, the output schema will be the
    following:
     >      >
     >      >
     >      > AggregatedSchema:
     >      >
     >      >     total_cost: INT64
     >      >
     >      >     top_purchases: ARRAY[INT64]
     >      >
     >      >     durationHistogram: ARRAY[DOUBLE]
     >      >
     >      >
     >      > There are some more utility transforms I've written that
    are worth
     >      > looking at such as Convert (which can convert between user
    types
     >     that
     >      > share a schema) and Unnest (flattens nested schemas).
    There are also
     >      > some others such as Pivot that we should consider writing
     >      >
     >      >
     >      > There is still a lot to do. All the todo items are
    reflected in
     >     JIRA,
     >      > however here are some examples of current gaps:
     >      >
     >      >
     >      >   *
     >      >
     >      >     Support for read-only POJOs (those with final fields) and
     >     JavaBean
     >      >     (objects without setters).
     >      >
     >      >   *
     >      >
     >      >     Automatic schema inference from more Java types: protocol
     >     buffers,
     >      >     avro, AutoValue, etc.
     >      >
     >      >   *
     >      >
     >      >     Integration with sources (BigQueryIO, JdbcIO, AvroIO,
    etc.)
     >      >
     >      >   *
     >      >
     >      >     Support for JsonPath expressions so users can better
    express
     >     nested
     >      >     fields. E.g. support expressions of the form
     >      >     Select.fields(“field1.field2”, “field3.*”,
    “field4[0].field5”);
     >      >
     >      >   *
     >      >
     >      >     Schemas still need to be defined in our portability
    layer so they
     >      >     can be used cross language.
     >      >
     >      >
     >      > If anyone is interested in helping close these gaps, you'll be
     >     helping
     >      > make Beam a better, more-usable system!
     >      >
     >      > Reuven
     >      >
     >