OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Flink table api


Hello Xingcan

DataStream<Oplog> streamSource = env
        .addSource(kafkaConsumer)
        .setParallelism(4);

StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(streamSource);


Table customerMISMaster = table1.filter("ns === 'local.customerMISMaster'").
select("o as master");
Table customerMISChild1 = table1.filter("ns === 'local.customerMISChild1'").
select("o as child1");
Table customerMISChild2 = table1.filter("ns === 'local.customerMISChild2'").
select("o as child2");
Table result = customerMISMaster.join(customerMISChild1).where("
master.loanApplicationId=child1.loanApplicationId");


it is throwing error "Method threw
'org.apache.flink.table.api.ValidationException' exception. Undefined
function: LOANAPPLICATIONID"



-----------------------------------------------
*Amol Suryawanshi*
Java Developer
amols@xxxxxxxxxxxxxxx


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com <sachin@xxxxxxxxxxxxxxx>
------------------------------------------------

On Mon, Jul 2, 2018 at 2:56 PM, Xingcan Cui <xingcanc@xxxxxxxxx> wrote:

> Hi Amol,
>
> The “dynamic table” is just a logical concept, following which the Flink
> table API is designed.
> That means you don’t need to implement dynamic tables yourself.
>
> Flink table API provides different kinds of stream to stream joins in
> recent versions (from 1.4).
> The related docs can be found here https://ci.apache.org/projects
> /flink/flink-docs-release-1.5/dev/table/tableApi.html#joins <
> https://ci.apache.org/projects/flink/flink-docs-release-1.
> 5/dev/table/tableApi.html#joins>.
>
> Best,
> Xingcan
>
>
> > On Jul 2, 2018, at 4:49 PM, Amol S - iProgrammer <amols@xxxxxxxxxxxxxxx>
> wrote:
> >
> > Hello,
> >
> > I am streaming mongodb oplog using kafka and flink and want to join
> > multiple tables using flink table api but i have some concerns like is it
> > possible to join streamed tables in flink and if yes then please provide
> me
> > some example of stream join using table API.
> >
> > I gone through your dynamic table api doc. it is quit interesting but
> > haven't found any example tutorial how to implement dynamic table.
> >
> > I have tried to implement table api join using pojo class but it is
> > giving org.apache.flink.table.api.TableException: Cannot generate a
> valid
> > execution plan for the given query
> >
> > -----------------------------------------------
> > *Amol Suryawanshi*
> > Java Developer
> > amols@xxxxxxxxxxxxxxx
> >
> >
> > *iProgrammer Solutions Pvt. Ltd.*
> >
> >
> >
> > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune -
> 411016,
> > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> > www.iprogrammer.com <sachin@xxxxxxxxxxxxxxx>
> > ------------------------------------------------
>
>