osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Flink table api


You can also use Row, but then you cannot rely on automatic type extraction
and provide TypeInformation.

Amol S - iProgrammer <amols@xxxxxxxxxxxxxxx> schrieb am Mo., 2. Juli 2018,
12:37:

> Hello Fabian,
>
> According to my requirement I can not create static pojo's for all classes
> because I want to create dynamic jobs for all tables based on rule engine
> config. Please suggest me if there any other way to achieve this.
>
> -----------------------------------------------
> *Amol Suryawanshi*
> Java Developer
> amols@xxxxxxxxxxxxxxx
>
>
> *iProgrammer Solutions Pvt. Ltd.*
>
>
>
> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> www.iprogrammer.com <sachin@xxxxxxxxxxxxxxx>
> ------------------------------------------------
>
> On Mon, Jul 2, 2018 at 4:02 PM, Fabian Hueske <fhueske@xxxxxxxxx> wrote:
>
> > Hi Amol,
> >
> > These are the requirements for POJOs [1] that are fully supported by
> Flink.
> >
> > Best, Fabian
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-
> > release-1.5/dev/api_concepts.html#pojos
> >
> > 2018-07-02 12:19 GMT+02:00 Amol S - iProgrammer <amols@xxxxxxxxxxxxxxx>:
> >
> > > Hello Xingcan
> > >
> > > As mentioned in above mail thread I am streaming mongodb oplog to join
> > > multiple mongo tables based on some unique key (Primary key). To
> achieve
> > > this I have created one java pojo as below. where o represent generic
> > pojo
> > > type of mongodb which has my table fields i.e. dynamic. now I want to
> use
> > > table api join over this basic BasicDBObject but it seem flink does not
> > > allow generic pojo's. please suggest on this.
> > >
> > > public class Oplog {
> > >     private OplogTimestamp ts;
> > >     private BasicDBObject o;
> > > }
> > >
> > >
> > >
> > > -----------------------------------------------
> > > *Amol Suryawanshi*
> > > Java Developer
> > > amols@xxxxxxxxxxxxxxx
> > >
> > >
> > > *iProgrammer Solutions Pvt. Ltd.*
> > >
> > >
> > >
> > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune -
> > 411016,
> > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> > > www.iprogrammer.com <sachin@xxxxxxxxxxxxxxx>
> > > ------------------------------------------------
> > >
> > > On Mon, Jul 2, 2018 at 3:03 PM, Amol S - iProgrammer <
> > > amols@xxxxxxxxxxxxxxx>
> > > wrote:
> > >
> > > > Hello Xingcan
> > > >
> > > > DataStream<Oplog> streamSource = env
> > > >         .addSource(kafkaConsumer)
> > > >         .setParallelism(4);
> > > >
> > > > StreamTableEnvironment tableEnv = TableEnvironment.
> > > getTableEnvironment(env);
> > > > // Convert the DataStream into a Table with default fields "f0", "f1"
> > > > Table table1 = tableEnv.fromDataStream(streamSource);
> > > >
> > > >
> > > > Table customerMISMaster = table1.filter("ns ===
> > > 'local.customerMISMaster'"
> > > > ).select("o as master");
> > > > Table customerMISChild1 = table1.filter("ns ===
> > > 'local.customerMISChild1'"
> > > > ).select("o as child1");
> > > > Table customerMISChild2 = table1.filter("ns ===
> > > 'local.customerMISChild2'"
> > > > ).select("o as child2");
> > > > Table result = customerMISMaster.join(customerMISChild1).where("
> > master.
> > > > loanApplicationId=child1.loanApplicationId");
> > > >
> > > >
> > > > it is throwing error "Method threw 'org.apache.flink.table.api.
> > ValidationException'
> > > exception. Undefined function: LOANAPPLICATIONID"
> > > >
> > > >
> > > >
> > > > -----------------------------------------------
> > > > *Amol Suryawanshi*
> > > > Java Developer
> > > > amols@xxxxxxxxxxxxxxx
> > > >
> > > >
> > > > *iProgrammer Solutions Pvt. Ltd.*
> > > >
> > > >
> > > >
> > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> > > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune -
> > > 411016,
> > > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> > > > www.iprogrammer.com <sachin@xxxxxxxxxxxxxxx>
> > > > ------------------------------------------------
> > > >
> > > > On Mon, Jul 2, 2018 at 2:56 PM, Xingcan Cui <xingcanc@xxxxxxxxx>
> > wrote:
> > > >
> > > >> Hi Amol,
> > > >>
> > > >> The “dynamic table” is just a logical concept, following which the
> > Flink
> > > >> table API is designed.
> > > >> That means you don’t need to implement dynamic tables yourself.
> > > >>
> > > >> Flink table API provides different kinds of stream to stream joins
> in
> > > >> recent versions (from 1.4).
> > > >> The related docs can be found here https://ci.apache.org/projects
> > > >> /flink/flink-docs-release-1.5/dev/table/tableApi.html#joins <
> > > >> https://ci.apache.org/projects/flink/flink-docs-release-1.5
> > > >> /dev/table/tableApi.html#joins>.
> > > >>
> > > >> Best,
> > > >> Xingcan
> > > >>
> > > >>
> > > >> > On Jul 2, 2018, at 4:49 PM, Amol S - iProgrammer <
> > > amols@xxxxxxxxxxxxxxx>
> > > >> wrote:
> > > >> >
> > > >> > Hello,
> > > >> >
> > > >> > I am streaming mongodb oplog using kafka and flink and want to
> join
> > > >> > multiple tables using flink table api but i have some concerns
> like
> > is
> > > >> it
> > > >> > possible to join streamed tables in flink and if yes then please
> > > >> provide me
> > > >> > some example of stream join using table API.
> > > >> >
> > > >> > I gone through your dynamic table api doc. it is quit interesting
> > but
> > > >> > haven't found any example tutorial how to implement dynamic table.
> > > >> >
> > > >> > I have tried to implement table api join using pojo class but it
> is
> > > >> > giving org.apache.flink.table.api.TableException: Cannot generate
> a
> > > >> valid
> > > >> > execution plan for the given query
> > > >> >
> > > >> > -----------------------------------------------
> > > >> > *Amol Suryawanshi*
> > > >> > Java Developer
> > > >> > amols@xxxxxxxxxxxxxxx
> > > >> >
> > > >> >
> > > >> > *iProgrammer Solutions Pvt. Ltd.*
> > > >> >
> > > >> >
> > > >> >
> > > >> > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> > > >> > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune
> -
> > > >> 411016,
> > > >> > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> > > >> > www.iprogrammer.com <sachin@xxxxxxxxxxxxxxx>
> > > >> > ------------------------------------------------
> > > >>
> > > >>
> > > >
> > >
> >
>