osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How to read messages from a queue in parallel


Hi Valdis,

Many thanks for your feedback, appreciate it.

I believe I was looking for the static or dyanamic routing:

   - Aggregation is not suitable for my use case, as I don't have a
deterministic message flow. I also want to process messages in a real-time
manner.

   - Static/dynamic message or routing might be suitable, though the groups
should be treated as a dynamic set. In my use case, I have `m` groups and
`n` available executor threads. Here, `m` can be any (large) number while
`n` is usually small, say 8. I have to try it out with static or dynamic
routing, it would be great if I could apply a module hash to map an
arbitrary group identifier to an executor thread.

It's important that the RabbitMQ ACK/NACK happens *after* the successful
execution on the executor thread; that is, not when it has been routed by
the router. I don't want to risk dropping messages, for example during a
service restart.

I'll experiment and come back with my results.

Thanks.

On Mon, Dec 17, 2018 at 4:41 PM Valdis Andersons <valdis.andersons@xxxxxx>
wrote:

> Hi Peter,
>
> Given you have a header already identifying the message categories, you
> could try to look into the Aggregator EIP:
>
> http://camel.apache.org/aggregator2.html
>
> It will allow you to group all the messages by the header into 'batches'
> for each of the groups. Each batch can then be processed by a single
> consumer and it would allow you to have multiple consumers taking care of
> the parallel processing of different groups. This is very suitable for
> periodic batch processing of messages, but requires finite batch sizes
> and/or finite time limits.
>
> If you have a fixed set of group headers but not a deterministic size of
> the messages per group or a non-deterministic time period then a Message
> Router might be suitable:
>
> http://camel.apache.org/message-router.html
>
> You can directly check for the header in the choice expression and then
> route the different group messages to their respective group queues where
> you can have single consumers processing them in sequence in a continuous
> way. The separate queues would allow to fulfil the parallel processing per
> group requirement.
>
> There is also a Dynamic Router option, but I personally wouldn't have any
> experience with it, but if none of the above suit your needs it might be
> something worth looking into:
>
> http://camel.apache.org/dynamic-router.html
>
>
> Thanks,
> Valdis
>
> -----Original Message-----
> From: Peter Billen [mailto:peter.billen@xxxxxxxxx]
> Sent: 17 December 2018 11:08
> To: users@xxxxxxxxxxxxxxxx
> Subject: How to read messages from a queue in parallel
>
> Hi all,
>
> I am reading from a RabbitMQ queue as following:
>
>
> from("rabbitmq://localhost/?queue=camel&autoAck=false&concurrentConsumers=1&
> threadPoolSize=1&prefetchEnabled=true&prefetchCount=50")
>
> Some remarks about the configuration parameters:
>
> - I set `autoAck` to false to be able to acknowledge manually in my
> implementation.
> - I set `concurrentConsumers` and `threadPoolSize` to 1 to guarantee that
> I consume the messages in the same order as they were added to the queue.
> - I set `prefetchCount` to 50 to have at most 50 inflight messages in
> memory.
>
> Now, I want to process these 50 messages in an asynchronous fashion and
> manually acknowledge when done. Each message has a `group identifier`
> header set. Messages from the same group will be processed sequentially,
> while messages from other groups will be processed concurrently.
>
> I tried to start with the following:
>
> from("rabbitmq://...")
> .process(new AsyncProcessor() {
>     @Override
>     public boolean process(final Exchange exchange, final AsyncCallback
> callback) {
>         System.out.println("ASYNC");
>         // TODO: (1) read group identifier (2) submit task to executor
> responsible for that particular group (3) call callback.done() in the task
> once done
>         return false;
>     }
>
>     @Override
>     public void process(final Exchange exchange) {
>         throw new UnsupportedOperationException();
>     }
> })
>
> The problem is here that only the first message is given to
> `process(exchange, callback)`. Is there a way to also receive the other
> inflight messages?
>
> Note that I do *not* want to increase the number of RabbitMQ consumers, as
> this would skew with the message order. It is important that messages from
> the same group will be executed sequentially, hence the necessity to have
> one single RabbitMQ consumer.
>
> Thanks!
>
> Vhi Group DAC (Vhi) is a holding company for insurance and healthcare
> services, which include Vhi Healthcare DAC, Vhi Insurance DAC, Vhi Health
> Services DAC and Vhi Investments DAC. Vhi Healthcare DAC trading as Vhi
> Healthcare and Vhi Insurance DAC trading as Vhi Insurance are regulated by
> the Central Bank of Ireland. Vhi Healthcare is tied to Vhi Insurance DAC
> for health insurance in Ireland which is underwritten by Vhi Insurance DAC.
> Vhi Healthcare is tied to Zurich Life Assurance plc for Vhi Life Term
> Insurance and Vhi Mortgage Protection which is underwritten by Zurich Life
> Assurance plc. Vhi Healthcare is tied to Collinson Insurance Services
> Limited for MultiTrip Travel Insurance, Backpacker Travel Insurance and Vhi
> Dental Insurance which are underwritten by Great Lakes Insurance SE, UK
> branch and for Vhi Canada Cover and Vhi International Health Insurance
> which are underwritten by Astrenska Insurance Limited. For more information
> about the Vhi Group please go to: https://www.vhi.ie/about-vhi.
>
>
> Tá Vhi Group DAC (Vhi) ina chuideachta sealbhaíochta le haghaidh seirbhísí
> árachais agus seirbhísí cúram sláinte, lena n-áirítear Vhi Healthcare DAC,
> Vhi Insurance DAC, Vhi Health Services DAC agus Vhi Investments DAC.
> Déanann Banc Ceannais na hÉireann rialáil ar Vhi Healthcare DAC, ag trádáil
> dó mar Vhi Healthcare, agus ar Vhi Insurance DAC, ag trádáil dó mar Vhi
> Insurance. Tá Vhi Healthcare ceangailte le Vhi Insurance DAC le haghaidh
> árachas sláinte in Éirinn, rud atá frithgheallta ag Vhi Insurance DAC. Tá
> Vhi Healthcare ceangailte le Zurich Life Assurance plc le haghaidh Árachais
> Saoil de chuid Vhi agus Árachas Cosanta Morgáiste de chuid Vhi atá
> frithgheallta ag Zurich Life Assurance plc. Tá Vhi Healthcare ceangailte le
> Collinson Insurance Services Limited le haghaidh Árachas Taistil Ilturais
> agus Turasóirí Mála Droma agus Árachas Fiaclóireachta de chuid Vhi atá
> frithgheallta ag Great Lakes Insurance SE, UK branch agus le haghaidh
> Clúdach Cheanada de chuid Vhi agus Árachas Sláinte Idirnáisiúnta de chuid
> Vhi atá frithgheallta ag Astrenska Insurance Limited. Chun tuilleadh
> faisnéise a fháil faoi Ghrúpa Vhi, tabhair cuairt ar:
> https://www.vhi.ie/about-vhi.
>
> This e-mail and any files transmitted with it contain information which
> may be confidential and which may also be privileged and is intended solely
> for the use of the individual or entity to whom it is addressed. Unless you
> are the intended recipient you may not copy or use it, or disclose it to
> anyone else. Any opinions expressed are that of the individual and not
> necessarily that of the Vhi Group. If you have received this e-mail in
> error please notify the sender by return.
>
>
>
>
>
>
>