osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[jira] [Created] (FLINK-10845) Support DISTINCT aggregates for batch


Timo Walther created FLINK-10845:
------------------------------------

             Summary: Support DISTINCT aggregates for batch
                 Key: FLINK-10845
                 URL: https://issues.apache.org/jira/browse/FLINK-10845
             Project: Flink
          Issue Type: New Feature
          Components: Table API & SQL
            Reporter: Timo Walther


Currently, we support distinct aggregates for streaming. However, executing the same query on batch like the following test:

{code}
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    val sqlQuery =
      "SELECT b, " +
      "  SUM(DISTINCT (a / 3)), " +
      "  COUNT(DISTINCT SUBSTRING(c FROM 1 FOR 2))," +
      "  COUNT(DISTINCT c) " +
      "FROM MyTable " +
      "GROUP BY b"

    val data = new mutable.MutableList[(Int, Long, String)]
    data.+=((1, 1L, "Hi"))
    data.+=((2, 2L, "Hello"))
    data.+=((3, 2L, "Hello world"))
    data.+=((4, 3L, "Hello world, how are you?"))
    data.+=((5, 3L, "I am fine."))
    data.+=((6, 3L, "Luke Skywalker"))
    data.+=((7, 4L, "Comment#1"))
    data.+=((8, 4L, "Comment#2"))
    data.+=((9, 4L, "Comment#3"))
    data.+=((10, 4L, "Comment#4"))
    data.+=((11, 5L, "Comment#5"))
    data.+=((12, 5L, "Comment#6"))
    data.+=((13, 5L, "Comment#7"))
    data.+=((14, 5L, "Comment#8"))
    data.+=((15, 5L, "Comment#9"))
    data.+=((16, 6L, "Comment#10"))
    data.+=((17, 6L, "Comment#11"))
    data.+=((18, 6L, "Comment#12"))
    data.+=((19, 6L, "Comment#13"))
    data.+=((20, 6L, "Comment#14"))
    data.+=((21, 6L, "Comment#15"))


    val t = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
    tEnv.registerTable("MyTable", t)

    tEnv.sqlQuery(sqlQuery).toDataSet[Row].print()
{code}

Fails with:

{code}
org.apache.flink.table.codegen.CodeGenException: Unsupported call: IS NOT DISTINCT FROM 
If you think this function should be supported, you can create an issue and start a discussion for it.

	at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
	at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1027)
	at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
	at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
	at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
	at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addInnerJoin(DataSetJoin.scala:221)
	at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:170)
	at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
	at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:165)
	at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
	at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:498)
	at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476)
	at org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:141)
	at org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:50)
	at org.apache.flink.table.runtime.stream.sql.SqlITCase.testDistinctGroupBy(SqlITCase.scala:2
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)