[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: throttling/backpressure on a route

Hi Onder,

Onder SEZGIN <ondersezgin@xxxxxxxxx> writes:

> Hi,
> I would suggest
> 1- Take a look at throttle-eip. (Make sure you understand split, aggregate
> and mongodb3 producer adocs around mongodb aggregation. Try and see your
> case unless you can explain or create a unit test to pinpoint your issue
> and also check mongodb3 components test case for better examples.)

I looked at the throttle EIP before, unfortunately this doesn't seem to
solve the issue at hand. What if the producer is producing more than
downstream can manage? Memory will be blasted. The throttler cannot know
how is downstream doing. Downstream needs to tell upstream if it's OK to
send more.

> 2- All in memory. Depending on your route design, they are either in
> memoryaggregationrepository or flowing through downstream enpoints / routes.

Understood, thanks.

> 3- For your example, take a look at split, aggreagte, mongodb3 aggregate
> functionality and possibly, throttle or delay eips.

As explained above none of these seem to solve the issue. There needs to
be communication between the producers. Having

A -> B -> C -> D -> E -> F

E needs to tell B "Wait" and when it catches up tell B "OK please send
more". Otherwise things will flow downstream, filling up memory.

Surely constraining memory usage is a solved problem? I'm just too new
to understand what pattern(s) solve it.

> On Tue, Oct 16, 2018 at 10:18 AM Peter Nagy (Jr) <pnagy@xxxxxxxxxx> wrote:
>> Newbie question incoming.
>> I have a route that looks like
>> ...
>> .setHeader("CamelMongoDbBatchSize", 128)
>> .to("mongodb3:mymongo?...&operation=aggregate&outputType=MongoIterable")
>> .split(constant(true))
>> .streaming()
>> ...
>> .aggregate(constant(true), new GroupedBodyAggregationStrategy())
>> .completionSize(128)
>> .to("mongodb3:myothermongo?...&operation=bulkWrite")
>> Pardon if there's some typos, I wrote this by hand since I'm using Camel
>> from clojure and the clojure code looks a bit different.
>> This works as expected, the aggregation is retrieved in batches and sent
>> downstream one-by-one, being processed with various Processors and
>> finally written in batches.
>> I had a bug before where I had
>> .aggregate().body().completionSize(128).completionTimeout(30000)
>> which resulted in the aggregation waiting for the timeout. However in
>> the meantime the mongo aggregation query finished processing everything.
>> This raised some questions for me.
>> 1. My process needs to be memory-friendly. Even after fixing the bug I
>> need to be sure the aggregation query won't fill the RAM when downstream
>> can't keep up. How can I apply some backpressure? How can I tell the
>> route "don't process more until someone downstream says you can"? E.g.
>> saying if there's more than N exchanges pending, wait.
>> 2. Who was buffering the exchanges? I had ~2k entries flowing through,
>> where did they end up queued?
>> 3. Did I miss a doc page where these questions are explained? I spent a
>> considerable amount of time searching for an answer thinking "This must
>> be a common requirement, surely there's an example somewhere".
>> --
>> To reach a goal one has to enjoy the journey.

To reach a goal one has to enjoy the journey.