[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Reading csv-files in parallel


the easiest approach is to read the CSV files linewise as regular text files (ExecutionEnvironment.readTextFile()) and apply custom parse logic in a MapFunction.
Then you have all freedom to deal with records of different schema.

Best, Fabian

2018-05-08 12:35 GMT+02:00 Esa Heikkinen <esa.heikkinen@xxxxxxxxxxxxxx>:



At this moment a batch query is ok.


Do you know any good (Scala) examples how to query batches (different type of csv-files) in parallel ?


Or do you have example of a custom source function, that read csv-files parallel ?


Best, Esa


From: Fabian Hueske <fhueske@xxxxxxxxx>
Sent: Monday, May 7, 2018 3:48 PM
To: Esa Heikkinen <esa.heikkinen@xxxxxxxxxxxxxx>
Cc: user@xxxxxxxxxxxxxxxx
Subject: Re: Reading csv-files in parallel


Hi Esa,

you can certainly read CSV files in parallel. This works very well in a batch query.

For streaming queries, that expect data to be ingested in timestamp order this is much more challenging, because you need 1) read the files in the right order and 2) cannot split files (unless you guarantee that splits are read in the right order).

The CsvTableSource does not guarantee to read files in timestamp order (it would have to know the timestamps in each file for that).

Having files with different schema is another problem. The SQL / Table API require a fixed schema per table (source).


The only recommendation when reading files in parallel for a streaming use case is to implement a custom source function and be careful when generating watermarks.

Best, Fabian


2018-05-07 12:44 GMT+02:00 Esa Heikkinen <esa.heikkinen@xxxxxxxxxxxxxx>:



I would want to read many different type csv-files (time series data) parallel using by CsvTableSource. Is that possible in Flink application ? If yes, are there exist the examples about that ?


If it is not, do you have any advices how to do that ?


Should I combine all csv-files to one csv-file in pre-processing phase ? But this has little problem, because there are not same type (columns are different, except timestamp-column).


Best, Esa