activemq和kafka区别 activemq消息持久化方式
队列模式(点对点模式,P2P)特点:1、客户端包括生产者和消费者;
2、队列中的消息只能被一个消费者消费;
3、消费者可以随时消费队列中的消息;

文章插图
队列模式和主题模式的区别:
1、提前订阅,队列模式:消费者不需要提前订阅也可以消费消息;主题模式:只有提前进行订阅的消费者才能成功消费消息;
2、多个消费者分配消息:队列模式:只能平均消费消息,被别的消费者消费的消息不能重复被其他的消费者消费;主题模式:每个订阅者都可以消费主题模式中的每一条消息;
案例代码:【activemq和kafka区别 activemq消息持久化方式】生产者:
import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class ActiveMQProducer {public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";public static final String QUEUE_NAME = "queue01";public static void main(String[] args) throws JMSException {//创建连接工厂 ,,按照定的url地址给定默认的用户名和密码ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);//通过连接工厂获取connection连接 并启动访问Connection connection = activeMQConnectionFactory.createConnection();connection.start();//创建会话session需要两个参数,第一个事务,第二个签收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建目的地(选择是队列还是主题)Queue queue = session.createQueue(QUEUE_NAME);//创建消息的生产者MessageProducer messageProducer = session.createProducer(queue);//通过使用消息生产者messageProducer生产3条消息发送到队列中for (int i = 1; i <= 7; i++) {//创建消息一个字符串消息TextMessage textMessage = session.createTextMessage("msg---->" + i);//通过messageProducer 发布消息messageProducer.send(textMessage);}//关闭资源messageProducer.close();session.close();connection.close();System.out.println("消息发送到MQ成功");}}
消费者1:import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class ActiveMQConsumer {public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";public static final String QUEUE_NAME="queue01";public static void main(String[] args) throws JMSException {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);//通过连接工厂获取connection连接 并启动访问Connection connection = activeMQConnectionFactory.createConnection();connection.start();//创建会话session需要两个参数,第一个事务,第二个签收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建目的地(选择是队列还是主题)Queue queue = session.createQueue(QUEUE_NAME);//创建消息的消费者MessageConsumer messageConsumer = session.createConsumer(queue);while (true){//从队列中获取消息receive未设置最大时间 是阻塞的,TextMessage textMessage = (TextMessage) messageConsumer.receive();if (textMessage !=null){System.out.println("消费者接受到消息---->"+textMessage.getText());}else {break;}}messageConsumer.close();session.close();connection.close();}}
输出: INFO | Successfully connected to tcp://192.168.1.17:61616消费者接受到消息---->msg---->2消费者接受到消息---->msg---->4消费者接受到消息---->msg---->6
消费者2:import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;import java.io.IOException;public class ActiveMQConsumerListener {public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";public static final String QUEUE_NAME = "queue01";public static void main(String[] args) throws JMSException, IOException {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);//通过连接工厂获取connection连接 并启动访问Connection connection = activeMQConnectionFactory.createConnection();connection.start();//创建会话session需要两个参数,第一个事务,第二个签收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建目的地(选择是队列还是主题)Queue queue = session.createQueue(QUEUE_NAME);//创建消息的消费者MessageConsumer messageConsumer = session.createConsumer(queue);//通过监听的机制消费消息messageConsumer.setMessageListener((message) -> {if (message != null && message instanceof TextMessage) {TextMessage textMessage = (TextMessage) message;try {System.out.println("消费者接受到消息---->" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}});//不关闭控制台如果不加这句话,在下面可能在连接的时候直接关闭了,造成无法消费的问题System.in.read();messageConsumer.close();session.close();connection.close();}}
输出: INFO | Successfully connected to tcp://192.168.1.17:61616消费者接受到消息---->msg---->1消费者接受到消息---->msg---->3消费者接受到消息---->msg---->5消费者接受到消息---->msg---->7

文章插图
Number Of Consumers:表示消费者数量;
Number Of Pending Messages:等待消费的消息,这个是当前未出队列的数量;
Messages Enqueued:进入队列的消息;( 这个数量只增不减,重启后会清零);
Messages Dequeued:出了队列的消息 可以理解为是消费者消费掉的数量 (重启后会清零);
持久化案例代码:ActiveMQ持久化,生产者产生的数据,在没有被消费者消费时,先保存到数据库中,当数据被消费者消费后,再从数据库中删除 。
生产者:
import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class ActiveMQProducer {public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";public static final String QUEUE_NAME = "queue02";public static void main(String[] args) throws JMSException {//创建连接工厂 ,,按照定的url地址给定默认的用户名和密码ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);//通过连接工厂获取connection连接 并启动访问Connection connection = activeMQConnectionFactory.createConnection();connection.start();//创建会话session需要两个参数,第一个事务,第二个签收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建目的地(选择是队列还是主题)Queue queue = session.createQueue(QUEUE_NAME);//创建消息的生产者MessageProducer messageProducer = session.createProducer(queue);// 消息持久化messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);//通过使用消息生产者messageProducer生产3条消息发送到队列中for (int i = 1; i <= 7; i++) {//创建消息一个字符串消息TextMessage textMessage = session.createTextMessage("msg---->" + i);//通过messageProducer 发布消息messageProducer.send(textMessage);}//关闭资源messageProducer.close();session.close();connection.close();System.out.println("消息发送到MQ成功");}}
推荐阅读
- 虾和胡萝卜能一起吃吗 虾和胡萝卜是否能一起吃的解析
- 北疆和南疆哪个大
- 煮粥和煮稀饭有什么区别
- 糯米胶和改性淀粉胶哪个好
- 孜然怎么炒
- 毛木耳和木耳有区别毛木耳
- 5年保修和5年质保的区别
- 会计和金融考研哪个更好
- 龙族优美句子
- 微信心碎心情签名