From 0517b7805170a265d732fe5690201ea91ac06fa3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Scarpa?= Date: Thu, 3 Feb 2022 10:43:08 +0100 Subject: [PATCH] allow to specify messageId + allow "" exchangeName --- pom.xml | 73 ++-- .../nickshoe/storm/rabbitmq/Declarator.java | 4 +- .../it/nickshoe/storm/rabbitmq/Message.java | 378 +++++++++++------- .../nickshoe/storm/rabbitmq/RabbitMQBolt.java | 88 ++-- .../storm/rabbitmq/RabbitMQProducer.java | 302 +++++++------- .../storm/rabbitmq/TupleToMessage.java | 208 +++++----- .../rabbitmq/config/ConnectionConfig.java | 2 +- 7 files changed, 595 insertions(+), 460 deletions(-) diff --git a/pom.xml b/pom.xml index 6a94409..cacb42a 100644 --- a/pom.xml +++ b/pom.xml @@ -68,33 +68,66 @@ UTF-8 - 1.6 + + 1.8 + + ${java.version} + ${java.version} + + 2.3.0 + 5.14.1 + 1.7.33 + + 3.8.1 + 2.8.2 + 3.0.1 + 2.5.3 + 3.2.1 + 3.3.1 - + + + org.apache.storm + storm-client + ${storm.version} + provided + + + com.rabbitmq + amqp-client + ${amqp-client.version} + + + org.slf4j + slf4j-api + ${slf4j-api.version} + + + org.apache.maven.plugins maven-gpg-plugin - 1.5 + ${maven-gpg-plugin.version} org.apache.maven.plugins maven-release-plugin - 2.5 + ${maven-release-plugin.version} org.apache.maven.plugins maven-source-plugin - 2.2.1 + ${maven-source-plugin.version} org.apache.maven.plugins maven-javadoc-plugin - 2.9.1 + ${maven-javadoc-plugin.version} @@ -103,16 +136,12 @@ org.apache.maven.plugins maven-compiler-plugin - 2.5.1 - - ${jdk.version} - ${jdk.version} - + ${maven-compiler-plugin.version} org.apache.maven.plugins maven-deploy-plugin - 2.5 + ${maven-deploy-plugin.version} org.apache.maven.plugins @@ -123,24 +152,4 @@ - - - - org.apache.storm - storm-core - 1.0.2 - provided - - - com.rabbitmq - amqp-client - 4.0.0 - - - org.slf4j - slf4j-api - 1.7.5 - - - diff --git a/src/main/java/it/nickshoe/storm/rabbitmq/Declarator.java b/src/main/java/it/nickshoe/storm/rabbitmq/Declarator.java index c9fb371..9f168be 100644 --- a/src/main/java/it/nickshoe/storm/rabbitmq/Declarator.java +++ b/src/main/java/it/nickshoe/storm/rabbitmq/Declarator.java @@ -8,7 +8,9 @@ public interface Declarator extends Serializable { void execute(Channel channel); public static class NoOp implements Declarator { - @Override + private static final long serialVersionUID = 1L; + + @Override public void execute(Channel channel) {} } } diff --git a/src/main/java/it/nickshoe/storm/rabbitmq/Message.java b/src/main/java/it/nickshoe/storm/rabbitmq/Message.java index 674b47e..4dbd8bd 100644 --- a/src/main/java/it/nickshoe/storm/rabbitmq/Message.java +++ b/src/main/java/it/nickshoe/storm/rabbitmq/Message.java @@ -4,160 +4,230 @@ import java.util.HashMap; import java.util.Map; -import com.rabbitmq.client.QueueingConsumer; +import com.rabbitmq.client.Delivery; public class Message { - public static final Message NONE = new None(); - - private final byte[] body; - - public Message(byte[] body) { - this.body = body; - } - - public static Message forDelivery(QueueingConsumer.Delivery delivery) { - return (delivery != null) ? new DeliveredMessage(delivery) : NONE; - } - - public static Message forSending(byte[] body, - Map headers, - String exchangeName, - String routingKey, - String contentType, - String contentEncoding, - boolean persistent) { - return (body != null && exchangeName != null && exchangeName.length() > 0) ? - new MessageForSending(body, headers, exchangeName, routingKey, contentType, contentEncoding, persistent) : - NONE; - } - - public byte[] getBody() { - return body; - } - - public static class DeliveredMessage extends Message { - private final boolean redelivery; - private final long deliveryTag; - private final String routingKey; - private final String exchange; - private final String className; - private final String clusterId; - private final String contentEncoding; - private final String contentType; - private final String correlationId; - private final Integer deliveryMode; - private final String expiration; - private final Map headers; - private final String messageId; - private final Integer priority; - private final String replyTo; - private final Date timestamp; - private final String type; - private final String userId; - - private DeliveredMessage(QueueingConsumer.Delivery delivery) { - super(delivery.getBody()); - redelivery = delivery.getEnvelope().isRedeliver(); - deliveryTag = delivery.getEnvelope().getDeliveryTag(); - routingKey = delivery.getEnvelope().getRoutingKey(); - exchange = delivery.getEnvelope().getExchange(); - className = delivery.getProperties().getClassName(); - clusterId = delivery.getProperties().getClusterId(); - contentEncoding = delivery.getProperties().getContentEncoding(); - contentType = delivery.getProperties().getContentType(); - correlationId = delivery.getProperties().getCorrelationId(); - deliveryMode = delivery.getProperties().getDeliveryMode(); - expiration = delivery.getProperties().getExpiration(); - headers = delivery.getProperties().getHeaders(); - messageId = delivery.getProperties().getMessageId(); - priority = delivery.getProperties().getPriority(); - replyTo = delivery.getProperties().getReplyTo(); - timestamp = delivery.getProperties().getTimestamp(); - type = delivery.getProperties().getType(); - userId = delivery.getProperties().getUserId(); - } - - public boolean isRedelivery() { return redelivery; } - public long getDeliveryTag() { return deliveryTag; } - public String getRoutingKey() { return routingKey; } - public String getExchange() { return exchange; } - public String getClassName() { return className;} - public String getClusterId(){ return clusterId; } - public String getContentEncoding() { return contentEncoding; } - public String getContentType() { return contentType; } - public String getCorrelationId() { return correlationId; } - public Integer getDeliveryMode() { return deliveryMode; } - public String getExpiration() { return expiration; } - public Map getHeaders() { return headers; } - public String getMessageId() { return messageId; } - public Integer getPriority() { return priority; } - public String getReplyTo() { return replyTo; } - public Date getTimestamp() { return timestamp; } - public String getType() { return type; } - public String getUserId() { return userId; } - } - - public static class None extends Message { - private None() { - super(null); - } - - @Override - public byte[] getBody() { throw new UnsupportedOperationException(); }; - } - - public static class MessageForSending extends Message { - private final Map headers; - private final String exchangeName; - private final String routingKey; - private final String contentType; - private final String contentEncoding; - private final boolean persistent; - - private MessageForSending(byte[] body, - Map headers, - String exchangeName, - String routingKey, - String contentType, - String contentEncoding, - boolean persistent) { - super(body); - this.headers = (headers != null) ? headers : new HashMap(); - this.exchangeName = exchangeName; - this.routingKey = routingKey; - this.contentType = contentType; - this.contentEncoding = contentEncoding; - this.persistent = persistent; - } - - public Map getHeaders() - { - return headers; - } - - public String getExchangeName() - { - return exchangeName; - } - - public String getRoutingKey() - { - return routingKey; - } - - public String getContentType() - { - return contentType; - } - - public String getContentEncoding() - { - return contentEncoding; - } - - public boolean isPersistent() - { - return persistent; - } - } + + public static final Message NONE = new None(); + + private final byte[] body; + + public Message(byte[] body) { + this.body = body; + } + + public static Message forDelivery(Delivery delivery) { + if (delivery != null) { + return new DeliveredMessage(delivery); + } else { + return NONE; + } + } + + public static Message forSending( + byte[] body, + Map headers, + String exchangeName, + String routingKey, + String contentType, + String contentEncoding, + boolean persistent, + String messageId + ) { + if (body != null && exchangeName != null) { + return new MessageForSending(body, headers, exchangeName, routingKey, contentType, contentEncoding, persistent, messageId); + } else { + return NONE; + } + } + + public byte[] getBody() { + return body; + } + + public static class DeliveredMessage extends Message { + private final boolean redelivery; + private final long deliveryTag; + private final String routingKey; + private final String exchange; + private final String className; + private final String clusterId; + private final String contentEncoding; + private final String contentType; + private final String correlationId; + private final Integer deliveryMode; + private final String expiration; + private final Map headers; + private final String messageId; + private final Integer priority; + private final String replyTo; + private final Date timestamp; + private final String type; + private final String userId; + + private DeliveredMessage(Delivery delivery) { + super(delivery.getBody()); + redelivery = delivery.getEnvelope().isRedeliver(); + deliveryTag = delivery.getEnvelope().getDeliveryTag(); + routingKey = delivery.getEnvelope().getRoutingKey(); + exchange = delivery.getEnvelope().getExchange(); + className = delivery.getProperties().getClassName(); + clusterId = delivery.getProperties().getClusterId(); + contentEncoding = delivery.getProperties().getContentEncoding(); + contentType = delivery.getProperties().getContentType(); + correlationId = delivery.getProperties().getCorrelationId(); + deliveryMode = delivery.getProperties().getDeliveryMode(); + expiration = delivery.getProperties().getExpiration(); + headers = delivery.getProperties().getHeaders(); + messageId = delivery.getProperties().getMessageId(); + priority = delivery.getProperties().getPriority(); + replyTo = delivery.getProperties().getReplyTo(); + timestamp = delivery.getProperties().getTimestamp(); + type = delivery.getProperties().getType(); + userId = delivery.getProperties().getUserId(); + } + + public boolean isRedelivery() { + return redelivery; + } + + public long getDeliveryTag() { + return deliveryTag; + } + + public String getRoutingKey() { + return routingKey; + } + + public String getExchange() { + return exchange; + } + + public String getClassName() { + return className; + } + + public String getClusterId() { + return clusterId; + } + + public String getContentEncoding() { + return contentEncoding; + } + + public String getContentType() { + return contentType; + } + + public String getCorrelationId() { + return correlationId; + } + + public Integer getDeliveryMode() { + return deliveryMode; + } + + public String getExpiration() { + return expiration; + } + + public Map getHeaders() { + return headers; + } + + public String getMessageId() { + return messageId; + } + + public Integer getPriority() { + return priority; + } + + public String getReplyTo() { + return replyTo; + } + + public Date getTimestamp() { + return timestamp; + } + + public String getType() { + return type; + } + + public String getUserId() { + return userId; + } + } + + public static class None extends Message { + private None() { + super(null); + } + + @Override + public byte[] getBody() { + throw new UnsupportedOperationException(); + }; + } + + public static class MessageForSending extends Message { + + private final Map headers; + private final String exchangeName; + private final String routingKey; + private final String contentType; + private final String contentEncoding; + private final boolean persistent; + private final String messageId; + + private MessageForSending( + byte[] body, + Map headers, + String exchangeName, + String routingKey, + String contentType, + String contentEncoding, + boolean persistent, + String messageId + ) { + super(body); + this.headers = (headers != null) ? headers : new HashMap(); + this.exchangeName = exchangeName; + this.routingKey = routingKey; + this.contentType = contentType; + this.contentEncoding = contentEncoding; + this.persistent = persistent; + this.messageId = messageId; + } + + public Map getHeaders() { + return headers; + } + + public String getExchangeName() { + return exchangeName; + } + + public String getRoutingKey() { + return routingKey; + } + + public String getContentType() { + return contentType; + } + + public String getContentEncoding() { + return contentEncoding; + } + + public boolean isPersistent() { + return persistent; + } + + public String getMessageId() { + return messageId; + } + + } } diff --git a/src/main/java/it/nickshoe/storm/rabbitmq/RabbitMQBolt.java b/src/main/java/it/nickshoe/storm/rabbitmq/RabbitMQBolt.java index 0da3712..b92ae45 100644 --- a/src/main/java/it/nickshoe/storm/rabbitmq/RabbitMQBolt.java +++ b/src/main/java/it/nickshoe/storm/rabbitmq/RabbitMQBolt.java @@ -20,52 +20,62 @@ * */ public class RabbitMQBolt extends BaseRichBolt { - private static final long serialVersionUID = 97236452008970L; - private final TupleToMessage scheme; - private final Declarator declarator; + private static final long serialVersionUID = 97236452008970L; - private transient Logger logger; - private transient RabbitMQProducer producer; - private transient OutputCollector collector; + private final TupleToMessage scheme; + private final Declarator declarator; - public RabbitMQBolt(final TupleToMessage scheme) { - this(scheme, new Declarator.NoOp()); - } + private transient Logger logger; + private transient RabbitMQProducer producer; + private transient OutputCollector collector; - public RabbitMQBolt(final TupleToMessage scheme, final Declarator declarator) { - this.scheme = scheme; - this.declarator = declarator; - } + public RabbitMQBolt(final TupleToMessage scheme) { + this(scheme, new Declarator.NoOp()); + } - @Override - public void prepare(@SuppressWarnings("rawtypes") final Map stormConf, final TopologyContext context, final OutputCollector collector) { - producer = new RabbitMQProducer(declarator); - producer.open(stormConf); - logger = LoggerFactory.getLogger(this.getClass()); - this.collector = collector; - this.scheme.prepare(stormConf); - logger.info("Successfully prepared RabbitMQBolt"); - } + public RabbitMQBolt(final TupleToMessage scheme, final Declarator declarator) { + this.scheme = scheme; + this.declarator = declarator; + } - @Override - public void execute(final Tuple tuple) { - publish(tuple); - // tuples are always acked, even when transformation by scheme yields Message.NONE as - // if it failed once it's unlikely to succeed when re-attempted (i.e. serialization/deserilization errors). - acknowledge(tuple); - } + @Override + public void prepare(Map topoConf, TopologyContext context, OutputCollector collector) { + this.logger = LoggerFactory.getLogger(this.getClass()); + + this.collector = collector; + this.producer = new RabbitMQProducer(declarator); + + this.producer.open(topoConf); + this.scheme.prepare(topoConf); + + this.logger.info("Successfully prepared RabbitMQBolt"); + } - protected void acknowledge(Tuple tuple) { - collector.ack(tuple); - } + @Override + public void execute(final Tuple tuple) { + this.publish(tuple); + + // tuples are always acked, even when transformation by scheme yields Message.NONE + // as if it failed once it's unlikely to succeed when re-attempted + // (i.e. serialization/deserilization errors). + this.acknowledge(tuple); + } - protected void publish(Tuple tuple) { - producer.send(scheme.produceMessage(tuple)); - } + protected void publish(Tuple tuple) { + Message message = scheme.produceMessage(tuple); + + this.producer.send(message); + } + + protected void acknowledge(Tuple tuple) { + this.collector.ack(tuple); + } + + @Override + public void declareOutputFields(final OutputFieldsDeclarer declarer) { + // Since this is a sink, no downstream bolts should be specified. + // Thus, it's not necessary to specify the output fields. + } - @Override - public void declareOutputFields(final OutputFieldsDeclarer declarer) { - //No fields are emitted from this drain. - } } diff --git a/src/main/java/it/nickshoe/storm/rabbitmq/RabbitMQProducer.java b/src/main/java/it/nickshoe/storm/rabbitmq/RabbitMQProducer.java index 2a97d41..9fc82e2 100644 --- a/src/main/java/it/nickshoe/storm/rabbitmq/RabbitMQProducer.java +++ b/src/main/java/it/nickshoe/storm/rabbitmq/RabbitMQProducer.java @@ -12,143 +12,167 @@ import java.util.concurrent.TimeoutException; public class RabbitMQProducer implements Serializable { - private final Declarator declarator; - - private transient Logger logger; - - private transient ConnectionConfig connectionConfig; - private transient Connection connection; - private transient Channel channel; - - private boolean blocked = false; - - public RabbitMQProducer() - { - this(new Declarator.NoOp()); - } - - public RabbitMQProducer(Declarator declarator) { - this.declarator = declarator; - } - - public void send(Message message) { - if (message == Message.NONE) return; - sendMessageWhenNotBlocked((Message.MessageForSending) message); - } - - private void sendMessageWhenNotBlocked(Message.MessageForSending message) { - while (true) { - if (blocked) { - try { Thread.sleep(100); } catch (InterruptedException ie) { } - } else { - sendMessageActual(message); - return; - } - } - } - - private void sendMessageActual(Message.MessageForSending message) { - - reinitIfNecessary(); - if (channel == null) throw new ReportedFailedException("No connection to RabbitMQ"); - try { - AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() - .contentType(message.getContentType()) - .contentEncoding(message.getContentEncoding()) - .deliveryMode((message.isPersistent()) ? 2 : 1) - .headers(message.getHeaders()) - .build(); - channel.basicPublish(message.getExchangeName(), message.getRoutingKey(), properties, message.getBody()); - } catch (AlreadyClosedException ace) { - logger.error("already closed exception while attempting to send message", ace); - reset(); - throw new ReportedFailedException(ace); - } catch (IOException ioe) { - logger.error("io exception while attempting to send message", ioe); - reset(); - throw new ReportedFailedException(ioe); - } catch (Exception e) { - logger.warn("Unexpected error while sending message. Backing off for a bit before trying again (to allow time for recovery)", e); - try { Thread.sleep(1000); } catch (InterruptedException ie) { } - } - } - - public void open(final Map config) { - logger = LoggerFactory.getLogger(RabbitMQProducer.class); - connectionConfig = ConnectionConfig.getFromStormConfig(config); - internalOpen(); - } - - private void internalOpen() { - try { - connection = createConnection(); - channel = connection.createChannel(); - - // run any declaration prior to message sending - declarator.execute(channel); - } catch (Exception e) { - logger.error("could not open connection on rabbitmq", e); - reset(); - } - } - - public void close() { - try { - if (channel != null && channel.isOpen()) { - channel.close(); - } - } catch (Exception e) { - logger.debug("error closing channel", e); - } - try { - logger.info("closing connection to rabbitmq: " + connection); - connection.close(); - } catch (Exception e) { - logger.debug("error closing connection", e); - } - channel = null; - connection = null; - } - - private void reset() { - channel = null; - } - - private void reinitIfNecessary() { - if (channel == null) { - close(); - internalOpen(); - } - } - - private Connection createConnection() throws IOException, TimeoutException { - ConnectionFactory connectionFactory = connectionConfig.asConnectionFactory(); - Connection connection = connectionConfig.getHighAvailabilityHosts().isEmpty() ? connectionFactory.newConnection() - : connectionFactory.newConnection(connectionConfig.getHighAvailabilityHosts().toAddresses()); - connection.addShutdownListener(new ShutdownListener() { - @Override - public void shutdownCompleted(ShutdownSignalException cause) { - logger.error("shutdown signal received", cause); - reset(); - } - }); - connection.addBlockedListener(new BlockedListener() - { - @Override - public void handleBlocked(String reason) throws IOException - { - blocked = true; - logger.warn(String.format("Got blocked by rabbitmq with reason = %s", reason)); - } - - @Override - public void handleUnblocked() throws IOException - { - blocked = false; - logger.warn(String.format("Got unblocked by rabbitmq")); - } - }); - logger.info("connected to rabbitmq: " + connection); - return connection; - } + + private static final long serialVersionUID = -4893550590096622136L; + + private final Declarator declarator; + + private transient Logger logger; + + private transient ConnectionConfig connectionConfig; + private transient Connection connection; + private transient Channel channel; + + private boolean blocked = false; + + public RabbitMQProducer() { + this(new Declarator.NoOp()); + } + + public RabbitMQProducer(Declarator declarator) { + this.declarator = declarator; + } + + public void send(Message message) { + if (message == Message.NONE) { + return; + } + + sendMessageWhenNotBlocked((Message.MessageForSending) message); + } + + private void sendMessageWhenNotBlocked(Message.MessageForSending message) { + while (true) { + if (blocked) { + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + } + } else { + sendMessageActual(message); + return; + } + } + } + + private void sendMessageActual(Message.MessageForSending message) { + reinitIfNecessary(); + + if (channel == null) { + throw new ReportedFailedException("No connection to RabbitMQ"); + } + + try { + AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() + .contentType(message.getContentType()) + .contentEncoding(message.getContentEncoding()) + .deliveryMode((message.isPersistent()) ? 2 : 1) + .messageId(message.getMessageId()) + .headers(message.getHeaders()).build(); + + channel.basicPublish(message.getExchangeName(), message.getRoutingKey(), properties, message.getBody()); + } catch (AlreadyClosedException ace) { + logger.error("already closed exception while attempting to send message", ace); + reset(); + throw new ReportedFailedException(ace); + } catch (IOException ioe) { + logger.error("io exception while attempting to send message", ioe); + reset(); + throw new ReportedFailedException(ioe); + } catch (Exception e) { + logger.warn("Unexpected error while sending message. Backing off for a bit before trying again (to allow time for recovery)", e); + try { + Thread.sleep(1000); + } catch (InterruptedException ie) {} + } + } + + public void open(final Map stormConf) { + this.logger = LoggerFactory.getLogger(RabbitMQProducer.class); + + this.connectionConfig = ConnectionConfig.getFromStormConfig(stormConf); + + internalOpen(); + } + + private void internalOpen() { + try { + connection = createConnection(); + channel = connection.createChannel(); + + // run any declaration prior to message sending + declarator.execute(channel); + } catch (Exception e) { + logger.error("could not open connection on rabbitmq", e); + reset(); + } + } + + public void close() { + try { + if (channel != null && channel.isOpen()) { + channel.close(); + } + } catch (Exception e) { + logger.debug("error closing channel", e); + } + + try { + logger.info("closing connection to rabbitmq: " + connection); + connection.close(); + } catch (Exception e) { + logger.debug("error closing connection", e); + } + + channel = null; + connection = null; + } + + private void reset() { + channel = null; + } + + private void reinitIfNecessary() { + if (channel == null) { + close(); + internalOpen(); + } + } + + private Connection createConnection() throws IOException, TimeoutException { + ConnectionFactory connectionFactory = connectionConfig.asConnectionFactory(); + + Connection connection; + if (connectionConfig.getHighAvailabilityHosts().isEmpty()) { + connection = connectionFactory.newConnection(); + } else { + connection = connectionFactory.newConnection(connectionConfig.getHighAvailabilityHosts().toAddresses()); + } + + connection.addShutdownListener(new ShutdownListener() { + @Override + public void shutdownCompleted(ShutdownSignalException cause) { + logger.error("shutdown signal received", cause); + reset(); + } + }); + + connection.addBlockedListener(new BlockedListener() { + @Override + public void handleBlocked(String reason) throws IOException { + blocked = true; + logger.warn(String.format("Got blocked by rabbitmq with reason = %s", reason)); + } + + @Override + public void handleUnblocked() throws IOException { + blocked = false; + logger.warn(String.format("Got unblocked by rabbitmq")); + } + }); + + logger.info("connected to rabbitmq: " + connection); + + return connection; + } } diff --git a/src/main/java/it/nickshoe/storm/rabbitmq/TupleToMessage.java b/src/main/java/it/nickshoe/storm/rabbitmq/TupleToMessage.java index 593942f..38ef26b 100644 --- a/src/main/java/it/nickshoe/storm/rabbitmq/TupleToMessage.java +++ b/src/main/java/it/nickshoe/storm/rabbitmq/TupleToMessage.java @@ -10,108 +10,128 @@ * This interface describes an object that will perform the work of mapping * incoming {@link Tuple}s to {@link Message} objects for posting on a RabbitMQ * exchange. - * */ public abstract class TupleToMessage implements Serializable { - protected void prepare(@SuppressWarnings("rawtypes") Map stormConfig) {} - /** - * Convert the incoming {@link Tuple} on the Storm stream to a {@link Message} - * for posting to RabbitMQ. - * - * @param input - * The incoming {@link Tuple} from Storm - * @return The {@link Message} for the {@link RabbitMQProducer} to publish. If - * transformation fails this should return Message.NONE. - */ - protected Message produceMessage(Tuple input) { - return Message.forSending( - extractBody(input), - specifyHeaders(input), - determineExchangeName(input), - determineRoutingKey(input), - specifyContentType(input), - specifyContentEncoding(input), - specifyMessagePersistence(input) - ); - } + private static final long serialVersionUID = 1L; - /** - * Extract message body as a byte array from the incoming tuple. This is required. - * This implementation must handle errors and should return null upon on unresolvable - * errors. - * - * @param input the incoming tuple - * @return message body as a byte array or null if extraction cannot be performed - */ - protected abstract byte[] extractBody(Tuple input); + /** + * Setup the mapper + * + * @param stormConfig + */ + protected void prepare(@SuppressWarnings("rawtypes") Map stormConfig) { + } - /** - * Determine the exchange where the message is published to. This can be - * derived based on the incoming tuple or a fixed value. - * - * @param input the incoming tuple - * @return the exchange where the message is published to. - */ - protected abstract String determineExchangeName(Tuple input); + /** + * Convert the incoming {@link Tuple} on the Storm stream to a {@link Message} + * for posting to RabbitMQ. + * + * @param input The incoming {@link Tuple} from Storm + * @return The {@link Message} for the {@link RabbitMQProducer} to publish. If + * transformation fails this should return Message.NONE. + */ + protected Message produceMessage(Tuple input) { + return Message.forSending( + extractBody(input), + specifyHeaders(input), + determineExchangeName(input), + determineRoutingKey(input), + specifyContentType(input), + specifyContentEncoding(input), + specifyMessagePersistence(input), + specifyMessageId(input) + ); + } - /** - * Determine the routing key used for this message. This can be derived based on - * the incoming tuple or a fixed value. Default implementation provides no - * routing key. - * - * @param input the incoming tuple - * @return the routing key for this message - */ - protected String determineRoutingKey(Tuple input) { - return ""; // rabbitmq java client library treats "" as no routing key - } + /** + * Extract message body as a byte array from the incoming tuple. This is + * required. This implementation must handle errors and should return null upon + * on unresolvable errors. + * + * @param input the incoming tuple + * @return message body as a byte array or null if extraction cannot be + * performed + */ + protected abstract byte[] extractBody(Tuple input); - /** - * Specify the headers to be sent along with this message. The default implementation - * return an empty map. - * - * @param input the incoming tuple - * @return the headers as a map - */ - protected Map specifyHeaders(Tuple input) - { - return new HashMap(); - } + /** + * Determine the exchange where the message is published to. This can be derived + * based on the incoming tuple or a fixed value. + * + * @param input the incoming tuple + * @return the exchange where the message is published to. + */ + protected abstract String determineExchangeName(Tuple input); - /** - * Specify message body content type. Default implementation skips the provision - * of this detail. - * - * @param input the incoming tuple - * @return content type - */ - protected String specifyContentType(Tuple input) { - return null; - } + /** + * Determine the routing key used for this message. This can be derived based on + * the incoming tuple or a fixed value. Default implementation provides no + * routing key. + * + * @param input the incoming tuple + * @return the routing key for this message + */ + protected String determineRoutingKey(Tuple input) { + return ""; // rabbitmq java client library treats "" as no routing key + } - /** - * Specify message body content encoding. Default implementation skips the provision - * of this detail. - * - * @param input the incoming tuple - * @return content encoding - */ - protected String specifyContentEncoding(Tuple input) { - return null; - } + /** + * Specify the headers to be sent along with this message. The default + * implementation return an empty map. + * + * @param input the incoming tuple + * @return the headers as a map + */ + protected Map specifyHeaders(Tuple input) { + return new HashMap(); + } - /** - * Specify whether each individual message should make use of message persistence - * when it's on a rabbitmq queue. This does imply queue durability or high availability - * or just avoidance of message loss. To accomplish that please read rabbitmq docs - * on High Availability, Publisher Confirms and Queue Durability in addition to - * having this return true. By default, message persistence returns false. - * - * @param input the incoming tuple - * @return whether the message should be persistent to disk or not. Defaults to not. - */ - protected boolean specifyMessagePersistence(Tuple input) { - return false; - } + /** + * Specify message body content type. Default implementation skips the provision + * of this detail. + * + * @param input the incoming tuple + * @return content type + */ + protected String specifyContentType(Tuple input) { + return null; + } + + /** + * Specify message body content encoding. Default implementation skips the + * provision of this detail. + * + * @param input the incoming tuple + * @return content encoding + */ + protected String specifyContentEncoding(Tuple input) { + return null; + } + + /** + * Specify whether each individual message should make use of message + * persistence when it's on a rabbitmq queue. This does imply queue durability + * or high availability or just avoidance of message loss. To accomplish that + * please read rabbitmq docs on High Availability, Publisher Confirms and Queue + * Durability in addition to having this return true. By default, message + * persistence returns false. + * + * @param input the incoming tuple + * @return whether the message should be persistent to disk or not. Defaults to + * not. + */ + protected boolean specifyMessagePersistence(Tuple input) { + return false; + } + + /** + * Specify message-id property + * + * @param input the incoming tuple + * @return the message id + */ + protected String specifyMessageId(Tuple input) { + return null; + } } diff --git a/src/main/java/it/nickshoe/storm/rabbitmq/config/ConnectionConfig.java b/src/main/java/it/nickshoe/storm/rabbitmq/config/ConnectionConfig.java index da3481c..e5f8d26 100644 --- a/src/main/java/it/nickshoe/storm/rabbitmq/config/ConnectionConfig.java +++ b/src/main/java/it/nickshoe/storm/rabbitmq/config/ConnectionConfig.java @@ -211,4 +211,4 @@ public Map asMap() { } return map; } -} +} \ No newline at end of file