介绍
RabbitMQ是作为一个消息代理中间件,其设计的目的很简单:收集消息然后转发消息。你可以把它当成一个邮局:当你发送邮件到邮箱时你肯定相信邮递员肯定会把这封邮件送到收件人手上。在这个比喻中,RabbitMQ实际充当了邮箱、邮局以及邮递员的角色。与邮局最大的不同是,RabbitMQ它不处理纸张文件,而是负责接收,储存以及转发二进制数据message。在详细介绍之前,我们首先了解一些RabbitMQ的术语:生产者:我们称用来发送消息的程序端为生产者,下面以图“P”表示:
消息队列:消息队列存在RabbitMQ server端中;虽然消息流经你的应用程序和RabbitMQ server,但他们只可以储存在RabbitMQ server队列中。队列是不受任何限制的,它可以储存你发送的所有消息,因为本质上它就是一个无限缓冲区。可以多个生产者同时发送消息给一个队列,同时 也可以多个消费者从一个队列接收数据。以下图来表示消息队列:
消费者:我们称大部分时间都在等待接收消息的程序端为消费者;以下图来表示消费者:
”Hello world“在本节我们将用java编写两个程序:一个生产者用来发送消息以及一个消费者用来接收消息并且打印出来。在实现过程中我们主要关注如何实现功能而对于java API的细节一笔带过。在下面这个图中,”P”代表生产者,”C”代表消费者,在中间的盒子表示队列(一个RabbitMQ用来保存消息的缓冲区)。基于java的客户端:RabbitMQ遵循AMQP协议(高级消息队列协议)–这是一个开放的、通用的消息协议。现在网上已存在很多基于的不同语言实现的AMQP客户端。在本例中我们使用java语言的客户端。maven引入如下:com.rabbitmq rabbitmq-client 1.3.0 生产者
在这个例子中,我们分别以Send和Recv来表示生产者、消费者。Send主要用来连接RabbitMQ server及发送一个消息,之后退出。下面是Send.class实现:import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException { //--------------1-------------- ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //--------------2----------------- channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //---------------3---------------- channel.close(); connection.close(); }}
代码的第一部分主要创建一个与RabbitMQ服务器的连接,该链接抽象了socket操作,负责判断协议版本和身份认证等等。另外因为我们把消息 代理broker搭建在本地,所以连接的地址是localhost。如果你想连接到其他机器的代理broker,你可以指定它的域名或者IP地址。
在第二部分,我们创建了一个通道;通道为我们提供了很多API用于完成消息的发送。比如,我们可以声明一个队列用来发送消息以及把消息发布到声明的队列。值得注意的是,声明一个队列是幂等的,只有在队列不存在的时候才创建。另外由于消息的内容是二进制的字节数组,所以收到数据后你可以编码成任意你需要的数据。最后,也就是第三部分,我们需要关掉通道channel以及连接。发送失败如果你是第一次使用RabbitMQ,那么当你没有看到发送的消息的时候,你可能会挠头到底想问题出在了哪呢?不要紧,有可能是broker(代理,以后 统称broker)没有足够的磁盘空间(默认情况下它至少需要1Gb的剩余空间)因此拒绝接收任何消息。你可以查看broker的日志文件来确认并且减少 一下限制。地址http://www.rabbitmq.com/configure.html#config-items里说明了如何设置 disk_free_limit.消费者
下面实现我们的消费者,它主要用来持续监听RabbitMQ推送消息并且打印。下面是Recv.class的实现:import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;import com.rabbitmq.client.QueueingConsumer;public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { //-------------1------------------- ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //--------------2------------------- QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } }}
QueueingConsumer类主要用来缓存RabbitMQ服务器推送的消息。
建立Recv的过程跟send类似;首先创建一个到服务器的连接和一个通道channel(后面统一叫channel),以及定义一个我们将要消费的队列。注意队列必须与send的匹配,也就是名字要一样。在第一部分,我们声明了一个队列;另外由于我们可能在sender运行前启动了Recv,因此我们要确保在我们消费消息前队列已经存在。在第二部分中我们告诉服务器要把QUEUE_NAME队列中的消息推送给我们。因为服务器是异步推送消息给我们,因此我们需要提供一个回调对象QueueingConsumer来缓存消息直到被程序消费掉。QueueingConsumer.nextDelivery()会堵塞直到RabbitMQ服务器推送消息过来。