osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Consume rate drops dracstically with message process latency, even with many consumers per queue.


My guess is that this is because messages are sitting in the prefetch
buffer of consumers while they are being slow to consume their current
message, making those messages unavailable for other consumers who might be
idle.

If you change your consumers' prefetch size to 0, do you get the expected
throughput?

Tim

On Wed, Nov 7, 2018, 7:58 AM angeloslenis <lenis.angelos@xxxxxxxxx wrote:

> Hello,
>
> TLDR: When we introduce some latency on the consumers, the maximum consume
> rate drops drastically, even though we have enough consumer threads to
> handle the rate.
>
>
> We are currently evaluating ActiveMQ, focusing on produce/consume rate:
> The setup is the following:
> - Virtual Topics: 100
> - Consumer Queues: 3 per virtual topic amounting to 300 queues.
> - Produce rate: 10 messages/second on each Virtual topic, amounting to
> 1000messages/second
> -- PooledConnectionFactory with 16 connections
> - Consumers: 4 different consumers on each queue, amounting to 1200
> consumers (i.e. sessions + consumer)
> -- another PooledConnectionFactory with 16 connections
> - Broker configuration: default settings, producerFlowControl is enabled
> - Everything runs on a single machine, i7 2.5Ghz, 16GB memory
>
> When the consumers process messages very fast we can achieve a consume rate
> of 3000 messages/sec which is the expected one for the 1000messages/sec
> that
> we produce (3 queues per topic).
>
> When we introduce some latency on the consumers, the maximum consume rate
> drops drastically, even though we have enough consumers (== threads) to
> handle the rate:
>
> - Latency 10 msec, Consume Rate drops to 1300 messages/sec
> - Latency 20 msec, Consume Rate drops to 650 messages/sec
> - Latency 100 msec, Consume Rate drops to 130 messages/sec
>
> In all cases produce rate also drops to approximately 1/3 of consume rate,
> due to flow control.
>
> Any advice on why the consume rate drops?
>
>
> Below are code samples, with the connections and producers/consumers
> configurations:
>
> // ------------------------------------
> // Producer Connection Settings
> ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(connectionUrl(amqConfiguration));
> connectionFactory.setUserName(amqConfiguration.getPublisherUsername());
> connectionFactory.setPassword(amqConfiguration.getPublisherPassword());
> connectionFactory.setClientIDPrefix("HermesMonitorAMQ-Producer");
> connectionFactory.setAlwaysSyncSend(true);
> connectionFactory.setDispatchAsync(false);
> connectionFactory.setSendTimeout(5000);
>
> publisherConnectionFactory = new
> PooledConnectionFactory(connectionFactory);
> publisherConnectionFactory.setMaxConnections(16);
> publisherConnectionFactory.start();
>
> // ------------------------------------
> // Producer: this is called by different threads  to produce 10
> messages/sec
> on each topic
> private synchronized void publish(String topic) throws JMSException,
> TException {
>     // 1. Create connection
>     Connection producerConnection =
> publisherConnectionFactory.createConnection();
>     // 2. Create Session
>     Session producerSession = producerConnection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>     // 3. Create Destination
>     Destination destination = producerSession.createTopic(topic);
>     // ^^ e.g. "VirtualTopic.ProducerYYY"
>     // 4. Create Producer
>     MessageProducer producer = producerSession.createProducer(destination);
>     producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>
>     // 5. Create message
>     MapMessage producerMessage = producerSession.createMapMessage();
>     {
>         // Add some payload
>         byte[] payload = new
> byte[monitorParameters.getProducerMessageSize()];
>         new Random().nextBytes(payload);
>         producerMessage.setObject("payload", payload);
>     }
>
>     // 6. Send Message
>     producer.send(producerMessage);
>
>     producer.close();
>     producerSession.close();
>     producerConnection.close();
> }
>
> // ------------------------------------
> // Consumer Connection Settings
> ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(connectionUrl(amqConfiguration));
> connectionFactory.setClientIDPrefix("HermesMonitorAMQ-Consumer");
> connectionFactory.setUserName(amqConfiguration.getConsumerUsername());
> connectionFactory.setPassword(amqConfiguration.getConsumerPassword());
> connectionFactory.setAlwaysSessionAsync(true);
> connectionFactory.setOptimizeAcknowledge(false);
> connectionFactory.setDispatchAsync(false);
> connectionFactory.setSendTimeout(5000);
> connectionFactory.setConnectResponseTimeout(5000);
> connectionFactory.setMessagePrioritySupported(true);
> ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
> prefetchPolicy.setAll(1);
> connectionFactory.setPrefetchPolicy(prefetchPolicy);
> RedeliveryPolicy redeliveryPolicy =
> connectionFactory.getRedeliveryPolicy();
> redeliveryPolicy.setMaximumRedeliveries(0);
> connectionFactory.setRedeliveryPolicy(redeliveryPolicy);
>
> consumerConnectionFactory = new PooledConnectionFactory(connectionFactory);
> consumerConnectionFactory.setMaxConnections(16);
>
> consumerConnectionFactory.start();
>
> //---------------------------------------------------
> // Consumer: this is called 4 times per queue => 4 * 300 = 1200 sessions +
> consumers
> void startConsumer(String queue) {
>     // 1. Establish a connection for the consumer.
>     consumerConnection = consumerConnectionFactory.createConnection();
>     // 2. Create a session.
>     consumerSession = consumerConnection.createSession(false,
> Session.CLIENT_ACKNOWLEDGE);
>     // 3. Create a message consumer from the session to the queue.
>     Destination destination = consumerSession.createQueue(queue));
>     // ^^ e.g. "Consumer.ConsumerXXX.VirtualTopic.ProducerYYY"
>     // 4. Create consumer
>     consumer = consumerSession.createConsumer(destination);
>     // 5. Add a listener for handling received messages
>     consumer.setMessageListener((consumerMessage) -> {
>         try {
>             MapMessage consumerByteMessage = (MapMessage) consumerMessage;
>                 try {
>                     Thread.sleep(N); // Introduce some artificial delay
>                 } catch (InterruptedException e) {
>                     logger.error("", e);
>                 }
>                 consumerMessage.acknowledge();
>         } catch (JMSException e) {
>             try {
>                 consumerSession.recover();
>             } catch (JMSException e1) {
>                 throw new RuntimeException(e1);
>             }
>         }
>     });
>     consumerConnection.start();
> }
>
>
>
> --
> Sent from:
> http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html
>