osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Elasticsearch InputFormat


Great to hear that it works :-)

On Tue, Sep 11, 2018 at 9:28 AM Michael Gendelman <genged@xxxxxxxxx> wrote:

> Hi Till,
>
> Thanks for the great suggestion!
> Seems like it does the job. Here is a sample of the code:
>
> public class FlinkMain {
>     public static void main(String[] args) throws Exception {
>         ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
>         EsInputFormat<Text, LinkedMapWritable> kvEsInputFormat = new
> EsInputFormat<> ();
>         HadoopInputFormat<Text, LinkedMapWritable> hadoopInputFormat =
>                 new HadoopInputFormat<>(kvEsInputFormat, Text.class,
> LinkedMapWritable.class);
>
>         Configuration configuration = hadoopInputFormat.getConfiguration();
>         configuration.set("es.resource", "flink-1/flink_t");
>         configuration.set("es.query", "?q=*");
>
>         DataSet<Tuple2<Text, LinkedMapWritable>> input =
> env.createInput(hadoopInputFormat);
>
>         List<Tuple2<Text, LinkedMapWritable>> collect = input.collect();
>         collect.forEach(e -> System.out.println(e));
>     }
> }
>
>
> On Mon, Sep 10, 2018 at 9:47 AM Till Rohrmann <trohrmann@xxxxxxxxxx>
> wrote:
>
> > Hi Michael,
> >
> > have you considered trying out the EsInputFormat [1] with
> > Flink's HadoopInputFormatBase? That way reading from ElasticSearch might
> > already work out of the box. If not, then adding a dedicated
> ElasticSearch
> > input format would definitely be helpful.
> >
> > [1] https://github.com/elastic/elasticsearch-hadoop
> >
> > On Sat, Sep 8, 2018 at 11:48 PM Michael Gendelman <genged@xxxxxxxxx>
> > wrote:
> >
> > > Hi all,
> > >
> > > I have a workload where I need to read and transform large amounts of
> > data
> > > from Elasticsearch. I'm currently using Flink only for streaming but I
> > > though that it can also be a good fit for this kind of batch job.
> > > However, I did not find a way to load data from Elasticsearch in
> parallel
> > > to Flink.
> > >
> > > I'd like to propose *ElasticsearchInputFormat* which will be able to
> load
> > > data from Elasticsearch in parallel by leveraging the InputSplit
> > mechanism
> > > in Flink and the Elasticsearch scroll API.
> > >
> > > The API should look something like this:
> > > ElasticsearchInputFormat<MessageObj> elasticsearchInputFormat =
> > > ElasticsearchInputFormat.builder(esHostList, query, esMapper, typeInfo)
> > > .setParametersProvider(paramsProvider)
> > > .setIndex("index-name")
> > > .setClusterName("type-name")
> > > .build();
> > > DataSet<MessageObj> input = env.createInput(elasticsearchInputFormat);
> > >
> > > The '*query' *is a regular ES query specifying the data to fetch.
> > > The '*esMapper*' maps JSON data returned from Elasticsearch to some
> > object
> > > (In the example above *MessageObj*)
> > > In order for it to work in parallel the InputFormat will work with an
> > > InputSplit which will get parameters on how to split a certain range
> > using
> > > the '*paramsProvider'.*
> > >
> > > What do you think?
> > >
> > > Best,
> > > Michael.
> > >
> >
>