com.rapportive.storm.amqp
Class SharedQueueWithBinding

java.lang.Object
  extended by com.rapportive.storm.amqp.SharedQueueWithBinding
All Implemented Interfaces:
QueueDeclaration, Serializable

public class SharedQueueWithBinding
extends Object
implements QueueDeclaration

Declares a named, durable queue and binds it to an existing exchange. This is a good choice for production use as the queue will survive spout restarts, so you won't miss messages if your spout crashes.

N.B. this could be risky under some circumstances. e.g. if while prototyping you set a development topology consuming from a production AMQP server, then kill your topology and go home for the night; messages will continue to be queued up, which could threaten the stability of the AMQP server if the exchange is high-volume. For prototyping consider ExclusiveQueueWithBinding.

This queue is safe for multiple parallel spout tasks: as they all consume the same named queue, the AMQP broker will round-robin messages between them, so each message will get processed only once (barring redelivery due to outages).

See Also:
Serialized Form

Constructor Summary
SharedQueueWithBinding(String queueName, String exchange, String routingKey)
          Create a declaration of a named, durable, non-exclusive queue bound to the specified exchange.
 
Method Summary
 com.rabbitmq.client.AMQP.Queue.DeclareOk declare(com.rabbitmq.client.Channel channel)
          Verifies the exchange exists, creates the named queue if it does not exist, and binds it to the exchange.
 boolean isParallelConsumable()
          Returns true as this queue is safe for parallel consumers.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

SharedQueueWithBinding

public SharedQueueWithBinding(String queueName,
                              String exchange,
                              String routingKey)
Create a declaration of a named, durable, non-exclusive queue bound to the specified exchange.

Parameters:
queueName - name of the queue to be declared.
exchange - exchange to bind the queue to.
routingKey - routing key for the exchange binding. Use "#" to receive all messages published to the exchange.
Method Detail

declare

public com.rabbitmq.client.AMQP.Queue.DeclareOk declare(com.rabbitmq.client.Channel channel)
                                                 throws IOException
Verifies the exchange exists, creates the named queue if it does not exist, and binds it to the exchange.

Specified by:
declare in interface QueueDeclaration
Parameters:
channel - An open AMQP channel which can be used to send the declarations.
Returns:
the server's response to the successful queue declaration.
Throws:
IOException - if the exchange does not exist, the queue could not be declared, or if the AMQP connection drops.

isParallelConsumable

public boolean isParallelConsumable()
Returns true as this queue is safe for parallel consumers.

Specified by:
isParallelConsumable in interface QueueDeclaration
Returns:
true if safe for parallel consumers, otherwise false.


Copyright © 2011. All Rights Reserved.