osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[jira] [Created] (FLINK-10194) Serialization issue with Scala's AggregateDataSet[Row]


Alexis Sarda-Espinosa created FLINK-10194:
---------------------------------------------

             Summary: Serialization issue with Scala's AggregateDataSet[Row]
                 Key: FLINK-10194
                 URL: https://issues.apache.org/jira/browse/FLINK-10194
             Project: Flink
          Issue Type: Bug
         Environment: Flink v1.6.0
            Reporter: Alexis Sarda-Espinosa


 

Consider the following code, where I had to jump through some hoops to manually create a DataSet[Row] that allows using groupBy and sum as shown:
{code:java}
object Main {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    val letters = Seq("a", "a", "b").map(Row.of(_, 1.asInstanceOf[Object]))
    val typeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
      BasicTypeInfo.INT_TYPE_INFO)

    import scala.collection.JavaConverters._
    val inputFormat = new CollectionInputFormat(letters.asJavaCollection,
      typeInfo.createSerializer(env.getConfig))

    val source = new DataSource(env.getJavaEnv,
      inputFormat,
      typeInfo,
      "hello.flink.Main$.main(Main.scala:20")
    
    val dataSet = new DataSet(source)

    dataSet.print()

    dataSet
      .groupBy(0)
      .sum(1)
      .print()
  }
}{code}
The call to dataSet.print() works as expected, but the final print() throws an exception:
{noformat}
Caused by: java.lang.ClassCastException: org.apache.flink.api.java.typeutils.runtime.RowSerializer cannot be cast to org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
at org.apache.flink.api.scala.operators.ScalaAggregateOperator$AggregatingUdf.open(ScalaAggregateOperator.java:260){noformat}
Changing the final print() to collect() throws the same exception.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)