博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
JMS实战——ActiveMQ实现Pub-Sub
阅读量:5350 次
发布时间:2019-06-15

本文共 3094 字,大约阅读时间需要 10 分钟。

前言

上篇博客介绍了ActiveMQ的安装,并实现了简单的PTP模型。这篇博客我们来看一下Pub-Sub模型,之后来总结一下JMS。

实现

项目结构

其中,一个消息发布者Producer,两个接收者Consumer1、Consumer2,还有两个监听类Listener1和2,负责监听消费者是否收到消息。

这里写图片描述

消息生产者Producer

Producer类的代码实现跟上篇博客中的Sender类十分类似,但要注意的是,destination = session.createTopic(“TestTopic2”); 我们创建的是topic而不再是queue。

//消息个数    private static final int SEND_NUMBER = 10;    public static void main(String[] args) {        //初始化开始,包括连接工厂、连接、会话、消息目的、消息生产者        ConnectionFactory connectionFactory;        Connection connection = null;        Session session;        Destination destination;        MessageProducer producer;        //创建连接工厂,使用默认用户名和密码。这里tcp://localhost:61616为连接地址,当然也可以使用默认地址。        connectionFactory = new ActiveMQConnectionFactory(                ActiveMQConnection.DEFAULT_USER,                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");        try {            // 构造从工厂得到连接对象            connection = connectionFactory.createConnection();            // 启动            connection.start();            // 获取操作连接            session = connection.createSession(Boolean.TRUE,                    Session.AUTO_ACKNOWLEDGE);            //创建一个名称为TestQueue的消息队列            destination = session.createTopic("TestTopic2");            //得到producer            producer = session.createProducer(destination);            // 构造消息            sendMessage(session, producer);            session.commit();        } catch (JMSException e) {            e.printStackTrace();        } finally {            try {                if (null != connection)                    connection.close();            } catch (Throwable ignore) {            }        }    }    private static void sendMessage(Session session, MessageProducer producer) throws JMSException {        for (int i = 0; i < SEND_NUMBER; i++) {            TextMessage message=session.createTextMessage("I am a Producer"+i);            System.out.println("发送消息:"+message.getText());            producer.send(message);        }    }

消息的消费者Consumer

这里就不再给出全部代码了,但依然创建的是Topic,设置了消息监听Listener1。

// 使用同一个消息队列            destination = session.createTopic("TestTopic2");             consumer=session.createConsumer(destination);             consumer.setMessageListener(new Listener1());

监听者Listener

public class Listener1 implements MessageListener{
@Override public void onMessage(Message message) { try { System.out.println("订阅者一收到消息:"+((TextMessage)message).getText()); } catch (Exception e) { e.printStackTrace(); } }}

此处的MessageListener是jms包下提供的一个接口

package javax.jms;/** * @version $Rev: 467553 $ $Date: 2006-10-25 06:01:51 +0200 (Wed, 25 Oct 2006) $ */public interface MessageListener {
void onMessage(Message message);}

测试

1 订阅

发布订阅模型要先执行订阅操作,依次执行Consumer1、Consumer2,之后在浏览器中查看Topic。

这里写图片描述

红框中显示已经存在2个Consumer。

2 发布

执行Producer,这里我们在代码中给出的10个消息。因为有2个订阅者,所以入队10个,出队20。

这里写图片描述

此时,我们的监听类Listener也发出了消息。查看控制台输出:

这里写图片描述

这里写图片描述

小结

ActiveMQ是基于JMS规范和J2EE规范的JMSProducer,也可以直接理解成是消息中间件。

1、与RMI、RPC相比,他的耦合性较小,更灵活。发布消息者往往不需要了解谁会收到消息,这与远程调用有着明显的不同。

2、再有,一般的调用是同步的、耗时的。而JMS是异步的,大大改善了用户体验。

转载于:https://www.cnblogs.com/saixing/p/6730205.html

你可能感兴趣的文章
awk工具-解析1
查看>>
推荐一款可以直接下载浏览器sources资源的Chrome插件
查看>>
CRM product UI里assignment block的显示隐藏逻辑
查看>>
AMH V4.5 – 基于AMH4.2的第三方开发版
查看>>
Web.Config文件配置之配置Session变量的生命周期
查看>>
mysql导入source注意点
查看>>
linux下编译安装nginx
查看>>
ArcScene 高程不同的表面无法叠加
查看>>
[ONTAK2010] Peaks
查看>>
DLL 导出函数
查看>>
windows超过最大连接数解决命令
查看>>
12个大调都是什么
查看>>
angular、jquery、vue 的区别与联系
查看>>
参数范围的选择
查看>>
使用 MarkDown & DocFX 升级 Rafy 帮助文档
查看>>
THUPC2019/CTS2019/APIO2019游记
查看>>
Nodejs Express模块server.address().address为::
查看>>
4.3.5 Sticks (POJ1011)
查看>>
POJ 2960 S-Nim 博弈论 sg函数
查看>>
Dijkstra模版
查看>>