前言
上篇博客介绍了ActiveMQ的安装,并实现了简单的PTP模型。这篇博客我们来看一下Pub-Sub模型,之后来总结一下JMS。
实现
项目结构
其中,一个消息发布者Producer,两个接收者Consumer1、Consumer2,还有两个监听类Listener1和2,负责监听消费者是否收到消息。
消息生产者Producer
Producer类的代码实现跟上篇博客中的Sender类十分类似,但要注意的是,destination = session.createTopic(“TestTopic2”); 我们创建的是topic而不再是queue。
//消息个数 private static final int SEND_NUMBER = 10; public static void main(String[] args) { //初始化开始,包括连接工厂、连接、会话、消息目的、消息生产者 ConnectionFactory connectionFactory; Connection connection = null; Session session; Destination destination; MessageProducer producer; //创建连接工厂,使用默认用户名和密码。这里tcp://localhost:61616为连接地址,当然也可以使用默认地址。 connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //创建一个名称为TestQueue的消息队列 destination = session.createTopic("TestTopic2"); //得到producer producer = session.createProducer(destination); // 构造消息 sendMessage(session, producer); session.commit(); } catch (JMSException e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } private static void sendMessage(Session session, MessageProducer producer) throws JMSException { for (int i = 0; i < SEND_NUMBER; i++) { TextMessage message=session.createTextMessage("I am a Producer"+i); System.out.println("发送消息:"+message.getText()); producer.send(message); } }
消息的消费者Consumer
这里就不再给出全部代码了,但依然创建的是Topic,设置了消息监听Listener1。
// 使用同一个消息队列 destination = session.createTopic("TestTopic2"); consumer=session.createConsumer(destination); consumer.setMessageListener(new Listener1());
监听者Listener
public class Listener1 implements MessageListener{ @Override public void onMessage(Message message) { try { System.out.println("订阅者一收到消息:"+((TextMessage)message).getText()); } catch (Exception e) { e.printStackTrace(); } }}
此处的MessageListener是jms包下提供的一个接口
package javax.jms;/** * @version $Rev: 467553 $ $Date: 2006-10-25 06:01:51 +0200 (Wed, 25 Oct 2006) $ */public interface MessageListener { void onMessage(Message message);}
测试
1 订阅
发布订阅模型要先执行订阅操作,依次执行Consumer1、Consumer2,之后在浏览器中查看Topic。
红框中显示已经存在2个Consumer。
2 发布
执行Producer,这里我们在代码中给出的10个消息。因为有2个订阅者,所以入队10个,出队20。
此时,我们的监听类Listener也发出了消息。查看控制台输出:
小结
ActiveMQ是基于JMS规范和J2EE规范的JMSProducer,也可以直接理解成是消息中间件。
1、与RMI、RPC相比,他的耦合性较小,更灵活。发布消息者往往不需要了解谁会收到消息,这与远程调用有着明显的不同。
2、再有,一般的调用是同步的、耗时的。而JMS是异步的,大大改善了用户体验。