1. 首页
  2. > 公司注册 >

activemq集群是什么意思(activemq高可用)


集群架构图


  • 业务使用场景,生产者和消费者系统说明
  • 是否需要持久化消息
  • 发送方/接收方模式(点对点,主题订阅)
  • 发送方QPS均值、峰值预估
  • 发送方,接收方的连接数均值、峰值预估

2 MQ管理员会评估需求,确认现有资源能否满足。如果可以满足,将提供线上集群连接配置以及线下测试环境集群配置。


3 业务方根据自身系统特点,选择相应开发库(比如Java、PHP已有成熟库和使用方案)。在线下联调测试。


4 告知MQ管理员业务上线时间,MQ管理员负责监控线上服务是否正常运行。


5 在AMQ业务接入方登记页,填写自己的服务信息。


五、基本规则

1.客户端配置


  • MQ允许持久化消息,和非持久化消息。持久化消息会在集群所有节点持久化存储,集群保证不丢消息,但对性能影响较大。消息量不大时,或者消息非常需要可靠传达时,可用持久化方式
  • 消息体内容在2KB以内时,性能较好。建议控制消息体长度,消息体长度超过1MB时,谨慎使用MQ
  • 与任何通信协议相同,生产者/消费者和MQ建立连接和释放连接时会消耗较多资源。确保自己的客户端每次subscribe时,尽量处理完queue所有消息再退出!一定要释放连接
  • MQ可工作在两种方式:点对点、主题订阅模型。请根据自己的业务特点选择合适的模式。

2.主要参数


amq主要参数


六、Java客户端配置与代码实现

  1. spring与activemq 的maven依赖

