osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [2/2] calcite git commit: [CALCITE-2528] Support Aggregates in ElasticSearch adapter (Andrei Sereda)


Thanks Andrei. One thing: now you’re a committer your commit messages should not (must not) end with “(Andrei Sereda)”. 

Julian

> On Sep 18, 2018, at 7:57 PM, sereda@xxxxxxxxxx wrote:
> 
> [CALCITE-2528] Support Aggregates in ElasticSearch adapter (Andrei Sereda)
> 
> Aggregate functions (count/sum/min/max/avg) are pushed down to ES.
> 
> Add ElasticsearchAggregate relational expression to convert SQL into native Elastic aggregations (value_count, min, max etc.).
> Enhance ElasticsearchTable to prepare correct aggregate ES JSON query.
> 
> Create special classes to parse recursively elastic aggregation response or buckets (located in ElasticJson). They're inspired from existing Elastic high-level client source.
> 
> For tests, make Json input more human friendly. Single quotes are accepted and fields can be unquoted (unless
> they contain special characters). Also field with dots 'a.b.c' are automatically auto-expanded. This reduces JSON noise.
> 
> Fix single projections which previously returned map (see [CALCITE-2485])
> 
> Close apache/calcite#801
> Close apache/calcite#822
> 
> 
> Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
> Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/79af1c9b
> Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/79af1c9b
> Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/79af1c9b
> 
> Branch: refs/heads/master
> Commit: 79af1c9ba735286653697deed3ff849b7c921fe4
> Parents: ce05146
> Author: Andrei Sereda <25229979+asereda-gs@xxxxxxxxxxxxxxxxxxxxxxxx>
> Authored: Tue Sep 18 22:53:24 2018 -0400
> Committer: Andrei Sereda <25229979+asereda-gs@xxxxxxxxxxxxxxxxxxxxxxxx>
> Committed: Tue Sep 18 22:53:24 2018 -0400
> 
> ----------------------------------------------------------------------
> elasticsearch/pom.xml                           |   6 +
> .../AbstractElasticsearchTable.java             | 150 -----
> .../elasticsearch/ElasticsearchAggregate.java   | 165 +++++
> .../elasticsearch/ElasticsearchConstants.java   |   9 -
> .../elasticsearch/ElasticsearchEnumerators.java |  44 +-
> .../elasticsearch/ElasticsearchFilter.java      |  17 +-
> .../elasticsearch/ElasticsearchJson.java        | 614 +++++++++++++++++++
> .../elasticsearch/ElasticsearchMethod.java      |  13 +-
> .../elasticsearch/ElasticsearchProject.java     |   6 +-
> .../adapter/elasticsearch/ElasticsearchRel.java |  66 +-
> .../elasticsearch/ElasticsearchRules.java       |  38 +-
> .../elasticsearch/ElasticsearchSchema.java      |  30 +-
> .../elasticsearch/ElasticsearchSort.java        |  41 +-
> .../elasticsearch/ElasticsearchTable.java       | 313 +++++++++-
> .../elasticsearch/ElasticsearchTableScan.java   |   6 +-
> .../ElasticsearchToEnumerableConverter.java     |  46 +-
> .../elasticsearch/PredicateAnalyzer.java        |  10 +
> .../adapter/elasticsearch/QueryBuilders.java    | 106 +++-
> .../adapter/elasticsearch/AggregationTest.java  | 235 +++++++
> .../adapter/elasticsearch/BooleanLogicTest.java |   1 +
> .../elasticsearch/ElasticSearchAdapterTest.java | 309 +++++++---
> .../elasticsearch/ElasticsearchJsonTest.java    | 183 ++++++
> .../EmbeddedElasticsearchPolicy.java            |  41 +-
> .../adapter/elasticsearch/Projection2Test.java  | 107 ++++
> .../adapter/elasticsearch/ProjectionTest.java   |  37 +-
> .../elasticsearch/QueryBuildersTest.java        |  63 ++
> .../calcite/test/ElasticsearchChecker.java      |  90 ++-
> 27 files changed, 2340 insertions(+), 406 deletions(-)
> ----------------------------------------------------------------------
> 
> 
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/pom.xml
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml
> index e3a044d..4700fee 100644
> --- a/elasticsearch/pom.xml
> +++ b/elasticsearch/pom.xml
> @@ -124,6 +124,12 @@ limitations under the License.
>       <scope>test</scope>
>     </dependency>
>     <dependency>
> +      <groupId>org.hamcrest</groupId>
> +      <artifactId>hamcrest-core</artifactId>
> +      <version>${hamcrest.version}</version>
> +      <scope>test</scope>
> +    </dependency>
> +    <dependency>
>       <groupId>org.slf4j</groupId>
>       <artifactId>slf4j-api</artifactId>
>     </dependency>
> 
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java
> deleted file mode 100644
> index 1a0f6d0..0000000
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java
> +++ /dev/null
> @@ -1,150 +0,0 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one or more
> - * contributor license agreements.  See the NOTICE file distributed with
> - * this work for additional information regarding copyright ownership.
> - * The ASF licenses this file to you under the Apache License, Version 2.0
> - * (the "License"); you may not use this file except in compliance with
> - * the License.  You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -package org.apache.calcite.adapter.elasticsearch;
> -
> -import org.apache.calcite.adapter.java.AbstractQueryableTable;
> -import org.apache.calcite.linq4j.Enumerable;
> -import org.apache.calcite.linq4j.Enumerator;
> -import org.apache.calcite.linq4j.QueryProvider;
> -import org.apache.calcite.linq4j.Queryable;
> -import org.apache.calcite.plan.RelOptCluster;
> -import org.apache.calcite.plan.RelOptTable;
> -import org.apache.calcite.rel.RelNode;
> -import org.apache.calcite.rel.type.RelDataType;
> -import org.apache.calcite.rel.type.RelDataTypeFactory;
> -import org.apache.calcite.schema.SchemaPlus;
> -import org.apache.calcite.schema.TranslatableTable;
> -import org.apache.calcite.schema.impl.AbstractTableQueryable;
> -import org.apache.calcite.sql.type.SqlTypeName;
> -
> -import com.fasterxml.jackson.databind.ObjectMapper;
> -
> -import java.util.List;
> -import java.util.Map;
> -import java.util.Objects;
> -
> -/**
> - * Table based on an Elasticsearch type.
> - */
> -abstract class AbstractElasticsearchTable extends AbstractQueryableTable
> -    implements TranslatableTable {
> -
> -  final String indexName;
> -  final String typeName;
> -  final ObjectMapper mapper;
> -
> -  /**
> -   * Creates an ElasticsearchTable.
> -   * @param indexName Elastic Search index
> -   * @param typeName Elastic Search index type
> -   * @param mapper Jackson API to parse (and created) JSON documents
> -   */
> -  AbstractElasticsearchTable(String indexName, String typeName, ObjectMapper mapper) {
> -    super(Object[].class);
> -    this.indexName = Objects.requireNonNull(indexName, "indexName");
> -    this.typeName = Objects.requireNonNull(typeName, "typeName");
> -    this.mapper = Objects.requireNonNull(mapper, "mapper");
> -  }
> -
> -  @Override public String toString() {
> -    return "ElasticsearchTable{" + indexName + "/" + typeName + "}";
> -  }
> -
> -  public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
> -    final RelDataType mapType = relDataTypeFactory.createMapType(
> -        relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR),
> -        relDataTypeFactory.createTypeWithNullability(
> -            relDataTypeFactory.createSqlType(SqlTypeName.ANY),
> -            true));
> -    return relDataTypeFactory.builder().add("_MAP", mapType).build();
> -  }
> -
> -  public <T> Queryable<T> asQueryable(QueryProvider queryProvider, SchemaPlus schema,
> -      String tableName) {
> -    return new ElasticsearchQueryable<>(queryProvider, schema, this, tableName);
> -  }
> -
> -  public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
> -    final RelOptCluster cluster = context.getCluster();
> -    return new ElasticsearchTableScan(cluster, cluster.traitSetOf(ElasticsearchRel.CONVENTION),
> -        relOptTable, this, null);
> -  }
> -
> -  /**
> -   * In ES 5.x scripted fields start with {@code params._source.foo} while in ES2.x
> -   * {@code _source.foo}. Helper method to build correct query based on runtime version of elastic.
> -   * Used to keep backwards compatibility with ES2.
> -   *
> -   * @see <a href="https://github.com/elastic/elasticsearch/issues/20068";>_source variable</a>
> -   * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-scripting-fields.html";>Scripted Fields</a>
> -   * @return string to be used for scripted fields
> -   */
> -  protected abstract String scriptedFieldPrefix();
> -
> -  /** Executes a "find" operation on the underlying type.
> -   *
> -   * <p>For example,
> -   * <code>client.prepareSearch(index).setTypes(type)
> -   * .setSource("{\"fields\" : [\"state\"]}")</code></p>
> -   *
> -   * @param index Elasticsearch index
> -   * @param ops List of operations represented as Json strings.
> -   * @param fields List of fields to project; or null to return map
> -   * @return Enumerator of results
> -   */
> -  protected abstract Enumerable<Object> find(String index, List<String> ops,
> -      List<Map.Entry<String, Class>> fields);
> -
> -  /**
> -   * Implementation of {@link Queryable} based on
> -   * a {@link AbstractElasticsearchTable}.
> -   *
> -   * @param <T> element type
> -   */
> -  public static class ElasticsearchQueryable<T> extends AbstractTableQueryable<T> {
> -    ElasticsearchQueryable(QueryProvider queryProvider, SchemaPlus schema,
> -        AbstractElasticsearchTable table, String tableName) {
> -      super(queryProvider, schema, table, tableName);
> -    }
> -
> -    public Enumerator<T> enumerator() {
> -      return null;
> -    }
> -
> -    private String getIndex() {
> -      return schema.unwrap(ElasticsearchSchema.class).getIndex();
> -    }
> -
> -    private AbstractElasticsearchTable getTable() {
> -      return (AbstractElasticsearchTable) table;
> -    }
> -
> -    /** Called via code-generation.
> -     * @param ops list of queries (as strings)
> -     * @param fields projection
> -     * @see ElasticsearchMethod#ELASTICSEARCH_QUERYABLE_FIND
> -     * @return result as enumerable
> -     */
> -    @SuppressWarnings("UnusedDeclaration")
> -    public Enumerable<Object> find(List<String> ops,
> -        List<Map.Entry<String, Class>> fields) {
> -      return getTable().find(getIndex(), ops, fields);
> -    }
> -  }
> -}
> -
> -// End AbstractElasticsearchTable.java
> 
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
> new file mode 100644
> index 0000000..9627aca
> --- /dev/null
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
> @@ -0,0 +1,165 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to you under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.calcite.adapter.elasticsearch;
> +
> +import org.apache.calcite.plan.RelOptCluster;
> +import org.apache.calcite.plan.RelOptCost;
> +import org.apache.calcite.plan.RelOptPlanner;
> +import org.apache.calcite.plan.RelTraitSet;
> +import org.apache.calcite.rel.InvalidRelException;
> +import org.apache.calcite.rel.RelNode;
> +import org.apache.calcite.rel.core.Aggregate;
> +import org.apache.calcite.rel.core.AggregateCall;
> +import org.apache.calcite.rel.metadata.RelMetadataQuery;
> +import org.apache.calcite.rel.type.RelDataType;
> +import org.apache.calcite.rel.type.RelDataTypeField;
> +import org.apache.calcite.sql.SqlKind;
> +import org.apache.calcite.util.ImmutableBitSet;
> +
> +import java.util.ArrayList;
> +import java.util.EnumSet;
> +import java.util.List;
> +import java.util.Locale;
> +import java.util.Set;
> +
> +/**
> + * Implementation of
> + * {@link org.apache.calcite.rel.core.Aggregate} relational expression
> + * for ElasticSearch.
> + */
> +public class ElasticsearchAggregate extends Aggregate implements ElasticsearchRel {
> +
> +  private static final Set<SqlKind> SUPPORTED_AGGREGATIONS =
> +      EnumSet.of(SqlKind.COUNT, SqlKind.MAX, SqlKind.MIN, SqlKind.AVG, SqlKind.SUM);
> +
> +  /** Creates a ElasticsearchAggregate */
> +  ElasticsearchAggregate(RelOptCluster cluster,
> +      RelTraitSet traitSet,
> +      RelNode input,
> +      boolean indicator,
> +      ImmutableBitSet groupSet,
> +      List<ImmutableBitSet> groupSets,
> +      List<AggregateCall> aggCalls) throws InvalidRelException  {
> +    super(cluster, traitSet, input, indicator, groupSet, groupSets, aggCalls);
> +
> +    if (getConvention() != input.getConvention()) {
> +      String message = String.format(Locale.ROOT, "%s != %s", getConvention(),
> +          input.getConvention());
> +      throw new AssertionError(message);
> +    }
> +
> +    assert getConvention() == input.getConvention();
> +    assert getConvention() == ElasticsearchRel.CONVENTION;
> +    assert this.groupSets.size() == 1 : "Grouping sets not supported";
> +
> +    for (AggregateCall aggCall : aggCalls) {
> +      if (aggCall.isDistinct()) {
> +        throw new InvalidRelException("distinct aggregation not supported");
> +      }
> +
> +      SqlKind kind = aggCall.getAggregation().getKind();
> +      if (!SUPPORTED_AGGREGATIONS.contains(kind)) {
> +        final String message = String.format(Locale.ROOT,
> +            "Aggregation %s not supported (use one of %s)", kind, SUPPORTED_AGGREGATIONS);
> +        throw new InvalidRelException(message);
> +      }
> +    }
> +
> +    if (getGroupType() != Group.SIMPLE) {
> +      final String message = String.format(Locale.ROOT, "Only %s grouping is supported. "
> +              + "Yours is %s", Group.SIMPLE, getGroupType());
> +      throw new InvalidRelException(message);
> +    }
> +
> +  }
> +
> +  @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator,
> +      ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets,
> +      List<AggregateCall> aggCalls) {
> +    try {
> +      return new ElasticsearchAggregate(getCluster(), traitSet, input,
> +          indicator, groupSet, groupSets,
> +          aggCalls);
> +    } catch (InvalidRelException e) {
> +      throw new AssertionError(e);
> +    }
> +  }
> +
> +  @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
> +    return super.computeSelfCost(planner, mq).multiplyBy(0.1);
> +  }
> +
> +  @Override public void implement(Implementor implementor) {
> +    implementor.visitChild(0, getInput());
> +    List<String> inputFields = fieldNames(getInput().getRowType());
> +
> +    for (int group : groupSet) {
> +      implementor.addGroupBy(inputFields.get(group));
> +    }
> +
> +    for (AggregateCall aggCall : aggCalls) {
> +      List<String> names = new ArrayList<>();
> +      for (int i : aggCall.getArgList()) {
> +        names.add(inputFields.get(i));
> +      }
> +
> +      final String name = names.isEmpty() ? ElasticsearchConstants.ID : names.get(0);
> +
> +      String op = String.format(Locale.ROOT, "\"%s\":{\"field\": \"%s\"}",
> +          toElasticAggregate(aggCall),
> +          name);
> +
> +      implementor.addAggregation(aggCall.getName(), op);
> +    }
> +  }
> +
> +  /**
> +   * Most of the aggregations can be retrieved with single
> +   * <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-stats-aggregation.html";>stats</a>
> +   * function. But currently only one-to-one mapping is supported between sql agg and elastic
> +   * aggregation.
> +   */
> +  private String toElasticAggregate(AggregateCall call) {
> +    SqlKind kind = call.getAggregation().getKind();
> +    switch (kind) {
> +    case COUNT:
> +      return call.isApproximate() ? "cardinality" : "value_count";
> +    case SUM:
> +      return "sum";
> +    case MIN:
> +      return "min";
> +    case MAX:
> +      return "max";
> +    case AVG:
> +      return "avg";
> +    default:
> +      throw new IllegalArgumentException("Unknown aggregation kind " + kind + " for " + call);
> +    }
> +  }
> +
> +  private List<String> fieldNames(RelDataType relDataType) {
> +    List<String> names = new ArrayList<>();
> +
> +    for (RelDataTypeField rdtf : relDataType.getFieldList()) {
> +      names.add(rdtf.getName());
> +    }
> +    return names;
> +  }
> +
> +}
> +
> +// End ElasticsearchAggregate.java
> 
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
> index ed628cc..2c4c42c 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
> @@ -30,18 +30,9 @@ interface ElasticsearchConstants {
>   String FIELDS = "fields";
>   String SOURCE_PAINLESS = "params._source";
>   String SOURCE_GROOVY = "_source";
> -  String SOURCE = SOURCE_GROOVY;
>   String ID = "_id";
>   String UID = "_uid";
> 
> -  /* Aggregation pushdown operations supported */
> -  String AGG_SUM = "SUM";
> -  String AGG_SUM0 = "$SUM0";
> -  String AGG_COUNT = "COUNT";
> -  String AGG_MIN = "MIN";
> -  String AGG_MAX = "MAX";
> -  String AGG_AVG = "AVG";
> -
>   Set<String> META_COLUMNS = ImmutableSet.of(UID, ID, TYPE, INDEX);
> 
> }
> 
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
> index d87de7e..16ac92d 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
> @@ -26,27 +26,27 @@ import java.util.Map;
> 
> /**
>  * Util functions which convert
> - * {@link org.apache.calcite.adapter.elasticsearch.ElasticsearchSearchResult.SearchHit}
> + * {@link ElasticsearchJson.SearchHit}
>  * into calcite specific return type (map, object[], list etc.)
>  */
> class ElasticsearchEnumerators {
> 
>   private ElasticsearchEnumerators() {}
> 
> -  private static Function1<ElasticsearchSearchResult.SearchHit, Map> mapGetter() {
> -    return new Function1<ElasticsearchSearchResult.SearchHit, Map>() {
> -      public Map apply(ElasticsearchSearchResult.SearchHit hits) {
> +  private static Function1<ElasticsearchJson.SearchHit, Map> mapGetter() {
> +    return new Function1<ElasticsearchJson.SearchHit, Map>() {
> +      public Map apply(ElasticsearchJson.SearchHit hits) {
>         return hits.sourceOrFields();
>       }
>     };
>   }
> 
> -  private static Function1<ElasticsearchSearchResult.SearchHit, Object> singletonGetter(
> +  private static Function1<ElasticsearchJson.SearchHit, Object> singletonGetter(
>       final String fieldName,
>       final Class fieldClass) {
> -    return new Function1<ElasticsearchSearchResult.SearchHit, Object>() {
> -      public Object apply(ElasticsearchSearchResult.SearchHit hits) {
> -        return convert(hits.sourceOrFields(), fieldClass);
> +    return new Function1<ElasticsearchJson.SearchHit, Object>() {
> +      public Object apply(ElasticsearchJson.SearchHit hits) {
> +        return convert(hits.valueOrNull(fieldName), fieldClass);
>       }
>     };
>   }
> @@ -59,30 +59,38 @@ class ElasticsearchEnumerators {
>    *
>    * @return function that converts the search result into a generic array
>    */
> -  private static Function1<ElasticsearchSearchResult.SearchHit, Object[]> listGetter(
> +  private static Function1<ElasticsearchJson.SearchHit, Object[]> listGetter(
>       final List<Map.Entry<String, Class>> fields) {
> -    return new Function1<ElasticsearchSearchResult.SearchHit, Object[]>() {
> -      public Object[] apply(ElasticsearchSearchResult.SearchHit hit) {
> +    return new Function1<ElasticsearchJson.SearchHit, Object[]>() {
> +      public Object[] apply(ElasticsearchJson.SearchHit hit) {
>         Object[] objects = new Object[fields.size()];
>         for (int i = 0; i < fields.size(); i++) {
>           final Map.Entry<String, Class> field = fields.get(i);
>           final String name = field.getKey();
>           final Class type = field.getValue();
> -          objects[i] = convert(hit.value(name), type);
> +          objects[i] = convert(hit.valueOrNull(name), type);
>         }
>         return objects;
>       }
>     };
>   }
> 
> -  static Function1<ElasticsearchSearchResult.SearchHit, Object> getter(
> +  static Function1<ElasticsearchJson.SearchHit, Object> getter(
>       List<Map.Entry<String, Class>> fields) {
>     //noinspection unchecked
> -    return fields == null
> -      ? (Function1) mapGetter()
> -      : fields.size() == 1
> -      ? singletonGetter(fields.get(0).getKey(), fields.get(0).getValue())
> -      : (Function1) listGetter(fields);
> +    final Function1 getter;
> +    if (fields == null || fields.size() == 1 && "_MAP".equals(fields.get(0).getKey())) {
> +      // select * from table
> +      getter = mapGetter();
> +    } else if (fields.size() == 1) {
> +      // select foo from table
> +      getter = singletonGetter(fields.get(0).getKey(), fields.get(0).getValue());
> +    } else {
> +      // select a, b, c from table
> +      getter = listGetter(fields);
> +    }
> +
> +    return getter;
>   }
> 
>   private static Object convert(Object o, Class clazz) {
> 
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
> index 4d187b1..c339671 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
> @@ -23,7 +23,6 @@ import org.apache.calcite.plan.RelOptUtil;
> import org.apache.calcite.plan.RelTraitSet;
> import org.apache.calcite.rel.RelNode;
> import org.apache.calcite.rel.core.Filter;
> -import org.apache.calcite.rel.core.Project;
> import org.apache.calcite.rel.metadata.RelMetadataQuery;
> import org.apache.calcite.rex.RexCall;
> import org.apache.calcite.rex.RexInputRef;
> @@ -70,24 +69,13 @@ public class ElasticsearchFilter extends Filter implements ElasticsearchRel {
> 
>   @Override public void implement(Implementor implementor) {
>     implementor.visitChild(0, getInput());
> -    List<String> fieldNames;
> -    if (input instanceof Project) {
> -      final List<RexNode> projects = ((Project) input).getProjects();
> -      fieldNames = new ArrayList<>(projects.size());
> -      for (RexNode project : projects) {
> -        String name = project.accept(MapProjectionFieldVisitor.INSTANCE);
> -        fieldNames.add(name);
> -      }
> -    } else {
> -      fieldNames = ElasticsearchRules.elasticsearchFieldNames(getRowType());
> -    }
>     ObjectMapper mapper = implementor.elasticsearchTable.mapper;
>     PredicateAnalyzerTranslator translator = new PredicateAnalyzerTranslator(mapper);
>     try {
>       implementor.add(translator.translateMatch(condition));
>     } catch (IOException e) {
>       throw new UncheckedIOException(e);
> -    } catch (ExpressionNotAnalyzableException e) {
> +    } catch (PredicateAnalyzer.ExpressionNotAnalyzableException e) {
>       throw new RuntimeException(e);
>     }
>   }
> @@ -103,7 +91,8 @@ public class ElasticsearchFilter extends Filter implements ElasticsearchRel {
>       this.mapper = Objects.requireNonNull(mapper, "mapper");
>     }
> 
> -    String translateMatch(RexNode condition) throws IOException, ExpressionNotAnalyzableException {
> +    String translateMatch(RexNode condition) throws IOException,
> +        PredicateAnalyzer.ExpressionNotAnalyzableException {
> 
>       StringWriter writer = new StringWriter();
>       JsonGenerator generator = mapper.getFactory().createGenerator(writer);
> 
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
> new file mode 100644
> index 0000000..7c80e82
> --- /dev/null
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
> @@ -0,0 +1,614 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to you under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.calcite.adapter.elasticsearch;
> +
> +import com.fasterxml.jackson.annotation.JsonCreator;
> +import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
> +import com.fasterxml.jackson.annotation.JsonProperty;
> +import com.fasterxml.jackson.core.JsonParser;
> +import com.fasterxml.jackson.core.JsonProcessingException;
> +import com.fasterxml.jackson.databind.DeserializationContext;
> +import com.fasterxml.jackson.databind.JsonNode;
> +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
> +import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
> +import com.fasterxml.jackson.databind.node.ArrayNode;
> +import com.fasterxml.jackson.databind.node.JsonNodeFactory;
> +import com.fasterxml.jackson.databind.node.ObjectNode;
> +
> +import java.io.IOException;
> +import java.time.Duration;
> +import java.util.ArrayList;
> +import java.util.Arrays;
> +import java.util.Collections;
> +import java.util.HashSet;
> +import java.util.Iterator;
> +import java.util.LinkedHashMap;
> +import java.util.List;
> +import java.util.Locale;
> +import java.util.Map;
> +import java.util.Objects;
> +import java.util.Set;
> +import java.util.function.BiConsumer;
> +import java.util.function.Consumer;
> +import java.util.stream.StreamSupport;
> +
> +import static java.util.Collections.unmodifiableMap;
> +
> +/**
> + * Internal objects (and deserializers) used to parse elastic search results
> + * (which are in JSON format).
> + *
> + * <p>Since we're using basic row-level rest client http response has to be
> + * processed manually using JSON (jackson) library.
> + */
> +class ElasticsearchJson {
> +
> +  /**
> +   * Used as special aggregation key for missing values (documents which are missing a field).
> +   * Buckets with that value are then converted to {@code null}s in flat tabular format.
> +   * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-sum-aggregation.html";>Missing Value</a>
> +   */
> +  static final JsonNode MISSING_VALUE = JsonNodeFactory.instance.textNode("__MISSING__");
> +
> +  private ElasticsearchJson() {}
> +
> +  /**
> +   * Visits leaves of the aggregation where all values are stored.
> +   */
> +  static void visitValueNodes(Aggregations aggregations, Consumer<Map<String, Object>> consumer) {
> +    Objects.requireNonNull(aggregations, "aggregations");
> +    Objects.requireNonNull(consumer, "consumer");
> +
> +    List<Bucket> buckets = new ArrayList<>();
> +
> +    Map<RowKey, List<MultiValue>> rows = new LinkedHashMap<>();
> +
> +    BiConsumer<RowKey, MultiValue> cons = (r, v) ->
> +        rows.computeIfAbsent(r, ignore -> new ArrayList<>()).add(v);
> +    aggregations.forEach(a -> visitValueNodes(a, buckets, cons));
> +    rows.forEach((k, v) -> {
> +      Map<String, Object> row = new LinkedHashMap<>(k.keys);
> +      v.forEach(val -> row.put(val.getName(), val.value()));
> +      consumer.accept(row);
> +    });
> +  }
> +
> +  /**
> +   * Identifies a calcite row (as in relational algebra)
> +   */
> +  private static class RowKey {
> +    private final Map<String, Object> keys;
> +    private final int hashCode;
> +
> +    private RowKey(final Map<String, Object> keys) {
> +      this.keys = Objects.requireNonNull(keys, "keys");
> +      this.hashCode = Objects.hashCode(keys);
> +    }
> +
> +    private RowKey(List<Bucket> buckets) {
> +      this(toMap(buckets));
> +    }
> +
> +    private static Map<String, Object> toMap(Iterable<Bucket> buckets) {
> +      return StreamSupport.stream(buckets.spliterator(), false)
> +          .collect(LinkedHashMap::new,
> +              (m, v) -> m.put(v.getName(), v.key()),
> +              LinkedHashMap::putAll);
> +    }
> +
> +    @Override public boolean equals(final Object o) {
> +      if (this == o) {
> +        return true;
> +      }
> +      if (o == null || getClass() != o.getClass()) {
> +        return false;
> +      }
> +      final RowKey rowKey = (RowKey) o;
> +      return hashCode == rowKey.hashCode
> +          && Objects.equals(keys, rowKey.keys);
> +    }
> +
> +    @Override public int hashCode() {
> +      return this.hashCode;
> +    }
> +  }
> +
> +  private static void visitValueNodes(Aggregation aggregation, List<Bucket> parents,
> +      BiConsumer<RowKey, MultiValue> consumer) {
> +
> +    if (aggregation instanceof MultiValue) {
> +      // publish one value of the row
> +      RowKey key = new RowKey(parents);
> +      consumer.accept(key, (MultiValue) aggregation);
> +      return;
> +    }
> +
> +    if (aggregation instanceof Bucket) {
> +      Bucket bucket = (Bucket) aggregation;
> +      parents.add(bucket);
> +      bucket.getAggregations().forEach(a -> visitValueNodes(a, parents, consumer));
> +      parents.remove(parents.size() - 1);
> +    } else if (aggregation instanceof HasAggregations) {
> +      HasAggregations children = (HasAggregations) aggregation;
> +      children.getAggregations().forEach(a -> visitValueNodes(a, parents, consumer));
> +    } else if (aggregation instanceof MultiBucketsAggregation) {
> +      MultiBucketsAggregation multi = (MultiBucketsAggregation) aggregation;
> +      multi.buckets().forEach(b -> {
> +        parents.add(b);
> +        b.getAggregations().forEach(a -> visitValueNodes(a, parents, consumer));
> +        parents.remove(parents.size() - 1);
> +      });
> +    }
> +
> +  }
> +
> +  /**
> +   * Response from Elastic
> +   */
> +  @JsonIgnoreProperties(ignoreUnknown = true)
> +  static class Result {
> +    private final SearchHits hits;
> +    private final Aggregations aggregations;
> +    private final long took;
> +
> +    /**
> +     * Constructor for this instance.
> +     * @param hits list of matched documents
> +     * @param took time taken (in took) for this query to execute
> +     */
> +    @JsonCreator
> +    Result(@JsonProperty("hits") SearchHits hits,
> +        @JsonProperty("aggregations") Aggregations aggregations,
> +        @JsonProperty("took") long took) {
> +      this.hits = Objects.requireNonNull(hits, "hits");
> +      this.aggregations = aggregations;
> +      this.took = took;
> +    }
> +
> +    SearchHits searchHits() {
> +      return hits;
> +    }
> +
> +    Aggregations aggregations() {
> +      return aggregations;
> +    }
> +
> +    public Duration took() {
> +      return Duration.ofMillis(took);
> +    }
> +
> +  }
> +
> +  /**
> +   * Similar to {@code SearchHits} in ES. Container for {@link SearchHit}
> +   */
> +  @JsonIgnoreProperties(ignoreUnknown = true)
> +  static class SearchHits {
> +
> +    private final long total;
> +    private final List<SearchHit> hits;
> +
> +    @JsonCreator
> +    SearchHits(@JsonProperty("total")final long total,
> +               @JsonProperty("hits") final List<SearchHit> hits) {
> +      this.total = total;
> +      this.hits = Objects.requireNonNull(hits, "hits");
> +    }
> +
> +    public List<SearchHit> hits() {
> +      return this.hits;
> +    }
> +
> +    public long total() {
> +      return total;
> +    }
> +
> +  }
> +
> +  /**
> +   * Concrete result record which matched the query. Similar to {@code SearchHit} in ES.
> +   */
> +  @JsonIgnoreProperties(ignoreUnknown = true)
> +  static class SearchHit {
> +    private final String id;
> +    private final Map<String, Object> source;
> +    private final Map<String, Object> fields;
> +
> +    @JsonCreator
> +    SearchHit(@JsonProperty("_id") final String id,
> +                      @JsonProperty("_source") final Map<String, Object> source,
> +                      @JsonProperty("fields") final Map<String, Object> fields) {
> +      this.id = Objects.requireNonNull(id, "id");
> +
> +      // both can't be null
> +      if (source == null && fields == null) {
> +        final String message = String.format(Locale.ROOT,
> +            "Both '_source' and 'fields' are missing for %s", id);
> +        throw new IllegalArgumentException(message);
> +      }
> +
> +      // both can't be non-null
> +      if (source != null && fields != null) {
> +        final String message = String.format(Locale.ROOT,
> +            "Both '_source' and 'fields' are populated (non-null) for %s", id);
> +        throw new IllegalArgumentException(message);
> +      }
> +
> +      this.source = source;
> +      this.fields = fields;
> +    }
> +
> +    /**
> +     * Returns id of this hit (usually document id)
> +     * @return unique id
> +     */
> +    public String id() {
> +      return id;
> +    }
> +
> +    Object valueOrNull(String name) {
> +      Objects.requireNonNull(name, "name");
> +      if (fields != null && fields.containsKey(name)) {
> +        Object field = fields.get(name);
> +        if (field instanceof Iterable) {
> +          // return first element (or null)
> +          Iterator<?> iter = ((Iterable<?>) field).iterator();
> +          return iter.hasNext() ? iter.next() : null;
> +        }
> +
> +        return field;
> +      }
> +
> +      return valueFromPath(source, name);
> +    }
> +
> +    /**
> +     * Returns property from nested maps given a path like {@code a.b.c}.
> +     * @param map current map
> +     * @param path field path(s), optionally with dots ({@code a.b.c}).
> +     * @return value located at path {@code path} or {@code null} if not found.
> +     */
> +    private static Object valueFromPath(Map<String, Object> map, String path) {
> +      if (map == null) {
> +        return null;
> +      }
> +
> +      if (map.containsKey(path)) {
> +        return map.get(path);
> +      }
> +
> +      // maybe pattern of type a.b.c
> +      final int index = path.indexOf('.');
> +      if (index == -1) {
> +        return null;
> +      }
> +
> +      final String prefix = path.substring(0, index);
> +      final String suffix = path.substring(index + 1);
> +
> +      Object maybeMap = map.get(prefix);
> +      if (maybeMap instanceof Map) {
> +        return valueFromPath((Map<String, Object>) maybeMap, suffix);
> +      }
> +
> +      return null;
> +    }
> +
> +    Map<String, Object> source() {
> +      return source;
> +    }
> +
> +    Map<String, Object> fields() {
> +      return fields;
> +    }
> +
> +    Map<String, Object> sourceOrFields() {
> +      return source != null ? source : fields;
> +    }
> +  }
> +
> +
> +  /**
> +   * {@link Aggregation} container.
> +   */
> +  @JsonDeserialize(using = AggregationsDeserializer.class)
> +  static class Aggregations implements Iterable<Aggregation> {
> +
> +    private final List<? extends Aggregation> aggregations;
> +    private Map<String, Aggregation> aggregationsAsMap;
> +
> +    Aggregations(List<? extends Aggregation> aggregations) {
> +      this.aggregations = Objects.requireNonNull(aggregations, "aggregations");
> +    }
> +
> +    /**
> +     * Iterates over the {@link Aggregation}s.
> +     */
> +    @Override public final Iterator<Aggregation> iterator() {
> +      return asList().iterator();
> +    }
> +
> +    /**
> +     * The list of {@link Aggregation}s.
> +     */
> +    final List<Aggregation> asList() {
> +      return Collections.unmodifiableList(aggregations);
> +    }
> +
> +    /**
> +     * Returns the {@link Aggregation}s keyed by aggregation name. Lazy init.
> +     */
> +    final Map<String, Aggregation> asMap() {
> +      if (aggregationsAsMap == null) {
> +        Map<String, Aggregation> map = new LinkedHashMap<>(aggregations.size());
> +        for (Aggregation aggregation : aggregations) {
> +          map.put(aggregation.getName(), aggregation);
> +        }
> +        this.aggregationsAsMap = unmodifiableMap(map);
> +      }
> +      return aggregationsAsMap;
> +    }
> +
> +    /**
> +     * Returns the aggregation that is associated with the specified name.
> +     */
> +    @SuppressWarnings("unchecked")
> +    public final <A extends Aggregation> A get(String name) {
> +      return (A) asMap().get(name);
> +    }
> +
> +    @Override public final boolean equals(Object obj) {
> +      if (obj == null || getClass() != obj.getClass()) {
> +        return false;
> +      }
> +      return aggregations.equals(((Aggregations) obj).aggregations);
> +    }
> +
> +    @Override public final int hashCode() {
> +      return Objects.hash(getClass(), aggregations);
> +    }
> +
> +  }
> +
> +  /**
> +   * Identifies all aggregations
> +   */
> +  interface Aggregation {
> +
> +    /**
> +     * @return The name of this aggregation.
> +     */
> +    String getName();
> +
> +  }
> +
> +  /**
> +   * Allows traversing aggregations tree
> +   */
> +  interface HasAggregations {
> +    Aggregations getAggregations();
> +  }
> +
> +  /**
> +   * An aggregation that returns multiple buckets
> +   */
> +  static class MultiBucketsAggregation implements Aggregation {
> +
> +    private final String name;
> +    private final List<Bucket> buckets;
> +
> +    MultiBucketsAggregation(final String name,
> +        final List<Bucket> buckets) {
> +      this.name = name;
> +      this.buckets = buckets;
> +    }
> +
> +    /**
> +     * @return  The buckets of this aggregation.
> +     */
> +    List<Bucket> buckets() {
> +      return buckets;
> +    }
> +
> +    @Override public String getName() {
> +      return name;
> +    }
> +  }
> +
> +  /**
> +   * A bucket represents a criteria to which all documents that fall in it adhere to.
> +   * It is also uniquely identified
> +   * by a key, and can potentially hold sub-aggregations computed over all documents in it.
> +   */
> +  static class Bucket implements HasAggregations, Aggregation {
> +    private final Object key;
> +    private final String name;
> +    private final Aggregations aggregations;
> +
> +    Bucket(final Object key,
> +        final String name,
> +        final Aggregations aggregations) {
> +      this.key = key; // key can be set after construction
> +      this.name = Objects.requireNonNull(name, "name");
> +      this.aggregations = Objects.requireNonNull(aggregations, "aggregations");
> +    }
> +
> +    /**
> +     * @return The key associated with the bucket
> +     */
> +    Object key() {
> +      return key;
> +    }
> +
> +    /**
> +     * @return The key associated with the bucket as a string
> +     */
> +    String keyAsString() {
> +      return Objects.toString(key());
> +    }
> +
> +    /**
> +     * @return  The sub-aggregations of this bucket
> +     */
> +    @Override public Aggregations getAggregations() {
> +      return aggregations;
> +    }
> +
> +    @Override public String getName() {
> +      return name;
> +    }
> +  }
> +
> +  /**
> +   * Multi value aggregatoin like
> +   * <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-stats-aggregation.html";>Stats</a>
> +   */
> +  static class MultiValue implements Aggregation {
> +    private final String name;
> +    private final Map<String, Object> values;
> +
> +    MultiValue(final String name, final Map<String, Object> values) {
> +      this.name = Objects.requireNonNull(name, "name");
> +      this.values = Objects.requireNonNull(values, "values");
> +    }
> +
> +    @Override public String getName() {
> +      return name;
> +    }
> +
> +    Map<String, Object> values() {
> +      return values;
> +    }
> +
> +    /**
> +     * For single value. Returns single value represented by this leaf aggregation.
> +     * @return value corresponding to {@code value}
> +     */
> +    Object value() {
> +      if (!values().containsKey("value")) {
> +        throw new IllegalStateException("'value' field not present in this aggregation");
> +      }
> +
> +      return values().get("value");
> +    }
> +
> +  }
> +
> +  /**
> +   * Allows to de-serialize nested aggregation structures.
> +   */
> +  static class AggregationsDeserializer extends StdDeserializer<Aggregations> {
> +
> +    private static final Set<String> IGNORE_TOKENS = new HashSet<>(Arrays.asList("meta",
> +        "buckets", "value", "values", "value_as_string", "doc_count", "key", "key_as_string"));
> +
> +    AggregationsDeserializer() {
> +      super(Aggregations.class);
> +    }
> +
> +    @Override public Aggregations deserialize(final JsonParser parser,
> +        final DeserializationContext ctxt)
> +        throws IOException  {
> +
> +      ObjectNode node = parser.getCodec().readTree(parser);
> +      return parseAggregations(parser, node);
> +    }
> +
> +    private static Aggregations parseAggregations(JsonParser parser, ObjectNode node)
> +        throws JsonProcessingException {
> +
> +      List<Aggregation> aggregations = new ArrayList<>();
> +
> +      Iterable<Map.Entry<String, JsonNode>> iter = node::fields;
> +      for (Map.Entry<String, JsonNode> entry : iter) {
> +        final String name = entry.getKey();
> +        final JsonNode value = entry.getValue();
> +
> +        Aggregation agg = null;
> +        if (value.has("buckets")) {
> +          agg = parseBuckets(parser, name, (ArrayNode) value.get("buckets"));
> +        } else if (value.isObject() && !IGNORE_TOKENS.contains(name)) {
> +          // leaf
> +          agg = parseValue(parser, name, (ObjectNode) value);
> +        }
> +
> +        if (agg != null) {
> +          aggregations.add(agg);
> +        }
> +      }
> +
> +      return new Aggregations(aggregations);
> +    }
> +
> +
> +
> +    private static MultiValue parseValue(JsonParser parser, String name, ObjectNode node)
> +        throws JsonProcessingException {
> +
> +      return new MultiValue(name, parser.getCodec().treeToValue(node, Map.class));
> +    }
> +
> +    private static Aggregation parseBuckets(JsonParser parser, String name, ArrayNode nodes)
> +        throws JsonProcessingException {
> +
> +      List<Bucket> buckets = new ArrayList<>(nodes.size());
> +      for (JsonNode b: nodes) {
> +        buckets.add(parseBucket(parser, name, (ObjectNode) b));
> +      }
> +
> +      return new MultiBucketsAggregation(name, buckets);
> +    }
> +
> +    /**
> +     * Determines if current key is a missing field key. Missing key is returned when document
> +     * does not have pivoting attribute (example {@code GROUP BY _MAP['a.b.missing']}). It helps
> +     * grouping documents which don't have a field. In relational algebra this
> +     * would be {@code null}.
> +     *
> +     * @param key current {@code key} (usually string) as returned by ES
> +     * @return {@code true} if this value
> +     * @see #MISSING_VALUE
> +     */
> +    private static boolean isMissingBucket(JsonNode key) {
> +      return MISSING_VALUE.equals(key);
> +    }
> +
> +    private static Bucket parseBucket(JsonParser parser, String name, ObjectNode node)
> +        throws JsonProcessingException  {
> +
> +      final JsonNode keyNode = node.get("key");
> +      final Object key;
> +      if (isMissingBucket(keyNode) || keyNode.isNull()) {
> +        key = null;
> +      } else if (keyNode.isTextual()) {
> +        key = keyNode.textValue();
> +      } else if (keyNode.isNumber()) {
> +        key = keyNode.numberValue();
> +      } else if (keyNode.isBoolean()) {
> +        key = keyNode.booleanValue();
> +      } else {
> +        // don't usually expect keys to be Objects
> +        key = parser.getCodec().treeToValue(node, Map.class);
> +      }
> +
> +      return new Bucket(key, name, parseAggregations(parser, node));
> +    }
> +
> +  }
> +}
> +
> +// End ElasticsearchJson.java
> 
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
> index 72753e6..709156f 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
> @@ -27,8 +27,17 @@ import java.util.List;
>  * Builtin methods in the Elasticsearch adapter.
>  */
> enum ElasticsearchMethod {
> -  ELASTICSEARCH_QUERYABLE_FIND(AbstractElasticsearchTable.ElasticsearchQueryable.class,
> -      "find", List.class, List.class);
> +
> +  ELASTICSEARCH_QUERYABLE_FIND(ElasticsearchTable.ElasticsearchQueryable.class,
> +      "find",
> +      List.class, // ops  - projections and other stuff
> +      List.class, // fields
> +      List.class, // sort
> +      List.class, // groupBy
> +      List.class, // aggregations
> +      Long.class, // offset
> +      Long.class // fetch
> +      );
> 
>   public final Method method;
> 
> 
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
> index 7d5811c..d0841c3 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
> @@ -101,11 +101,7 @@ public class ElasticsearchProject extends Project implements ElasticsearchRel {
>       query.append("\"script_fields\": {" + String.join(", ", scriptFields) + "}");
>     }
> 
> -    for (String opfield : implementor.list) {
> -      if (opfield.startsWith("\"_source\"")) {
> -        implementor.list.remove(opfield);
> -      }
> -    }
> +    implementor.list.removeIf(l -> l.startsWith("\"_source\""));
>     implementor.add(query.toString());
>   }
> }
> 
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
> index 436adf9..1dad691 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
> @@ -18,10 +18,14 @@ package org.apache.calcite.adapter.elasticsearch;
> 
> import org.apache.calcite.plan.Convention;
> import org.apache.calcite.plan.RelOptTable;
> +import org.apache.calcite.rel.RelFieldCollation;
> import org.apache.calcite.rel.RelNode;
> +import org.apache.calcite.util.Pair;
> 
> import java.util.ArrayList;
> import java.util.List;
> +import java.util.Map;
> +import java.util.Objects;
> 
> /**
>  * Relational expression that uses Elasticsearch calling convention.
> @@ -39,19 +43,75 @@ public interface ElasticsearchRel extends RelNode {
>    * {@link ElasticsearchRel} nodes into an Elasticsearch query.
>    */
>   class Implementor {
> +
>     final List<String> list = new ArrayList<>();
> 
> +    /**
> +     * Sorting clauses.
> +     * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-sort.html";>Sort</a>
> +     */
> +    final List<Map.Entry<String, RelFieldCollation.Direction>> sort = new ArrayList<>();
> +
> +    /**
> +     * Elastic aggregation ({@code MIN / MAX / COUNT} etc.) statements (functions).
> +     * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html";>aggregations</a>
> +     */
> +    final List<Map.Entry<String, String>> aggregations = new ArrayList<>();
> +
> +    /**
> +     * Allows bucketing documents together. Similar to {@code select ... from table group by field1}
> +     * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/6.3/search-aggregations-bucket.html";>Bucket Aggregrations</a>
> +     */
> +    final List<String> groupBy = new ArrayList<>();
> +
> +    /**
> +     * Starting index (default {@code 0}). Equivalent to {@code start} in ES query.
> +     * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-from-size.html";>From/Size</a>
> +     */
> +    Long offset;
> +
> +    /**
> +     * Number of records to return. Equivalent to {@code size} in ES query.
> +     * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-from-size.html";>From/Size</a>
> +     */
> +    Long fetch;
> +
>     RelOptTable table;
> -    AbstractElasticsearchTable elasticsearchTable;
> +    ElasticsearchTable elasticsearchTable;
> 
> -    public void add(String findOp) {
> +    void add(String findOp) {
>       list.add(findOp);
>     }
> 
> -    public void visitChild(int ordinal, RelNode input) {
> +    void addGroupBy(String field) {
> +      Objects.requireNonNull(field, "field");
> +      groupBy.add(field);
> +    }
> +
> +    void addSort(String field, RelFieldCollation.Direction direction) {
> +      Objects.requireNonNull(field, "field");
> +      sort.add(new Pair<>(field, direction));
> +    }
> +
> +    void addAggregation(String field, String expression) {
> +      Objects.requireNonNull(field, "field");
> +      Objects.requireNonNull(expression, "expression");
> +      aggregations.add(new Pair<>(field, expression));
> +    }
> +
> +    void offset(long offset) {
> +      this.offset = offset;
> +    }
> +
> +    void fetch(long fetch) {
> +      this.fetch = fetch;
> +    }
> +
> +    void visitChild(int ordinal, RelNode input) {
>       assert ordinal == 0;
>       ((ElasticsearchRel) input).implement(this);
>     }
> +
>   }
> }
> 
> 
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
> index 97e934c..b442ddd 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
> @@ -23,10 +23,12 @@ import org.apache.calcite.plan.Convention;
> import org.apache.calcite.plan.RelOptRule;
> import org.apache.calcite.plan.RelTrait;
> import org.apache.calcite.plan.RelTraitSet;
> +import org.apache.calcite.rel.InvalidRelException;
> import org.apache.calcite.rel.RelCollations;
> import org.apache.calcite.rel.RelNode;
> import org.apache.calcite.rel.convert.ConverterRule;
> import org.apache.calcite.rel.core.Sort;
> +import org.apache.calcite.rel.logical.LogicalAggregate;
> import org.apache.calcite.rel.logical.LogicalFilter;
> import org.apache.calcite.rel.logical.LogicalProject;
> import org.apache.calcite.rel.type.RelDataType;
> @@ -53,7 +55,8 @@ class ElasticsearchRules {
>   static final RelOptRule[] RULES = {
>       ElasticsearchSortRule.INSTANCE,
>       ElasticsearchFilterRule.INSTANCE,
> -      ElasticsearchProjectRule.INSTANCE
> +      ElasticsearchProjectRule.INSTANCE,
> +      ElasticsearchAggregateRule.INSTANCE
>   };
> 
>   private ElasticsearchRules() {}
> @@ -147,7 +150,7 @@ class ElasticsearchRules {
>         }
>       }
>       throw new IllegalArgumentException("Translation of " + call.toString()
> -        + "is not supported by ElasticsearchProject");
> +        + " is not supported by ElasticsearchProject");
>     }
> 
>     List<String> visitList(List<RexNode> list) {
> @@ -217,6 +220,37 @@ class ElasticsearchRules {
>   }
> 
>   /**
> +   * Rule to convert an {@link org.apache.calcite.rel.logical.LogicalAggregate}
> +   * to an {@link ElasticsearchAggregate}.
> +   */
> +  private static class ElasticsearchAggregateRule extends ElasticsearchConverterRule {
> +    static final RelOptRule INSTANCE = new ElasticsearchAggregateRule();
> +
> +    private ElasticsearchAggregateRule() {
> +      super(LogicalAggregate.class, Convention.NONE, ElasticsearchRel.CONVENTION,
> +          "ElasticsearchAggregateRule");
> +    }
> +
> +    public RelNode convert(RelNode rel) {
> +      final LogicalAggregate agg = (LogicalAggregate) rel;
> +      final RelTraitSet traitSet = agg.getTraitSet().replace(out);
> +      try {
> +        return new ElasticsearchAggregate(
> +            rel.getCluster(),
> +            traitSet,
> +            convert(agg.getInput(), traitSet.simplify()),
> +            agg.indicator,
> +            agg.getGroupSet(),
> +            agg.getGroupSets(),
> +            agg.getAggCallList());
> +      } catch (InvalidRelException e) {
> +        return null;
> +      }
> +    }
> +  }
> +
> +
> +  /**
>    * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalProject}
>    * to an {@link ElasticsearchProject}.
>    */
> 
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
> index 1c630ad..80a94be 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
> @@ -30,6 +30,7 @@ import org.elasticsearch.client.RestClient;
> import java.io.IOException;
> import java.io.InputStream;
> import java.io.UncheckedIOException;
> +import java.util.Collections;
> import java.util.Locale;
> import java.util.Map;
> import java.util.Objects;
> @@ -48,6 +49,8 @@ public class ElasticsearchSchema extends AbstractSchema {
> 
>   private final ObjectMapper mapper;
> 
> +  private final Map<String, Table> tableMap;
> +
>   /**
>    * Allows schema to be instantiated from existing elastic search client.
>    * This constructor is used in tests.
> @@ -56,20 +59,33 @@ public class ElasticsearchSchema extends AbstractSchema {
>    * @param index name of ES index
>    */
>   public ElasticsearchSchema(RestClient client, ObjectMapper mapper, String index) {
> +    this(client, mapper, index, null);
> +  }
> +
> +  public ElasticsearchSchema(RestClient client, ObjectMapper mapper, String index, String type) {
>     super();
>     this.client = Objects.requireNonNull(client, "client");
>     this.mapper = Objects.requireNonNull(mapper, "mapper");
>     this.index = Objects.requireNonNull(index, "index");
> +    if (type == null) {
> +      try {
> +        this.tableMap = createTables(listTypesFromElastic());
> +      } catch (IOException e) {
> +        throw new UncheckedIOException("Couldn't get types for " + index, e);
> +      }
> +    } else {
> +      this.tableMap = createTables(Collections.singleton(type));
> +    }
>   }
> 
>   @Override protected Map<String, Table> getTableMap() {
> +    return tableMap;
> +  }
> +
> +  private Map<String, Table> createTables(Iterable<String> types) {
>     final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
> -    try {
> -      for (String type: listTypes()) {
> -        builder.put(type, new ElasticsearchTable(client, mapper, index, type));
> -      }
> -    } catch (IOException e) {
> -      throw new UncheckedIOException("Failed to get types for " + index, e);
> +    for (String type : types) {
> +      builder.put(type, new ElasticsearchTable(client, mapper, index, type));
>     }
>     return builder.build();
>   }
> @@ -81,7 +97,7 @@ public class ElasticsearchSchema extends AbstractSchema {
>    * @throws IOException for any IO related issues
>    * @throws IllegalStateException if reply is not understood
>    */
> -  private Set<String> listTypes() throws IOException  {
> +  private Set<String> listTypesFromElastic() throws IOException  {
>     final String endpoint = "/" + index + "/_mapping";
>     final Response response = client.performRequest("GET", endpoint);
>     try (InputStream is = response.getEntity().getContent()) {
> 
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
> index ed669aa..9078b72 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java
> @@ -23,15 +23,12 @@ import org.apache.calcite.plan.RelTraitSet;
> import org.apache.calcite.rel.RelCollation;
> import org.apache.calcite.rel.RelFieldCollation;
> import org.apache.calcite.rel.RelNode;
> -import org.apache.calcite.rel.core.Project;
> import org.apache.calcite.rel.core.Sort;
> import org.apache.calcite.rel.metadata.RelMetadataQuery;
> import org.apache.calcite.rel.type.RelDataTypeField;
> import org.apache.calcite.rex.RexLiteral;
> import org.apache.calcite.rex.RexNode;
> -import org.apache.calcite.util.Util;
> 
> -import java.util.ArrayList;
> import java.util.List;
> 
> /**
> @@ -57,48 +54,22 @@ public class ElasticsearchSort extends Sort implements ElasticsearchRel {
> 
>   @Override public void implement(Implementor implementor) {
>     implementor.visitChild(0, getInput());
> -    if (!collation.getFieldCollations().isEmpty()) {
> -      final List<String> keys = new ArrayList<>();
> -      if (input instanceof Project) {
> -        final List<RexNode> projects = ((Project) input).getProjects();
> +    final List<RelDataTypeField> fields = getRowType().getFieldList();
> 
> -        for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
> -          RexNode project = projects.get(fieldCollation.getFieldIndex());
> -          String name = project.accept(MapProjectionFieldVisitor.INSTANCE);
> -          keys.add(ElasticsearchRules.quote(name) + ": " + direction(fieldCollation));
> -        }
> -      } else {
> -        final List<RelDataTypeField> fields = getRowType().getFieldList();
> -
> -        for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
> -          final String name = fields.get(fieldCollation.getFieldIndex()).getName();
> -          keys.add(ElasticsearchRules.quote(name) + ": " + direction(fieldCollation));
> -        }
> -      }
> -
> -      implementor.add("\"sort\": [ " + Util.toString(keys, "{", "}, {", "}") + "]");
> +    for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
> +      final String name = fields.get(fieldCollation.getFieldIndex()).getName();
> +      implementor.addSort(name, fieldCollation.getDirection());
>     }
> 
>     if (offset != null) {
> -      implementor.add("\"from\": " + ((RexLiteral) offset).getValue());
> +      implementor.offset(((RexLiteral) offset).getValueAs(Long.class));
>     }
> 
>     if (fetch != null) {
> -      implementor.add("\"size\": " + ((RexLiteral) fetch).getValue());
> +      implementor.fetch(((RexLiteral) fetch).getValueAs(Long.class));
>     }
>   }
> 
> -  private String direction(RelFieldCollation fieldCollation) {
> -    switch (fieldCollation.getDirection()) {
> -    case DESCENDING:
> -    case STRICTLY_DESCENDING:
> -      return "\"desc\"";
> -    case ASCENDING:
> -    case STRICTLY_ASCENDING:
> -    default:
> -      return "\"asc\"";
> -    }
> -  }
> }
> 
> // End ElasticsearchSort.java
> 
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
> index 955636e..c404da7 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
> @@ -16,9 +16,24 @@
>  */
> package org.apache.calcite.adapter.elasticsearch;
> 
> +import org.apache.calcite.adapter.java.AbstractQueryableTable;
> import org.apache.calcite.linq4j.Enumerable;
> +import org.apache.calcite.linq4j.Enumerator;
> import org.apache.calcite.linq4j.Linq4j;
> +import org.apache.calcite.linq4j.QueryProvider;
> +import org.apache.calcite.linq4j.Queryable;
> import org.apache.calcite.linq4j.function.Function1;
> +import org.apache.calcite.plan.RelOptCluster;
> +import org.apache.calcite.plan.RelOptTable;
> +import org.apache.calcite.rel.RelFieldCollation;
> +import org.apache.calcite.rel.RelNode;
> +import org.apache.calcite.rel.type.RelDataType;
> +import org.apache.calcite.rel.type.RelDataTypeFactory;
> +import org.apache.calcite.runtime.Hook;
> +import org.apache.calcite.schema.SchemaPlus;
> +import org.apache.calcite.schema.TranslatableTable;
> +import org.apache.calcite.schema.impl.AbstractTableQueryable;
> +import org.apache.calcite.sql.type.SqlTypeName;
> import org.apache.calcite.util.Util;
> 
> import org.apache.http.HttpEntity;
> @@ -29,26 +44,47 @@ import org.apache.http.util.EntityUtils;
> 
> import com.fasterxml.jackson.databind.JsonNode;
> import com.fasterxml.jackson.databind.ObjectMapper;
> +import com.fasterxml.jackson.databind.node.ArrayNode;
> +import com.fasterxml.jackson.databind.node.ObjectNode;
> 
> import org.elasticsearch.client.Response;
> import org.elasticsearch.client.RestClient;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> 
> import java.io.IOException;
> import java.io.InputStream;
> import java.io.UncheckedIOException;
> +import java.util.ArrayList;
> import java.util.Collections;
> +import java.util.HashMap;
> +import java.util.LinkedHashMap;
> +import java.util.LinkedHashSet;
> import java.util.List;
> import java.util.Locale;
> import java.util.Map;
> import java.util.Objects;
> +import java.util.Set;
> +import java.util.function.Predicate;
> +import java.util.stream.Collectors;
> 
> /**
>  * Table based on an Elasticsearch type.
>  */
> -public class ElasticsearchTable extends AbstractElasticsearchTable {
> +public class ElasticsearchTable extends AbstractQueryableTable implements TranslatableTable {
> +
> +  private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchTable.class);
> +
> +  /**
> +   * Used for constructing (possibly nested) Elastic aggregation nodes.
> +   */
> +  private static final String AGGREGATIONS = "aggregations";
> +
>   private final RestClient restClient;
>   private final ElasticsearchVersion version;
> -
> +  private final String indexName;
> +  private final String typeName;
> +  final ObjectMapper mapper;
> 
>   /**
>    * Creates an ElasticsearchTable.
> @@ -58,7 +94,7 @@ public class ElasticsearchTable extends AbstractElasticsearchTable {
>    * @param typeName elastic searh index type
>    */
>   ElasticsearchTable(RestClient client, ObjectMapper mapper, String indexName, String typeName) {
> -    super(indexName, typeName, Objects.requireNonNull(mapper, "mapper"));
> +    super(Object[].class);
>     this.restClient = Objects.requireNonNull(client, "client");
>     try {
>       this.version = detectVersion(client, mapper);
> @@ -67,6 +103,9 @@ public class ElasticsearchTable extends AbstractElasticsearchTable {
>           + "for %s/%s", indexName, typeName);
>       throw new UncheckedIOException(message, e);
>     }
> +    this.indexName = Objects.requireNonNull(indexName, "indexName");
> +    this.typeName = Objects.requireNonNull(typeName, "typeName");
> +    this.mapper = Objects.requireNonNull(mapper, "mapper");
> 
>   }
> 
> @@ -87,37 +126,211 @@ public class ElasticsearchTable extends AbstractElasticsearchTable {
>     return ElasticsearchVersion.fromString(node.get("version").get("number").asText());
>   }
> 
> -  @Override protected String scriptedFieldPrefix() {
> +  /**
> +   * In ES 5.x scripted fields start with {@code params._source.foo} while in ES2.x
> +   * {@code _source.foo}. Helper method to build correct query based on runtime version of elastic.
> +   * Used to keep backwards compatibility with ES2.
> +   *
> +   * @see <a href="https://github.com/elastic/elasticsearch/issues/20068";>_source variable</a>
> +   * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-scripting-fields.html";>Scripted Fields</a>
> +   * @return string to be used for scripted fields
> +   */
> +  String scriptedFieldPrefix() {
>     // ES2 vs ES5 scripted field difference
>     return version == ElasticsearchVersion.ES2
>         ? ElasticsearchConstants.SOURCE_GROOVY
>         : ElasticsearchConstants.SOURCE_PAINLESS;
>   }
> 
> -  @Override protected Enumerable<Object> find(String index, List<String> ops,
> -      List<Map.Entry<String, Class>> fields) {
> +  /**
> +   * Executes a "find" operation on the underlying type.
> +   *
> +   * <p>For example,
> +   * <code>client.prepareSearch(index).setTypes(type)
> +   * .setSource("{\"fields\" : [\"state\"]}")</code></p>
> +   *
> +   * @param ops List of operations represented as Json strings.
> +   * @param fields List of fields to project; or null to return map
> +   * @param sort list of fields to sort and their direction (asc/desc)
> +   * @param aggregations aggregation functions
> +   * @return Enumerator of results
> +   */
> +  protected Enumerable<Object> find(List<String> ops,
> +      List<Map.Entry<String, Class>> fields,
> +      List<Map.Entry<String, RelFieldCollation.Direction>> sort,
> +      List<String> groupBy,
> +      List<Map.Entry<String, String>> aggregations,
> +      Long offset, Long fetch) throws IOException {
> +
> +    if (!aggregations.isEmpty()) {
> +      // process aggregations separately
> +      return aggregate(ops, fields, sort, groupBy, aggregations, offset, fetch);
> +    }
> +
> +    final ObjectNode query = mapper.createObjectNode();
> 
> -    final String query;
> -    if (!ops.isEmpty()) {
> -      query = "{" + Util.toString(ops, "", ", ", "") + "}";
> -    } else {
> -      query = "{}";
> +    // manually parse from previously concatenated string
> +    query.setAll(
> +        (ObjectNode) mapper.readTree("{"
> +            + Util.toString(ops, "", ", ", "") + "}"));
> +
> +    if (!sort.isEmpty()) {
> +      ArrayNode sortNode = query.withArray("sort");
> +      sort.forEach(e ->
> +          sortNode.add(
> +            mapper.createObjectNode().put(e.getKey(), e.getValue().isDescending() ? "desc" : "asc"))
> +      );
> +    }
> +
> +    if (offset != null) {
> +      query.put("from", offset);
> +    }
> +
> +    if (fetch != null) {
> +      query.put("size", fetch);
>     }
> 
>     try {
> -      ElasticsearchSearchResult result = httpRequest(query);
> -      final Function1<ElasticsearchSearchResult.SearchHit, Object> getter =
> +      ElasticsearchJson.Result search = httpRequest(query);
> +      final Function1<ElasticsearchJson.SearchHit, Object> getter =
>           ElasticsearchEnumerators.getter(fields);
> -      return Linq4j.asEnumerable(result.searchHits().hits()).select(getter);
> +      return Linq4j.asEnumerable(search.searchHits().hits()).select(getter);
>     } catch (IOException e) {
>       throw new UncheckedIOException(e);
>     }
>   }
> 
> -  private ElasticsearchSearchResult httpRequest(String query) throws IOException {
> +  private Enumerable<Object> aggregate(List<String> ops,
> +      List<Map.Entry<String, Class>> fields,
> +      List<Map.Entry<String, RelFieldCollation.Direction>> sort,
> +      List<String> groupBy,
> +      List<Map.Entry<String, String>> aggregations,
> +      Long offset, Long fetch) throws IOException {
> +
> +    if (aggregations.isEmpty()) {
> +      throw new IllegalArgumentException("Missing Aggregations");
> +    }
> +
> +    if (!groupBy.isEmpty() && offset != null) {
> +      String message = "Currently ES doesn't support generic pagination "
> +          + "with aggregations. You can still use LIMIT keyword (without OFFSET). "
> +          + "For more details see https://github.com/elastic/elasticsearch/issues/4915";;
> +      throw new IllegalStateException(message);
> +    }
> +
> +    final ObjectNode query = mapper.createObjectNode();
> +
> +    // manually parse into JSON from previously concatenated strings
> +    query.setAll((ObjectNode) mapper.readTree("{" + Util.toString(ops, "", ", ", "") + "}"));
> +
> +    // remove / override attributes which are not applicable to aggregations
> +    query.put("_source", false);
> +    query.put("size", 0);
> +    query.remove("script_fields");
> +
> +    // allows to detect aggregation for count(*)
> +    final Predicate<Map.Entry<String, String>> isCountStar = e -> e.getValue()
> +            .contains("\"" + ElasticsearchConstants.ID + "\"");
> +
> +    // list of expressions which are count(*)
> +    final Set<String> countAll = aggregations.stream()
> +            .filter(isCountStar)
> +        .map(Map.Entry::getKey).collect(Collectors.toSet());
> +
> +    final Map<String, String> fieldMap = new HashMap<>();
> +
> +    // due to ES aggregation format. fields in "order by" clause should go first
> +    // if "order by" is missing. order in "group by" is un-important
> +    final Set<String> orderedGroupBy = new LinkedHashSet<>();
> +    orderedGroupBy.addAll(sort.stream().map(Map.Entry::getKey).collect(Collectors.toList()));
> +    orderedGroupBy.addAll(groupBy);
> +
> +    // construct nested aggregations node(s)
> +    ObjectNode parent = query.with(AGGREGATIONS);
> +    for (String name: orderedGroupBy) {
> +      final String aggName = "g_" + name;
> +      fieldMap.put(aggName, name);
> +
> +      final ObjectNode section = parent.with(aggName);
> +      final ObjectNode terms = section.with("terms");
> +      terms.put("field", name);
> +      terms.set("missing", ElasticsearchJson.MISSING_VALUE); // expose missing terms
> +
> +      if (fetch != null) {
> +        terms.put("size", fetch);
> +      }
> +
> +      sort.stream().filter(e -> e.getKey().equals(name)).findAny().ifPresent(s -> {
> +        terms.with("order").put("_key", s.getValue().isDescending() ? "desc" : "asc");
> +      });
> +
> +      parent = section.with(AGGREGATIONS);
> +    }
> +
> +    // simple version for queries like "select count(*), max(col1) from table" (no GROUP BY cols)
> +    if (!groupBy.isEmpty() || !aggregations.stream().allMatch(isCountStar)) {
> +      for (Map.Entry<String, String> aggregation : aggregations) {
> +        JsonNode value = mapper.readTree("{" + aggregation.getValue()  + "}");
> +        parent.set(aggregation.getKey(), value);
> +      }
> +    }
> +
> +    // cleanup query. remove empty AGGREGATIONS element (if empty)
> +    JsonNode agg = query;
> +    while (agg.has(AGGREGATIONS) && agg.get(AGGREGATIONS).elements().hasNext()) {
> +      agg = agg.get(AGGREGATIONS);
> +    }
> +    ((ObjectNode) agg).remove(AGGREGATIONS);
> +
> +    ElasticsearchJson.Result res = httpRequest(query);
> +
> +    final List<Map<String, Object>> result = new ArrayList<>();
> +    if (res.aggregations() != null) {
> +      // collect values
> +      ElasticsearchJson.visitValueNodes(res.aggregations(), m -> {
> +        Map<String, Object> newMap = new LinkedHashMap<>();
> +        for (String key: m.keySet()) {
> +          newMap.put(fieldMap.getOrDefault(key, key), m.get(key));
> +        }
> +        result.add(newMap);
> +      });
> +    } else {
> +      // probably no group by. add single result
> +      result.add(new LinkedHashMap<>());
> +    }
> +
> +    // elastic exposes total number of documents matching a query in "/hits/total" path
> +    // this can be used for simple "select count(*) from table"
> +    final long total = res.searchHits().total();
> +
> +    if (groupBy.isEmpty()) {
> +      // put totals automatically for count(*) expression(s), unless they contain group by
> +      for (String expr : countAll) {
> +        result.forEach(m -> m.put(expr, total));
> +      }
> +    }
> +
> +    final Function1<ElasticsearchJson.SearchHit, Object> getter =
> +        ElasticsearchEnumerators.getter(fields);
> +
> +    ElasticsearchJson.SearchHits hits =
> +        new ElasticsearchJson.SearchHits(total, result.stream()
> +            .map(r -> new ElasticsearchJson.SearchHit("_id", r, null))
> +            .collect(Collectors.toList()));
> +
> +    return Linq4j.asEnumerable(hits.hits()).select(getter);
> +  }
> +
> +  private ElasticsearchJson.Result httpRequest(ObjectNode query) throws IOException {
>     Objects.requireNonNull(query, "query");
>     String uri = String.format(Locale.ROOT, "/%s/%s/_search", indexName, typeName);
> -    HttpEntity entity = new StringEntity(query, ContentType.APPLICATION_JSON);
> +
> +    Hook.QUERY_PLAN.run(query);
> +    final String json = mapper.writeValueAsString(query);
> +
> +    LOGGER.debug("Elasticsearch Query: {}", json);
> +
> +    HttpEntity entity = new StringEntity(json, ContentType.APPLICATION_JSON);
>     Response response = restClient.performRequest("POST", uri, Collections.emptyMap(), entity);
>     if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
>       final String error = EntityUtils.toString(response.getEntity());
> @@ -128,9 +341,75 @@ public class ElasticsearchTable extends AbstractElasticsearchTable {
>     }
> 
>     try (InputStream is = response.getEntity().getContent()) {
> -      return mapper.readValue(is, ElasticsearchSearchResult.class);
> +      return mapper.readValue(is, ElasticsearchJson.Result.class);
>     }
>   }
> +
> +  @Override public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
> +    final RelDataType mapType = relDataTypeFactory.createMapType(
> +        relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR),
> +        relDataTypeFactory.createTypeWithNullability(
> +            relDataTypeFactory.createSqlType(SqlTypeName.ANY),
> +            true));
> +    return relDataTypeFactory.builder().add("_MAP", mapType).build();
> +  }
> +
> +  @Override public String toString() {
> +    return "ElasticsearchTable{" + indexName + "/" + typeName + "}";
> +  }
> +
> +  @Override public <T> Queryable<T> asQueryable(QueryProvider queryProvider, SchemaPlus schema,
> +      String tableName) {
> +    return new ElasticsearchQueryable<>(queryProvider, schema, this, tableName);
> +  }
> +
> +  @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
> +    final RelOptCluster cluster = context.getCluster();
> +    return new ElasticsearchTableScan(cluster, cluster.traitSetOf(ElasticsearchRel.CONVENTION),
> +        relOptTable, this, null);
> +  }
> +
> +  /**
> +   * Implementation of {@link Queryable} based on
> +   * a {@link ElasticsearchTable}.
> +   *
> +   * @param <T> element type
> +   */
> +  public static class ElasticsearchQueryable<T> extends AbstractTableQueryable<T> {
> +    ElasticsearchQueryable(QueryProvider queryProvider, SchemaPlus schema,
> +        ElasticsearchTable table, String tableName) {
> +      super(queryProvider, schema, table, tableName);
> +    }
> +
> +    public Enumerator<T> enumerator() {
> +      return null;
> +    }
> +
> +    private ElasticsearchTable getTable() {
> +      return (ElasticsearchTable) table;
> +    }
> +
> +    /** Called via code-generation.
> +     * @param ops list of queries (as strings)
> +     * @param fields projection
> +     * @see ElasticsearchMethod#ELASTICSEARCH_QUERYABLE_FIND
> +     * @return result as enumerable
> +     */
> +    @SuppressWarnings("UnusedDeclaration")
> +    public Enumerable<Object> find(List<String> ops,
> +         List<Map.Entry<String, Class>> fields,
> +         List<Map.Entry<String, RelFieldCollation.Direction>> sort,
> +         List<String> groupBy,
> +         List<Map.Entry<String, String>> aggregations,
> +         Long offset, Long fetch) {
> +      try {
> +        return getTable().find(ops, fields, sort, groupBy, aggregations, offset, fetch);
> +      } catch (IOException e) {
> +        throw new UncheckedIOException("Failed to query " + getTable().indexName, e);
> +      }
> +    }
> +
> +  }
> }
> 
> // End ElasticsearchTable.java
> 
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
> index 7795ad3..3dd041a 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java
> @@ -37,7 +37,7 @@ import java.util.Objects;
>  * using the "find" method.</p>
>  */
> public class ElasticsearchTableScan extends TableScan implements ElasticsearchRel {
> -  private final AbstractElasticsearchTable elasticsearchTable;
> +  private final ElasticsearchTable elasticsearchTable;
>   private final RelDataType projectRowType;
> 
>   /**
> @@ -50,10 +50,10 @@ public class ElasticsearchTableScan extends TableScan implements ElasticsearchRe
>    * @param projectRowType Fields and types to project; null to project raw row
>    */
>   ElasticsearchTableScan(RelOptCluster cluster, RelTraitSet traitSet,
> -       RelOptTable table, AbstractElasticsearchTable elasticsearchTable,
> +       RelOptTable table, ElasticsearchTable elasticsearchTable,
>        RelDataType projectRowType) {
>     super(cluster, traitSet, table);
> -    this.elasticsearchTable = Objects.requireNonNull(elasticsearchTable);
> +    this.elasticsearchTable = Objects.requireNonNull(elasticsearchTable, "elasticsearchTable");
>     this.projectRowType = projectRowType;
> 
>     assert getConvention() == ElasticsearchRel.CONVENTION;
> 
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
> index 51a2bd5..5e788a8 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
> @@ -30,12 +30,10 @@ import org.apache.calcite.plan.RelOptCluster;
> import org.apache.calcite.plan.RelOptCost;
> import org.apache.calcite.plan.RelOptPlanner;
> import org.apache.calcite.plan.RelTraitSet;
> -import org.apache.calcite.prepare.CalcitePrepareImpl;
> import org.apache.calcite.rel.RelNode;
> import org.apache.calcite.rel.convert.ConverterImpl;
> import org.apache.calcite.rel.metadata.RelMetadataQuery;
> import org.apache.calcite.rel.type.RelDataType;
> -import org.apache.calcite.runtime.Hook;
> import org.apache.calcite.util.BuiltInMethod;
> import org.apache.calcite.util.Pair;
> 
> @@ -60,15 +58,15 @@ public class ElasticsearchToEnumerableConverter extends ConverterImpl implements
>     return super.computeSelfCost(planner, mq).multiplyBy(.1);
>   }
> 
> -  @Override public Result implement(EnumerableRelImplementor implementor, Prefer prefer) {
> -    final BlockBuilder list = new BlockBuilder();
> -    final ElasticsearchRel.Implementor elasticsearchImplementor =
> -        new ElasticsearchRel.Implementor();
> -    elasticsearchImplementor.visitChild(0, getInput());
> +  @Override public Result implement(EnumerableRelImplementor relImplementor, Prefer prefer) {
> +    final BlockBuilder block = new BlockBuilder();
> +    final ElasticsearchRel.Implementor implementor = new ElasticsearchRel.Implementor();
> +    implementor.visitChild(0, getInput());
> +
>     final RelDataType rowType = getRowType();
> -    final PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), rowType,
> +    final PhysType physType = PhysTypeImpl.of(relImplementor.getTypeFactory(), rowType,
>         prefer.prefer(JavaRowFormat.ARRAY));
> -    final Expression fields = list.append("fields",
> +    final Expression fields = block.append("fields",
>         constantArrayList(
>             Pair.zip(ElasticsearchRules.elasticsearchFieldNames(rowType),
>                 new AbstractList<Class>() {
> @@ -81,20 +79,24 @@ public class ElasticsearchToEnumerableConverter extends ConverterImpl implements
>                   }
>                 }),
>             Pair.class));
> -    final Expression table = list.append("table",
> -        elasticsearchImplementor.table
> -            .getExpression(AbstractElasticsearchTable.ElasticsearchQueryable.class));
> -    List<String> opList = elasticsearchImplementor.list;
> -    final Expression ops = list.append("ops", constantArrayList(opList, String.class));
> -    Expression enumerable = list.append("enumerable",
> +    final Expression table = block.append("table",
> +        implementor.table
> +            .getExpression(ElasticsearchTable.ElasticsearchQueryable.class));
> +    List<String> opList = implementor.list;
> +    final Expression ops = block.append("ops", constantArrayList(opList, String.class));
> +    final Expression sort = block.append("sort", constantArrayList(implementor.sort, Pair.class));
> +    final Expression groupBy = block.append("groupBy", Expressions.constant(implementor.groupBy));
> +    final Expression aggregations = block.append("aggregations",
> +        constantArrayList(implementor.aggregations, Pair.class));
> +
> +    final Expression offset = block.append("offset", Expressions.constant(implementor.offset));
> +    final Expression fetch = block.append("fetch", Expressions.constant(implementor.fetch));
> +
> +    Expression enumerable = block.append("enumerable",
>         Expressions.call(table, ElasticsearchMethod.ELASTICSEARCH_QUERYABLE_FIND.method, ops,
> -            fields));
> -    if (CalcitePrepareImpl.DEBUG) {
> -      System.out.println("Elasticsearch: " + opList);
> -    }
> -    Hook.QUERY_PLAN.run(opList);
> -    list.add(Expressions.return_(null, enumerable));
> -    return implementor.result(physType, list.toBlock());
> +            fields, sort, groupBy, aggregations, offset, fetch));
> +    block.add(Expressions.return_(null, enumerable));
> +    return relImplementor.result(physType, block.toBlock());
>   }
> 
>   /** E.g. {@code constantArrayList("x", "y")} returns
> 
> http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java
> ----------------------------------------------------------------------
> diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java
> index 97f7943..a866fe4 100644
> --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java
> +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java
> @@ -78,6 +78,16 @@ class PredicateAnalyzer {
>     }
>   }
> 
> +  /**
> +   * Thrown when {@link org.apache.calcite.rel.RelNode} expression can't be processed
> +   * (or converted into ES query)
> +   */
> +  static class ExpressionNotAnalyzableException extends Exception {
> +    ExpressionNotAnalyzableException(String message, Throwable cause) {
> +      super(message, cause);
> +    }
> +  }
> +
>   private PredicateAnalyzer() {}
> 
>   /**
>