osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [2/2] calcite git commit: [CALCITE-2528] Support Aggregates in ElasticSearch adapter (Andrei Sereda)


Thank you Julian, for correction.
 I wasn't sure if I should remove it since the original commit (and PR) was
created when I wasn't yet a committer. Going forward I will not include my
name for personal commits.

On Wed, Sep 19, 2018 at 10:34 AM Julian Hyde <jhyde.apache@xxxxxxxxx> wrote:

> Thanks Andrei. One thing: now you’re a committer your commit messages
> should not (must not) end with “(Andrei Sereda)”.
>
> Julian
>
> > On Sep 18, 2018, at 7:57 PM, sereda@xxxxxxxxxx wrote:
> >
> > [CALCITE-2528] Support Aggregates in ElasticSearch adapter (Andrei
> Sereda)
> >
> > Aggregate functions (count/sum/min/max/avg) are pushed down to ES.
> >
> > Add ElasticsearchAggregate relational expression to convert SQL into
> native Elastic aggregations (value_count, min, max etc.).
> > Enhance ElasticsearchTable to prepare correct aggregate ES JSON query.
> >
> > Create special classes to parse recursively elastic aggregation response
> or buckets (located in ElasticJson). They're inspired from existing Elastic
> high-level client source.
> >
> > For tests, make Json input more human friendly. Single quotes are
> accepted and fields can be unquoted (unless
> > they contain special characters). Also field with dots 'a.b.c' are
> automatically auto-expanded. This reduces JSON noise.
> >
> > Fix single projections which previously returned map (see [CALCITE-2485])
> >
> > Close apache/calcite#801
> > Close apache/calcite#822
> >
> >
> > Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
> > Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/79af1c9b
> > Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/79af1c9b
> > Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/79af1c9b
> >
> > Branch: refs/heads/master
> > Commit: 79af1c9ba735286653697deed3ff849b7c921fe4
> > Parents: ce05146
> > Author: Andrei Sereda <25229979+asereda-gs@xxxxxxxxxxxxxxxxxxxxxxxx>
> > Authored: Tue Sep 18 22:53:24 2018 -0400
> > Committer: Andrei Sereda <25229979+asereda-gs@xxxxxxxxxxxxxxxxxxxxxxxx>
> > Committed: Tue Sep 18 22:53:24 2018 -0400
> >
> > ----------------------------------------------------------------------
> > elasticsearch/pom.xml                           |   6 +
> > .../AbstractElasticsearchTable.java             | 150 -----
> > .../elasticsearch/ElasticsearchAggregate.java   | 165 +++++
> > .../elasticsearch/ElasticsearchConstants.java   |   9 -
> > .../elasticsearch/ElasticsearchEnumerators.java |  44 +-
> > .../elasticsearch/ElasticsearchFilter.java      |  17 +-
> > .../elasticsearch/ElasticsearchJson.java        | 614 +++++++++++++++++++
> > .../elasticsearch/ElasticsearchMethod.java      |  13 +-
> > .../elasticsearch/ElasticsearchProject.java     |   6 +-
> > .../adapter/elasticsearch/ElasticsearchRel.java |  66 +-
> > .../elasticsearch/ElasticsearchRules.java       |  38 +-
> > .../elasticsearch/ElasticsearchSchema.java      |  30 +-
> > .../elasticsearch/ElasticsearchSort.java        |  41 +-
> > .../elasticsearch/ElasticsearchTable.java       | 313 +++++++++-
> > .../elasticsearch/ElasticsearchTableScan.java   |   6 +-
> > .../ElasticsearchToEnumerableConverter.java     |  46 +-
> > .../elasticsearch/PredicateAnalyzer.java        |  10 +
> > .../adapter/elasticsearch/QueryBuilders.java    | 106 +++-
> > .../adapter/elasticsearch/AggregationTest.java  | 235 +++++++
> > .../adapter/elasticsearch/BooleanLogicTest.java |   1 +
> > .../elasticsearch/ElasticSearchAdapterTest.java | 309 +++++++---
> > .../elasticsearch/ElasticsearchJsonTest.java    | 183 ++++++
> > .../EmbeddedElasticsearchPolicy.java            |  41 +-
> > .../adapter/elasticsearch/Projection2Test.java  | 107 ++++
> > .../adapter/elasticsearch/ProjectionTest.java   |  37 +-
> > .../elasticsearch/QueryBuildersTest.java        |  63 ++
> > .../calcite/test/ElasticsearchChecker.java      |  90 ++-
> > 27 files changed, 2340 insertions(+), 406 deletions(-)
> > ----------------------------------------------------------------------
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/pom.xml
> > ----------------------------------------------------------------------
> > diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml
> > index e3a044d..4700fee 100644
> > --- a/elasticsearch/pom.xml
> > +++ b/elasticsearch/pom.xml
> > @@ -124,6 +124,12 @@ limitations under the License.
> >       <scope>test</scope>
> >     </dependency>
> >     <dependency>
> > +      <groupId>org.hamcrest</groupId>
> > +      <artifactId>hamcrest-core</artifactId>
> > +      <version>${hamcrest.version}</version>
> > +      <scope>test</scope>
> > +    </dependency>
> > +    <dependency>
> >       <groupId>org.slf4j</groupId>
> >       <artifactId>slf4j-api</artifactId>
> >     </dependency>
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java
> > deleted file mode 100644
> > index 1a0f6d0..0000000
> > ---
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java
> > +++ /dev/null
> > @@ -1,150 +0,0 @@
> > -/*
> > - * Licensed to the Apache Software Foundation (ASF) under one or more
> > - * contributor license agreements.  See the NOTICE file distributed with
> > - * this work for additional information regarding copyright ownership.
> > - * The ASF licenses this file to you under the Apache License, Version
> 2.0
> > - * (the "License"); you may not use this file except in compliance with
> > - * the License.  You may obtain a copy of the License at
> > - *
> > - * http://www.apache.org/licenses/LICENSE-2.0
> > - *
> > - * Unless required by applicable law or agreed to in writing, software
> > - * distributed under the License is distributed on an "AS IS" BASIS,
> > - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > - * See the License for the specific language governing permissions and
> > - * limitations under the License.
> > - */
> > -package org.apache.calcite.adapter.elasticsearch;
> > -
> > -import org.apache.calcite.adapter.java.AbstractQueryableTable;
> > -import org.apache.calcite.linq4j.Enumerable;
> > -import org.apache.calcite.linq4j.Enumerator;
> > -import org.apache.calcite.linq4j.QueryProvider;
> > -import org.apache.calcite.linq4j.Queryable;
> > -import org.apache.calcite.plan.RelOptCluster;
> > -import org.apache.calcite.plan.RelOptTable;
> > -import org.apache.calcite.rel.RelNode;
> > -import org.apache.calcite.rel.type.RelDataType;
> > -import org.apache.calcite.rel.type.RelDataTypeFactory;
> > -import org.apache.calcite.schema.SchemaPlus;
> > -import org.apache.calcite.schema.TranslatableTable;
> > -import org.apache.calcite.schema.impl.AbstractTableQueryable;
> > -import org.apache.calcite.sql.type.SqlTypeName;
> > -
> > -import com.fasterxml.jackson.databind.ObjectMapper;
> > -
> > -import java.util.List;
> > -import java.util.Map;
> > -import java.util.Objects;
> > -
> > -/**
> > - * Table based on an Elasticsearch type.
> > - */
> > -abstract class AbstractElasticsearchTable extends AbstractQueryableTable
> > -    implements TranslatableTable {
> > -
> > -  final String indexName;
> > -  final String typeName;
> > -  final ObjectMapper mapper;
> > -
> > -  /**
> > -   * Creates an ElasticsearchTable.
> > -   * @param indexName Elastic Search index
> > -   * @param typeName Elastic Search index type
> > -   * @param mapper Jackson API to parse (and created) JSON documents
> > -   */
> > -  AbstractElasticsearchTable(String indexName, String typeName,
> ObjectMapper mapper) {
> > -    super(Object[].class);
> > -    this.indexName = Objects.requireNonNull(indexName, "indexName");
> > -    this.typeName = Objects.requireNonNull(typeName, "typeName");
> > -    this.mapper = Objects.requireNonNull(mapper, "mapper");
> > -  }
> > -
> > -  @Override public String toString() {
> > -    return "ElasticsearchTable{" + indexName + "/" + typeName + "}";
> > -  }
> > -
> > -  public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
> > -    final RelDataType mapType = relDataTypeFactory.createMapType(
> > -        relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR),
> > -        relDataTypeFactory.createTypeWithNullability(
> > -            relDataTypeFactory.createSqlType(SqlTypeName.ANY),
> > -            true));
> > -    return relDataTypeFactory.builder().add("_MAP", mapType).build();
> > -  }
> > -
> > -  public <T> Queryable<T> asQueryable(QueryProvider queryProvider,
> SchemaPlus schema,
> > -      String tableName) {
> > -    return new ElasticsearchQueryable<>(queryProvider, schema, this,
> tableName);
> > -  }
> > -
> > -  public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable
> relOptTable) {
> > -    final RelOptCluster cluster = context.getCluster();
> > -    return new ElasticsearchTableScan(cluster,
> cluster.traitSetOf(ElasticsearchRel.CONVENTION),
> > -        relOptTable, this, null);
> > -  }
> > -
> > -  /**
> > -   * In ES 5.x scripted fields start with {@code params._source.foo}
> while in ES2.x
> > -   * {@code _source.foo}. Helper method to build correct query based on
> runtime version of elastic.
> > -   * Used to keep backwards compatibility with ES2.
> > -   *
> > -   * @see <a href="
> https://github.com/elastic/elasticsearch/issues/20068";>_source
> variable</a>
> > -   * @see <a href="
> https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-scripting-fields.html";>Scripted
> Fields</a>
> > -   * @return string to be used for scripted fields
> > -   */
> > -  protected abstract String scriptedFieldPrefix();
> > -
> > -  /** Executes a "find" operation on the underlying type.
> > -   *
> > -   * <p>For example,
> > -   * <code>client.prepareSearch(index).setTypes(type)
> > -   * .setSource("{\"fields\" : [\"state\"]}")</code></p>
> > -   *
> > -   * @param index Elasticsearch index
> > -   * @param ops List of operations represented as Json strings.
> > -   * @param fields List of fields to project; or null to return map
> > -   * @return Enumerator of results
> > -   */
> > -  protected abstract Enumerable<Object> find(String index, List<String>
> ops,
> > -      List<Map.Entry<String, Class>> fields);
> > -
> > -  /**
> > -   * Implementation of {@link Queryable} based on
> > -   * a {@link AbstractElasticsearchTable}.
> > -   *
> > -   * @param <T> element type
> > -   */
> > -  public static class ElasticsearchQueryable<T> extends
> AbstractTableQueryable<T> {
> > -    ElasticsearchQueryable(QueryProvider queryProvider, SchemaPlus
> schema,
> > -        AbstractElasticsearchTable table, String tableName) {
> > -      super(queryProvider, schema, table, tableName);
> > -    }
> > -
> > -    public Enumerator<T> enumerator() {
> > -      return null;
> > -    }
> > -
> > -    private String getIndex() {
> > -      return schema.unwrap(ElasticsearchSchema.class).getIndex();
> > -    }
> > -
> > -    private AbstractElasticsearchTable getTable() {
> > -      return (AbstractElasticsearchTable) table;
> > -    }
> > -
> > -    /** Called via code-generation.
> > -     * @param ops list of queries (as strings)
> > -     * @param fields projection
> > -     * @see ElasticsearchMethod#ELASTICSEARCH_QUERYABLE_FIND
> > -     * @return result as enumerable
> > -     */
> > -    @SuppressWarnings("UnusedDeclaration")
> > -    public Enumerable<Object> find(List<String> ops,
> > -        List<Map.Entry<String, Class>> fields) {
> > -      return getTable().find(getIndex(), ops, fields);
> > -    }
> > -  }
> > -}
> > -
> > -// End AbstractElasticsearchTable.java
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
> > new file mode 100644
> > index 0000000..9627aca
> > --- /dev/null
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
> > @@ -0,0 +1,165 @@
> > +/*
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements.  See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to you under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License.  You may obtain a copy of the License at
> > + *
> > + * http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +package org.apache.calcite.adapter.elasticsearch;
> > +
> > +import org.apache.calcite.plan.RelOptCluster;
> > +import org.apache.calcite.plan.RelOptCost;
> > +import org.apache.calcite.plan.RelOptPlanner;
> > +import org.apache.calcite.plan.RelTraitSet;
> > +import org.apache.calcite.rel.InvalidRelException;
> > +import org.apache.calcite.rel.RelNode;
> > +import org.apache.calcite.rel.core.Aggregate;
> > +import org.apache.calcite.rel.core.AggregateCall;
> > +import org.apache.calcite.rel.metadata.RelMetadataQuery;
> > +import org.apache.calcite.rel.type.RelDataType;
> > +import org.apache.calcite.rel.type.RelDataTypeField;
> > +import org.apache.calcite.sql.SqlKind;
> > +import org.apache.calcite.util.ImmutableBitSet;
> > +
> > +import java.util.ArrayList;
> > +import java.util.EnumSet;
> > +import java.util.List;
> > +import java.util.Locale;
> > +import java.util.Set;
> > +
> > +/**
> > + * Implementation of
> > + * {@link org.apache.calcite.rel.core.Aggregate} relational expression
> > + * for ElasticSearch.
> > + */
> > +public class ElasticsearchAggregate extends Aggregate implements
> ElasticsearchRel {
> > +
> > +  private static final Set<SqlKind> SUPPORTED_AGGREGATIONS =
> > +      EnumSet.of(SqlKind.COUNT, SqlKind.MAX, SqlKind.MIN, SqlKind.AVG,
> SqlKind.SUM);
> > +
> > +  /** Creates a ElasticsearchAggregate */
> > +  ElasticsearchAggregate(RelOptCluster cluster,
> > +      RelTraitSet traitSet,
> > +      RelNode input,
> > +      boolean indicator,
> > +      ImmutableBitSet groupSet,
> > +      List<ImmutableBitSet> groupSets,
> > +      List<AggregateCall> aggCalls) throws InvalidRelException  {
> > +    super(cluster, traitSet, input, indicator, groupSet, groupSets,
> aggCalls);
> > +
> > +    if (getConvention() != input.getConvention()) {
> > +      String message = String.format(Locale.ROOT, "%s != %s",
> getConvention(),
> > +          input.getConvention());
> > +      throw new AssertionError(message);
> > +    }
> > +
> > +    assert getConvention() == input.getConvention();
> > +    assert getConvention() == ElasticsearchRel.CONVENTION;
> > +    assert this.groupSets.size() == 1 : "Grouping sets not supported";
> > +
> > +    for (AggregateCall aggCall : aggCalls) {
> > +      if (aggCall.isDistinct()) {
> > +        throw new InvalidRelException("distinct aggregation not
> supported");
> > +      }
> > +
> > +      SqlKind kind = aggCall.getAggregation().getKind();
> > +      if (!SUPPORTED_AGGREGATIONS.contains(kind)) {
> > +        final String message = String.format(Locale.ROOT,
> > +            "Aggregation %s not supported (use one of %s)", kind,
> SUPPORTED_AGGREGATIONS);
> > +        throw new InvalidRelException(message);
> > +      }
> > +    }
> > +
> > +    if (getGroupType() != Group.SIMPLE) {
> > +      final String message = String.format(Locale.ROOT, "Only %s
> grouping is supported. "
> > +              + "Yours is %s", Group.SIMPLE, getGroupType());
> > +      throw new InvalidRelException(message);
> > +    }
> > +
> > +  }
> > +
> > +  @Override public Aggregate copy(RelTraitSet traitSet, RelNode input,
> boolean indicator,
> > +      ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets,
> > +      List<AggregateCall> aggCalls) {
> > +    try {
> > +      return new ElasticsearchAggregate(getCluster(), traitSet, input,
> > +          indicator, groupSet, groupSets,
> > +          aggCalls);
> > +    } catch (InvalidRelException e) {
> > +      throw new AssertionError(e);
> > +    }
> > +  }
> > +
> > +  @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
> RelMetadataQuery mq) {
> > +    return super.computeSelfCost(planner, mq).multiplyBy(0.1);
> > +  }
> > +
> > +  @Override public void implement(Implementor implementor) {
> > +    implementor.visitChild(0, getInput());
> > +    List<String> inputFields = fieldNames(getInput().getRowType());
> > +
> > +    for (int group : groupSet) {
> > +      implementor.addGroupBy(inputFields.get(group));
> > +    }
> > +
> > +    for (AggregateCall aggCall : aggCalls) {
> > +      List<String> names = new ArrayList<>();
> > +      for (int i : aggCall.getArgList()) {
> > +        names.add(inputFields.get(i));
> > +      }
> > +
> > +      final String name = names.isEmpty() ? ElasticsearchConstants.ID :
> names.get(0);
> > +
> > +      String op = String.format(Locale.ROOT, "\"%s\":{\"field\":
> \"%s\"}",
> > +          toElasticAggregate(aggCall),
> > +          name);
> > +
> > +      implementor.addAggregation(aggCall.getName(), op);
> > +    }
> > +  }
> > +
> > +  /**
> > +   * Most of the aggregations can be retrieved with single
> > +   * <a href="
> https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-stats-aggregation.html
> ">stats</a>
> > +   * function. But currently only one-to-one mapping is supported
> between sql agg and elastic
> > +   * aggregation.
> > +   */
> > +  private String toElasticAggregate(AggregateCall call) {
> > +    SqlKind kind = call.getAggregation().getKind();
> > +    switch (kind) {
> > +    case COUNT:
> > +      return call.isApproximate() ? "cardinality" : "value_count";
> > +    case SUM:
> > +      return "sum";
> > +    case MIN:
> > +      return "min";
> > +    case MAX:
> > +      return "max";
> > +    case AVG:
> > +      return "avg";
> > +    default:
> > +      throw new IllegalArgumentException("Unknown aggregation kind " +
> kind + " for " + call);
> > +    }
> > +  }
> > +
> > +  private List<String> fieldNames(RelDataType relDataType) {
> > +    List<String> names = new ArrayList<>();
> > +
> > +    for (RelDataTypeField rdtf : relDataType.getFieldList()) {
> > +      names.add(rdtf.getName());
> > +    }
> > +    return names;
> > +  }
> > +
> > +}
> > +
> > +// End ElasticsearchAggregate.java
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
> > index ed628cc..2c4c42c 100644
> > ---
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
> > @@ -30,18 +30,9 @@ interface ElasticsearchConstants {
> >   String FIELDS = "fields";
> >   String SOURCE_PAINLESS = "params._source";
> >   String SOURCE_GROOVY = "_source";
> > -  String SOURCE = SOURCE_GROOVY;
> >   String ID = "_id";
> >   String UID = "_uid";
> >
> > -  /* Aggregation pushdown operations supported */
> > -  String AGG_SUM = "SUM";
> > -  String AGG_SUM0 = "$SUM0";
> > -  String AGG_COUNT = "COUNT";
> > -  String AGG_MIN = "MIN";
> > -  String AGG_MAX = "MAX";
> > -  String AGG_AVG = "AVG";
> > -
> >   Set<String> META_COLUMNS = ImmutableSet.of(UID, ID, TYPE, INDEX);
> >
> > }
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
> > index d87de7e..16ac92d 100644
> > ---
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
> > @@ -26,27 +26,27 @@ import java.util.Map;
> >
> > /**
> >  * Util functions which convert
> > - * {@link
> org.apache.calcite.adapter.elasticsearch.ElasticsearchSearchResult.SearchHit}
> > + * {@link ElasticsearchJson.SearchHit}
> >  * into calcite specific return type (map, object[], list etc.)
> >  */
> > class ElasticsearchEnumerators {
> >
> >   private ElasticsearchEnumerators() {}
> >
> > -  private static Function1<ElasticsearchSearchResult.SearchHit, Map>
> mapGetter() {
> > -    return new Function1<ElasticsearchSearchResult.SearchHit, Map>() {
> > -      public Map apply(ElasticsearchSearchResult.SearchHit hits) {
> > +  private static Function1<ElasticsearchJson.SearchHit, Map>
> mapGetter() {
> > +    return new Function1<ElasticsearchJson.SearchHit, Map>() {
> > +      public Map apply(ElasticsearchJson.SearchHit hits) {
> >         return hits.sourceOrFields();
> >       }
> >     };
> >   }
> >
> > -  private static Function1<ElasticsearchSearchResult.SearchHit, Object>
> singletonGetter(
> > +  private static Function1<ElasticsearchJson.SearchHit, Object>
> singletonGetter(
> >       final String fieldName,
> >       final Class fieldClass) {
> > -    return new Function1<ElasticsearchSearchResult.SearchHit, Object>()
> {
> > -      public Object apply(ElasticsearchSearchResult.SearchHit hits) {
> > -        return convert(hits.sourceOrFields(), fieldClass);
> > +    return new Function1<ElasticsearchJson.SearchHit, Object>() {
> > +      public Object apply(ElasticsearchJson.SearchHit hits) {
> > +        return convert(hits.valueOrNull(fieldName), fieldClass);
> >       }
> >     };
> >   }
> > @@ -59,30 +59,38 @@ class ElasticsearchEnumerators {
> >    *
> >    * @return function that converts the search result into a generic
> array
> >    */
> > -  private static Function1<ElasticsearchSearchResult.SearchHit,
> Object[]> listGetter(
> > +  private static Function1<ElasticsearchJson.SearchHit, Object[]>
> listGetter(
> >       final List<Map.Entry<String, Class>> fields) {
> > -    return new Function1<ElasticsearchSearchResult.SearchHit,
> Object[]>() {
> > -      public Object[] apply(ElasticsearchSearchResult.SearchHit hit) {
> > +    return new Function1<ElasticsearchJson.SearchHit, Object[]>() {
> > +      public Object[] apply(ElasticsearchJson.SearchHit hit) {
> >         Object[] objects = new Object[fields.size()];
> >         for (int i = 0; i < fields.size(); i++) {
> >           final Map.Entry<String, Class> field = fields.get(i);
> >           final String name = field.getKey();
> >           final Class type = field.getValue();
> > -          objects[i] = convert(hit.value(name), type);
> > +          objects[i] = convert(hit.valueOrNull(name), type);
> >         }
> >         return objects;
> >       }
> >     };
> >   }
> >
> > -  static Function1<ElasticsearchSearchResult.SearchHit, Object> getter(
> > +  static Function1<ElasticsearchJson.SearchHit, Object> getter(
> >       List<Map.Entry<String, Class>> fields) {
> >     //noinspection unchecked
> > -    return fields == null
> > -      ? (Function1) mapGetter()
> > -      : fields.size() == 1
> > -      ? singletonGetter(fields.get(0).getKey(),
> fields.get(0).getValue())
> > -      : (Function1) listGetter(fields);
> > +    final Function1 getter;
> > +    if (fields == null || fields.size() == 1 &&
> "_MAP".equals(fields.get(0).getKey())) {
> > +      // select * from table
> > +      getter = mapGetter();
> > +    } else if (fields.size() == 1) {
> > +      // select foo from table
> > +      getter = singletonGetter(fields.get(0).getKey(),
> fields.get(0).getValue());
> > +    } else {
> > +      // select a, b, c from table
> > +      getter = listGetter(fields);
> > +    }
> > +
> > +    return getter;
> >   }
> >
> >   private static Object convert(Object o, Class clazz) {
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
> > index 4d187b1..c339671 100644
> > ---
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
> > @@ -23,7 +23,6 @@ import org.apache.calcite.plan.RelOptUtil;
> > import org.apache.calcite.plan.RelTraitSet;
> > import org.apache.calcite.rel.RelNode;
> > import org.apache.calcite.rel.core.Filter;
> > -import org.apache.calcite.rel.core.Project;
> > import org.apache.calcite.rel.metadata.RelMetadataQuery;
> > import org.apache.calcite.rex.RexCall;
> > import org.apache.calcite.rex.RexInputRef;
> > @@ -70,24 +69,13 @@ public class ElasticsearchFilter extends Filter
> implements ElasticsearchRel {
> >
> >   @Override public void implement(Implementor implementor) {
> >     implementor.visitChild(0, getInput());
> > -    List<String> fieldNames;
> > -    if (input instanceof Project) {
> > -      final List<RexNode> projects = ((Project) input).getProjects();
> > -      fieldNames = new ArrayList<>(projects.size());
> > -      for (RexNode project : projects) {
> > -        String name =
> project.accept(MapProjectionFieldVisitor.INSTANCE);
> > -        fieldNames.add(name);
> > -      }
> > -    } else {
> > -      fieldNames =
> ElasticsearchRules.elasticsearchFieldNames(getRowType());
> > -    }
> >     ObjectMapper mapper = implementor.elasticsearchTable.mapper;
> >     PredicateAnalyzerTranslator translator = new
> PredicateAnalyzerTranslator(mapper);
> >     try {
> >       implementor.add(translator.translateMatch(condition));
> >     } catch (IOException e) {
> >       throw new UncheckedIOException(e);
> > -    } catch (ExpressionNotAnalyzableException e) {
> > +    } catch (PredicateAnalyzer.ExpressionNotAnalyzableException e) {
> >       throw new RuntimeException(e);
> >     }
> >   }
> > @@ -103,7 +91,8 @@ public class ElasticsearchFilter extends Filter
> implements ElasticsearchRel {
> >       this.mapper = Objects.requireNonNull(mapper, "mapper");
> >     }
> >
> > -    String translateMatch(RexNode condition) throws IOException,
> ExpressionNotAnalyzableException {
> > +    String translateMatch(RexNode condition) throws IOException,
> > +        PredicateAnalyzer.ExpressionNotAnalyzableException {
> >
> >       StringWriter writer = new StringWriter();
> >       JsonGenerator generator =
> mapper.getFactory().createGenerator(writer);
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
> > new file mode 100644
> > index 0000000..7c80e82
> > --- /dev/null
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
> > @@ -0,0 +1,614 @@
> > +/*
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements.  See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to you under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License.  You may obtain a copy of the License at
> > + *
> > + * http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +package org.apache.calcite.adapter.elasticsearch;
> > +
> > +import com.fasterxml.jackson.annotation.JsonCreator;
> > +import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
> > +import com.fasterxml.jackson.annotation.JsonProperty;
> > +import com.fasterxml.jackson.core.JsonParser;
> > +import com.fasterxml.jackson.core.JsonProcessingException;
> > +import com.fasterxml.jackson.databind.DeserializationContext;
> > +import com.fasterxml.jackson.databind.JsonNode;
> > +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
> > +import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
> > +import com.fasterxml.jackson.databind.node.ArrayNode;
> > +import com.fasterxml.jackson.databind.node.JsonNodeFactory;
> > +import com.fasterxml.jackson.databind.node.ObjectNode;
> > +
> > +import java.io.IOException;
> > +import java.time.Duration;
> > +import java.util.ArrayList;
> > +import java.util.Arrays;
> > +import java.util.Collections;
> > +import java.util.HashSet;
> > +import java.util.Iterator;
> > +import java.util.LinkedHashMap;
> > +import java.util.List;
> > +import java.util.Locale;
> > +import java.util.Map;
> > +import java.util.Objects;
> > +import java.util.Set;
> > +import java.util.function.BiConsumer;
> > +import java.util.function.Consumer;
> > +import java.util.stream.StreamSupport;
> > +
> > +import static java.util.Collections.unmodifiableMap;
> > +
> > +/**
> > + * Internal objects (and deserializers) used to parse elastic search
> results
> > + * (which are in JSON format).
> > + *
> > + * <p>Since we're using basic row-level rest client http response has
> to be
> > + * processed manually using JSON (jackson) library.
> > + */
> > +class ElasticsearchJson {
> > +
> > +  /**
> > +   * Used as special aggregation key for missing values (documents
> which are missing a field).
> > +   * Buckets with that value are then converted to {@code null}s in
> flat tabular format.
> > +   * @see <a href="
> https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-sum-aggregation.html";>Missing
> Value</a>
> > +   */
> > +  static final JsonNode MISSING_VALUE =
> JsonNodeFactory.instance.textNode("__MISSING__");
> > +
> > +  private ElasticsearchJson() {}
> > +
> > +  /**
> > +   * Visits leaves of the aggregation where all values are stored.
> > +   */
> > +  static void visitValueNodes(Aggregations aggregations,
> Consumer<Map<String, Object>> consumer) {
> > +    Objects.requireNonNull(aggregations, "aggregations");
> > +    Objects.requireNonNull(consumer, "consumer");
> > +
> > +    List<Bucket> buckets = new ArrayList<>();
> > +
> > +    Map<RowKey, List<MultiValue>> rows = new LinkedHashMap<>();
> > +
> > +    BiConsumer<RowKey, MultiValue> cons = (r, v) ->
> > +        rows.computeIfAbsent(r, ignore -> new ArrayList<>()).add(v);
> > +    aggregations.forEach(a -> visitValueNodes(a, buckets, cons));
> > +    rows.forEach((k, v) -> {
> > +      Map<String, Object> row = new LinkedHashMap<>(k.keys);
> > +      v.forEach(val -> row.put(val.getName(), val.value()));
> > +      consumer.accept(row);
> > +    });
> > +  }
> > +
> > +  /**
> > +   * Identifies a calcite row (as in relational algebra)
> > +   */
> > +  private static class RowKey {
> > +    private final Map<String, Object> keys;
> > +    private final int hashCode;
> > +
> > +    private RowKey(final Map<String, Object> keys) {
> > +      this.keys = Objects.requireNonNull(keys, "keys");
> > +      this.hashCode = Objects.hashCode(keys);
> > +    }
> > +
> > +    private RowKey(List<Bucket> buckets) {
> > +      this(toMap(buckets));
> > +    }
> > +
> > +    private static Map<String, Object> toMap(Iterable<Bucket> buckets) {
> > +      return StreamSupport.stream(buckets.spliterator(), false)
> > +          .collect(LinkedHashMap::new,
> > +              (m, v) -> m.put(v.getName(), v.key()),
> > +              LinkedHashMap::putAll);
> > +    }
> > +
> > +    @Override public boolean equals(final Object o) {
> > +      if (this == o) {
> > +        return true;
> > +      }
> > +      if (o == null || getClass() != o.getClass()) {
> > +        return false;
> > +      }
> > +      final RowKey rowKey = (RowKey) o;
> > +      return hashCode == rowKey.hashCode
> > +          && Objects.equals(keys, rowKey.keys);
> > +    }
> > +
> > +    @Override public int hashCode() {
> > +      return this.hashCode;
> > +    }
> > +  }
> > +
> > +  private static void visitValueNodes(Aggregation aggregation,
> List<Bucket> parents,
> > +      BiConsumer<RowKey, MultiValue> consumer) {
> > +
> > +    if (aggregation instanceof MultiValue) {
> > +      // publish one value of the row
> > +      RowKey key = new RowKey(parents);
> > +      consumer.accept(key, (MultiValue) aggregation);
> > +      return;
> > +    }
> > +
> > +    if (aggregation instanceof Bucket) {
> > +      Bucket bucket = (Bucket) aggregation;
> > +      parents.add(bucket);
> > +      bucket.getAggregations().forEach(a -> visitValueNodes(a, parents,
> consumer));
> > +      parents.remove(parents.size() - 1);
> > +    } else if (aggregation instanceof HasAggregations) {
> > +      HasAggregations children = (HasAggregations) aggregation;
> > +      children.getAggregations().forEach(a -> visitValueNodes(a,
> parents, consumer));
> > +    } else if (aggregation instanceof MultiBucketsAggregation) {
> > +      MultiBucketsAggregation multi = (MultiBucketsAggregation)
> aggregation;
> > +      multi.buckets().forEach(b -> {
> > +        parents.add(b);
> > +        b.getAggregations().forEach(a -> visitValueNodes(a, parents,
> consumer));
> > +        parents.remove(parents.size() - 1);
> > +      });
> > +    }
> > +
> > +  }
> > +
> > +  /**
> > +   * Response from Elastic
> > +   */
> > +  @JsonIgnoreProperties(ignoreUnknown = true)
> > +  static class Result {
> > +    private final SearchHits hits;
> > +    private final Aggregations aggregations;
> > +    private final long took;
> > +
> > +    /**
> > +     * Constructor for this instance.
> > +     * @param hits list of matched documents
> > +     * @param took time taken (in took) for this query to execute
> > +     */
> > +    @JsonCreator
> > +    Result(@JsonProperty("hits") SearchHits hits,
> > +        @JsonProperty("aggregations") Aggregations aggregations,
> > +        @JsonProperty("took") long took) {
> > +      this.hits = Objects.requireNonNull(hits, "hits");
> > +      this.aggregations = aggregations;
> > +      this.took = took;
> > +    }
> > +
> > +    SearchHits searchHits() {
> > +      return hits;
> > +    }
> > +
> > +    Aggregations aggregations() {
> > +      return aggregations;
> > +    }
> > +
> > +    public Duration took() {
> > +      return Duration.ofMillis(took);
> > +    }
> > +
> > +  }
> > +
> > +  /**
> > +   * Similar to {@code SearchHits} in ES. Container for {@link
> SearchHit}
> > +   */
> > +  @JsonIgnoreProperties(ignoreUnknown = true)
> > +  static class SearchHits {
> > +
> > +    private final long total;
> > +    private final List<SearchHit> hits;
> > +
> > +    @JsonCreator
> > +    SearchHits(@JsonProperty("total")final long total,
> > +               @JsonProperty("hits") final List<SearchHit> hits) {
> > +      this.total = total;
> > +      this.hits = Objects.requireNonNull(hits, "hits");
> > +    }
> > +
> > +    public List<SearchHit> hits() {
> > +      return this.hits;
> > +    }
> > +
> > +    public long total() {
> > +      return total;
> > +    }
> > +
> > +  }
> > +
> > +  /**
> > +   * Concrete result record which matched the query. Similar to {@code
> SearchHit} in ES.
> > +   */
> > +  @JsonIgnoreProperties(ignoreUnknown = true)
> > +  static class SearchHit {
> > +    private final String id;
> > +    private final Map<String, Object> source;
> > +    private final Map<String, Object> fields;
> > +
> > +    @JsonCreator
> > +    SearchHit(@JsonProperty("_id") final String id,
> > +                      @JsonProperty("_source") final Map<String,
> Object> source,
> > +                      @JsonProperty("fields") final Map<String, Object>
> fields) {
> > +      this.id = Objects.requireNonNull(id, "id");
> > +
> > +      // both can't be null
> > +      if (source == null && fields == null) {
> > +        final String message = String.format(Locale.ROOT,
> > +            "Both '_source' and 'fields' are missing for %s", id);
> > +        throw new IllegalArgumentException(message);
> > +      }
> > +
> > +      // both can't be non-null
> > +      if (source != null && fields != null) {
> > +        final String message = String.format(Locale.ROOT,
> > +            "Both '_source' and 'fields' are populated (non-null) for
> %s", id);
> > +        throw new IllegalArgumentException(message);
> > +      }
> > +
> > +      this.source = source;
> > +      this.fields = fields;
> > +    }
> > +
> > +    /**
> > +     * Returns id of this hit (usually document id)
> > +     * @return unique id
> > +     */
> > +    public String id() {
> > +      return id;
> > +    }
> > +
> > +    Object valueOrNull(String name) {
> > +      Objects.requireNonNull(name, "name");
> > +      if (fields != null && fields.containsKey(name)) {
> > +        Object field = fields.get(name);
> > +        if (field instanceof Iterable) {
> > +          // return first element (or null)
> > +          Iterator<?> iter = ((Iterable<?>) field).iterator();
> > +          return iter.hasNext() ? iter.next() : null;
> > +        }
> > +
> > +        return field;
> > +      }
> > +
> > +      return valueFromPath(source, name);
> > +    }
> > +
> > +    /**
> > +     * Returns property from nested maps given a path like {@code
> a.b.c}.
> > +     * @param map current map
> > +     * @param path field path(s), optionally with dots ({@code a.b.c}).
> > +     * @return value located at path {@code path} or {@code null} if
> not found.
> > +     */
> > +    private static Object valueFromPath(Map<String, Object> map, String
> path) {
> > +      if (map == null) {
> > +        return null;
> > +      }
> > +
> > +      if (map.containsKey(path)) {
> > +        return map.get(path);
> > +      }
> > +
> > +      // maybe pattern of type a.b.c
> > +      final int index = path.indexOf('.');
> > +      if (index == -1) {
> > +        return null;
> > +      }
> > +
> > +      final String prefix = path.substring(0, index);
> > +      final String suffix = path.substring(index + 1);
> > +
> > +      Object maybeMap = map.get(prefix);
> > +      if (maybeMap instanceof Map) {
> > +        return valueFromPath((Map<String, Object>) maybeMap, suffix);
> > +      }
> > +
> > +      return null;
> > +    }
> > +
> > +    Map<String, Object> source() {
> > +      return source;
> > +    }
> > +
> > +    Map<String, Object> fields() {
> > +      return fields;
> > +    }
> > +
> > +    Map<String, Object> sourceOrFields() {
> > +      return source != null ? source : fields;
> > +    }
> > +  }
> > +
> > +
> > +  /**
> > +   * {@link Aggregation} container.
> > +   */
> > +  @JsonDeserialize(using = AggregationsDeserializer.class)
> > +  static class Aggregations implements Iterable<Aggregation> {
> > +
> > +    private final List<? extends Aggregation> aggregations;
> > +    private Map<String, Aggregation> aggregationsAsMap;
> > +
> > +    Aggregations(List<? extends Aggregation> aggregations) {
> > +      this.aggregations = Objects.requireNonNull(aggregations,
> "aggregations");
> > +    }
> > +
> > +    /**
> > +     * Iterates over the {@link Aggregation}s.
> > +     */
> > +    @Override public final Iterator<Aggregation> iterator() {
> > +      return asList().iterator();
> > +    }
> > +
> > +    /**
> > +     * The list of {@link Aggregation}s.
> > +     */
> > +    final List<Aggregation> asList() {
> > +      return Collections.unmodifiableList(aggregations);
> > +    }
> > +
> > +    /**
> > +     * Returns the {@link Aggregation}s keyed by aggregation name. Lazy
> init.
> > +     */
> > +    final Map<String, Aggregation> asMap() {
> > +      if (aggregationsAsMap == null) {
> > +        Map<String, Aggregation> map = new
> LinkedHashMap<>(aggregations.size());
> > +        for (Aggregation aggregation : aggregations) {
> > +          map.put(aggregation.getName(), aggregation);
> > +        }
> > +        this.aggregationsAsMap = unmodifiableMap(map);
> > +      }
> > +      return aggregationsAsMap;
> > +    }
> > +
> > +    /**
> > +     * Returns the aggregation that is associated with the specified
> name.
> > +     */
> > +    @SuppressWarnings("unchecked")
> > +    public final <A extends Aggregation> A get(String name) {
> > +      return (A) asMap().get(name);
> > +    }
> > +
> > +    @Override public final boolean equals(Object obj) {
> > +      if (obj == null || getClass() != obj.getClass()) {
> > +        return false;
> > +      }
> > +      return aggregations.equals(((Aggregations) obj).aggregations);
> > +    }
> > +
> > +    @Override public final int hashCode() {
> > +      return Objects.hash(getClass(), aggregations);
> > +    }
> > +
> > +  }
> > +
> > +  /**
> > +   * Identifies all aggregations
> > +   */
> > +  interface Aggregation {
> > +
> > +    /**
> > +     * @return The name of this aggregation.
> > +     */
> > +    String getName();
> > +
> > +  }
> > +
> > +  /**
> > +   * Allows traversing aggregations tree
> > +   */
> > +  interface HasAggregations {
> > +    Aggregations getAggregations();
> > +  }
> > +
> > +  /**
> > +   * An aggregation that returns multiple buckets
> > +   */
> > +  static class MultiBucketsAggregation implements Aggregation {
> > +
> > +    private final String name;
> > +    private final List<Bucket> buckets;
> > +
> > +    MultiBucketsAggregation(final String name,
> > +        final List<Bucket> buckets) {
> > +      this.name = name;
> > +      this.buckets = buckets;
> > +    }
> > +
> > +    /**
> > +     * @return  The buckets of this aggregation.
> > +     */
> > +    List<Bucket> buckets() {
> > +      return buckets;
> > +    }
> > +
> > +    @Override public String getName() {
> > +      return name;
> > +    }
> > +  }
> > +
> > +  /**
> > +   * A bucket represents a criteria to which all documents that fall in
> it adhere to.
> > +   * It is also uniquely identified
> > +   * by a key, and can potentially hold sub-aggregations computed over
> all documents in it.
> > +   */
> > +  static class Bucket implements HasAggregations, Aggregation {
> > +    private final Object key;
> > +    private final String name;
> > +    private final Aggregations aggregations;
> > +
> > +    Bucket(final Object key,
> > +        final String name,
> > +        final Aggregations aggregations) {
> > +      this.key = key; // key can be set after construction
> > +      this.name = Objects.requireNonNull(name, "name");
> > +      this.aggregations = Objects.requireNonNull(aggregations,
> "aggregations");
> > +    }
> > +
> > +    /**
> > +     * @return The key associated with the bucket
> > +     */
> > +    Object key() {
> > +      return key;
> > +    }
> > +
> > +    /**
> > +     * @return The key associated with the bucket as a string
> > +     */
> > +    String keyAsString() {
> > +      return Objects.toString(key());
> > +    }
> > +
> > +    /**
> > +     * @return  The sub-aggregations of this bucket
> > +     */
> > +    @Override public Aggregations getAggregations() {
> > +      return aggregations;
> > +    }
> > +
> > +    @Override public String getName() {
> > +      return name;
> > +    }
> > +  }
> > +
> > +  /**
> > +   * Multi value aggregatoin like
> > +   * <a href="
> https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-stats-aggregation.html
> ">Stats</a>
> > +   */
> > +  static class MultiValue implements Aggregation {
> > +    private final String name;
> > +    private final Map<String, Object> values;
> > +
> > +    MultiValue(final String name, final Map<String, Object> values) {
> > +      this.name = Objects.requireNonNull(name, "name");
> > +      this.values = Objects.requireNonNull(values, "values");
> > +    }
> > +
> > +    @Override public String getName() {
> > +      return name;
> > +    }
> > +
> > +    Map<String, Object> values() {
> > +      return values;
> > +    }
> > +
> > +    /**
> > +     * For single value. Returns single value represented by this leaf
> aggregation.
> > +     * @return value corresponding to {@code value}
> > +     */
> > +    Object value() {
> > +      if (!values().containsKey("value")) {
> > +        throw new IllegalStateException("'value' field not present in
> this aggregation");
> > +      }
> > +
> > +      return values().get("value");
> > +    }
> > +
> > +  }
> > +
> > +  /**
> > +   * Allows to de-serialize nested aggregation structures.
> > +   */
> > +  static class AggregationsDeserializer extends
> StdDeserializer<Aggregations> {
> > +
> > +    private static final Set<String> IGNORE_TOKENS = new
> HashSet<>(Arrays.asList("meta",
> > +        "buckets", "value", "values", "value_as_string", "doc_count",
> "key", "key_as_string"));
> > +
> > +    AggregationsDeserializer() {
> > +      super(Aggregations.class);
> > +    }
> > +
> > +    @Override public Aggregations deserialize(final JsonParser parser,
> > +        final DeserializationContext ctxt)
> > +        throws IOException  {
> > +
> > +      ObjectNode node = parser.getCodec().readTree(parser);
> > +      return parseAggregations(parser, node);
> > +    }
> > +
> > +    private static Aggregations parseAggregations(JsonParser parser,
> ObjectNode node)
> > +        throws JsonProcessingException {
> > +
> > +      List<Aggregation> aggregations = new ArrayList<>();
> > +
> > +      Iterable<Map.Entry<String, JsonNode>> iter = node::fields;
> > +      for (Map.Entry<String, JsonNode> entry : iter) {
> > +        final String name = entry.getKey();
> > +        final JsonNode value = entry.getValue();
> > +
> > +        Aggregation agg = null;
> > +        if (value.has("buckets")) {
> > +          agg = parseBuckets(parser, name, (ArrayNode)
> value.get("buckets"));
> > +        } else if (value.isObject() && !IGNORE_TOKENS.contains(name)) {
> > +          // leaf
> > +          agg = parseValue(parser, name, (ObjectNode) value);
> > +        }
> > +
> > +        if (agg != null) {
> > +          aggregations.add(agg);
> > +        }
> > +      }
> > +
> > +      return new Aggregations(aggregations);
> > +    }
> > +
> > +
> > +
> > +    private static MultiValue parseValue(JsonParser parser, String
> name, ObjectNode node)
> > +        throws JsonProcessingException {
> > +
> > +      return new MultiValue(name, parser.getCodec().treeToValue(node,
> Map.class));
> > +    }
> > +
> > +    private static Aggregation parseBuckets(JsonParser parser, String
> name, ArrayNode nodes)
> > +        throws JsonProcessingException {
> > +
> > +      List<Bucket> buckets = new ArrayList<>(nodes.size());
> > +      for (JsonNode b: nodes) {
> > +        buckets.add(parseBucket(parser, name, (ObjectNode) b));
> > +      }
> > +
> > +      return new MultiBucketsAggregation(name, buckets);
> > +    }
> > +
> > +    /**
> > +     * Determines if current key is a missing field key. Missing key is
> returned when document
> > +     * does not have pivoting attribute (example {@code GROUP BY
> _MAP['a.b.missing']}). It helps
> > +     * grouping documents which don't have a field. In relational
> algebra this
> > +     * would be {@code null}.
> > +     *
> > +     * @param key current {@code key} (usually string) as returned by ES
> > +     * @return {@code true} if this value
> > +     * @see #MISSING_VALUE
> > +     */
> > +    private static boolean isMissingBucket(JsonNode key) {
> > +      return MISSING_VALUE.equals(key);
> > +    }
> > +
> > +    private static Bucket parseBucket(JsonParser parser, String name,
> ObjectNode node)
> > +        throws JsonProcessingException  {
> > +
> > +      final JsonNode keyNode = node.get("key");
> > +      final Object key;
> > +      if (isMissingBucket(keyNode) || keyNode.isNull()) {
> > +        key = null;
> > +      } else if (keyNode.isTextual()) {
> > +        key = keyNode.textValue();
> > +      } else if (keyNode.isNumber()) {
> > +        key = keyNode.numberValue();
> > +      } else if (keyNode.isBoolean()) {
> > +        key = keyNode.booleanValue();
> > +      } else {
> > +        // don't usually expect keys to be Objects
> > +        key = parser.getCodec().treeToValue(node, Map.class);
> > +      }
> > +
> > +      return new Bucket(key, name, parseAggregations(parser, node));
> > +    }
> > +
> > +  }
> > +}
> > +
> > +// End ElasticsearchJson.java
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
> > index 72753e6..709156f 100644
> > ---
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
> > @@ -27,8 +27,17 @@ import java.util.List;
> >  * Builtin methods in the Elasticsearch adapter.
> >  */
> > enum ElasticsearchMethod {
> > -
> ELASTICSEARCH_QUERYABLE_FIND(AbstractElasticsearchTable.ElasticsearchQueryable.class,
> > -      "find", List.class, List.class);
> > +
> > +
> ELASTICSEARCH_QUERYABLE_FIND(ElasticsearchTable.ElasticsearchQueryable.class,
> > +      "find",
> > +      List.class, // ops  - projections and other stuff
> > +      List.class, // fields
> > +      List.class, // sort
> > +      List.class, // groupBy
> > +      List.class, // aggregations
> > +      Long.class, // offset
> > +      Long.class // fetch
> > +      );
> >
> >   public final Method method;
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
> > index 7d5811c..d0841c3 100644
> > ---
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
> > @@ -101,11 +101,7 @@ public class ElasticsearchProject extends Project
> implements ElasticsearchRel {
> >       query.append("\"script_fields\": {" + String.join(", ",
> scriptFields) + "}");
> >     }
> >
> > -    for (String opfield : implementor.list) {
> > -      if (opfield.startsWith("\"_source\"")) {
> > -        implementor.list.remove(opfield);
> > -      }
> > -    }
> > +    implementor.list.removeIf(l -> l.startsWith("\"_source\""));
> >     implementor.add(query.toString());
> >   }
> > }
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
> > index 436adf9..1dad691 100644
> > ---
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
> > @@ -18,10 +18,14 @@ package org.apache.calcite.adapter.elasticsearch;
> >
> > import org.apache.calcite.plan.Convention;
> > import org.apache.calcite.plan.RelOptTable;
> > +import org.apache.calcite.rel.RelFieldCollation;
> > import org.apache.calcite.rel.RelNode;
> > +import org.apache.calcite.util.Pair;
> >
> > import java.util.ArrayList;
> > import java.util.List;
> > +import java.util.Map;
> > +import java.util.Objects;
> >
> > /**
> >  * Relational expression that uses Elasticsearch calling convention.
> > @@ -39,19 +43,75 @@ public interface ElasticsearchRel extends RelNode {
> >    * {@link ElasticsearchRel} nodes into an Elasticsearch query.
> >    */
> >   class Implementor {
> > +
> >     final List<String> list = new ArrayList<>();
> >
> > +    /**
> > +     * Sorting clauses.
> > +     * @see <a href="
> https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-sort.html
> ">Sort</a>
> > +     */
> > +    final List<Map.Entry<String, RelFieldCollation.Direction>> sort =
> new ArrayList<>();
> > +
> > +    /**
> > +     * Elastic aggregation ({@code MIN / MAX / COUNT} etc.) statements
> (functions).
> > +     * @see <a href="
> https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html
> ">aggregations</a>
> > +     */
> > +    final List<Map.Entry<String, String>> aggregations = new
> ArrayList<>();
> > +
> > +    /**
> > +     * Allows bucketing documents together. Similar to {@code select
> ... from table group by field1}
> > +     * @see <a href="
> https://www.elastic.co/guide/en/elasticsearch/reference/6.3/search-aggregations-bucket.html";>Bucket
> Aggregrations</a>
> > +     */
> > +    final List<String> groupBy = new ArrayList<>();
> > +
> > +    /**
> > +     * Starting index (default {@code 0}). Equivalent to {@code start}
> in ES query.
> > +     * @see <a href="
> https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-from-size.html
> ">From/Size</a>
> > +     */
> > +    Long offset;
> > +
> > +    /**
> > +     * Number of records to return. Equivalent to {@code size} in ES
> query.
> > +     * @see <a href="
> https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-from-size.html
> ">From/Size</a>
> > +     */
> > +    Long fetch;
> > +
> >     RelOptTable table;
> > -    AbstractElasticsearchTable elasticsearchTable;
> > +    ElasticsearchTable elasticsearchTable;
> >
> > -    public void add(String findOp) {
> > +    void add(String findOp) {
> >       list.add(findOp);
> >     }
> >
> > -    public void visitChild(int ordinal, RelNode input) {
> > +    void addGroupBy(String field) {
> > +      Objects.requireNonNull(field, "field");
> > +      groupBy.add(field);
> > +    }
> > +
> > +    void addSort(String field, RelFieldCollation.Direction direction) {
> > +      Objects.requireNonNull(field, "field");
> > +      sort.add(new Pair<>(field, direction));
> > +    }
> > +
> > +    void addAggregation(String field, String expression) {
> > +      Objects.requireNonNull(field, "field");
> > +      Objects.requireNonNull(expression, "expression");
> > +      aggregations.add(new Pair<>(field, expression));
> > +    }
> > +
> > +    void offset(long offset) {
> > +      this.offset = offset;
> > +    }
> > +
> > +    void fetch(long fetch) {
> > +      this.fetch = fetch;
> > +    }
> > +
> > +    void visitChild(int ordinal, RelNode input) {
> >       assert ordinal == 0;
> >       ((ElasticsearchRel) input).implement(this);
> >     }
> > +
> >   }
> > }
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
> > index 97e934c..b442ddd 100644
> > ---
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
> > @@ -23,10 +23,12 @@ import org.apache.calcite.plan.Convention;
> > import org.apache.calcite.plan.RelOptRule;
> > import org.apache.calcite.plan.RelTrait;
> > import org.apache.calcite.plan.RelTraitSet;
> > +import org.apache.calcite.rel.InvalidRelException;
> > import org.apache.calcite.rel.RelCollations;
> > import org.apache.calcite.rel.RelNode;
> > import org.apache.calcite.rel.convert.ConverterRule;
> > import org.apache.calcite.rel.core.Sort;
> > +import org.apache.calcite.rel.logical.LogicalAggregate;
> > import org.apache.calcite.rel.logical.LogicalFilter;
> > import org.apache.calcite.rel.logical.LogicalProject;
> > import org.apache.calcite.rel.type.RelDataType;
> > @@ -53,7 +55,8 @@ class ElasticsearchRules {
> >   static final RelOptRule[] RULES = {
> >       ElasticsearchSortRule.INSTANCE,
> >       ElasticsearchFilterRule.INSTANCE,
> > -      ElasticsearchProjectRule.INSTANCE
> > +      ElasticsearchProjectRule.INSTANCE,
> > +      ElasticsearchAggregateRule.INSTANCE
> >   };
> >
> >   private ElasticsearchRules() {}
> > @@ -147,7 +150,7 @@ class ElasticsearchRules {
> >         }
> >       }
> >       throw new IllegalArgumentException("Translation of " +
> call.toString()
> > -        + "is not supported by ElasticsearchProject");
> > +        + " is not supported by ElasticsearchProject");
> >     }
> >
> >     List<String> visitList(List<RexNode> list) {
> > @@ -217,6 +220,37 @@ class ElasticsearchRules {
> >   }
> >
> >   /**
> > +   * Rule to convert an {@link
> org.apache.calcite.rel.logical.LogicalAggregate}
> > +   * to an {@link ElasticsearchAggregate}.
> > +   */
> > +  private static class ElasticsearchAggregateRule extends
> ElasticsearchConverterRule {
> > +    static final RelOptRule INSTANCE = new ElasticsearchAggregateRule();
> > +
> > +    private ElasticsearchAggregateRule() {
> > +      super(LogicalAggregate.class, Convention.NONE,
> ElasticsearchRel.CONVENTION,
> > +          "ElasticsearchAggregateRule");
> > +    }
> > +
> > +    public RelNode convert(RelNode rel) {
> > +      final LogicalAggregate agg = (LogicalAggregate) rel;
> > +      final RelTraitSet traitSet = agg.getTraitSet().replace(out);
> > +      try {
> > +        return new ElasticsearchAggregate(
> > +            rel.getCluster(),
> > +            traitSet,
> > +            convert(agg.getInput(), traitSet.simplify()),
> > +            agg.indicator,
> > +            agg.getGroupSet(),
> > +            agg.getGroupSets(),
> > +            agg.getAggCallList());
> > +      } catch (InvalidRelException e) {
> > +        return null;
> > +      }
> > +    }
> > +  }
> > +
> > +
> > +  /**
> >    * Rule to convert a {@link
> org.apache.calcite.rel.logical.LogicalProject}
> >    * to an {@link ElasticsearchProject}.
> >    */
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
> > index 1c630ad..80a94be 100644
> > ---
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
> > @@ -30,6 +30,7 @@ import org.elasticsearch.client.RestClient;
> > import java.io.IOException;
> > import java.io.InputStream;
> > import java.io.UncheckedIOException;
> > +import java.util.Collections;
> > import java.util.Locale;
> > import java.util.Map;
> > import java.util.Objects;
> > @@ -48,6 +49,8 @@ public class ElasticsearchSchema extends
> AbstractSchema {
> >
> >   private final ObjectMapper mapper;
> >
> > +  private final Map<String, Table> tableMap;
> > +
> >   /**
> >    * Allows schema to be instantiated from existing elastic search
> client.
> >    * This constructor is used in tests.
> > @@ -56,20 +59,33 @@ public class ElasticsearchSchema extends
> AbstractSchema {
> >    * @param index name of ES index
> >    */
> >   public ElasticsearchSchema(RestClient client, ObjectMapper mapper,
> String index) {
> > +    this(client, mapper, index, null);
> > +  }
> > +
> > +  public ElasticsearchSchema(RestClient client, ObjectMapper mapper,
> String index, String type) {
> >     super();
> >     this.client = Objects.requireNonNull(client, "client");
> >     this.mapper = Objects.requireNonNull(mapper, "mapper");
> >     this.index = Objects.requireNonNull(index, "index");
> > +    if (type == null) {
> > +      try {
> > +        this.tableMap = createTables(listTypesFromElastic());
> > +      } catch (IOException e) {
> > +        throw new UncheckedIOException("Couldn't get types for " +
> index, e);
> > +      }
> > +    } else {
> > +      this.tableMap = createTables(Collections.singleton(type));
> > +    }
> >   }
> >
> >   @Override protected Map<String, Table> getTableMap() {
> > +    return tableMap;
> > +  }
> > +
> > +  private Map<String, Table> createTables(Iterable<String> types) {
> >     final ImmutableMap.Builder<String, Table> builder =
> ImmutableMap.builder();
> > -    try {
> > -      for (String type: listTypes()) {
> > -        builder.put(type, new ElasticsearchTable(client, mapper, index,
> type));
> > -      }
> > -    } catch (IOException e) {
> > -      throw new UncheckedIOException("Failed to get types for " +
> index, e);
> > +    for (String type : types) {
> > +      builder.put(type, new ElasticsearchTable(client, mapper, index,
> type));
> >     }
> >     return builder.build();
> >   }
> > @@ -81,7 +97,7 @@ public class ElasticsearchSchema extends
> AbstractSchema {
> >    * @throws IOException for any IO related issues
> >    * @throws IllegalStateException if reply is not understood
> >    */
> > -  private Set<String> listTypes() throws IOException  {
> > +  private Set<String> listTypesFromElastic() throws IOException  {
> >     final String endpoint = "/" + index + "/_mapping";
> >     final Response response = client.performRequest("GET", endpoint);
> >     try (InputStream is = response.getEntity().getContent()) {
> >
> >
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
> > ----------------------------------------------------------------------
> > diff --git
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
> > index ed669aa..9078b72 100644
> > ---
> a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
> > +++
> b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
> > @@ -23,15 +23,12 @@ import org.apache.calcite.plan.RelTraitSet;
> > import org.apache.calcite.rel.RelCollation;
> > import org.apache.calcite.rel.RelFieldCollation;
> > import org.apache.calcite.rel.RelNode;
> > -import org.apache.calcite.rel.core.Project;
> > import org.apache.calcite.rel.core.Sort;
> > import org.apache.calcite.rel.metadata.RelMetadataQuery;
> > import org.apache.calcite.rel.type.RelDataTypeField;
> > import org.apache.calcite.rex.RexLiteral;
> > import org.apache.calcite.rex.RexNode;
> > -import org.apache.calcite.util.Util;
> >
> > -import java.util.ArrayList;
> > import java.util.List;
> >
> > /**
> > @@ -57,48 +54,22 @@ public class ElasticsearchSort extends Sort
> implements ElasticsearchRel {
> >
> >   @Override public void implement(Implementor implementor) {
> >     implementor.visitChild(0, getInput());
> > -    if (!collation.getFieldCollations().isEmpty()) {
> > -      final List<String> keys = new ArrayList<>();
> > -      if (input instanceof Project) {
> > -        final List<RexNode> projects = ((Project) input).getProjects();
> > +    final List<RelDataTypeField> fields = getRowType().getFieldList();
> >
> > -        for (RelFieldCollation fieldCollation :
> collation.getFieldCollations()) {
> > -          RexNode project =
> projects.get(fieldCollation.getFieldIndex());
> > -          String name =
> project.accept(MapProjectionFieldVisitor.INSTANCE);
> > -          keys.add(ElasticsearchRules.quote(name) + ": " +
> direction(fieldCollation));
> > -        }
> > -      } else {
> > -        final List<RelDataTypeField> fields =
> getRowType().getFieldList();
> > -
> > -        for (RelFieldCollation fieldCollation :
> collation.getFieldCollations()) {
> > -          final String name =
> fields.get(fieldCollation.getFieldIndex()).getName();
> > -          keys.add(ElasticsearchRules.quote(name) + ": " +
> direction(fieldCollation));
> > -        }
> > -      }
> > -
> > -      implementor.add("\"sort\": [ " + Util.toString(keys, "{", "}, {",
> "}") + "]");
> > +    for (RelFieldCollation fieldCollation :
> collation.getFieldCollations()) {
> > +      final String name =
> fields.get(fieldCollation.getFieldIndex()).getName();
> > +      implementor.addSort(name, fieldCollation.getDirection());
> >     }
> >
> >     if (offset != null) {
> > -      implementor.add("\"from\": " + ((RexLiteral) offset).getValue