问题描述:项目中接收到rabbitmq消息后,先进行一系列的处理,等所有处理完成后,将消息推送到前台,但是在处理消息的过程中,每个方法中都有与数据库交互的代码,直接导致消息推送不及时。
单线程代码模型:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;public class MqHandler implements MessageListener {//消息处理public void onMessage(Message msg) {//处理AdealA(msg);//处理BdealB(msg);//处理CdealC(msg);//处理DdealD(msg);//处理EdealE(msg);//处理FdealF(msg);}public void dealA(Message msg){}public void dealB(Message msg){}public void dealC(Message msg){}public void dealD(Message msg){}public void dealE(Message msg){}public void dealF(Message msg){}
}
单线程处理图示:
解决方案:采用多线程的方式,每个线程处理一个或多个逻辑,提高CPU的使用率,优化消息响应时间。
多线程处理图示:
代码实现
一:配置spring线程池
<bean id="taskExecutor"class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><!-- 核心线程数 --><property name="corePoolSize" value="10" /><!-- 最大线程数 --><property name="maxPoolSize" value="50" /><!-- 队列最大长度 >=mainExecutor.maxSize --><property name="queueCapacity" value="1000" /><!-- 线程池维护线程所允许的空闲时间 --><property name="keepAliveSeconds" value="300" /><!-- 线程池对拒绝任务(无线程可用)的处理策略 --><property name="rejectedExecutionHandler"><bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /></property>
</bean>
二:线程类
ABProcess:将方法A和方法B中的逻辑交给该线程处理
import java.util.ArrayList;
import java.util.List;import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;public class ABProcess implements Runnable {@Autowiredprivate ThreadPoolTaskExecutor taskExecutor;private List<Message> ABList = new ArrayList<Message>();// 初始化方法:启动线程public void init() {taskExecutor.execute(this);}/*** 对外提供添加数据的方法* ABList是共享资源,主线程MqHandler对此进行添加,子线程ABProcess对此进行删除,存在线程安全问题,所以需要加同步* notify():此方法必须在synchronized修饰的代码块或者方法中使用* @param msg*/public synchronized void addList(Message msg) {ABList.add(msg);notify(); // 唤醒在此对象监视器(锁)上等待的单个线程,}@Overridepublic void run() {while (true) {try {thread(); //调用实现方法} catch (Exception e) {e.printStackTrace();}}}/*** 因为涉及共享资源的操作,需要同步* wait():此方法必须在synchronized修饰的代码块或者方法中使用* @throws Exception*/public synchronized void thread() throws Exception {if (ABList.size() > 0) { // 判断集合中是否有消息dealA(ABList.get(0)); //方法AdealB(ABList.get(0));//方法BABList.remove(0); // 处理完后,删除这条数据System.out.println("dealABSuccess");} else {wait(); // 若集合中没有消息,让线程等待,}}public void dealA(Message msg) {}public void dealB(Message msg) {}}
spring配置配置Bean,并初始化init方法
<bean id="ABProcess" class="com.thread.ABProcess" init-method="init"/>
CDProcess:将方法C和方法D的逻辑交给该线程处理,具体实现与ABProcess一致
三:修改主线程MqHandler逻辑
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;public class MqHandler implements MessageListener {@Autowiredprivate ABProcess acProcess;@Autowiredprivate CDProcess cdProcess;//消息处理public void onMessage(Message msg) {acProcess.addList(msg);//主线程将消息添加到集合,交给子线程ABProcess处理cdProcess.addList(msg);//主线程将消息添加到集合,交给子线程CDProcess处理//E和F逻辑代码简单,直接交给主线程dealE(msg);dealF(msg);}public void dealE(Message msg){}public void dealF(Message msg){}
}
需要熟悉回忆线程通信和线程安全/同步的同仁,参考:https://blog.csdn.net/qq_37936542/article/details/81944583