[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Sorting in runners

Hi Community,

I am trying to support ORDER BY in BeamSQL (currently in global window only, see BEAM-5064). In order to do so, I need to sort PCollection<Row>. The scale of dataset that ORDER BY works on is unknown. It might be up to TB sized dataset if BeamSQL runs on some benchmarks. But in the most cases, the sorting shouldn't work on too large dataset.

The safe approach is to sort PCollection<Row> in memory because memory access should be guaranteed in runners. One possible way is: Combine.globally(new sortCombineFn()), where sortCombineFn can does Merge Sort. This approach is bounded by size of memory on a single machine.

External sorting could be more scalable by using storage (e.g. disk). There are some code in beam/sdks/java/extensions/sorter that is doing it. However, seems like GCS is not allowed in ExternalSorter in that sorter module. Assuming ExternalSorter by default uses disk, it is unclear if runners can access disk and how disk space are provisioned. Another observation is ExternalSorter does not clean up generated files during sorting.

My question is, in major runners (direct, dataflow, spark, flink, e,g,), if disk is accessible so it is safe to go with external sorting approach regardless of disk space? Also, is there better practice to sort in Beam?