[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Write bulks files from streaming app

Here is a pseudocode (sorry) of what I am doing right now:

PCollection<KV<String, String>> writtenFiles = dataStream
         duration = 1H,
         trigger = AfterWatermark.pastEndOfWindow()


AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5 minutes)))

        lateness = 1 day)

    .groupBy(x -> x.getKey())
         duration = 1H,
         trigger = AfterWatermark.pastEndOfWindow()

        lateness = 1 day)
   .map(x -> println(x); x)

but in second window over written files I do observe EARLY fires with
group-by value iterator having always one file (propagated from write files
result). ON_TIME fires are always empty.

What am I missing here? How not to get early fires one by one but get all
written files at the ON_TIME window fire?

On Sun, Jul 22, 2018 at 4:27 PM Stephan Ewen <sewen@xxxxxxxxxx> wrote:

> For what it's worth, in Flink directly we found that this pattern is
> generally not a well working one: windowing data in large windows in order
> to perform large bulk writes.
> Instead, the sinks (to file systems) continuously write (possibly across
> different destination files) files, ensure persistence at checkpoints, can
> roll back the output in a file system specific ways to the previous
> checkpoint. That way, there is no data buffering in state (memory, rocksdb,
> etc) at all, only metadata tracking.
> For bulk encoders (like parquet), one needs an additional step, to
> encode/compress when the specific destination file is done (if you think in
> Hadoop terms, that would be in the "commit" step).
> On Sun, Jul 22, 2018 at 2:10 PM, Jozef Vilcek <jozo.vilcek@xxxxxxxxx>
> wrote:
>> I looked into Wait.on() but doc say it waits untill window is completely
>> done, so it is not quite fir for my case, as my lateness can be a day or
>> two and I would like to compact and publish hourly data sooner.
>> What i am thinking of is write triggers under different location than
>> target. I will have lot's of EARLY fires for main data and than some LATE
>> fires. What I would like to do is observe all EARLY fires for window
>> (ideally in ON_TIME event time) in one group and move those files to target
>> dir by merging them. Observed LATE fires would be just moved immediately
>> because that is not much and does not hurt to keep them fragmented now.
>> The question is if it make sense and can be done with Beam? FileIO
>> returns WriteFilesResult where I can call
>> `getPerDestinationOutputFilenames()` which returns me a collection of KV
>> with key being destination and value being a file which was written. I
>> tried to window it again with different triggers (no early trigger) and
>> groupBy key, but so far, no luck as it never yield a collection of files in
>> which were emitted as EARLY in first window.
>> On Fri, Jul 20, 2018 at 9:06 PM Raghu Angadi <rangadi@xxxxxxxxxx> wrote:
>>> On Fri, Jul 20, 2018 at 2:58 AM Jozef Vilcek <jozo.vilcek@xxxxxxxxx>
>>> wrote:
>>>> Hm, that is interesting idea to make the write composite and merge
>>>> files later. Do not know Beam well yet.
>>>> I will look into it and learn about Wait.on() transform (wonder how it
>>>> will work with late fires). Thanks!
>>>> But keeps me thinking...
>>>> Does it make sense to have support from SDK?
>>>> Is my use case that uncommon? Not fit for Beam? How does others out
>>>> there does similar thing?
>>> SDK does allow it. Looks like you are running into scaling and memory
>>> limits with amount of state stored in large windows. This is something that
>>> will improve. I am not familiar enough with Flink runner to  comment on
>>> specifics. I was mainly thinking of a work around.
>>> Raghu.
>>>> On Thu, Jul 19, 2018 at 11:21 PM Raghu Angadi <rangadi@xxxxxxxxxx>
>>>> wrote:
>>>>> One option (but requires more code): Write to smaller files with
>>>>> frequent triggers to directory_X and once the window properly closes, copy
>>>>> all the files to a single file in your own DoFn. This is certainly more
>>>>> code on your part, but might be worth it. You can use Wait.on() transoform
>>>>> to run your finalizer DoFn right after the window that writes smaller files
>>>>> closes.
>>>>> On Thu, Jul 19, 2018 at 2:43 AM Jozef Vilcek <jozo.vilcek@xxxxxxxxx>
>>>>> wrote:
>>>>>> Hey,
>>>>>> I am looking for the advice.
>>>>>> I am trying to do a stream processing with Beam on Flink runtime.
>>>>>> Reading data from Kafka, doing some processing with it which is not
>>>>>> important here and in the same time want to store consumed data to history
>>>>>> storage for archive and reprocessing, which is HDFS.
>>>>>> Now, the part of writing batches to HDFS is giving me hard time.
>>>>>> Logically, I want to do:
>>>>>> fileIO = FileIO.writeDynamic()
>>>>>>         .by(destinationFn)
>>>>>>         .via(AvroIO.sink(avroClass))
>>>>>>         .to(path)
>>>>>>         .withNaming(namingFn)
>>>>>>         .withTempDirectory(tmp)
>>>>>>         .withNumShards(shards)
>>>>>> data
>>>>>>    .withFixedWindow(1H, afterWatermarkTrigger, discardFiredPanes)
>>>>>>    .saveTo(fileIO)
>>>>>> This write generates in Flink execution graph 3 operators, which I do
>>>>>> not full understand yet.
>>>>>> Now, the problem is, that I am not able to run this at scale.
>>>>>> If I want to write big enough files to not to have lots of files on
>>>>>> HDFS, I keep running into the OOM. With Flink, I use rocksdb state backend
>>>>>> and I was warned about this JIRA which is probably related to my OOM
>>>>>> https://issues.apache.org/jira/browse/FLINK-8297
>>>>>> Therefore, I need to trigger more often and small batches which leads
>>>>>> to too many files on HDFS.
>>>>>> Question here is, if there is some path I do not see how to make this
>>>>>> work ( write bulks of data to HDFS of my choosing without running to memory
>>>>>> troubles ). Also, keeping whole window data which is designated for write
>>>>>> to output to filesystem in state involves more IO.
>>>>>> Thanks for any thoughts and guidelines,
>>>>>> Jozef