[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: CSVSplitter - Splittable DoFn

Looking closely at https://tools.ietf.org/html/rfc4180,

* Lone CRs and LFs are only allowed inside escaped fields (so if you find
one of those, you can know which quotes are opening vs. closing, and
* Lone double quotes are not allowed inside escaped fields (so if you find
one of those, and a adjacent character must be not a comma or newline, and
that character is inside the field).

I don't know how many CSV files actually follow that second point strictly
(v.s. say, escaping with a backslash), but it seems the most likely
candidate, and a backslashed double quote could be accepted if the
subsequent character was not a newline or comma (or just accepted
regardless as an escaped quote). Whitespace after a comma has inconsistent
handling as well. The first point seems dangerous to lean on if differing
newline conventions are in play (e.g. in transmitting, not just producing,
the data). Of you could go with some probabilistic models, but that seems

There's also headers, but it could make sense to add header skipping
support to TextIO (as it has header writing support).

On Tue, Apr 24, 2018 at 3:27 PM Eugene Kirpichov <kirpichov@xxxxxxxxxx>

> Robert - you're right, but this is a pathological case. It signals that
there *might* be cases where we'll need to scan the whole file, however for
practical purposes it's more important whether we need to scan the whole
file in *all* (or most) cases - i.e. whether no amount of backward scanning
of a non-pathological file can give us confidence that we're truly located
a record boundary.

> On Tue, Apr 24, 2018 at 3:21 PM Robert Bradshaw <robertwb@xxxxxxxxxx>

>> On Tue, Apr 24, 2018 at 3:18 PM Eugene Kirpichov <kirpichov@xxxxxxxxxx>
>> wrote:

>> > I think the first question that has to be answered here is: Is it
>> possible *at all* to implement parallel reading of RFC 4180?

>> No. Consider a multi-record CSV file with no quotes. Placing a quote at
>> start and end gives a new CSV file with exactly one element.

>> > I.e., given a start byte offset, is it possible to reliably locate the
>> first record boundary at or after that offset while scanning only a small
>> amount of data?
>> > If it is possible, then that's what the SDF (or BoundedSource, etc.)
>> should do - split into blind byte ranges, and use this algorithm to
>> consistent meaning to byte ranges.

>> > To answer your questions 2 and 3: think of it this way.
>> > The SDF's ProcessElement takes an element and a restriction.
>> > ProcessElement must make only one promise: that it will correctly
>> exactly the work associated with this element and restriction.
>> > The challenge is that the restriction can become smaller while
>> ProcessElement runs - in which case, ProcessElement must also do fewer
>> work. This can happen concurrently to ProcessElement running, so really
>> guarantee should be rephrased as "By the time ProcessElement completes,
>> should have performed exactly the work associated with the element and
>> tracker.currentRestriction() at the moment of completion".

>> > This is all that is asked of ProcessElement. If Beam decides to ask the
>> tracker to split itself into two ranges (making the current one -
>> - smaller, and producing an additional one - "residual"), Beam of course
>> takes the responsibility for executing the residual restriction somewhere
>> else: it won't be lost.

>> > E.g. if ProcessElement was invoked with [a, b), but while it was
>> it was split into [a, b-100) and [b-100, b), then the current
>> ProcessElement call must process [a, b-100), and Beam guarantees that it
>> will fire up another ProcessElement call for [b-100, b) (Of course, both
>> these calls may end up being recursively split further).

>> > I'm not quite sure what you mean by "recombining" - please let me know
>> the explanation above makes things clear enough or not.

>> > On Tue, Apr 24, 2018 at 2:55 PM Peter Brumblay <
>> wrote:

>> >> Hi Eugene, thank you for the feedback!

>> >> TextIO.read() can't handle RFC 4180 in full (at least I don't think it
>> does!) - we have a lot of source data with embedded newlines. These
>> get split improperly because TextIO.read() blindly looks for newline
>> characters. We need something which natively understands embedded
>> in quoted fields ... like so:

>> >> foo,bar,"this has an\r\nembedded newline",192928\r\n

>> >> As for the other feedback:

>> >> 1. Claiming the entire range - yes, I figured this was a major
>> Thanks for the confirmation.
>> >> 2. The code for initial splitting of the restriction seems very
>> complex...

>> >> Follow-up question: if I process (and claim) only a subset of a range,
>> say [a, b - 100), and [b - 100, b) represents an incomplete block, will
>> beam SDF dynamically recombine ranges such that [b - 100, b + N) is sent
>> a worker with a (potentially) complete block?

>> >> 3. Fine-tuning the evenness .... if beam SDF re-combines ranges for
>> split blocks then it sounds like arbitrary splits in splitFunction()
>> more sense.

>> >> I'll try to take another pass at this with your feedback in mind.

>> >> Peter

>> >> On Tue, Apr 24, 2018 at 3:08 PM, Eugene Kirpichov <
>> wrote:

>> >>> Hi Peter,

>> >>> Thanks for experimenting with SDF! However, in this particular case:
>> any reason why you can't just use TextIO.read() and parse each line as
>> Seems like that would require considerably less code.

>> >>> A few comments on this code per se:
>> >>> - The ProcessElement implementation immediately claims the entire
>> range, which means that there can be no dynamic splitting and the code
>> behaves equivalently to a regular DoFn
>> >>> - The code for initial splitting of the restriction seems very
>> - can you just split it blindly into a bunch of byte ranges of about
>> size? Looking at the actual data while splitting should be never
>> - you should be able to just look at the file size (say, 100MB) and split
>> it into a bunch of splits, say, [0, 10MB), [10MB, 20MB) etc.
>> >>> - It seems that the splitting code tries to align splits with record
>> boundaries - this is not useful: it does not matter whether the split
>> boundaries fall onto record boundaries or not; instead, the reading code
>> should be able to read an arbitrary range of bytes in a meaningful way.
>> That typically means that reading [a, b) means "start at the first record
>> boundary located at or after "a", end at the first record boundary
>> at or after "b""
>> >>> - Fine-tuning the evenness of initial splitting is also not useful:
>> dynamic splitting will even things out anyway; moreover, even if you are
>> able to achieve an equal amount of data read by different restrictions,
>> does not translate into equal time to process the data with the ParDo's
>> fused into the same bundle (and that time is unpredictable).

>> >>> On Tue, Apr 24, 2018 at 1:24 PM Peter Brumblay
>> >>> <pbrumblay@xxxxxxxxxxxxxx>
>> wrote:

>> >>>> Hi All,

>> >>>> I noticed that there is no support for CSV file reading (e.g.
>> in Apache Beam - at least no native transform. There's an issue to add
>> support: https://issues.apache.org/jira/browse/BEAM-51.

>> >>>> I've seen examples which use the apache commons csv parser. I took a
>> shot at implementing a SplittableDoFn transform. I have the full code and
>> some questions in a gist here:
>> https://gist.github.com/pbrumblay/9474dcc6cd238c3f1d26d869a20e863d.

>> >>>> I suspect it could be improved quite a bit. If anyone has time to
>> provide feedback I would really appreciate it.

>> >>>> Regards,

>> >>>> Peter Brumblay
>> >>>> Fearless Technology Group, Inc.