OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Filter columns of a csv file with Flink


Ok Hequn,

I'll open 2 Jira for this issue, and maybe propose a draft of CsvTableSource class handling avro schemas
FLINK-9813 and FLINK-9814

Thank you for your answers and best regards

François

2018-07-11 8:11 GMT+02:00 Hequn Cheng <chenghequn@xxxxxxxxx>:
Hi francois,

Is there any plan to give avro schemas a better role in Flink in further versions?
Haven't heard about avro for csv. You can open a jira for it. Maybe also contribute to flink :-)


On Tue, Jul 10, 2018 at 11:32 PM, françois lacombe <francois.lacombe@xxxxxxxxxxx> wrote:
Hi Hequn,

2018-07-10 3:47 GMT+02:00 Hequn Cheng <chenghequn@xxxxxxxxx>:
Maybe I misunderstand you. So you don't want to skip the whole file?
Yes I do
By skipping the whole file I mean "throw an Exception to stop the process and inform user that file is invalid for a given reason" and not "the process goes fully right and import 0 rows"
 
If does, then "extending CsvTableSource and provide the avro schema to the constructor without creating a custom AvroInputFormat" is ok.

Then we agree on this
Is there any plan to give avro schemas a better role in Flink in further versions?
Avro schemas are perfect to build CSVTableSource with code like

for (Schema field_nfo : sch.getTypes()){
     // Test if csv file header actually contains a field corresponding to schema
     if (!csv_headers.contains(field_nfo.getName())) {
          throw new NoSuchFieldException(field_nfo.getName());
     }

     // Declare the field in the source Builder
     src_builder.field(field_nfo.getName(), primitiveTypes.get(field_nfo.getType()));
}

All the best

François



On Mon, Jul 9, 2018 at 11:03 PM, françois lacombe <francois.lacombe@xxxxxxxxxxx> wrote:
Hi Hequn,

2018-07-09 15:09 GMT+02:00 Hequn Cheng <chenghequn@xxxxxxxxx>:
The first step requires an AvroInputFormat because the source needs AvroInputFormat to read avro data if data match schema. 

I don't want avro data, I just want to check if my csv file have the same fields than defined in a given avro schema.
Processing should stop if and only if I find missing columns.

A record which not match the schema (types mainly) should be rejected and logged in a dedicated file but the processing can go on.

How about extending CsvTableSource and provide the avro schema to the constructor without creating a custom AvroInputFormat?


François