<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.12.0</version> </dependency> <depen高可用dency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>3.16</version是什么> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.2.1.RELEASE</version(> </dependency>

  1. 属性配置文件 config-activemq.properties

#######activemq###### #实际项目中属性值分多环境配置,为方便说明下面列出 #activemq.broker.url=${activemq.broker.url} activemq.broker.url=failover:(tcp://172.30.xx.xx:61616, tcp://172.30.xx.xx:61616, tcp://172.30.xx.xx:61616) ?initialReconnectDelay=1000 &maxReconnectDelay=10000 &jms.prefetchPolicy.all=1000&timeout=3000 feed.queue.subject=FEED.QUEUE.DEV1 feed.topic.subject=FEED.TOPIC.DEV mobile.queue.subject=CONSUMER.MESSAGE.QUEUE.FOLIO queue.interlink_customer_prod=interlink_customer_prod queue.interlink_house_prod=interlink_house_prod queue.chec集群k_customer_prod=check_customer_prod queue.redstar_task=HONGXING.TASK.SCHEDULE.DOCKING queue.callRecord_customer_prod=callRecord_customer_prod

  1. mq总(概括性)配置文件spring-activemq.xml

<?xml version="1.0" encoding="UTF-8"?> <!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd"> <context:component-scan base-package="com.lianjia.customer.message" /> <context:property-placeholder location="classpath:config-activemq.properties" ignore-unresolvable="true" /> <bean id="jmsSimpleMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"/> <!--import activemq可用 producer conf --> <import Resource="spring-producer-basic.xml"/> <import resource="spring-producer-interlink.xml"/> <import resource="spring-producer-check.xml"/> <import resource="spring-producer-queue.xml"/> <import resource="spring-producer-redstar.xml"/> <import resource="spring-producer-callrecord.xml"/> <!--import activemq consumer conf --> <import resource="spring-consumer-basic.xml"/> <import resource="spring-consumer-interlink.xml"/> <import resource="spring-consumer-check.xml"/> <import resource="spring-consumer-callrecord.xml"/> <import resource="spring-consumer-resblock.xml"/> </beans>

  1. 生产者基本配置文件 spring-producer-basic.xml

<?xml version="1.0" encoding="UTF-8"?> <!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${activemq.broker.url}"/> <property name="userName" value="admin"/> <property name="password" value="admin"/> <property name="useAsyncSend" value="true"/> <property n)ame="sendTimeout" v集群alue="300"/> </bean> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="amqConnectionFactory"/> <property name="maxConnections" value="3"/> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="jm是什么sConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="pooledConnectionFactory"/> </bean> </beans>

  1. 生产者实例配置spring-producer-callrecord.xml

<?xml version="1.0" encoding="UTF-8"?> <!-- 查找最新的schemaLocation 访问 http://www.springframework.org/sactivemqche(ma/ --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- 配置消息发送目的地方式 --> <!-- Queue队列:仅有一个订阅者会收到消息,消息一旦被处理就不会存在队列中 --> <bean id="callRecordProductDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="${queue.callRecord_customer_prod}"></constructor-高可用arg> </bean> <!-- Spring JMS Template --> <bean id="callRecordJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsConnectionFactory" /> <!-- <property name="defaultDestination" value="subject" /> --> <!-- 区别它采用的模式为false是p2p为true是订阅 --> <property name="pubSubDomain" value="false" /> <!-- 当设置pubSubNoLocal为true时,消费者不会接收来意思自同一个连接的消息。 因为我们在上面的配置文件中定义了连接池的最大连接数为1,因此每次使用的连接都是同一个连接, 所以就消费者就接收不到消息。只有当pubSubNoLocal为false时,消费者才能接收到来自同一个连接的 消息。 --> <property name="pubSubNoLocal" value="false" /> <property name="messageConverter" ref="jmsSimpleMessageConverter" /> <property name="sessionTransacted" value="true" /> <!-- 使 deliveryMode, priority, timeToLive设置生效 --> <property name="explicitQosEnabled" value="true" /> <property name="deliveryMode" value="2" /> <property name="priority" value="4" /> <property name="timeToLive" value="0" /> </bean> </beans>

其他生产者配置,如spring-producer-redstar.xml,类似spring-producer-callrecord.xml配置,不再啰嗦列举。


  1. 生产者java代码实例CallRecordMessageProducer.java

package com.lianjia.customer.message.producer; import com.alibaba.fastjson.JSONObject; import com.lianjia.blacklist.api.dto.PhoneHasMarkedDTO; import com.lianjia.common.datasource.DataRegion; import com.lianjia.common.log.RlaLogUtil; import com.lianjia.customer.log.CustomerRlaLogCode; import com.lianjia.customer.service.constant.Constants; import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import javax.jms.Destination; import java.util.*; @Service public class CallRecordMessageProducer { protected Logger logger = LoggerFactory.getLogger(this.getClass()); @Resource JmsTemplate callRecordJmsTemplate; @Resource Destination callRecordProductDestination; public void sendMessageForDealCallRecord(final String regionStr, final Long brokerUid, final Map<String, PhoneHasMarkedDTO> phoneHasMarkedDTOMap, final Collection<String> phones,final Boolean isEncrypt) { try { if(CollectionUtils.isNotEmpty(phones)){ Constants.THREAD_POOL_MQ.execute(new Runnable() { @Override public void run() { long beginTime = System.currentTimeMillis(); JSONObject jo = new JSONObject(); jo.put("brokerUid", brokerUid); jo.put("phones", phones); jo.put("regionStr", regionStr); jo.put("phoneHasMarkedDTOMap", phoneHasMarkedDTOMap); jo.put("isEncrypt", isEncrypt); //jo.put("businessId", businessId); callRecordJmsTemplate.convertAndSend(callRecordProductDestination, jo.toJSONString()); logger.info("==========sendCallRecordMessage:{},threadId:{},brokerUid:{},phones:{},time:{}", jo, Thread.currentThread().getId(), regionStr, brokerUid, phones, System.currentTimeMillis()- beginTime); } }); } } catch (Exception e) { RlaLogUtil.log(logger, e, CustomerRlaLogCode.errorMessageSendCheck, brokerUid, phones); } } }

  1. 消费者基本配置文件 spring-consumer-basic.xml

<?xml version="1.0" encoding="UTF-8"?> <!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!-- Wait 15 seconds first re-delivery, then 45, 135, 405, 1215, 3645 seconds --> <bean id="amqRedeliveryPolicyConsumer" class="org.apache.activemq.RedeliveryPolicy"> <!-- 重连时间间隔递增倍数,只有值大于1和启用useExpone --> <p)roperty name="backOffMultiplier" value="1" /> <!-- 初始重发延迟时间 --> <property name="initialRedeliveryDelay" value="5000" /> <!-- 最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数, 为0时表示不进行重传。 --> <property name="maximumRedeliveries" value="6" /> <property name="queue" value="*" /> <property name="reactivemqdeliveryDelay" value="5000" /> <!-- 启用指数倍数递增的方式增加延迟时间。 --> <property name="useExponentialBackOff" value="true" /> <property name="useCollisionAvoidance" value="true" /> <property name="collisionAvoidancePercent" value="15" /> </bean> <!-- PreFetch预拉取 --> <bean id="amqPrefetchPolicyConsumer" class="org.apache.activemq.ActiveMQPrefetchPolicy"> <property name="queuePrefetch" value="100" /> <property nam高e="topicPrefetch" value="100" /> </bean> <bean id="jmsTransactionManagerConsumer" class="org.springframework.jms.connection.JmsTransactionManager"> <!-- can also refer to amq.connectionFactory --> <property name="connectionFactory" ref="jmsConnectionFactoryConsumer" /> </bean> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <bean id="amqConnectionFactoryConsumer" class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- <property name="brokerURL" value="failover:(tcp://172.30.xx.xx:61616, tcp://172.30.xx.xx:61616,tcp://172.30.xx.xx:61616) ?initialReconnectDelay=1000"/> --> <property name="brokerURL" value="${activemq.broker.url}"/> <property name="userName" value="admin" /> <property name="password" value="admin" /> <property name="useAsyncSend" value="true"/> <property name="optimizeAcknowledge" value="false" /> <property name="optimizeAcknowledgeTimeOut" value="300" /> <property name="redeliveryPolicy" ref="amqRedeliveryPolicyConsumer" /> <property name="prefetchPolicy" ref="amqPrefetchPolicyConsumer" /> </bean> <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="1" /> <property name="maxPoolSize" value="10" /> <property name="queueCapacity" value="200" /> <property name="keepAliveSeconds" value="300" /> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /> </property> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="jmsConnectionFactoryConsumer" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactoryConsumer"/> </be可用an> </beans>

其他消费者配置,如spring-consumer-resblock.xml参考上面


  1. 消费者实例java代码实现 CallRecordMessageConsumer.java

package com.lianjia.customer.message.consumer; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.lianjia.common.datasource.DataRegion; import com.lianjia.common.datasource.impl.CustomerRegion; import com.lianjia.common.datasource.transaction.DataRegionsTransactional; import com.lianjia.common.log.RlaLogUtil; import com.lianjia.customer.log.CustomerRlaLogCode; import com.lianjia.customer.service.facade.DataFacade; import com.lianjia.customer.service.facade.LogicFacade; import org.slf4j.Lo高gger; import org.slf4j.LoggerFactory; import org.springframework.jms.support.converter.MessageConverter; import org.springframework.stereotype.Service; import javax.annotation.Resource; import javax.jms.Message; import javax.jms.MessageListener; import java.util.*; @Service public class CallRecordMessageConsumer implements MessageListener { protected意思 Logger logger = LoggerFactory.getLogger(this.getClass()); @Resource MessageConverter jmsSimpleMessageConverter; @Resource private LogicFacade logicFacade; @Resource private DataFacade dataFacade; @DataRegionsTransactional public void onMessage(Message message) { try { String objMessage = (String) this.jmsSimpleMessageConverter.fromMessage(message); RlaLogUtil.log(logger, CustomerRlaLogCode.infoCallRecordMessageAccepted, objMessage); JSONObject jo = JSON.parseObject(objMessage); String regionStr = jo.getString("regionStr"); Long brokerUid = jo.getLong("brokerUid"); Boolean isEncrypt = jo.getBoolean("isEncrypt"); Object phoneHasMarkedDTOMapObject = jo.get("phoneHasMarkedDTOMap"); JSONArray phoneArr = jo.getJSONArray("phones"); if (phoneArr == null || phoneArr.size() == 0) { logger.warn("phones is empty: {}", phoneArr); return; } Collection<String> phones = new ArrayList<String>(); for (int i = 0; i < phoneArr.size(); i ) { phones.add((String) phoneArr.get(i)); } DataRegion region = CustomerRegion.getByRegionName(regionStr); //同步黑名单 //TODO 代码省略 //同步客源潜客 //TODO 代码省略 } catch (Exception e) { RlaLogUtil.log(logger, e, CustomerRlaLogCode.errorCallRecordMessageAccepted); try { Thread.sleep(1000); } catch (InterruptedException e1) { RlaLogUtil.log(logger, e1, CustomerRlaLogCode.errorInternalServer); } throw new RuntimeException(e); } } }

按上面配置,生产、消费mq消息demo已经可用了。


版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至123456@qq.com 举报,一经查实,本站将立刻删除。

联系我们

工作日:9:30-18:30,节假日休息