AMQP and AKKA

Posted on Thursday, September 15, 2011

1


AMQP is a message protocol that deals with publishers and consumers. It would look a lot like JMS but it is not. The main entities are Exchanges, Queues and Bindings. Look at the following diagram

So a producer would send a message to the exchange and it is the job of the message broker (RabbitMQ in our case) to ensure that the messages are delivered to the right queue.
But first the connections have to be built so that publishers can publish on the exchange and consumers can listen from it. Let us see how this is done in Akka in our project. Our framework gets initialized by the following method

def initializeFramework(xml: Elem) {
    connectionInfo = getAMQPChannelInformation(xml)
    val connection = getAMQPChannel(connectionInfo)
    val exchangeParameters = getExchangeParameters(connectionInfo)
    instantiateProducer(connection, exchangeParameters)

    logger.info("VajraPM is now initialized and ready to process messages...")
  }

I have taken out the business logic and left out the methods which would form the core of this post. Let us go method by method, first, we get all the connection information which would be relevant for us to make the connection

def getAMQPChannelInformation(xml: Elem): ConnectionInfo = {
    val momServer = (xml  "VAJRA"  "RABBITSERVER").text
    val momPort = (xml  "VAJRA"  "RABBITPORT").text
    val userName = (xml  "VAJRA"  "USERNAME").text
    val password = (xml  "VAJRA"  "PASSWORD").text
    val queueName = (xml  "VAJRA"  "QUEUENAME").text
    val exchangeName = (xml  "VAJRA"  "EXCHANGENAME").text
    val durable = (xml  "VAJRA"  "DURABLE").text
    val connectionInfo = new ConnectionInfo(momServer, momPort, userName, password, queueName, exchangeName, durable)
    connectionInfo
  }

Here, use the Scala XML magic and get done soon. Our sample XML through which

<VAJRA>
		<RABBITSERVER>localhost</RABBITSERVER>
		<RABBITPORT>5672</RABBITPORT>
		<USERNAME>guest</USERNAME>
		<PASSWORD>guest</PASSWORD>
		<QUEUENAME>myqueue</QUEUENAME>
		<EXCHANGENAME>myexchange</EXCHANGENAME>
	</VAJRA>

Now let us see how we get a connection

def getAMQPChannel(connectionInfo: ConnectionInfo): ActorRef = {
    val myAddresses = Array(new Address(connectionInfo.momServer, connectionInfo.momPort.toInt))
    val connectionParameters = ConnectionParameters(myAddresses, connectionInfo.userName, connectionInfo.password)
    val connection = AMQP.newConnection(connectionParameters)
    connection
  }

As you would notice, all the connection parameters are passed to the akka.amqp.AMQP.ConnectionParameters and a connection object is received.

Once we have the connection, let us instantiate the producer. The method instantiateProducer(connection, exchangeParameters) looks like this

 def instantiateProducer(connection: ActorRef, exchangeParameters: ExchangeParameters) {
    producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters)))
  }

As you would notice, we pass exchangeParameters to the producer. You would see in the diagram above that AMQP producer requires a reference of the exchange to which it can send messages. We get the exchange information as follows

  def getExchangeParameters(connectionInfo: ConnectionInfo): ExchangeParameters = {
    var exchangeParameters: ExchangeParameters = ExchangeParameters(connectionInfo.exchangeName, Topic)
    connectionInfo.durable match {
      case "DURABLE" => exchangeParameters = ExchangeParameters(connectionInfo.exchangeName, Topic, ActiveDeclaration(durable = true, autoDelete = false))
      case _ => exchangeParameters
    }
    exchangeParameters
  }

If in our configuration we define the exchange to be durable, then we set the durable and autodelete properties of the exchange.

This would set up our producer actor and we would be able to send messages to the producer like this with the bang operator

producer ! Message(message)

As soon as the producer actor would receive the message, it would put it on the exchange that we have specified. Now, let us look at the consumer

 def createConsumer(exchangeParameters: ExchangeParameters, connection: ActorRef, queueName: String, actor: ActorRef, connectionInfo: ConnectionInfo): ActorRef = {
    connectionInfo.durable match {
      case "DURABLE" => {
        val queueDeclaration = ActiveDeclaration(durable = true, autoDelete = false)
        AMQP.newConsumer(connection, ConsumerParameters("@vajrafeed", actor, Some(queueName), Some(exchangeParameters), queueDeclaration))
      }
      case _ => AMQP.newConsumer(connection, ConsumerParameters("@vajrafeed", actor, Some(queueName), Some(exchangeParameters)))
    }
  }

Hence, now as shown in the figure above, we have a consumer listening on the queue called myqueue which is listening to the exchange myexchange. You would notice that we have a “@vajrafeed” This is the binding key which lets the messgage broker decide, which messages it should pass on onto the consumer. If there was another consumer attached to the queue with a binding key say @nyse then the messages for @vajrafeed would not be delivered to it. You can get more information about AMQP here.

Advertisements
Tagged: , , ,
Posted in: Architecture, Scala