osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Question on GroupBy query results merging process


Hi Jihoon,

Thanks for the explanation, but I think I need some more clarification on
the reason not to use queryRunnerFactory.mergeRunners().

> In v2, the broker assumes that intermediate aggregates from historicals
are
always sorted by grouping keys. This enables merge-sorted aggregation on
brokers which is much more efficient than hash aggregation in terms of
speed as well as memory usage. However, queryRunnerFactory.mergeRunners()
works based on hash aggregation. This would cause the same issue of groupBy
v1 which requires the full materialization of intermediates on brokers
(either in memory or on disk).

Could you please point me to the code for this?

> Also, this requires to sort the result
of queryRunnerFactory.mergeRunners() before
calling queryToolChest.mergeResults() since it assumes the input is sorted
by grouping keys.

How does queryToolChest.mergeResults() assumes the input is sorted by
grouping keys? Seems like it's using ResultMergeQueryRunner with query's
row ordering within the method, and queryToolChest.mergeResults() gets
called in ServerManager as well where it uses
queryRunnerFactory.mergeRunners(). The difference between ServerManager and
ClientQuerySegmentWalker/CachingClusteredClient seems to be that
CachingClusteredClient returns MergeSequence with
query.getResultOrdering(), which I think uses rowOrdering. Is the use of
MergeSequence the one that's making the difference? In that case, is it ok
for ServerManager to call queryToolChest.mergeResults() on top of
queryRunnerFactory.mergeRunners()? I am curious about what differentiates
the Druid broker from historicals when it comes to merging the sub query
results.

Sorry for many questions and thanks,
Jisoo

On Sat, Jul 21, 2018 at 7:28 PM, Jihoon Son <ghoonson@xxxxxxxxx> wrote:

