集群架构图
- 业务使用场景,生产者和消费者系统说明
- 是否需要持久化消息
- 发送方/接收方模式(点对点,主题订阅)
- 发送方QPS均值、峰值预估
- 发送方,接收方的连接数均值、峰值预估
2 MQ管理员会评估需求,确认现有资源能否满足。如果可以满足,将提供线上集群连接配置以及线下测试环境集群配置。
3 业务方根据自身系统特点,选择相应开发库(比如Java、PHP已有成熟库和使用方案)。在线下联调测试。
4 告知MQ管理员业务上线时间,MQ管理员负责监控线上服务是否正常运行。
5 在AMQ业务接入方登记页,填写自己的服务信息。
五、基本规则
1.客户端配置
- MQ允许持久化消息,和非持久化消息。持久化消息会在集群所有节点持久化存储,集群保证不丢消息,但对性能影响较大。消息量不大时,或者消息非常需要可靠传达时,可用持久化方式。
- 消息体内容在2KB以内时,性能较好。建议控制消息体长度,消息体长度超过1MB时,谨慎使用MQ。
- 与任何通信协议相同,生产者/消费者和MQ建立连接和释放连接时会消耗较多资源。确保自己的客户端每次subscribe时,尽量处理完queue所有消息再退出!一定要释放连接!
- MQ可工作在两种方式:点对点、主题订阅模型。请根据自己的业务特点选择合适的模式。
2.主要参数
amq主要参数
六、Java客户端配置与代码实现
- spring与activemq 的maven依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.12.0</version>
</dependency>
<depen高可用dency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version是什么>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.2.1.RELEASE</version(>
</dependency>
- 属性配置文件 config-activemq.properties
#######activemq######
#实际项目中属性值分多环境配置,为方便说明下面列出
#activemq.broker.url=${activemq.broker.url}
activemq.broker.url=failover:(tcp://172.30.xx.xx:61616, tcp://172.30.xx.xx:61616,
tcp://172.30.xx.xx:61616) ?initialReconnectDelay=1000
&maxReconnectDelay=10000 &jms.prefetchPolicy.all=1000&timeout=3000
feed.queue.subject=FEED.QUEUE.DEV1
feed.topic.subject=FEED.TOPIC.DEV
mobile.queue.subject=CONSUMER.MESSAGE.QUEUE.FOLIO
queue.interlink_customer_prod=interlink_customer_prod
queue.interlink_house_prod=interlink_house_prod
queue.chec集群k_customer_prod=check_customer_prod
queue.redstar_task=HONGXING.TASK.SCHEDULE.DOCKING
queue.callRecord_customer_prod=callRecord_customer_prod
- mq总(概括性)配置文件spring-activemq.xml
<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd">
<context:component-scan base-package="com.lianjia.customer.message" />
<context:property-placeholder location="classpath:config-activemq.properties"
ignore-unresolvable="true" />
<bean id="jmsSimpleMessageConverter"
class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
<!--import activemq可用 producer conf -->
<import Resource="spring-producer-basic.xml"/>
<import resource="spring-producer-interlink.xml"/>
<import resource="spring-producer-check.xml"/>
<import resource="spring-producer-queue.xml"/>
<import resource="spring-producer-redstar.xml"/>
<import resource="spring-producer-callrecord.xml"/>
<!--import activemq consumer conf -->
<import resource="spring-consumer-basic.xml"/>
<import resource="spring-consumer-interlink.xml"/>
<import resource="spring-consumer-check.xml"/>
<import resource="spring-consumer-callrecord.xml"/>
<import resource="spring-consumer-resblock.xml"/>
</beans>
- 生产者基本配置文件 spring-producer-basic.xml
<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${activemq.broker.url}"/>
<property name="userName" value="admin"/>
<property name="password" value="admin"/>
<property name="useAsyncSend" value="true"/>
<property n)ame="sendTimeout" v集群alue="300"/>
</bean>
<bean id="pooledConnectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="amqConnectionFactory"/>
<property name="maxConnections" value="3"/>
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="jm是什么sConnectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
</bean>
</beans>
- 生产者实例配置spring-producer-callrecord.xml
<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/sactivemqche(ma/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">
<!-- 配置消息发送目的地方式 -->
<!-- Queue队列:仅有一个订阅者会收到消息,消息一旦被处理就不会存在队列中 -->
<bean id="callRecordProductDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="${queue.callRecord_customer_prod}"></constructor-高可用arg>
</bean>
<!-- Spring JMS Template -->
<bean id="callRecordJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<!-- <property name="defaultDestination" value="subject" /> -->
<!-- 区别它采用的模式为false是p2p为true是订阅 -->
<property name="pubSubDomain" value="false" />
<!-- 当设置pubSubNoLocal为true时,消费者不会接收来意思自同一个连接的消息。
因为我们在上面的配置文件中定义了连接池的最大连接数为1,因此每次使用的连接都是同一个连接,
所以就消费者就接收不到消息。只有当pubSubNoLocal为false时,消费者才能接收到来自同一个连接的
消息。 -->
<property name="pubSubNoLocal" value="false" />
<property name="messageConverter" ref="jmsSimpleMessageConverter" />
<property name="sessionTransacted" value="true" />
<!-- 使 deliveryMode, priority, timeToLive设置生效 -->
<property name="explicitQosEnabled" value="true" />
<property name="deliveryMode" value="2" />
<property name="priority" value="4" />
<property name="timeToLive" value="0" />
</bean>
</beans>
其他生产者配置,如spring-producer-redstar.xml,类似spring-producer-callrecord.xml配置,不再啰嗦列举。
- 生产者java代码实例CallRecordMessageProducer.java
package com.lianjia.customer.message.producer;
import com.alibaba.fastjson.JSONObject;
import com.lianjia.blacklist.api.dto.PhoneHasMarkedDTO;
import com.lianjia.common.datasource.DataRegion;
import com.lianjia.common.log.RlaLogUtil;
import com.lianjia.customer.log.CustomerRlaLogCode;
import com.lianjia.customer.service.constant.Constants;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.Destination;
import java.util.*;
@Service
public class CallRecordMessageProducer {
protected Logger logger = LoggerFactory.getLogger(this.getClass());
@Resource
JmsTemplate callRecordJmsTemplate;
@Resource
Destination callRecordProductDestination;
public void sendMessageForDealCallRecord(final String regionStr, final Long brokerUid,
final Map<String, PhoneHasMarkedDTO> phoneHasMarkedDTOMap,
final Collection<String> phones,final Boolean isEncrypt) {
try {
if(CollectionUtils.isNotEmpty(phones)){
Constants.THREAD_POOL_MQ.execute(new Runnable() {
@Override
public void run() {
long beginTime = System.currentTimeMillis();
JSONObject jo = new JSONObject();
jo.put("brokerUid", brokerUid);
jo.put("phones", phones);
jo.put("regionStr", regionStr);
jo.put("phoneHasMarkedDTOMap", phoneHasMarkedDTOMap);
jo.put("isEncrypt", isEncrypt);
//jo.put("businessId", businessId);
callRecordJmsTemplate.convertAndSend(callRecordProductDestination, jo.toJSONString());
logger.info("==========sendCallRecordMessage:{},threadId:{},brokerUid:{},phones:{},time:{}",
jo, Thread.currentThread().getId(), regionStr, brokerUid, phones,
System.currentTimeMillis()- beginTime);
}
});
}
} catch (Exception e) {
RlaLogUtil.log(logger, e, CustomerRlaLogCode.errorMessageSendCheck, brokerUid, phones);
}
}
}
- 消费者基本配置文件 spring-consumer-basic.xml
<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">
<!-- Wait 15 seconds first re-delivery, then 45, 135, 405, 1215, 3645 seconds -->
<bean id="amqRedeliveryPolicyConsumer"
class="org.apache.activemq.RedeliveryPolicy">
<!-- 重连时间间隔递增倍数,只有值大于1和启用useExpone -->
<p)roperty name="backOffMultiplier" value="1" />
<!-- 初始重发延迟时间 -->
<property name="initialRedeliveryDelay" value="5000" />
<!-- 最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,
为0时表示不进行重传。 -->
<property name="maximumRedeliveries" value="6" />
<property name="queue" value="*" />
<property name="reactivemqdeliveryDelay" value="5000" />
<!-- 启用指数倍数递增的方式增加延迟时间。 -->
<property name="useExponentialBackOff" value="true" />
<property name="useCollisionAvoidance" value="true" />
<property name="collisionAvoidancePercent" value="15" />
</bean>
<!-- PreFetch预拉取 -->
<bean id="amqPrefetchPolicyConsumer"
class="org.apache.activemq.ActiveMQPrefetchPolicy">
<property name="queuePrefetch" value="100" />
<property nam高e="topicPrefetch" value="100" />
</bean>
<bean id="jmsTransactionManagerConsumer"
class="org.springframework.jms.connection.JmsTransactionManager">
<!-- can also refer to amq.connectionFactory -->
<property name="connectionFactory" ref="jmsConnectionFactoryConsumer" />
</bean>
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="amqConnectionFactoryConsumer"
class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- <property name="brokerURL" value="failover:(tcp://172.30.xx.xx:61616,
tcp://172.30.xx.xx:61616,tcp://172.30.xx.xx:61616)
?initialReconnectDelay=1000"/> -->
<property name="brokerURL" value="${activemq.broker.url}"/>
<property name="userName" value="admin" />
<property name="password" value="admin" />
<property name="useAsyncSend" value="true"/>
<property name="optimizeAcknowledge" value="false" />
<property name="optimizeAcknowledgeTimeOut" value="300" />
<property name="redeliveryPolicy" ref="amqRedeliveryPolicyConsumer" />
<property name="prefetchPolicy" ref="amqPrefetchPolicyConsumer" />
</bean>
<bean id="taskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="1" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="200" />
<property name="keepAliveSeconds" value="300" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="jmsConnectionFactoryConsumer"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactoryConsumer"/>
</be可用an>
</beans>
其他消费者配置,如spring-consumer-resblock.xml参考上面
- 消费者实例java代码实现 CallRecordMessageConsumer.java
package com.lianjia.customer.message.consumer;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.lianjia.common.datasource.DataRegion;
import com.lianjia.common.datasource.impl.CustomerRegion;
import com.lianjia.common.datasource.transaction.DataRegionsTransactional;
import com.lianjia.common.log.RlaLogUtil;
import com.lianjia.customer.log.CustomerRlaLogCode;
import com.lianjia.customer.service.facade.DataFacade;
import com.lianjia.customer.service.facade.LogicFacade;
import org.slf4j.Lo高gger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.*;
@Service
public class CallRecordMessageConsumer implements MessageListener {
protected意思 Logger logger = LoggerFactory.getLogger(this.getClass());
@Resource
MessageConverter jmsSimpleMessageConverter;
@Resource
private LogicFacade logicFacade;
@Resource
private DataFacade dataFacade;
@DataRegionsTransactional
public void onMessage(Message message) {
try {
String objMessage = (String) this.jmsSimpleMessageConverter.fromMessage(message);
RlaLogUtil.log(logger, CustomerRlaLogCode.infoCallRecordMessageAccepted, objMessage);
JSONObject jo = JSON.parseObject(objMessage);
String regionStr = jo.getString("regionStr");
Long brokerUid = jo.getLong("brokerUid");
Boolean isEncrypt = jo.getBoolean("isEncrypt");
Object phoneHasMarkedDTOMapObject = jo.get("phoneHasMarkedDTOMap");
JSONArray phoneArr = jo.getJSONArray("phones");
if (phoneArr == null || phoneArr.size() == 0) {
logger.warn("phones is empty: {}", phoneArr);
return;
}
Collection<String> phones = new ArrayList<String>();
for (int i = 0; i < phoneArr.size(); i ) {
phones.add((String) phoneArr.get(i));
}
DataRegion region = CustomerRegion.getByRegionName(regionStr);
//同步黑名单
//TODO 代码省略
//同步客源潜客
//TODO 代码省略
} catch (Exception e) {
RlaLogUtil.log(logger, e, CustomerRlaLogCode.errorCallRecordMessageAccepted);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
RlaLogUtil.log(logger, e1, CustomerRlaLogCode.errorInternalServer);
}
throw new RuntimeException(e);
}
}
}
按上面配置,生产、消费mq消息demo已经可用了。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至123456@qq.com 举报,一经查实,本站将立刻删除。