com.rapportive.storm.spout
Class AMQPSpout

java.lang.Object
  extended by com.rapportive.storm.spout.AMQPSpout
All Implemented Interfaces:
ISpout, IComponent, IRichSpout, Serializable

public class AMQPSpout
extends Object
implements IRichSpout

Spout to feed messages into Storm from an AMQP queue. Each message routed to the queue will be emitted as a Storm tuple. The message will be acked or rejected once the topology has respectively fully processed or failed the corresponding tuple.

N.B. if you need to guarantee all messages are reliably processed, you should have AMQPSpout consume from a queue that is not set as 'exclusive' or 'auto-delete': otherwise if the spout task crashes or is restarted, the queue will be deleted and any messages in it lost, as will any messages published while the task remains down. See SharedQueueWithBinding to declare a shared queue that allows for guaranteed processing. (For prototyping, an ExclusiveQueueWithBinding may be simpler to manage.)

N.B. this does not currently handle malformed messages (which cannot be deserialised by the provided Scheme) very well: the spout worker will crash if it fails to serialise a message.

This consumes messages from AMQP asynchronously, so it may receive messages before Storm requests them as tuples; therefore it buffers messages in an internal queue. To avoid this buffer growing large and consuming too much RAM, set CONFIG_PREFETCH_COUNT.

This spout can be distributed among multiple workers, depending on the queue declaration: see QueueDeclaration.isParallelConsumable().

Author:
Sam Stokes (sam@rapportive.com)
See Also:
QueueDeclaration, SharedQueueWithBinding, ExclusiveQueueWithBinding, Serialized Form

Field Summary
static String CONFIG_PREFETCH_COUNT
          Storm config key to set the AMQP basic.qos prefetch-count parameter.
static long WAIT_AFTER_SHUTDOWN_SIGNAL
          Time in milliseconds to wait after losing connection to the AMQP broker before attempting to reconnect.
static long WAIT_FOR_NEXT_MESSAGE
          Time in milliseconds to wait for a message from the queue if there is no message ready when the topology requests a tuple (via nextTuple()).
 
Constructor Summary
AMQPSpout(String host, int port, String username, String password, String vhost, QueueDeclaration queueDeclaration, Scheme scheme)
          Create a new AMQP spout.
 
Method Summary
 void ack(Object msgId)
          Acks the message with the AMQP broker.
 void close()
          Cancels the queue subscription, and disconnects from the AMQP broker.
 void declareOutputFields(OutputFieldsDeclarer declarer)
          Declares the output fields of this spout according to the provided Scheme.
 void fail(Object msgId)
          Tells the AMQP broker to drop (Basic.Reject) the message.
 boolean isDistributed()
          This spout can be distributed among multiple workers if the QueueDeclaration supports it.
 void nextTuple()
          Emits the next message from the queue as a tuple.
 void open(Map config, TopologyContext context, SpoutOutputCollector collector)
          Connects to the AMQP broker, declares the queue and subscribes to incoming messages.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

CONFIG_PREFETCH_COUNT

public static final String CONFIG_PREFETCH_COUNT
Storm config key to set the AMQP basic.qos prefetch-count parameter. Defaults to 100.

This caps the number of messages outstanding (i.e. unacked) at a time that will be sent to each spout worker. Increasing this will improve throughput if the network roundtrip time to the AMQP broker is significant compared to the time for the topology to process each message; this will also increase the RAM requirements as the internal message buffer grows.

AMQP allows a prefetch-count of zero, indicating unlimited delivery, but that is not allowed here to avoid unbounded buffer growth.

See Also:
Constant Field Values

WAIT_FOR_NEXT_MESSAGE

public static final long WAIT_FOR_NEXT_MESSAGE
Time in milliseconds to wait for a message from the queue if there is no message ready when the topology requests a tuple (via nextTuple()).

See Also:
Constant Field Values

WAIT_AFTER_SHUTDOWN_SIGNAL

public static final long WAIT_AFTER_SHUTDOWN_SIGNAL
Time in milliseconds to wait after losing connection to the AMQP broker before attempting to reconnect.

See Also:
Constant Field Values
Constructor Detail

AMQPSpout

public AMQPSpout(String host,
                 int port,
                 String username,
                 String password,
                 String vhost,
                 QueueDeclaration queueDeclaration,
                 Scheme scheme)
Create a new AMQP spout. When open(Map, TopologyContext, SpoutOutputCollector) is called, it will declare a queue according to the specified queueDeclaration, subscribe to the queue, and start consuming messages. It will use the provided scheme to deserialise each AMQP message into a Storm tuple.

Parameters:
host - hostname of the AMQP broker node
port - port number of the AMQP broker node
username - username to log into to the broker
password - password to authenticate to the broker
vhost - vhost on the broker
queueDeclaration - declaration of the queue / exchange bindings
scheme - Scheme used to deserialise each AMQP message into a Storm tuple
Method Detail

ack

public void ack(Object msgId)
Acks the message with the AMQP broker.

Specified by:
ack in interface ISpout

close

public void close()
Cancels the queue subscription, and disconnects from the AMQP broker.

Specified by:
close in interface ISpout

fail

public void fail(Object msgId)
Tells the AMQP broker to drop (Basic.Reject) the message.

N.B. this does not requeue the message: failed messages will simply be dropped. This is to prevent infinite redelivery in the event of non-transient failures (e.g. malformed messages). However it means that messages will not be retried in the event of transient failures.

TODO make this configurable.

Specified by:
fail in interface ISpout

nextTuple

public void nextTuple()
Emits the next message from the queue as a tuple.

If no message is ready to emit, this will wait a short time (WAIT_FOR_NEXT_MESSAGE) for one to arrive on the queue, to avoid a tight loop in the spout worker.

Specified by:
nextTuple in interface ISpout

open

public void open(Map config,
                 TopologyContext context,
                 SpoutOutputCollector collector)
Connects to the AMQP broker, declares the queue and subscribes to incoming messages.

Specified by:
open in interface ISpout

declareOutputFields

public void declareOutputFields(OutputFieldsDeclarer declarer)
Declares the output fields of this spout according to the provided Scheme.

Specified by:
declareOutputFields in interface IComponent

isDistributed

public boolean isDistributed()
This spout can be distributed among multiple workers if the QueueDeclaration supports it.

Specified by:
isDistributed in interface IRichSpout
See Also:
QueueDeclaration.isParallelConsumable()


Copyright © 2011. All Rights Reserved.