osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Side effect of DataStreamRel#translateToPlan


Hi,

this sounds like a bug to me. Maybe the explain() method is not implemented correctly. Can you open an issue for it in Jira?

Thanks,
Timo


Am 21.08.18 um 15:04 schrieb wangsan:
Hi all,

I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that may cause the execution plan not as what we expected. Every time we call DataStreamRel#translateToPlan (in TableEnvirnment#explain, TableEnvirnment#writeToSink, etc), we add same operators in execution environment repeatedly.

Should we eliminate the side effect of DataStreamRel#translateToPlan ?

Best,  Wangsan

appendix

     tenv.registerTableSource("test_source", sourceTable)

     val t = tenv.sqlQuery("SELECT * from test_source")
     println(tenv.explain(t))
     println(tenv.explain(t))

     implicit val typeInfo = TypeInformation.of(classOf[Row])
     tenv.toAppendStream(t)
     println(tenv.explain(t))
We call explain three times, and the Physical Execution Plan are all diffrent.

== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
   LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
     content : collect elements with CollectionInputFormat

     Stage 2 : Operator
         content : CsvTableSource(read fields: f1, f2)
         ship_strategy : FORWARD

         Stage 3 : Operator
             content : Map
             ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
   LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
     content : collect elements with CollectionInputFormat

     Stage 2 : Operator
         content : CsvTableSource(read fields: f1, f2)
         ship_strategy : FORWARD

         Stage 3 : Operator
             content : Map
             ship_strategy : FORWARD

Stage 4 : Data Source
     content : collect elements with CollectionInputFormat

     Stage 5 : Operator
         content : CsvTableSource(read fields: f1, f2)
         ship_strategy : FORWARD

         Stage 6 : Operator
             content : Map
             ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
   LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
     content : collect elements with CollectionInputFormat

     Stage 2 : Operator
         content : CsvTableSource(read fields: f1, f2)
         ship_strategy : FORWARD

         Stage 3 : Operator
             content : Map
             ship_strategy : FORWARD

Stage 4 : Data Source
     content : collect elements with CollectionInputFormat

     Stage 5 : Operator
         content : CsvTableSource(read fields: f1, f2)
         ship_strategy : FORWARD

         Stage 6 : Operator
             content : Map
             ship_strategy : FORWARD

Stage 7 : Data Source
     content : collect elements with CollectionInputFormat

     Stage 8 : Operator
         content : CsvTableSource(read fields: f1, f2)
         ship_strategy : FORWARD

         Stage 9 : Operator
             content : Map
             ship_strategy : FORWARD

             Stage 10 : Operator
                 content : to: Row
                 ship_strategy : FORWARD

Stage 11 : Data Source
     content : collect elements with CollectionInputFormat

     Stage 12 : Operator
         content : CsvTableSource(read fields: f1, f2)
         ship_strategy : FORWARD

         Stage 13 : Operator
             content : Map
             ship_strategy : FORWARD