osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Flink table api


Hello Xingcan

As mentioned in above mail thread I am streaming mongodb oplog to join
multiple mongo tables based on some unique key (Primary key). To achieve
this I have created one java pojo as below. where o represent generic pojo
type of mongodb which has my table fields i.e. dynamic. now I want to use
table api join over this basic BasicDBObject but it seem flink does not
allow generic pojo's. please suggest on this.

public class Oplog {
    private OplogTimestamp ts;
    private BasicDBObject o;
}



-----------------------------------------------
*Amol Suryawanshi*
Java Developer
amols@xxxxxxxxxxxxxxx


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com <sachin@xxxxxxxxxxxxxxx>
------------------------------------------------

On Mon, Jul 2, 2018 at 3:03 PM, Amol S - iProgrammer <amols@xxxxxxxxxxxxxxx>
wrote:

> Hello Xingcan
>
> DataStream<Oplog> streamSource = env
>         .addSource(kafkaConsumer)
>         .setParallelism(4);
>
> StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
> // Convert the DataStream into a Table with default fields "f0", "f1"
> Table table1 = tableEnv.fromDataStream(streamSource);
>
>
> Table customerMISMaster = table1.filter("ns === 'local.customerMISMaster'"
> ).select("o as master");
> Table customerMISChild1 = table1.filter("ns === 'local.customerMISChild1'"
> ).select("o as child1");
> Table customerMISChild2 = table1.filter("ns === 'local.customerMISChild2'"
> ).select("o as child2");
> Table result = customerMISMaster.join(customerMISChild1).where("master.
> loanApplicationId=child1.loanApplicationId");
>
>
> it is throwing error "Method threw 'org.apache.flink.table.api.ValidationException' exception. Undefined function: LOANAPPLICATIONID"
>
>
>
> -----------------------------------------------
> *Amol Suryawanshi*
> Java Developer
> amols@xxxxxxxxxxxxxxx
>
>
> *iProgrammer Solutions Pvt. Ltd.*
>
>
>
> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> www.iprogrammer.com <sachin@xxxxxxxxxxxxxxx>
> ------------------------------------------------
>
> On Mon, Jul 2, 2018 at 2:56 PM, Xingcan Cui <xingcanc@xxxxxxxxx> wrote:
>
>> Hi Amol,
>>
>> The “dynamic table” is just a logical concept, following which the Flink
>> table API is designed.
>> That means you don’t need to implement dynamic tables yourself.
>>
>> Flink table API provides different kinds of stream to stream joins in
>> recent versions (from 1.4).
>> The related docs can be found here https://ci.apache.org/projects
>> /flink/flink-docs-release-1.5/dev/table/tableApi.html#joins <
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5
>> /dev/table/tableApi.html#joins>.
>>
>> Best,
>> Xingcan
>>
>>
>> > On Jul 2, 2018, at 4:49 PM, Amol S - iProgrammer <amols@xxxxxxxxxxxxxxx>
>> wrote:
>> >
>> > Hello,
>> >
>> > I am streaming mongodb oplog using kafka and flink and want to join
>> > multiple tables using flink table api but i have some concerns like is
>> it
>> > possible to join streamed tables in flink and if yes then please
>> provide me
>> > some example of stream join using table API.
>> >
>> > I gone through your dynamic table api doc. it is quit interesting but
>> > haven't found any example tutorial how to implement dynamic table.
>> >
>> > I have tried to implement table api join using pojo class but it is
>> > giving org.apache.flink.table.api.TableException: Cannot generate a
>> valid
>> > execution plan for the given query
>> >
>> > -----------------------------------------------
>> > *Amol Suryawanshi*
>> > Java Developer
>> > amols@xxxxxxxxxxxxxxx
>> >
>> >
>> > *iProgrammer Solutions Pvt. Ltd.*
>> >
>> >
>> >
>> > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
>> > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune -
>> 411016,
>> > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
>> > www.iprogrammer.com <sachin@xxxxxxxxxxxxxxx>
>> > ------------------------------------------------
>>
>>
>