> Hi Jisoo,
>
> I think it would work, but there is currently at least one reason to not
> use queryRunnerFactory.mergeRunners() in groupBy v2.
>
> In v2, the broker assumes that intermediate aggregates from historicals are
> always sorted by grouping keys. This enables merge-sorted aggregation on
> brokers which is much more efficient than hash aggregation in terms of
> speed as well as memory usage. However, queryRunnerFactory.mergeRunners()
> works based on hash aggregation. This would cause the same issue of groupBy
> v1 which requires the full materialization of intermediates on brokers
> (either in memory or on disk). Also, this requires to sort the result
> of queryRunnerFactory.mergeRunners() before
> calling queryToolChest.mergeResults() since it assumes the input is sorted
> by grouping keys.
>
> > I am wondering if the improvement that I gained was from changing logic
> was mainly from deserializing the sub-query results in parallel (by calling
> queryRunnerFactory.mergeRunners() which seems to enable parallelism), or
> if
> it was also benefitting from using GroupByMergingQueryRunnerV2 that has
> parallel combining threads enabled.
>
> I assume you enabled parallel combining in GroupByMergingQueryRunnerV2
> (it's disabled by default). Then, it's difficult to tell where you gained
> the benefit. You might ned to run more benchmarks to figure out.
>
> Best,
> Jihoon
>
> On Thu, Jul 19, 2018 at 4:29 PM Jisoo Kim <jisoo.kim@xxxxxxxx.invalid>
> wrote:
>
> > Hi Jihoon,
> >
> > Thanks for the reply. So what I ended up doing for merging a list of
> > serialized result Sequences (which is a byte array) was:
> >
> > 1) Create a stream of  out of the list
> > 2) For each serialized sequence in a list, create a query runner that
> > deserializes the byte array and returns a Sequence (along with applying
> > PreComputeManipulatorFn). Now the stream becomes Stream<QueryRunner>
> > 3) Call queryRunnerFactory.mergeRunners() (factory is created from the
> > injector and given query) on the materialized list of QueryRunner
> > 4) Create a FluentQueryRunner out of 3) and add necessary steps including
> > mergeResults(), which essentially calls queryToolChest.mergeResults() on
> > queryRunnerFactory.mergeRunners()
> >
> > Does my approach look valid or is it something that I shouldn't be doing
> > for merging query results? Before I changed the merging logic to the
> above
> > I encountered a problem with merging sub-query results properly for very
> > heavy groupBy queries.
> >
> > I haven't had much chance to read through all the group by query
> processing
> > logic, but I am wondering if the improvement that I gained was from
> > changing logic was mainly from deserializing the sub-query results in
> > parallel (by calling queryRunnerFactory.mergeRunners() which seems to
> > enable parallelism), or if it was also benefitting from using
> > GroupByMergingQueryRunnerV2 that has parallel combining threads enabled.
> >
> > Thanks,
> > Jisoo
> >
> > On Thu, Jul 19, 2018 at 3:06 PM, Jihoon Son <ghoonson@xxxxxxxxx> wrote:
> >
> > > Hi Jisoo,
> > >
> > > sorry, the previous email was sent by accident.
> > >
> > > The initial version of groupBy v2 wasn't capable of combining
> > intermediates
> > > in parallel. Some of our customers met the similar issue to yours, and
> > so I
> > > was working on improving groupBy v2 performance for a while.
> > >
> > > Parallel combining on brokers definitely makes sense. I was thinking to
> > add
> > > a sort of ParallelMergeSequence which is a parallel version of
> > > MergeSequence, but it can be anything if it supports parallel combining
> > on
> > > brokers.
> > >
> > > One thing I'm worrying about is, most query processing interfaces in
> > > brokers are using Sequence, and thus using another stuff for a specific
> > > query type might make the codes complicated. I think we need to avoid
> it
> > if
> > > possible.
> > >
> > > Best,
> > > Jihoon
> > >
> > > On Thu, Jul 19, 2018 at 2:58 PM Jihoon Son <ghoonson@xxxxxxxxx> wrote:
> > >
> > > > Hi Jisoo,
> > > >
> > > > the initial version of groupBy v2
> > > >
> > > > On Thu, Jul 19, 2018 at 2:42 PM Jisoo Kim <jisoo.kim@xxxxxxxx.invalid
> >
> > > > wrote:
> > > >
> > > >> Hi all,
> > > >>
> > > >> I am currently working on a project that uses Druid's QueryRunner
> and
> > > >> other
> > > >> druid-processing classes. It uses Druid's own classes to calculate
> > query
> > > >> results. I have been testing large GroupBy queries (using v2), and
> it
> > > >> seems
> > > >> like parallel combining threads for GroupBy queries are only enabled
> > on
> > > >> the
> > > >> historical level. I think it is only getting called by
> > > >> GroupByStrategyV2.mergeRunners()
> > > >> <
> > > >> https://github.com/apache/incubator-druid/blob/druid-0.
> > > 12.1/processing/src/main/java/io/druid/query/groupby/
> > > strategy/GroupByStrategyV2.java#L335
> > > >> >
> > > >> which is only called by GroupByQueryRunnerFactory.mergeRunners() on
> > > >> historicals.
> > > >>
> > > >> Are GroupByMergingQueryRunnerV2 and parallel combining threads meant
> > for
> > > >> computing and merging per-segment results only, or can they also be
> > used
> > > >> on
> > > >> the broker level? I changed the logic of my project from calling
> > > >> queryToolChest.mergeResults() on MergeSequence (created by
> providing a
> > > >> list
> > > >> of per-segment/per-server sequences) to calling
> > > >> queryToolChest.mergeResults() on queryRunnerFactory.mergeRunners()
> > > (where
> > > >> each runner returns a deserialized result sequence), and that seemed
> > to
> > > >> have reduced really heavy groupby query computation time or failures
> > by
> > > >> quite a lot. Or is this just a coincidence and there shouldn't be a
> > > >> performance difference in merging groupby query results, and the
> only
> > > >> difference could've been by parallelizing the deserialization of
> > result
> > > >> sequences from sub-queries?
> > > >>
> > > >> Thanks,
> > > >> Jisoo
> > > >>
> > > >
> > >
> >
>