Re: Question on GroupBy query results merging process
I think it would work, but there is currently at least one reason to not
use queryRunnerFactory.mergeRunners() in groupBy v2.
In v2, the broker assumes that intermediate aggregates from historicals are
always sorted by grouping keys. This enables merge-sorted aggregation on
brokers which is much more efficient than hash aggregation in terms of
speed as well as memory usage. However, queryRunnerFactory.mergeRunners()
works based on hash aggregation. This would cause the same issue of groupBy
v1 which requires the full materialization of intermediates on brokers
(either in memory or on disk). Also, this requires to sort the result
of queryRunnerFactory.mergeRunners() before
calling queryToolChest.mergeResults() since it assumes the input is sorted
by grouping keys.
> I am wondering if the improvement that I gained was from changing logic
was mainly from deserializing the sub-query results in parallel (by calling
queryRunnerFactory.mergeRunners() which seems to enable parallelism), or if
it was also benefitting from using GroupByMergingQueryRunnerV2 that has
parallel combining threads enabled.
I assume you enabled parallel combining in GroupByMergingQueryRunnerV2
(it's disabled by default). Then, it's difficult to tell where you gained
the benefit. You might ned to run more benchmarks to figure out.
On Thu, Jul 19, 2018 at 4:29 PM Jisoo Kim <firstname.lastname@example.org>
> Hi Jihoon,
> Thanks for the reply. So what I ended up doing for merging a list of
> serialized result Sequences (which is a byte array) was:
> 1) Create a stream of out of the list
> 2) For each serialized sequence in a list, create a query runner that
> deserializes the byte array and returns a Sequence (along with applying
> PreComputeManipulatorFn). Now the stream becomes Stream<QueryRunner>
> 3) Call queryRunnerFactory.mergeRunners() (factory is created from the
> injector and given query) on the materialized list of QueryRunner
> 4) Create a FluentQueryRunner out of 3) and add necessary steps including
> mergeResults(), which essentially calls queryToolChest.mergeResults() on
> Does my approach look valid or is it something that I shouldn't be doing
> for merging query results? Before I changed the merging logic to the above
> I encountered a problem with merging sub-query results properly for very
> heavy groupBy queries.
> I haven't had much chance to read through all the group by query processing
> logic, but I am wondering if the improvement that I gained was from
> changing logic was mainly from deserializing the sub-query results in
> parallel (by calling queryRunnerFactory.mergeRunners() which seems to
> enable parallelism), or if it was also benefitting from using
> GroupByMergingQueryRunnerV2 that has parallel combining threads enabled.
> On Thu, Jul 19, 2018 at 3:06 PM, Jihoon Son <ghoonson@xxxxxxxxx> wrote:
> > Hi Jisoo,
> > sorry, the previous email was sent by accident.
> > The initial version of groupBy v2 wasn't capable of combining
> > in parallel. Some of our customers met the similar issue to yours, and
> so I
> > was working on improving groupBy v2 performance for a while.
> > Parallel combining on brokers definitely makes sense. I was thinking to
> > a sort of ParallelMergeSequence which is a parallel version of
> > MergeSequence, but it can be anything if it supports parallel combining
> > brokers.
> > One thing I'm worrying about is, most query processing interfaces in
> > brokers are using Sequence, and thus using another stuff for a specific
> > query type might make the codes complicated. I think we need to avoid it
> > possible.
> > Best,
> > Jihoon
> > On Thu, Jul 19, 2018 at 2:58 PM Jihoon Son <ghoonson@xxxxxxxxx> wrote:
> > > Hi Jisoo,
> > >
> > > the initial version of groupBy v2
> > >
> > > On Thu, Jul 19, 2018 at 2:42 PM Jisoo Kim <email@example.com>
> > > wrote:
> > >
> > >> Hi all,
> > >>
> > >> I am currently working on a project that uses Druid's QueryRunner and
> > >> other
> > >> druid-processing classes. It uses Druid's own classes to calculate
> > >> results. I have been testing large GroupBy queries (using v2), and it
> > >> seems
> > >> like parallel combining threads for GroupBy queries are only enabled
> > >> the
> > >> historical level. I think it is only getting called by
> > >> GroupByStrategyV2.mergeRunners()
> > >> <
> > >> https://github.com/apache/incubator-druid/blob/druid-0.
> > 12.1/processing/src/main/java/io/druid/query/groupby/
> > strategy/GroupByStrategyV2.java#L335
> > >> >
> > >> which is only called by GroupByQueryRunnerFactory.mergeRunners() on
> > >> historicals.
> > >>
> > >> Are GroupByMergingQueryRunnerV2 and parallel combining threads meant
> > >> computing and merging per-segment results only, or can they also be
> > >> on
> > >> the broker level? I changed the logic of my project from calling
> > >> queryToolChest.mergeResults() on MergeSequence (created by providing a
> > >> list
> > >> of per-segment/per-server sequences) to calling
> > >> queryToolChest.mergeResults() on queryRunnerFactory.mergeRunners()
> > (where
> > >> each runner returns a deserialized result sequence), and that seemed
> > >> have reduced really heavy groupby query computation time or failures
> > >> quite a lot. Or is this just a coincidence and there shouldn't be a
> > >> performance difference in merging groupby query results, and the only
> > >> difference could've been by parallelizing the deserialization of
> > >> sequences from sub-queries?
> > >>
> > >> Thanks,
> > >> Jisoo
> > >>
> > >