osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Side effect of DataStreamRel#translateToPlan


Hi Wangsan,

the bahavior of DataStreamRel#translateToPlan is more or less intended. That's why you call `toAppendStream` on the table environment. Because you add your pipeline to the environment (from source to current operator).

However, the explain() method should not cause those side-effects.

Regards,
Timo

Am 21.08.18 um 17:29 schrieb wangsan:
Hi Timo,

I think this may not only affect  explain() method. Method DataStreamRel#translateToPlan is called when we need translate a FlinkRelNode into DataStream or DataSet, we add desired operators in execution environment. By side effect, I mean that if we call DataStreamRel#translateToPlan on same RelNode  several times, the same operators are added in execution environment more than once, but actually we need that for only one time. Correct me if I misunderstood that.

I will open an issue late this day, if this is indeed a problem.

Best,
wangsan



On Aug 21, 2018, at 10:16 PM, Timo Walther <twalthr@xxxxxxxxxx> wrote:

Hi,

this sounds like a bug to me. Maybe the explain() method is not implemented correctly. Can you open an issue for it in Jira?

Thanks,
Timo


Am 21.08.18 um 15:04 schrieb wangsan:
Hi all,

I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that may cause the execution plan not as what we expected. Every time we call DataStreamRel#translateToPlan (in TableEnvirnment#explain, TableEnvirnment#writeToSink, etc), we add same operators in execution environment repeatedly.

Should we eliminate the side effect of DataStreamRel#translateToPlan ?

Best,  Wangsan

appendix

     tenv.registerTableSource("test_source", sourceTable)

     val t = tenv.sqlQuery("SELECT * from test_source")
     println(tenv.explain(t))
     println(tenv.explain(t))

     implicit val typeInfo = TypeInformation.of(classOf[Row])
     tenv.toAppendStream(t)
     println(tenv.explain(t))
We call explain three times, and the Physical Execution Plan are all diffrent.

== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
   LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
     content : collect elements with CollectionInputFormat

     Stage 2 : Operator
         content : CsvTableSource(read fields: f1, f2)
         ship_strategy : FORWARD

         Stage 3 : Operator
             content : Map
             ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
   LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
     content : collect elements with CollectionInputFormat

     Stage 2 : Operator
         content : CsvTableSource(read fields: f1, f2)
         ship_strategy : FORWARD

         Stage 3 : Operator
             content : Map
             ship_strategy : FORWARD

Stage 4 : Data Source
     content : collect elements with CollectionInputFormat

     Stage 5 : Operator
         content : CsvTableSource(read fields: f1, f2)
         ship_strategy : FORWARD

         Stage 6 : Operator
             content : Map
             ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
   LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
     content : collect elements with CollectionInputFormat

     Stage 2 : Operator
         content : CsvTableSource(read fields: f1, f2)
         ship_strategy : FORWARD

         Stage 3 : Operator
             content : Map
             ship_strategy : FORWARD

Stage 4 : Data Source
     content : collect elements with CollectionInputFormat

     Stage 5 : Operator
         content : CsvTableSource(read fields: f1, f2)
         ship_strategy : FORWARD

         Stage 6 : Operator
             content : Map
             ship_strategy : FORWARD

Stage 7 : Data Source
     content : collect elements with CollectionInputFormat

     Stage 8 : Operator
         content : CsvTableSource(read fields: f1, f2)
         ship_strategy : FORWARD

         Stage 9 : Operator
             content : Map
             ship_strategy : FORWARD

             Stage 10 : Operator
                 content : to: Row
                 ship_strategy : FORWARD

Stage 11 : Data Source
     content : collect elements with CollectionInputFormat

     Stage 12 : Operator
         content : CsvTableSource(read fields: f1, f2)
         ship_strategy : FORWARD

         Stage 13 : Operator
             content : Map
             ship_strategy : FORWARD