osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Side effect of DataStreamRel#translateToPlan


Hi Timo,

What confused me is why we need to rebuild the pipeline each time we try to convert the RelNode into datastream.

In case I have this code (I want to write the query result into two target sinks) :

val t = tenv.sqlQuery("SELECT * from test_source")
t.insertInto("sink_1")
t.insertInto("sink_2")

And I got execution plan link this,
So we have two source operators, and they are just the same. But in some cases, when the backend connector does not support multiple consumers (eg, MQ that only deliver message once), one of the two target sink may not receive all the records (and that's the problem I meet :( ).

I thought the behavior of the above code should be equivalent to this,

val t = tenv.sqlQuery("SELECT * from test_source")
val stream = tenv.toAppendStream[Row](t)
stream.writeAsText("path_1")
stream.writeAsText("path_2”)


So, IMO, Instead of return a totally new one, why don’t we make the DataStreamRel#translateToPlan always return the same DataSteam as it is called the first time. What do you think?

Best,
wangsan



On Aug 22, 2018, at 12:03 AM, Timo Walther <twalthr@xxxxxxxxxx> wrote:

Hi Wangsan,

the bahavior of DataStreamRel#translateToPlan is more or less intended. That's why you call `toAppendStream` on the table environment. Because you add your pipeline to the environment (from source to current operator).

However, the explain() method should not cause those side-effects.

Regards,
Timo

Am 21.08.18 um 17:29 schrieb wangsan:
Hi Timo,

I think this may not only affect  explain() method. Method DataStreamRel#translateToPlan is called when we need translate a FlinkRelNode into DataStream or DataSet, we add desired operators in execution environment. By side effect, I mean that if we call DataStreamRel#translateToPlan on same RelNode  several times, the same operators are added in execution environment more than once, but actually we need that for only one time. Correct me if I misunderstood that.

I will open an issue late this day, if this is indeed a problem.

Best,
wangsan



On Aug 21, 2018, at 10:16 PM, Timo Walther <twalthr@xxxxxxxxxx> wrote:

Hi,

this sounds like a bug to me. Maybe the explain() method is not implemented correctly. Can you open an issue for it in Jira?

Thanks,
Timo


Am 21.08.18 um 15:04 schrieb wangsan:
Hi all,

I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that may cause the execution plan not as what we expected. Every time we call DataStreamRel#translateToPlan (in TableEnvirnment#explain, TableEnvirnment#writeToSink, etc), we add same operators in execution environment repeatedly.

Should we eliminate the side effect of DataStreamRel#translateToPlan ?

Best,  Wangsan

appendix

    tenv.registerTableSource("test_source", sourceTable)

    val t = tenv.sqlQuery("SELECT * from test_source")
    println(tenv.explain(t))
    println(tenv.explain(t))

    implicit val typeInfo = TypeInformation.of(classOf[Row])
    tenv.toAppendStream(t)
    println(tenv.explain(t))
We call explain three times, and the Physical Execution Plan are all diffrent.

== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 2 : Operator
        content : CsvTableSource(read fields: f1, f2)
        ship_strategy : FORWARD

        Stage 3 : Operator
            content : Map
            ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 2 : Operator
        content : CsvTableSource(read fields: f1, f2)
        ship_strategy : FORWARD

        Stage 3 : Operator
            content : Map
            ship_strategy : FORWARD

Stage 4 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 5 : Operator
        content : CsvTableSource(read fields: f1, f2)
        ship_strategy : FORWARD

        Stage 6 : Operator
            content : Map
            ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 2 : Operator
        content : CsvTableSource(read fields: f1, f2)
        ship_strategy : FORWARD

        Stage 3 : Operator
            content : Map
            ship_strategy : FORWARD

Stage 4 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 5 : Operator
        content : CsvTableSource(read fields: f1, f2)
        ship_strategy : FORWARD

        Stage 6 : Operator
            content : Map
            ship_strategy : FORWARD

Stage 7 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 8 : Operator
        content : CsvTableSource(read fields: f1, f2)
        ship_strategy : FORWARD

        Stage 9 : Operator
            content : Map
            ship_strategy : FORWARD

            Stage 10 : Operator
                content : to: Row
                ship_strategy : FORWARD

Stage 11 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 12 : Operator
        content : CsvTableSource(read fields: f1, f2)
        ship_strategy : FORWARD

        Stage 13 : Operator
            content : Map
            ship_strategy : FORWARD