[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Sorting in runners

IMO, going with SortValues is the right way to go. The idea is that runners can always replace the SortValues PTransform with their own optimized variant. As you have already pointed out, the default inmemory implementation has strict limitations.

I would suggest going with the inmemory version to unblock yourself and doing some more digging to figure out why GCS was disallowed explicitly, and possibly answers to your disk provisioning questions.

On Tue, Aug 7, 2018 at 11:08 AM Rui Wang <ruwang@xxxxxxxxxx> wrote:
Hi Community,

I am trying to support ORDER BY in BeamSQL (currently in global window only, see BEAM-5064). In order to do so, I need to sort PCollection<Row>. The scale of dataset that ORDER BY works on is unknown. It might be up to TB sized dataset if BeamSQL runs on some benchmarks. But in the most cases, the sorting shouldn't work on too large dataset.

The safe approach is to sort PCollection<Row> in memory because memory access should be guaranteed in runners. One possible way is: Combine.globally(new sortCombineFn()), where sortCombineFn can does Merge Sort. This approach is bounded by size of memory on a single machine.

External sorting could be more scalable by using storage (e.g. disk). There are some code in beam/sdks/java/extensions/sorter that is doing it. However, seems like GCS is not allowed in ExternalSorter in that sorter module. Assuming ExternalSorter by default uses disk, it is unclear if runners can access disk and how disk space are provisioned. Another observation is ExternalSorter does not clean up generated files during sorting.

My question is, in major runners (direct, dataflow, spark, flink, e,g,), if disk is accessible so it is safe to go with external sorting approach regardless of disk space? Also, is there better practice to sort in Beam?