# overt 财务公开项目
# 简介
通过门户网站,向所在农村集体的村民进行特定公开,其中 “三资” 信息中的招标公告、结果公示、待交易资产为全面公开;财务信息的更新频率为每月一次
# 框架
框架 | 说明 | 版本 |
---|---|---|
springboot | 应用开发框架 | 2.3.12.RELEASE |
spring-boot-starter-data-jpa | 操作数据的框架 | |
spring-boot-starter-web | MVC 框架 | |
spring-boot-starter-security | Spring 安全框架 | |
spring-security-cas | cas 验证框架 | |
knife4j-spring-boot-starter | api 文档框架 | |
mapstruct | 对象转换工具 | |
flowable-spring-boot-starter | 工作流 flowable 架包 | 6.8.0 |
easyexcel | excel 导入导出工具 | |
hibernate-validator | 参数校验 | 6.0.17.Final |
# 系统功能
# 动态线程池管理
# 功能点:
配置动态添加线程池,扩展了线程池:
可以清楚的知道线程的执行时间
可以知道执行的线程运行的什么任务
可以知道该任务执行占用的 cpu 时间和占总 cpu 的百分比
可动态修改线程池核心数、线程池拒绝策略等
可以记录线程池中运行线程任务的情况
# 思路:
我们都知道在创建一个线程池需要传参,线程池名称、核心线程池大小、最大线程池大小、空闲线程的存活时间、用于保存等待执行的任务的阻塞队列、无法执行时的拒绝策略 这些参数等等
我想随意的调节这些参数怎么办?
我想加一个执行特定的任务的线程池怎么办?
这里我是存储在配置文件中,项目启动的时候使用 事件的监听器
、 bean定义注册器
,读取配置文件中所有线程配置动态注册
# 具体实现步骤:
application.yml
#注: 这里也只是比较单一的动态线程池。 | |
#动态线程池配置 | |
dynamic: | |
thread: | |
pool: | |
pool1: | |
poolName: 业务线程 | |
#用注册到 spring 容器中的 bean 名称,方便使用 | |
beanName: bizPool | |
corePoolSize: 30 | |
maximumPoolSize: 64 | |
keepAliveTime: 60 | |
blockQueueCapacity: 100 | |
#任务拒绝策略 | |
#1: AbortPolicy 默认的处理策略,会直接抛出 RejectedExecutionException 异常,阻止系统继续接受新的任务 | |
#2: CallerRunsPolicy 当任务被拒绝时,会在调用者的线程中执行该任务。 | |
#3: DiscardPolicy 当任务被拒绝时,会默默地丢弃该任务。 | |
#4: DiscardOldestPolicy 当任务被拒绝时,会丢弃最老的一个任务,然后尝试重新提交被拒绝的任务。 | |
rejectedType: 1 | |
pool2: | |
poolName: 定时任务线程 | |
#用注册到 spring 容器中的 bean 名称,方便使用 | |
beanName: jobPool | |
corePoolSize: 15 | |
maximumPoolSize: 32 | |
keepAliveTime: 30 | |
blockQueueCapacity: 50 | |
#任务拒绝策略 | |
#1: AbortPolicy 默认的处理策略,会直接抛出 RejectedExecutionException 异常,阻止系统继续接受新的任务 | |
#2: CallerRunsPolicy 当任务被拒绝时,会在调用者的线程中执行该任务。 | |
#3: DiscardPolicy 当任务被拒绝时,会默默地丢弃该任务。 | |
#4: DiscardOldestPolicy 当任务被拒绝时,会丢弃最老的一个任务,然后尝试重新提交被拒绝的任务。 | |
rejectedType: 1 |
动态线程池的配置类:
import lombok.Data; | |
import org.springframework.boot.context.properties.ConfigurationProperties; | |
import org.springframework.stereotype.Component; | |
import java.util.Map; | |
/** | |
* @author Tz | |
* @date 2024/01/22 15:03 | |
* @Description 动态线程池的配置属性 | |
*/ | |
@Component | |
@ConfigurationProperties(prefix = "dynamic.thread") | |
public class DynamicThreadPoolProperties { | |
private Map<String, PoolConfig> pool; | |
public Map<String, PoolConfig> getPool() { | |
return pool; | |
} | |
public void setPool(Map<String, PoolConfig> pool) { | |
this.pool = pool; | |
} | |
@Data | |
public static class PoolConfig { | |
/** | |
* 线程池名称 | |
*/ | |
private String poolName; | |
/** | |
* 动态注册的 bean 名称 | |
*/ | |
private String beanName; | |
/** | |
* 核心线程池大小,即线程池中始终保持存活的线程数量 | |
*/ | |
private int corePoolSize; | |
/** | |
* 最大线程池大小,即线程池中允许的最大线程数量 | |
*/ | |
private int maximumPoolSize; | |
/** | |
* 空闲线程的存活时间,超过这个时间的空闲线程会被回收 默认单位为:second | |
*/ | |
private long keepAliveTime; | |
/** | |
* 用于保存等待执行的任务的阻塞队列 默认队列为 new ArrayBlockingQueue<>(blockQueueCapacity, true) | |
*/ | |
private int blockQueueCapacity; | |
/** | |
* 当线程池已满,且任务无法执行时的拒绝策略 | |
* <p> | |
* <li> | |
* 1: AbortPolicy 默认的处理策略,会直接抛出 RejectedExecutionException 异常,阻止系统继续接受新的任务 | |
* <li> | |
* 2: CallerRunsPolicy 当任务被拒绝时,会在调用者的线程中执行该任务。 | |
* <li> | |
* 3: DiscardPolicy 当任务被拒绝时,会默默地丢弃该任务。 | |
* <li> | |
* 4: DiscardOldestPolicy 当任务被拒绝时,会丢弃最老的一个任务,然后尝试重新提交被拒绝的任务。 | |
*/ | |
private int rejectedType; | |
} | |
} |
扩展的线程池
package com.bright.ghj.common.config; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.lang.management.ManagementFactory; | |
import com.sun.management.OperatingSystemMXBean; | |
import java.lang.management.ThreadMXBean; | |
import java.util.List; | |
import java.util.Objects; | |
import java.util.concurrent.*; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicLong; | |
/** | |
* @author Tz | |
* @date 2024/01/22 15:03 | |
* @Description 业务线程池扩展 | |
*/ | |
public class BizThreadPoolExt extends ThreadPoolExecutor { | |
private static final Logger LOGGER = LoggerFactory.getLogger(BizThreadPoolExt.class); | |
/** | |
* Java 虚拟机的线程系统的管理接口 | |
*/ | |
private final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); | |
/** | |
* 运行 Java 虚拟机的操作系统的管理接口 | |
*/ | |
private final OperatingSystemMXBean osBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); | |
/** | |
* 默认拒绝策略 | |
*/ | |
private static final RejectedExecutionHandler DEFAULT_DENIAL_STRATEGY = new AbortPolicy(); | |
/** | |
* 线程池名称,一般以业务名称命名,方便区分 | |
*/ | |
private String poolName; | |
/** | |
* 最短执行时间 | |
*/ | |
private Long minCostTime; | |
/** | |
* 最长执行时间 | |
*/ | |
private Long maxCostTime; | |
/** | |
* 总的耗时 | |
*/ | |
private AtomicLong totalCostTime = new AtomicLong(); | |
private ThreadLocal<Long> startTimeThreadLocal = new ThreadLocal<>(); | |
/** | |
* 调用父类的构造方法,并初始化 HashMap 和线程池名称 | |
* | |
* @param corePoolSize 线程池核心线程数 | |
* @param maximumPoolSize 线程池最大线程数 | |
* @param keepAliveTime 线程的最大空闲时间 | |
* @param unit 空闲时间的单位 | |
* @param workQueue 保存被提交任务的队列 | |
* @param poolName 线程池名称 | |
*/ | |
public BizThreadPoolExt(int corePoolSize, int maximumPoolSize, long keepAliveTime, | |
TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) { | |
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, | |
Executors.defaultThreadFactory(), poolName); | |
} | |
/** | |
* 调用父类的构造方法,并初始化 HashMap 和线程池名称 | |
* | |
* @param corePoolSize 线程池核心线程数 | |
* @param maximumPoolSize 线程池最大线程数 | |
* @param keepAliveTime 线程的最大空闲时间 | |
* @param unit 空闲时间的单位 | |
* @param workQueue 保存被提交任务的队列 | |
* @param | |
* @param poolName 线程池名称 | |
*/ | |
public BizThreadPoolExt(int corePoolSize, int maximumPoolSize, long keepAliveTime, | |
TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, String poolName) { | |
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, | |
Executors.defaultThreadFactory(), handler, poolName); | |
} | |
/** | |
* 调用父类的构造方法,并初始化 HashMap 和线程池名称 | |
* | |
* @param corePoolSize 线程池核心线程数 | |
* @param maximumPoolSize 线程池最大线程数 | |
* @param keepAliveTime 线程的最大空闲时间 | |
* @param unit 空闲时间的单位 | |
* @param workQueue 保存被提交任务的队列 | |
* @param threadFactory 线程工厂 | |
* @param poolName 线程池名称 | |
*/ | |
public BizThreadPoolExt(int corePoolSize, int maximumPoolSize, long keepAliveTime, | |
TimeUnit unit, BlockingQueue<Runnable> workQueue, | |
ThreadFactory threadFactory, String poolName) { | |
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, DEFAULT_DENIAL_STRATEGY); | |
this.poolName = poolName; | |
this.maxCostTime = 0L; | |
this.minCostTime = 0L; | |
} | |
/** | |
* 调用父类的构造方法,并初始化 HashMap 和线程池名称 | |
* | |
* @param corePoolSize 线程池核心线程数 | |
* @param maximumPoolSize 线程池最大线程数 | |
* @param keepAliveTime 线程的最大空闲时间 | |
* @param unit 空闲时间的单位 | |
* @param workQueue 保存被提交任务的队列 | |
* @param threadFactory 线程工厂 | |
* @param handler 拒绝策略 | |
* @param poolName 线程池名称 | |
*/ | |
public BizThreadPoolExt(int corePoolSize, int maximumPoolSize, long keepAliveTime, | |
TimeUnit unit, BlockingQueue<Runnable> workQueue, | |
ThreadFactory threadFactory, RejectedExecutionHandler handler, String poolName) { | |
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); | |
this.poolName = poolName; | |
this.maxCostTime = 0L; | |
this.minCostTime = 0L; | |
} | |
/** | |
* 根据线程 id 获取线程执行占用总 cpu 的百分比 | |
* @param threadId 线程 id | |
* @return 占用总 cpu 的百分比 | |
*/ | |
private double getThreadCpuUsage(long threadId) { | |
long threadCpuTime = threadMXBean.getThreadCpuTime(threadId); | |
long systemCpuTime = osBean.getProcessCpuTime(); | |
double cpuUsage = ((double) threadCpuTime / (double) systemCpuTime) * 100; | |
cpuUsage = Math.round(cpuUsage * 100.0) / 100.0; | |
return cpuUsage; | |
} | |
/** | |
* 线程池延迟关闭时(等待线程池里的任务都执行完毕),统计线程池情况 | |
*/ | |
@Override | |
public void shutdown() { | |
// 统计已执行任务、正在执行任务、未执行任务数量 | |
LOGGER.info("{} 关闭线程池, 已执行任务: {}, 正在执行任务: {}, 未执行任务数量: {}", | |
this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()); | |
super.shutdown(); | |
} | |
/** | |
* 线程池立即关闭时,统计线程池情况 | |
*/ | |
@Override | |
public List<Runnable> shutdownNow() { | |
// 统计已执行任务、正在执行任务、未执行任务数量 | |
LOGGER.info("{} 立即关闭线程池,已执行任务: {}, 正在执行任务: {}, 未执行任务数量: {}", | |
this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()); | |
return super.shutdownNow(); | |
} | |
/** | |
* 任务执行之前,记录任务开始时间 | |
*/ | |
@Override | |
protected void beforeExecute(Thread t, Runnable r) { | |
startTimeThreadLocal.set(System.currentTimeMillis()); | |
} | |
/** | |
* 任务执行之后,计算任务结束时间 | |
*/ | |
@Override | |
protected void afterExecute(Runnable r, Throwable t) { | |
long costTime = System.currentTimeMillis() - startTimeThreadLocal.get(); | |
String threadName = Thread.currentThread().getName(); | |
startTimeThreadLocal.remove(); | |
maxCostTime = maxCostTime > costTime ? maxCostTime : costTime; | |
if (getCompletedTaskCount() == 0) { | |
minCostTime = costTime; | |
} | |
minCostTime = minCostTime < costTime ? minCostTime : costTime; | |
totalCostTime.addAndGet(costTime); | |
LOGGER.info("{}-pool-monitor: " + | |
"任务名称: {}, 任务耗时: {} ms, 占用cpu时间: {} ms, 占总cpu: {} %, 初始线程数: {}, 核心线程数: {}, 执行的任务数量: {}, " + | |
"已完成任务数量: {}, 任务总数: {}, 队列里缓存的任务数量: {}, 池中存在的最大线程数: {}, " + | |
"最大允许的线程数: {}, 线程空闲时间: {}, 线程池是否关闭: {}, 线程池是否终止: {}", | |
this.poolName, threadName, | |
costTime, threadMXBean.getThreadCpuTime(Thread.currentThread().getId()) / (1000 * 1000), getThreadCpuUsage(Thread.currentThread().getId()), | |
this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(), | |
this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(), | |
this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated()); | |
} | |
public Long getMinCostTime() { | |
return minCostTime; | |
} | |
public Long getMaxCostTime() { | |
return maxCostTime; | |
} | |
public void setMaxCostTime(Long maxCostTime) { | |
this.maxCostTime = maxCostTime; | |
} | |
public void setMinCostTime(Long mixCostTime) { | |
this.minCostTime = mixCostTime; | |
} | |
public long getAverageCostTime(){ | |
if(getCompletedTaskCount()==0||totalCostTime.get()==0){ | |
return 0; | |
} | |
return totalCostTime.get()/getCompletedTaskCount(); | |
} | |
public String getPoolName() { | |
return poolName; | |
} | |
} |
动态线程池的管理类(管理所有线程池的什么周期, 其实这里也只是管理释放线程池)
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.stereotype.Component; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.concurrent.TimeUnit; | |
/** | |
* @author Tz | |
* @date 2024/02/23 17:16 | |
* @Description 线程池管理 | |
*/ | |
@Slf4j | |
@Component | |
public class DynamicThreadPoolManager { | |
private final Map<String, BizThreadPoolExt> bizThreadPoolExtMap = new HashMap<>(); | |
/** | |
* 注册线程池到管理器中 | |
* @param poolName 线程池名称 | |
* @param bizThreadPoolExt 线程池 | |
*/ | |
public void registerThreadPool(String poolName, BizThreadPoolExt bizThreadPoolExt) { | |
bizThreadPoolExtMap.put(poolName, bizThreadPoolExt); | |
} | |
/** | |
* 释放所有线程池 | |
*/ | |
public void shutdownAllThreadPools(){ | |
for (BizThreadPoolExt bizThreadPoolExt : bizThreadPoolExtMap.values()) { | |
try { | |
// 关闭线程池 | |
bizThreadPoolExt.shutdown(); | |
// 等待线程池内部的线程任务完成,并设置超时时间 30 秒 | |
boolean execFlag = bizThreadPoolExt.awaitTermination(30L, TimeUnit.SECONDS); | |
if (execFlag) { | |
log.info("线程池:[{}] 已经停止, 该线程池已经关闭并且没有还在运行的线程", bizThreadPoolExt.getPoolName()); | |
} else { | |
log.info("线程池:[{}] 中可能还存在正在运行的线程", bizThreadPoolExt.getPoolName()); | |
} | |
} catch (InterruptedException e) { | |
if (!bizThreadPoolExt.isShutdown()) { | |
bizThreadPoolExt.shutdown(); | |
} | |
} | |
} | |
} | |
} |
动态线程池的变更事件类
import org.springframework.context.ApplicationEvent; | |
/** | |
* @author Tz | |
* @date 2024/01/22 15:03 | |
* @Description 动态线程池的变更事件 | |
*/ | |
public class DynamicThreadPoolChangeEvent extends ApplicationEvent { | |
public DynamicThreadPoolChangeEvent(Object source) { | |
super(source); | |
} | |
} |
动态线程池的变更事件发布类
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.context.ApplicationEventPublisher; | |
import org.springframework.stereotype.Service; | |
import javax.annotation.PostConstruct; | |
/** | |
* @author Tz | |
* @date 2024/01/22 15:03 | |
* @Description 动态线程池的变更事件发布类 | |
*/ | |
@Service | |
public class DynamicThreadPoolChangePublisher { | |
@Autowired | |
private ApplicationEventPublisher applicationEventPublisher; | |
/** | |
* PostConstruct: 初始化执行发布 | |
*/ | |
@PostConstruct | |
public void init() { | |
// 会在 spring 容器启动的时候执行监听器的方法 | |
applicationEventPublisher.publishEvent(new DynamicThreadPoolChangeEvent(this)); | |
} | |
} |
动态线程池的变更事件监听类(核心)
import org.springframework.beans.factory.config.BeanDefinition; | |
import org.springframework.beans.factory.config.ConstructorArgumentValues; | |
import org.springframework.beans.factory.support.BeanDefinitionBuilder; | |
import org.springframework.beans.factory.support.BeanDefinitionRegistry; | |
import org.springframework.context.ApplicationContext; | |
import org.springframework.context.ApplicationListener; | |
import org.springframework.stereotype.Component; | |
import java.util.Map; | |
import java.util.concurrent.ArrayBlockingQueue; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
/** | |
* @author Tz | |
* @date 2024/01/22 15:03 | |
* @Description 动态线程池变更事件的监听器 | |
*/ | |
@Component | |
public class DynamicThreadPoolChangeListener implements ApplicationListener<DynamicThreadPoolChangeEvent> { | |
private final DynamicThreadPoolProperties dynamicThreadPoolProperties; | |
private final DynamicThreadPoolManager dynamicThreadPoolManager; | |
private final ApplicationContext context; | |
public DynamicThreadPoolChangeListener(ApplicationContext context | |
, DynamicThreadPoolProperties dynamicThreadPoolProperties | |
, DynamicThreadPoolManager dynamicThreadPoolManager) { | |
this.context = context; | |
this.dynamicThreadPoolProperties = dynamicThreadPoolProperties; | |
this.dynamicThreadPoolManager = dynamicThreadPoolManager; | |
} | |
@Override | |
public void onApplicationEvent(DynamicThreadPoolChangeEvent event) { | |
// 获取 BeanDefinitionRegistry | |
BeanDefinitionRegistry beanDefinitionRegistry = (BeanDefinitionRegistry) context.getAutowireCapableBeanFactory(); | |
// 获取配置文件中的线程池配置 | |
Map<String, DynamicThreadPoolProperties.PoolConfig> poolConfigMap = dynamicThreadPoolProperties.getPool(); | |
for (Map.Entry<String, DynamicThreadPoolProperties.PoolConfig> entry : poolConfigMap.entrySet()) { | |
DynamicThreadPoolProperties.PoolConfig poolConfig = entry.getValue(); | |
// 获取相关属性 | |
String poolName = poolConfig.getPoolName(); | |
int corePoolSize = poolConfig.getCorePoolSize(); | |
String beanName = poolConfig.getBeanName(); | |
long keepAliveTime = poolConfig.getKeepAliveTime(); | |
int blockQueueCapacity = poolConfig.getBlockQueueCapacity(); | |
int maximumPoolSize = poolConfig.getMaximumPoolSize(); | |
int rejectedType = poolConfig.getRejectedType(); | |
// 需要注册的线程池 | |
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(BizThreadPoolExt.class); | |
// 构建构造器的参数 | |
ConstructorArgumentValues constructorArgs = new ConstructorArgumentValues(); | |
constructorArgs.addIndexedArgumentValue(0, corePoolSize); | |
constructorArgs.addIndexedArgumentValue(1, maximumPoolSize); | |
constructorArgs.addIndexedArgumentValue(2, keepAliveTime); | |
constructorArgs.addIndexedArgumentValue(3, TimeUnit.SECONDS); | |
constructorArgs.addIndexedArgumentValue(4, new ArrayBlockingQueue<>(blockQueueCapacity, true)); | |
// 处理设置的拒绝策略 | |
switch (rejectedType) { | |
case 2: | |
constructorArgs.addIndexedArgumentValue(5, new ThreadPoolExecutor.CallerRunsPolicy()); | |
break; | |
case 3: | |
constructorArgs.addIndexedArgumentValue(5, new ThreadPoolExecutor.DiscardPolicy()); | |
break; | |
case 4: | |
constructorArgs.addIndexedArgumentValue(5, new ThreadPoolExecutor.DiscardOldestPolicy()); | |
break; | |
default: | |
constructorArgs.addIndexedArgumentValue(5, new ThreadPoolExecutor.AbortPolicy()); | |
break; | |
} | |
// 设置线程池名称 | |
constructorArgs.addIndexedArgumentValue(6, poolName); | |
// 设置 bean | |
builder.getRawBeanDefinition().setConstructorArgumentValues(constructorArgs); | |
// 注册 bean | |
BeanDefinition beanDefinition = builder.getRawBeanDefinition(); | |
beanDefinitionRegistry.registerBeanDefinition(beanName, beanDefinition); | |
// 获取线程池实例 | |
BizThreadPoolExt bizThreadPoolExt = (BizThreadPoolExt) context.getBean(beanName); | |
// 注册到线程管理器中 | |
dynamicThreadPoolManager.registerThreadPool(poolName | |
, bizThreadPoolExt); | |
} | |
} | |
} |
还需要动态线程池关闭的监听器,因为动态注册后关闭线程不会触发关闭事件需要手动触发关闭事件
import org.springframework.context.ApplicationListener; | |
import org.springframework.context.event.ContextClosedEvent; | |
import org.springframework.stereotype.Component; | |
/** | |
* @author Tz | |
* @date 2024/02/23 17:15 | |
* @Description 动态线程池的销毁时间事件 ContextClosedEvent 容器关闭的事件 | |
*/ | |
@Component | |
public class DynamicThreadPoolCleanupListener implements ApplicationListener<ContextClosedEvent> { | |
private final DynamicThreadPoolManager dynamicThreadPoolManager; | |
public DynamicThreadPoolCleanupListener(DynamicThreadPoolManager dynamicThreadPoolManager) { | |
this.dynamicThreadPoolManager = dynamicThreadPoolManager; | |
} | |
@Override | |
public void onApplicationEvent(ContextClosedEvent event) { | |
// 进行销毁前对线程池进行手动释放 | |
dynamicThreadPoolManager.shutdownAllThreadPools(); | |
} | |
} |
项目结构:
# 使用:
直接注入使用
/** | |
* 使用 beanname 为 bizPool 的线程池 | |
*/ | |
@Resource | |
private BizThreadPoolExt bizPool; | |
/** | |
* 使用 beanname 为 jobPool 的线程池 | |
*/ | |
@Resource | |
private BizThreadPoolExt jobPool; |
# 异步任务提交器
# 功能点:
由于在该项目中的功能大部分都是需要异步执行的任务,比如上传数据到上级平台、上传附件数据解析保存、上传 access 数据库的数据保存等等。所以这里进行了异步任务的封装。
# 思路:
可以把一个任务看作一个有状态的对象,该对象包括以下内容:
- 任务对象的基础信息,执行状态、进度、等信息
- 任务的行为,开始执行任务,停止任务,为自己加锁,推进进度等等。具体执行的任务内容由自己实现决定。
- 选哟异步任务提交的情况的对象, 能够知道任务是否进入线程池了
通过上述问题:
定义一个通用的异步任务处理器,提供通用被调用的方法,该方法做状态的处理,并在该方法中在调用子类实现的逻辑
在定义一个通用的返回方法,可以知道任务是否被提交, 子类实现可以根据判断提交的情况针对业务做自定义返回
异步任务提交的方法接受异步任务执行器,并异步调用执行中的通用方法,提交成功在返回执行结果
# 具体实现步骤:
项目结构
首先创建一张表保存任务对象执行情况的信息,因为需要查询任务最后的执行情况
/* | |
Navicat Premium Data Transfer | |
Source Server : 192.168.0.36 | |
Source Server Type : SQL Server | |
Source Server Version : 10501600 | |
Source Host : 192.168.0.36:1433 | |
Source Catalog : zcjy_th_test | |
Source Schema : dbo | |
Target Server Type : SQL Server | |
Target Server Version : 10501600 | |
File Encoding : 65001 | |
Date: 26/04/2024 11:57:32 | |
*/ | |
-- ---------------------------- | |
-- Table structure for async_message | |
-- ---------------------------- | |
IF EXISTS (SELECT * FROM sys.all_objects WHERE object_id = OBJECT_ID(N'[dbo].[async_message]') AND type IN ('U')) | |
DROP TABLE [dbo].[async_message] | |
GO | |
CREATE TABLE [dbo].[async_message] ( | |
[id] int IDENTITY(1,1) NOT NULL, | |
[keyword] varchar(255) COLLATE Chinese_PRC_CI_AS NULL, | |
[username] varchar(255) COLLATE Chinese_PRC_CI_AS NULL, | |
[task_type] varchar(255) COLLATE Chinese_PRC_CI_AS NULL, | |
[task_name] varchar(255) COLLATE Chinese_PRC_CI_AS NULL, | |
[years] int NULL, | |
[months] int NULL, | |
[days] int NULL, | |
[dist_no] varchar(255) COLLATE Chinese_PRC_CI_AS NULL, | |
[dist_name] varchar(255) COLLATE Chinese_PRC_CI_AS NULL, | |
[zt_id] varchar(255) COLLATE Chinese_PRC_CI_AS NULL, | |
[zt_name] varchar(255) COLLATE Chinese_PRC_CI_AS NULL, | |
[content_type] int NULL, | |
[content_info] text COLLATE Chinese_PRC_CI_AS NULL, | |
[error_info] text COLLATE Chinese_PRC_CI_AS NULL, | |
[exec_status] int NULL, | |
[exec_flag] bit NULL, | |
[run_flag] bit NULL, | |
[read_flag] bit NULL, | |
[exec_time] int NULL, | |
[created_by] varchar(255) COLLATE Chinese_PRC_CI_AS NULL, | |
[created_time] datetime NULL, | |
[updated_by] varchar(255) COLLATE Chinese_PRC_CI_AS NULL, | |
[updated_time] datetime NULL | |
) | |
GO | |
ALTER TABLE [dbo].[async_message] SET (LOCK_ESCALATION = TABLE) | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'主键', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'id' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'关键值;关键值', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'keyword' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'发送任务的用户名;发送任务的用户名', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'username' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'任务类型;任务类型', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'task_type' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'任务名称;任务名称', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'task_name' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'年份;年份', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'years' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'月份;月份', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'months' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'天数;天数', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'days' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'地区号;地区号', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'dist_no' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'地区名称;地区名称', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'dist_name' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'账套id;账套id', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'zt_id' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'账套名称;账套名称', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'zt_name' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'消息体类型;消息响应的类型。0:普通、1:页面跳转、2:嵌套窗口', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'content_type' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'消息内容;执行任务的响应消息', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'content_info' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'错误信息;执行错误情况', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'error_info' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'执行情况;0、待执行。1、执行中。2、执行成功。-1、执行失败', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'exec_status' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'是否执行;消息执行状态(1、成功。0、失败)', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'exec_flag' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'是否可用;消息是否可用(1、可用、0、不可用)', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'run_flag' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'是否已读消息;消息是否已读状态', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'read_flag' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'执行时间;执行花费的时间(毫秒)', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'exec_time' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'创建人', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'created_by' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'创建时间', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'created_time' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'更新人', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'updated_by' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'更新时间', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message', | |
'COLUMN', N'updated_time' | |
GO | |
EXEC sp_addextendedproperty | |
'MS_Description', N'异步任务消息表', | |
'SCHEMA', N'dbo', | |
'TABLE', N'async_message' | |
GO | |
-- ---------------------------- | |
-- Primary Key structure for table async_message | |
-- ---------------------------- | |
ALTER TABLE [dbo].[async_message] ADD CONSTRAINT [PK__async_me__3213E83F100234CD] PRIMARY KEY CLUSTERED ([id]) | |
WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) | |
ON [PRIMARY] | |
GO |
创建记录任务对象执行情况的信息的对象
import lombok.Data; | |
import javax.persistence.*; | |
import java.io.Serializable; | |
import java.util.Date; | |
/** | |
* @author Tz | |
* @date 2024/01/22 15:03 | |
* @Description 异步任务消息实体 | |
*/ | |
@Data | |
@Entity | |
@Table(name ="async_message" , schema = "dbo") | |
public class AsyncMessage implements Serializable { | |
private static final long serialVersionUID = 1L; | |
/** 主键 */ | |
@Id | |
@GeneratedValue(strategy = GenerationType.IDENTITY) | |
@Column(name = "id") | |
private Integer id ; | |
/** 关键值;关键值 */ | |
@Column(name = "keyword") | |
private String keyword ; | |
/** 发送任务的用户名;发送任务的用户名 */ | |
@Column(name = "username") | |
private String username ; | |
/** 任务类型;任务类型 */ | |
@Column(name = "task_type") | |
private String taskType ; | |
/** 任务名称;任务名称 */ | |
@Column(name = "task_name") | |
private String taskName ; | |
/** 年份;年份 */ | |
@Column(name = "years") | |
private Integer years ; | |
/** 月份;月份 */ | |
@Column(name = "months") | |
private Integer months ; | |
/** 天数;天数 */ | |
@Column(name = "days") | |
private Integer days ; | |
/** 地区号;地区号 */ | |
@Column(name = "dist_no") | |
private String distNo ; | |
/** 地区名称;地区名称 */ | |
@Column(name = "dist_name") | |
private String distName ; | |
/** 账套 id; 账套 id */ | |
@Column(name = "zt_id") | |
private String ztId ; | |
/** 账套名称;账套名称 */ | |
@Column(name = "zt_name") | |
private String ztName ; | |
/** 消息体类型;执行任务的类型 */ | |
@Column(name = "content_type") | |
private Integer contentType ; | |
/** 消息内容;消息响应的类型。0:普通、1:页面跳转、2:嵌套窗口 */ | |
@Column(name = "content_info") | |
private String contentInfo ; | |
/** 错误信息;消息响应的类型。执行错误情况 */ | |
@Column(name = "error_info") | |
private String errorInfo ; | |
/** 执行情况;0、待执行。1、执行中。2、执行成功。-1、执行失败。-2、执行取消。-3、执行错误 */ | |
@Column(name = "exec_status") | |
private Integer execStatus ; | |
/** 状态类别;0、全部。1、执行中。2、执行成功。3、执行失败 */ | |
@Transient | |
private Integer statusCategory ; | |
/** 是否执行;消息执行状态(1、成功。0、失败) */ | |
@Column(name = "exec_flag") | |
private Boolean execFlag ; | |
/** 是否可用;消息是否可用(1、可用、0、不可用) */ | |
@Column(name = "run_flag") | |
private Boolean runFlag ; | |
/** 是否已读消息;消息是否已读状态 */ | |
@Column(name = "read_flag") | |
private Boolean readFlag ; | |
/** 执行时间;执行花费的时间(毫秒) */ | |
@Column(name = "exec_time") | |
private Integer execTime ; | |
/** 创建人 */ | |
@Column(name = "created_by") | |
private String createdBy ; | |
/** 创建时间 */ | |
@Column(name = "created_time") | |
private Date createdTime ; | |
/** 更新人 */ | |
@Column(name = "updated_by") | |
private String updatedBy ; | |
/** 更新时间 */ | |
@Column(name = "updated_time") | |
private Date updatedTime ; | |
} |
相关的 VO、DTO 字段和 PO 实体类是一样的, 这里使用了 mapstruct
做了对象转换, 转换器:
import org.mapstruct.Mapper; | |
import java.util.ArrayList; | |
import java.util.List; | |
/** | |
* @author Tz | |
* @date 2024/01/22 15:03 | |
* @Description 异步任务消息实体的 转换器 | |
*/ | |
@Mapper(componentModel = "spring") | |
public interface AsyncMessageConverter extends BaseConverter<AsyncMessage, AsyncMessageDTO, AsyncMessageVO, CommonCommand> { | |
/** | |
* 自定义 List-PO 转 List-DTO 处理状态分类的字段 | |
* @param poList | |
* @return | |
*/ | |
@Override | |
default List<AsyncMessageDTO> poConvertDTO(List<AsyncMessage> poList) { | |
if ( poList == null ) { | |
return null; | |
} | |
List<AsyncMessageDTO> list = new ArrayList<AsyncMessageDTO>( poList.size() ); | |
for ( AsyncMessage asyncMessage : poList ) { | |
asyncMessage.setStatusCategory(getCategoryFromStatus(asyncMessage.getExecStatus())); | |
list.add( poConvertDTO( asyncMessage ) ); | |
} | |
return list; | |
} | |
} |
创建一个异步任务执行器抽象类,提供异步任务通用方法,具体实现由任务子类来实现,这里继承了 AsyncMessageDTO 获得执行情况的状态,和保存状态
import lombok.Data; | |
import lombok.EqualsAndHashCode; | |
import lombok.NoArgsConstructor; | |
import lombok.extern.slf4j.Slf4j; | |
import java.io.PrintWriter; | |
import java.io.Serializable; | |
import java.io.StringWriter; | |
import java.util.Enumeration; | |
import java.util.Objects; | |
import java.util.concurrent.ConcurrentHashMap; | |
import static com.bright.ghj.overt.enums.AsyncTaskExecInfo.*; | |
/** | |
* @author Tz | |
* @date 2024/01/22 15:03 | |
* @Description 异步任务执行器的抽象类 继承 AsyncMessageDTO 获得执行情况的状态 | |
*/ | |
@Slf4j | |
@Data | |
@NoArgsConstructor | |
@EqualsAndHashCode(callSuper = true) | |
public abstract class AsyncTaskExecutor extends AsyncMessageDTO implements Serializable { | |
/** | |
* 异步任务执行器名称 | |
*/ | |
private String asyncTaskExecutorName; | |
/** | |
* 原子操作 | |
* <p> | |
* key: 实际的异步任务执行器名称 | |
* value: 实际的异步任务同步锁集合 | |
*/ | |
private static ConcurrentHashMap<String, ConcurrentHashMap> BASIC_TASK_LOCK = new ConcurrentHashMap<>(); | |
/** | |
* 进度百分比 | |
*/ | |
protected static final Integer PERCENTAGE = 100; | |
/** | |
* 完成度 | |
*/ | |
protected Integer completionDegree; | |
/** | |
* 进度步长 | |
*/ | |
protected Integer progressStepSize; | |
/** | |
* 任务是否是运行状态 | |
*/ | |
protected Boolean isRunning = true; | |
/** | |
* 同步锁的关键值 | |
* <p> | |
* 相同的 key 值会进行同步操作 | |
* <p> | |
* 注意:不设置也是同步执行的 | |
*/ | |
protected String syncKeyword = ""; | |
/** | |
* 处理异步消息任务的接口 | |
*/ | |
protected AsyncMessageService asyncMessageService; | |
/** | |
* 初始化值 | |
*/ | |
public AsyncTaskExecutor(String asyncTaskExecutorName){ | |
//======= 初始化相关参数 ======= | |
// 初始进度 | |
setCompletionDegree(0); | |
// 默认未读状态 | |
setReadFlag(false); | |
// 默认待执行状态 | |
setExecStatus(TODO_EXEC.getStatus()); | |
// 消息默认可以用 | |
setRunFlag(true); | |
// 消息默认未开始执行 | |
setExecFlag(false); | |
setCreatedBy(SecurityUtil.getLoginUser().getUsername()); | |
setCreatedTime(DateUtil.getCurrDate()); | |
this.asyncTaskExecutorName = asyncTaskExecutorName; | |
this.asyncMessageService = ApplicationContextProvider.getApplicationContext().getBean(AsyncMessageServiceImpl.class); | |
} | |
/** | |
* 基础执行器,只是做异步任务的消息记录 具体的业务由子类实现 {@link #taskContent ()} 完成 | |
*/ | |
public void baseExec() { | |
AsyncMessageDTO asyncMessageDTO = asyncMessageService.getById(this.getId()); | |
// 记录时间 | |
long startTime = System.currentTimeMillis(); | |
try { | |
// 锁对象 | |
Object object = null; | |
if (!BASIC_TASK_LOCK.containsKey(asyncTaskExecutorName)) { | |
BASIC_TASK_LOCK.put(asyncTaskExecutorName, new ConcurrentHashMap<String, Object>()); | |
} | |
//1、校验任务是否取消 | |
// 需要二次判断,处理锁 | |
synchronized (this){ | |
// 查看这条消息是否是未执行且可用的 | |
AsyncMessageDTO messageDTO = asyncMessageService.getById(asyncMessageDTO.getId()); | |
// 如果不可用 代表该消息在没被消费前被取消了 | |
if(!messageDTO.getRunFlag()){ | |
messageDTO.setContentInfo("任务被取消!"); | |
messageDTO.setExecStatus(EXECUTION_CANCEL.getStatus()); | |
messageDTO.setExecFlag(true); | |
messageDTO.setUpdatedBy(asyncMessageDTO.getUsername()); | |
messageDTO.setUpdatedTime(DateUtil.getCurrDate()); | |
asyncMessageService.save(messageDTO); | |
return; | |
} | |
// 如果根据 key 查询到了对象锁 | |
ConcurrentHashMap concurrentHashMap = BASIC_TASK_LOCK.get(asyncTaskExecutorName); | |
Enumeration enumeration = concurrentHashMap.keys(); | |
while(enumeration.hasMoreElements()){ | |
String key = (String) enumeration.nextElement(); | |
if(key.equalsIgnoreCase(this.syncKeyword)){ | |
object = concurrentHashMap.get(key); | |
} | |
} | |
// 没有获取到锁则新增 | |
if(object == null){ | |
object = new Object(); | |
} | |
} | |
// 进入消费 表示该消息已经开始消费了 | |
//3、锁住自己定义的同步锁 进行业务处理 | |
// 如果上一次执行的锁标识没有释放,现在有需要没释放的锁,就会进行等待 | |
synchronized(object) { | |
try { | |
// 查看这条消息是否是未消费且可用的 | |
AsyncMessageDTO resMqMessage = asyncMessageService.getById(asyncMessageDTO.getId()); | |
// 如果不可用 代表该消息在没被消费前被取消了 | |
if(!resMqMessage.getRunFlag()){ | |
resMqMessage.setContentInfo("任务被取消!"); | |
resMqMessage.setExecStatus(EXECUTION_CANCEL.getStatus()); | |
resMqMessage.setUpdatedBy(resMqMessage.getUsername()); | |
resMqMessage.setUpdatedTime(DateUtil.getCurrDate()); | |
asyncMessageService.save(resMqMessage); | |
return; | |
} | |
// 开始执行 将锁标识放入 | |
BASIC_TASK_LOCK.get(asyncTaskExecutorName).put(this.syncKeyword, object); | |
asyncMessageDTO.setExecFlag(true); | |
asyncMessageDTO.setExecStatus(EXECUTING.getStatus()); | |
AsyncMessageDTO updResMessage = asyncMessageService.save(asyncMessageDTO); | |
if (Objects.isNull(updResMessage)) { | |
throw new RuntimeException("更新异步任务状态异常!"); | |
} | |
// 执行实际的任务内容 | |
boolean execFlag = taskContent(); | |
if (execFlag) { | |
// 执行成功 | |
asyncMessageDTO.setExecStatus(EXECUTION_SUCCESSFUL.getStatus()); | |
asyncMessageDTO.setContentInfo(String.format("执行: %s 成功!", asyncMessageDTO.getTaskName())); | |
} else { | |
// 执行失败 | |
asyncMessageDTO.setExecStatus(EXECUTION_FAILED.getStatus()); | |
asyncMessageDTO.setContentInfo(String.format("执行: %s 失败!", asyncMessageDTO.getTaskName())); | |
// 设置执行失败的错误信息 | |
asyncMessageDTO.setErrorInfo(this.getErrorInfo()); | |
} | |
} catch (Exception e){ | |
// 将异常信息保存到 String 中 | |
StringWriter sw = new StringWriter(); | |
PrintWriter pw = new PrintWriter(sw); | |
e.printStackTrace(pw); | |
String exceptionAsString = sw.toString(); | |
System.out.println(exceptionAsString); | |
// 执行失败 | |
asyncMessageDTO.setExecStatus(EXECUTION_ERROR.getStatus()); | |
asyncMessageDTO.setContentInfo(String.format("执行: %s 失败!", asyncMessageDTO.getTaskName())); | |
asyncMessageDTO.setErrorInfo(exceptionAsString); | |
} finally { | |
//4、释放当前执行地区持有的锁 | |
BASIC_TASK_LOCK.get(asyncTaskExecutorName).remove(this.syncKeyword); | |
} | |
} | |
long endTime = System.currentTimeMillis(); | |
asyncMessageDTO.setExecTime(endTime - startTime); | |
asyncMessageDTO.setUpdatedBy(asyncMessageDTO.getUsername()); | |
asyncMessageDTO.setUpdatedTime(DateUtil.getCurrDate()); | |
asyncMessageService.save(asyncMessageDTO); | |
} catch (Exception e) { | |
// 将异常信息保存到 String 中 | |
StringWriter sw = new StringWriter(); | |
PrintWriter pw = new PrintWriter(sw); | |
e.printStackTrace(pw); | |
String exceptionAsString = sw.toString(); | |
// 执行错误 | |
asyncMessageDTO.setExecStatus(EXECUTION_ERROR.getStatus()); | |
asyncMessageDTO.setContentInfo(e.getMessage()); | |
asyncMessageDTO.setErrorInfo(exceptionAsString); | |
long endTime = System.currentTimeMillis(); | |
asyncMessageDTO.setExecTime(endTime - startTime); | |
asyncMessageDTO.setUpdatedBy(asyncMessageDTO.getUsername()); | |
asyncMessageDTO.setUpdatedTime(DateUtil.getCurrDate()); | |
asyncMessageService.save(asyncMessageDTO); | |
} finally { | |
this.isRunning = false; | |
} | |
} | |
/** | |
* 实际异步执行的任务体抽象方法、由具体的异步任务实现 | |
* @return 执行结果 | |
* @throws Exception 可能出现的业务异常 | |
*/ | |
public abstract boolean taskContent() throws Exception; | |
/** | |
* 推进进度 | |
*/ | |
public void advanceProgress() { | |
completionDegree = completionDegree + progressStepSize; | |
if (completionDegree > PERCENTAGE) { | |
completionDegree = PERCENTAGE; | |
} | |
log.info("当前任务keyword:[{}], 任务类型: [{}], 任务名称: [{}], 任务进度: [{}%], 是否正在运行: [{}]" | |
, getKeyword(), getTaskType(), getTaskName(), getCompletionDegree(), getIsRunning() ? "是" : "否"); | |
} | |
/** | |
* 对执行器进行必要的校验 | |
* @return 校验结果 | |
*/ | |
public boolean checkAsyncTaskExecutor() { | |
if (StringUtil.isBlank(this.asyncTaskExecutorName) | |
|| StringUtil.isBlank(this.syncKeyword)) { | |
return false; | |
} | |
return true; | |
} | |
} |
异步任务执行情况的枚举类
import lombok.AllArgsConstructor; | |
import lombok.Getter; | |
import java.util.Arrays; | |
import java.util.Collections; | |
import java.util.List; | |
/** | |
* @author Tz | |
* @date 2024/01/22 15:03 | |
* @Description 异步任务执行情况信息的枚举 | |
*/ | |
@Getter | |
@AllArgsConstructor | |
public enum AsyncTaskExecInfo { | |
/** | |
* 待执行 | |
*/ | |
TODO_EXEC(0, "待执行"), | |
/** | |
* 正在执行 | |
*/ | |
EXECUTING(1, "正在执行"), | |
/** | |
* 执行成功 | |
*/ | |
EXECUTION_SUCCESSFUL(2, "执行成功"), | |
/** | |
* 执行失败 (程序运行后,结果错误) | |
*/ | |
EXECUTION_FAILED(-1, "执行失败"), | |
/** | |
* 执行取消 | |
*/ | |
EXECUTION_CANCEL(-2, "执行取消"), | |
/** | |
* 执行错误 (程序异常导致的错误) | |
*/ | |
EXECUTION_ERROR(-3, "执行错误"); | |
private final Integer status; | |
private final String info; | |
/** | |
* 根据类别获取状态列表 | |
* @param category 类别 0:全部,1:处理中,2:成功,3:失败 | |
* @return 状态列表 | |
*/ | |
public static List<Integer> getStatusFromCategory(Integer category) { | |
switch (category) { | |
case 1: | |
return Arrays.asList(TODO_EXEC.status, EXECUTING.status); | |
case 2: | |
return Collections.singletonList(EXECUTION_SUCCESSFUL.status); | |
case 3: | |
return Arrays.asList(EXECUTION_FAILED.status, EXECUTION_CANCEL.status, EXECUTION_ERROR.status); | |
default: | |
return Arrays.asList(TODO_EXEC.status | |
, EXECUTING.status | |
, EXECUTION_SUCCESSFUL.status | |
, EXECUTION_FAILED.status | |
, EXECUTION_CANCEL.status | |
, EXECUTION_ERROR.status); | |
} | |
} | |
/** | |
* 根据执行状态获取对应的状态分类 | |
* @param status 执行状态 | |
* @return 状态分类 | |
*/ | |
public static Integer getCategoryFromStatus(Integer status) { | |
if (TODO_EXEC.status.equals(status) || EXECUTING.status.equals(status)){ | |
return 1; | |
} else if (EXECUTION_SUCCESSFUL.status.equals(status)) { | |
return 2; | |
} else if (EXECUTION_FAILED.status.equals(status) || EXECUTION_CANCEL.status.equals(status) || EXECUTION_ERROR.status.equals(status)) { | |
return 3; | |
} else { | |
return 0; | |
} | |
} | |
} |
保存异步任务执行情况信息的 service
import java.util.List; | |
/** | |
* @author Tz | |
* @date 2024/01/22 15:03 | |
* @Description 异步任务消息接口 额外提供可以提交异步执行任务的功能 | |
*/ | |
public interface AsyncMessageService { | |
/** | |
* 保存异步任务消息 | |
* @param asyncMessageDTO 异步任务消息 DTO | |
* @return 异步任务消息 DTO | |
*/ | |
AsyncMessageDTO save(AsyncMessageDTO asyncMessageDTO); | |
/** | |
* 根据 id 查询异步任务消息 | |
* @param id 主键 | |
* @return 异步任务消息 DTO | |
*/ | |
AsyncMessageDTO getById(Integer id); | |
/** | |
* 通用的提交执行异步任务 | |
* <p> | |
* 会提交实现 {@link AsyncTaskExecutor#baseExec ()} 方法中的任务内容, | |
* 实际执行的是子类的 {@link AsyncTaskExecutor#taskContent ()} | |
* @param asyncTaskExecutor 需要执行的异步任务内容 | |
* @return 执行生成的结果 | |
*/ | |
AsyncTaskResultVO execAsyncTask(AsyncTaskExecutor asyncTaskExecutor); | |
/** | |
* 根据主键标识获取异步任务的进度 | |
* <p> | |
* 当项目重启 如果获取历史执行任务的进度会报错 直接返回 100 | |
* <p> | |
* 添加一个可以自动清理已经完成任务的功能 避免一直占用内存 {@link AsyncMessageServiceImpl#cleanAsyncTaskTimer ()} | |
* @param keyword 主键标识 | |
* @return 进度百分比 | |
*/ | |
Integer getCompletionDegree(String keyword); | |
/** | |
* 根据 id 撤回发送的异步任务 | |
* @param id 主键 | |
* @return 返回撤回的数量 | |
*/ | |
Integer revokeAsyncTask(Integer id); | |
/** | |
* 根据条件获取异步任务消息列表 | |
* @param asyncTaskMassageQuery 查询条件 | |
* @return 异步消息列表 | |
*/ | |
PageVO<AsyncMessageVO> pageAsyncMessage(AsyncTaskMassageQuery asyncTaskMassageQuery); | |
/** | |
* 根据 keyword 集合查询异步消息列表 | |
* @param keywords keyword 集合 | |
* @return 异步消息列表 | |
*/ | |
List<AsyncMessageVO> listAsyncMessageByKeywords(List<String> keywords); | |
/** | |
* 一键已读 | |
* | |
* @param statusCategory 需要已读的状态类别 | |
* @param username 需要已读的用户 | |
* @return 已读记录数 | |
*/ | |
Integer oneClickRead(Integer statusCategory, String username); | |
/** | |
* 阅读一条未读消息 | |
* @param id 主键 | |
* @return 读取结果 | |
*/ | |
Integer read(Integer id); | |
/** | |
* 获取异步任务执行的情况结果 | |
* @param username 执行任务的用户名 | |
* @return 异步任务的执行情况结果的 VO | |
*/ | |
AsyncTaskExecInfoResultVO getAsyncTaskExecInfo(String username); | |
/** | |
* 确认读取执行的结果 | |
* @param username 执行任务的用户名 | |
*/ | |
void confirmReadingExecResult(String username); | |
} |
保存异步任务执行情况信息的 service 的实现
import lombok.RequiredArgsConstructor; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.scheduling.annotation.Scheduled; | |
import org.springframework.stereotype.Service; | |
import java.util.*; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ConcurrentMap; | |
/** | |
* @author Tz | |
* @date 2024/01/22 15:03 | |
* @Description 异步任务消息接口 实现 | |
*/ | |
@Slf4j | |
@Service | |
@RequiredArgsConstructor | |
public class AsyncMessageServiceImpl implements AsyncMessageService { | |
private final AsyncMessageManager asyncMessageManager; | |
private final AsyncMessageConverter asyncMessageConverter; | |
/** | |
* 保存执行任务对象,可以获取进度等信息 | |
*/ | |
public static ConcurrentMap<String, AsyncTaskExecutor> CONCURRENT_MAP = new ConcurrentHashMap<>(); | |
/** | |
* 异步任务执行的扩展线程池 (这里用到上面的动态线程池) | |
*/ | |
private final BizThreadPoolExt bizPool; | |
/** | |
* 定时任务执行的扩展线程池 | |
*/ | |
private final BizThreadPoolExt jobPool; | |
@Override | |
public AsyncMessageDTO save(AsyncMessageDTO asyncMessageDTO) { | |
return asyncMessageManager.save(asyncMessageDTO); | |
} | |
/** | |
* 根据 id 查询异步任务消息 | |
* | |
* @param id 主键 | |
* @return 异步任务消息 DTO | |
*/ | |
@Override | |
public AsyncMessageDTO getById(Integer id) { | |
return asyncMessageManager.getById(id); | |
} | |
@Override | |
public AsyncTaskResultVO execAsyncTask(AsyncTaskExecutor asyncTaskExecutor) { | |
AsyncTaskResultVO asyncTaskResultVO = new AsyncTaskResultVO(); | |
boolean checkAsyncTaskExecutor = asyncTaskExecutor.checkAsyncTaskExecutor(); | |
if (!checkAsyncTaskExecutor) { | |
// 提交任务失败 | |
asyncTaskResultVO.setIsSubmitTask(false); | |
asyncTaskResultVO.setNoSubmitIllustrate("执行器不通过校验,可能的原因: 未设置同步锁或未识别的执行器"); | |
return asyncTaskResultVO; | |
} | |
asyncTaskResultVO.setKeyword(asyncTaskExecutor.getKeyword()); | |
// 保存执行记录 | |
AsyncMessageDTO save = save(asyncTaskExecutor); | |
if (save != null) { | |
asyncTaskExecutor.setId(save.getId()); | |
CompletableFuture.runAsync(() -> { | |
// 设置当前执行任务的线程名称,方便追踪 | |
Thread.currentThread().setName(asyncTaskExecutor.getTaskName() + "_" + asyncTaskExecutor.getKeyword()); | |
asyncTaskExecutor.baseExec(); | |
}, bizPool) | |
.exceptionally(throwable -> { | |
throwable.printStackTrace(); | |
return null; | |
}); | |
// 保存执行的任务 | |
CONCURRENT_MAP.put(asyncTaskExecutor.getKeyword(), asyncTaskExecutor); | |
// 提交任务成功 | |
asyncTaskResultVO.setIsSubmitTask(true); | |
} else { | |
// 提交任务失败 | |
asyncTaskResultVO.setIsSubmitTask(false); | |
} | |
return asyncTaskResultVO; | |
} | |
/** | |
* 根据主键标识获取异步任务的进度 | |
* <p> | |
* 当项目重启 如果获取历史执行任务的进度会报错 直接返回 100 | |
* <p> | |
* 添加一个可以自动清理已经完成任务的功能 避免一直占用内存 {@link AsyncMessageServiceImpl#cleanAsyncTaskTimer ()} | |
* @param keyword 主键标识 | |
* @return 进度百分比 | |
*/ | |
@Override | |
public Integer getCompletionDegree(String keyword) { | |
// 如果不存在 可能是任务完成后已经被清除了 | |
if (!CONCURRENT_MAP.containsKey(keyword)) { | |
// 直接返回 100 | |
return 100; | |
} | |
return CONCURRENT_MAP.get(keyword).getCompletionDegree(); | |
} | |
/** | |
* 根据 id 撤回发送的异步任务 | |
* | |
* @param id 主键 | |
* @return 返回撤回的数量 | |
*/ | |
@Override | |
public Integer revokeAsyncTask(Integer id) { | |
Integer resultSize = asyncMessageManager.revokeAsyncTask(id); | |
if (resultSize <= 0) { | |
throw new RuntimeException("没有可撤回的任务或当前任务正在执行中!"); | |
} | |
return resultSize; | |
} | |
/** | |
* 根据条件获取异步任务消息列表 | |
* | |
* @param asyncTaskMassageQuery 查询条件 | |
* @return 异步消息列表 | |
*/ | |
@Override | |
public PageVO<AsyncMessageVO> pageAsyncMessage(AsyncTaskMassageQuery asyncTaskMassageQuery) { | |
PageDTO<AsyncMessageDTO> pageDTO = asyncMessageManager.pageAsyncMessage(asyncTaskMassageQuery, PageUtil.toPageable(asyncTaskMassageQuery)); | |
PageVO<AsyncMessageVO> pageVO = new PageVO<>(); | |
pageVO.setData(asyncMessageConverter.dtoConvertVO(pageDTO.getData())); | |
pageVO.setTotal(pageDTO.getTotal()); | |
pageVO.setPageCount(pageDTO.getPageCount()); | |
return pageVO; | |
} | |
/** | |
* 根据 keyword 集合查询异步消息列表 | |
* | |
* @param keywords keyword 集合 | |
* @return 异步消息列表 | |
*/ | |
@Override | |
public List<AsyncMessageVO> listAsyncMessageByKeywords(List<String> keywords) { | |
List<AsyncMessageDTO> asyncMessageDTOS = asyncMessageManager.listAsyncMessageByKeywords(keywords); | |
return asyncMessageConverter.dtoConvertVO(asyncMessageDTOS); | |
} | |
/** | |
* 一键已读 | |
* | |
* @param statusCategory 需要已读的状态类别 | |
* @param username 需要已读的用户 | |
* @return 已读记录数 | |
*/ | |
@Override | |
public Integer oneClickRead(Integer statusCategory, String username) { | |
// 状态类别需要转实际的状态 | |
List<Integer> execStatusList = AsyncTaskExecInfo.getStatusFromCategory(statusCategory); | |
return asyncMessageManager.oneClickRead(execStatusList, username); | |
} | |
/** | |
* 阅读一条未读消息 | |
* | |
* @param id 主键 | |
* @return 读取结果 | |
*/ | |
@Override | |
public Integer read(Integer id) { | |
return asyncMessageManager.read(id); | |
} | |
/** | |
* 获取异步任务执行的情况结果 | |
* @param username 执行任务的用户名 | |
* @return 异步任务的执行情况结果的 VO | |
*/ | |
@Override | |
public AsyncTaskExecInfoResultVO getAsyncTaskExecInfo(String username) { | |
//TODO 集群情况下需要考虑换种方式 | |
AsyncTaskExecInfoResultVO asyncTaskExecInfoResultVO = new AsyncTaskExecInfoResultVO(); | |
asyncTaskExecInfoResultVO.setIsExistsCompleteTask(false); | |
asyncTaskExecInfoResultVO.setCompleteTaskMessages(new ArrayList<>()); | |
Map<String, Integer> taskCount = new HashMap<>(); | |
// 循环遍历所有提交的任务 | |
Collection<AsyncTaskExecutor> asyncTaskExecutors = CONCURRENT_MAP.values(); | |
for (AsyncTaskExecutor asyncTaskExecutor : asyncTaskExecutors) { | |
if (asyncTaskExecutor.getUsername().equalsIgnoreCase(username) | |
&& !asyncTaskExecutor.getIsRunning()) { | |
asyncTaskExecInfoResultVO.setIsExistsCompleteTask(true); | |
taskCount.put(asyncTaskExecutor.getTaskType(), taskCount.getOrDefault(asyncTaskExecutor.getTaskType(), 0) + 1); | |
} | |
} | |
// 纪录执行任务的情况 | |
if (!CollectionUtil.isEmpty(taskCount) && asyncTaskExecInfoResultVO.getIsExistsCompleteTask()) { | |
for (String key : taskCount.keySet()) { | |
asyncTaskExecInfoResultVO.getCompleteTaskMessages().add(key + ",有完成的任务!"); | |
} | |
} | |
return asyncTaskExecInfoResultVO; | |
} | |
/** | |
* 确认读取执行的结果 | |
* | |
* @param username 执行任务的用户名 | |
*/ | |
@Override | |
public void confirmReadingExecResult(String username) { | |
//TODO 集群情况下需要考虑换种方式 | |
// 循环遍历所有提交的任务 | |
Collection<AsyncTaskExecutor> asyncTaskExecutors = CONCURRENT_MAP.values(); | |
for (AsyncTaskExecutor asyncTaskExecutor : asyncTaskExecutors) { | |
if (asyncTaskExecutor.getUsername().equalsIgnoreCase(username) | |
&& !asyncTaskExecutor.getIsRunning()) { | |
CONCURRENT_MAP.remove(asyncTaskExecutor.getKeyword()); | |
} | |
} | |
} | |
/** | |
* 清理异步消息的定时器 | |
* <p> | |
* initialDelay = 300000: 延迟执行时间 单位毫秒 这里是 5 分钟 | |
* <p> | |
* fixedRate = 1800000: 执行间隔时间 单位毫秒 这里是 30 分钟 | |
* <p> | |
* Scheduled (initialDelay = 3000, fixedRate = 3000): 测试时用的 | |
*/ | |
@Scheduled(initialDelay = 300000, fixedRate = 1800000) | |
public void cleanAsyncTaskTimer() { | |
List<AsyncMessageVO> asyncMessageVOS = listAsyncMessageByKeywords(new ArrayList<>(CONCURRENT_MAP.keySet())); | |
if (!CollectionUtil.isEmpty(asyncMessageVOS)) { | |
for (AsyncMessageVO asyncMessageVO : asyncMessageVOS) { | |
// 清除不是待处理和正在运行的任务 | |
if (!asyncMessageVO.getExecStatus().equals(TODO_EXEC.getStatus()) | |
&& !asyncMessageVO.getExecStatus().equals(EXECUTING.getStatus())) { | |
log.info("清除keyword:[{}], 异步任务的缓存", asyncMessageVO.getKeyword()); | |
CompletableFuture.runAsync(() -> { | |
Thread.currentThread().setName("clean_task_" + asyncMessageVO.getTaskName() + "_" + asyncMessageVO.getKeyword()); | |
CONCURRENT_MAP.remove(asyncMessageVO.getKeyword()); | |
}, jobPool); | |
} | |
} | |
} | |
} | |
} |
还需要提交任务的结果对象的抽象类,子类可以实现自己提交任务成功或者失败时的返回信息
import lombok.Data; | |
/** | |
* @author Tz | |
* @date 2024/01/22 15:03 | |
* @Description 通用异步提交任务的返回结果类 | |
*/ | |
@Data | |
public class AsyncTaskResultVO extends CommonVO { | |
/** | |
* 异步执行任务的消息 key 值 | |
*/ | |
private String keyword; | |
/** | |
* 任务是否已经提交 | |
*/ | |
private Boolean isSubmitTask; | |
/** | |
* 未提交说明 | |
*/ | |
private String noSubmitIllustrate; | |
} |
由于项目用的 JPA 分多了一层 manager
import org.springframework.data.domain.Pageable; | |
import java.util.List; | |
/** | |
* @author Tz | |
* @date 2024/01/22 15:03 | |
* @Description 异步消息的 Manager | |
*/ | |
public interface AsyncMessageManager { | |
/** | |
* 保存异步任务消息 | |
* | |
* @param asyncMessageDTO 异步任务消息的 DTO | |
* @return 异步任务消息的 DTO | |
*/ | |
AsyncMessageDTO save(AsyncMessageDTO asyncMessageDTO); | |
/** | |
* 根据 id 查询异步任务消息 | |
* | |
* @param id 主键 | |
* @return 异步任务消息 DTO | |
*/ | |
AsyncMessageDTO getById(Integer id); | |
/** | |
* 根据 id 撤回当前任务 | |
* | |
* @param id 主键 id | |
* @return 返回结果数 | |
*/ | |
Integer revokeAsyncTask(Integer id); | |
/** | |
* 根据条件获取异步任务消息列表 | |
* | |
* @param asyncTaskMassageQuery 查询条件 | |
* @param pageable 分页条件 | |
* @return 异步消息列表 | |
*/ | |
PageDTO<AsyncMessageDTO> pageAsyncMessage(AsyncTaskMassageQuery asyncTaskMassageQuery, Pageable pageable); | |
/** | |
* 根据 keyword 集合查询异步消息列表 | |
* | |
* @param keywords keyword 集合 | |
* @return 异步消息列表 | |
*/ | |
List<AsyncMessageDTO> listAsyncMessageByKeywords(List<String> keywords); | |
/** | |
* 一键已读 | |
* | |
* @param execStatus 需要已读的状态列表 | |
* @param username 需要已读的用户 | |
* @return 已读记录数 | |
*/ | |
Integer oneClickRead(List<Integer> execStatus, String username); | |
/** | |
* 阅读一条未读消息 | |
* | |
* @param id 主键 | |
* @return 读取结果 | |
*/ | |
Integer read(Integer id); | |
} |
manager 实现
import lombok.RequiredArgsConstructor; | |
import org.springframework.data.domain.Page; | |
import org.springframework.data.domain.Pageable; | |
import org.springframework.stereotype.Component; | |
import javax.persistence.criteria.Predicate; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Optional; | |
/** | |
* @author Tz | |
* @date 2024/01/22 15:03 | |
* @Description 异步消息的 Manager 实现 | |
*/ | |
@Component | |
@RequiredArgsConstructor | |
public class AsyncMessageManagerImpl implements AsyncMessageManager { | |
private final AsyncMessageRepository asyncMessageRepository; | |
private final AsyncMessageConverter asyncMessageConverter; | |
/** | |
* 保存异步任务消息 | |
* | |
* @param asyncMessageDTO 异步任务消息的 DTO | |
* @return 异步任务消息的 DTO | |
*/ | |
@Override | |
public AsyncMessageDTO save(AsyncMessageDTO asyncMessageDTO) { | |
AsyncMessage asyncMessage = asyncMessageConverter.dtoConvertPO(asyncMessageDTO); | |
AsyncMessage save = asyncMessageRepository.save(asyncMessage); | |
return asyncMessageConverter.poConvertDTO(save); | |
} | |
/** | |
* 根据 id 查询异步任务消息 | |
* | |
* @param id 主键 | |
* @return 异步任务消息 DTO | |
*/ | |
@Override | |
public AsyncMessageDTO getById(Integer id) { | |
Optional<AsyncMessage> asyncMessageOptional = asyncMessageRepository.findById(id); | |
if (!asyncMessageOptional.isPresent()) { | |
throw new RuntimeException("无提交信息!"); | |
} | |
return asyncMessageConverter.poConvertDTO(asyncMessageOptional.get()); | |
} | |
/** | |
* 根据 id 撤回当前任务 | |
* | |
* @param id 主键 id | |
* @return 返回结果数 | |
*/ | |
@Override | |
public Integer revokeAsyncTask(Integer id) { | |
return asyncMessageRepository.revokeAsyncTask(id); | |
} | |
/** | |
* 根据条件获取异步任务消息列表 | |
* | |
* @param asyncTaskMassageQuery 查询条件 | |
* @param pageable 分页条件 | |
* @return 异步消息列表 | |
*/ | |
@Override | |
public PageDTO<AsyncMessageDTO> pageAsyncMessage(AsyncTaskMassageQuery asyncTaskMassageQuery, Pageable pageable) { | |
Page<AsyncMessage> page = asyncMessageRepository.findAll((root, criteriaQuery, criteriaBuilder) -> { | |
List<Predicate> ps = new ArrayList<>(); | |
if (StringUtil.isNotBlank(asyncTaskMassageQuery.getUsername())) { | |
ps.add(criteriaBuilder.equal(root.get("username"), asyncTaskMassageQuery.getUsername())); | |
} | |
if (StringUtil.isNotBlank(asyncTaskMassageQuery.getTaskType())) { | |
ps.add(criteriaBuilder.equal(root.get("taskType"), asyncTaskMassageQuery.getTaskType())); | |
} | |
// 根据状态类别条件查询执行状态 | |
if (asyncTaskMassageQuery.getStatusCategory() != null) { | |
ps.add(criteriaBuilder.in(root.get("execStatus")).value(AsyncTaskExecInfo.getStatusFromCategory(asyncTaskMassageQuery.getStatusCategory()))); | |
} | |
if (asyncTaskMassageQuery.getReadFlag() != null) { | |
ps.add(criteriaBuilder.equal(root.get("readFlag"), asyncTaskMassageQuery.getReadFlag())); | |
} | |
criteriaQuery.where(ps.toArray(new Predicate[0])); | |
criteriaQuery.orderBy(criteriaBuilder.desc(root.get("updatedTime"))); | |
return criteriaQuery.getRestriction(); | |
}, pageable); | |
List<AsyncMessageDTO> list = asyncMessageConverter.poConvertDTO(page.getContent()); | |
return PageDTO.of(page.getTotalElements(), page.getTotalPages(), list); | |
} | |
/** | |
* 根据 keyword 集合查询异步消息列表 | |
* | |
* @param keywords keyword 集合 | |
* @return 异步消息列表 | |
*/ | |
@Override | |
public List<AsyncMessageDTO> listAsyncMessageByKeywords(List<String> keywords) { | |
List<AsyncMessage> asyncMessages = asyncMessageRepository.listAsyncMessageByKeywords(keywords); | |
return asyncMessageConverter.poConvertDTO(asyncMessages); | |
} | |
/** | |
* 一键已读 | |
* | |
* @param execStatus 需要已读的状态列表 | |
* @param username 需要已读的用户 | |
* @return 已读记录数 | |
*/ | |
@Override | |
public Integer oneClickRead(List<Integer> execStatus, String username) { | |
return asyncMessageRepository.oneClickRead(execStatus, username); | |
} | |
/** | |
* 阅读一条未读消息 | |
* | |
* @param id 主键 | |
* @return 读取结果 | |
*/ | |
@Override | |
public Integer read(Integer id) { | |
return asyncMessageRepository.read(id); | |
} | |
} |
# 使用:
比如我需要执行上传采集数据的异步任务, 那么可以继承 AsyncTaskExecutor, 实现上传的逻辑:
import lombok.Data; | |
import lombok.EqualsAndHashCode; | |
import lombok.Getter; | |
import lombok.Setter; | |
import lombok.extern.slf4j.Slf4j; | |
import org.apache.commons.lang3.StringUtils; | |
import org.springframework.http.HttpEntity; | |
import org.springframework.http.HttpHeaders; | |
import org.springframework.http.MediaType; | |
import org.springframework.jdbc.core.JdbcTemplate; | |
import org.springframework.jdbc.core.RowMapper; | |
import org.springframework.transaction.PlatformTransactionManager; | |
import org.springframework.transaction.TransactionStatus; | |
import org.springframework.transaction.support.DefaultTransactionDefinition; | |
import org.springframework.web.client.ResourceAccessException; | |
import org.springframework.web.client.RestTemplate; | |
import java.sql.ResultSet; | |
import java.sql.SQLException; | |
import java.util.*; | |
import java.util.regex.Matcher; | |
import java.util.regex.Pattern; | |
import java.util.stream.Collectors; | |
/** | |
* @author Tz | |
* @date 2024/01/22 15:03 | |
* @Description 上传采集数据的异步任务执行器实现 | |
*/ | |
@Slf4j | |
@Data | |
@EqualsAndHashCode(callSuper = true) | |
public class AsyncCollectPlatformExecutor extends AsyncTaskExecutor { | |
private static final String ASYNC_TASK_EXECUTOR_NAME = "上传平台任务处理器"; | |
private String collectDataUrl; | |
/** | |
* 当前地区标识 | |
*/ | |
private String region; | |
private JdbcTemplate jdbcTemplatePrimary; | |
private JdbcTemplate jdbcTemplateSecond; | |
private RestTemplate restTemplate; | |
private PlatformTransactionManager transactionManagerPrimary; | |
private PlatformTransactionManager transactionManagerSecond; | |
private List<FileListDFLZ> fileListDFLZList; | |
private ZtDTO ztDTO; | |
private PublicInfoMasterTableManager publicInfoMasterTableManager; | |
private List<OpenReportRecordVO> auditedData; | |
/** | |
* 初始化设置执行器名称 | |
*/ | |
public AsyncCollectPlatformExecutor() { | |
super(ASYNC_TASK_EXECUTOR_NAME); | |
} | |
/** | |
* 实际异步执行的任务体抽象方法、由具体的异步任务实现 | |
* @return 执行结果 | |
* @throws Exception 可能出现的业务异常 | |
*/ | |
@Override | |
public boolean taskContent() { | |
return handleUpload(); | |
} | |
/** | |
* 具体的逻辑 | |
* @return 返回执行的结果 | |
*/ | |
public boolean handleUpload() { | |
// 执行上传平台逻辑 | |
// 还需要区分上传的是党务、事务、还是财务、或者其他类型的大类报表 | |
如果是多数据远 | |
// 实际执行的业务需要手动开启事务、手动提交回滚 | |
DefaultTransactionDefinition def = new DefaultTransactionDefinition(); | |
TransactionStatus primaryStatus = transactionManagerPrimary.getTransaction(def); | |
TransactionStatus secondStatus = transactionManagerSecond.getTransaction(def); | |
try { | |
// 决定进度步长由它决定 | |
int execCount = dataList.size(); | |
setProgressStepSize(PERCENTAGE / execCount); | |
// 推进进度 | |
advanceProgress(); | |
// 提交事务 | |
// 先进后出提交 否则会出现:Transaction synchronization is not active 的错误 | |
transactionManagerSecond.commit(secondStatus); | |
transactionManagerPrimary.commit(primaryStatus); | |
// 根据自己的业务情况进行返回 | |
return true; | |
} catch (Exception e) { | |
e.printStackTrace(); | |
// 回滚事务 | |
// 先进后出提交 否则会出现:Transaction synchronization is not active 的错误 | |
transactionManagerSecond.rollback(secondStatus); | |
transactionManagerPrimary.rollback(primaryStatus); | |
throw new RuntimeException(e); | |
} | |
} | |
} |
实现自己的异步任务提交失败或成功的返回信息
import lombok.Data; | |
import java.util.List; | |
/** | |
* @author Tz | |
* @date 2024/01/22 15:03 | |
* @Description 上传平台异步执行任务返回结果 | |
*/ | |
@Data | |
public class CollectPlatformAsyncTaskVO extends AsyncTaskResultVO { | |
/** | |
* 执行的账套号 | |
*/ | |
private String ztId; | |
/** | |
* 执行的账套名称 | |
*/ | |
private String ztName; | |
/** | |
* 执行的状态 进行中、拒绝 | |
*/ | |
private String status; | |
/** | |
* 拒绝原因 | |
*/ | |
private String remark; | |
/** | |
* 错误的表 | |
*/ | |
private List<String> errorTables; | |
} |
完成了执行器的实现,开始提交该异步任务
// 申请一个 keyword (uuid) | |
String keyword = getKeyword(username); | |
//1.2. 校验通过生成一个记录信息,可以让用户实时感知到上传的情况 | |
//1.3. 异步执行任务、监控该任务执行的状态和进度、 还要能够查看实际的采集情况 | |
AsyncCollectPlatformExecutor asyncTaskDTO = new AsyncCollectPlatformExecutor(); | |
asyncTaskDTO.setKeyword(keyword); | |
asyncTaskDTO.setTaskType("上传平台"); | |
asyncTaskDTO.setUsername(username); | |
asyncTaskDTO.setYears(years); | |
asyncTaskDTO.setMonths(months); | |
asyncTaskDTO.setDistNo(ztByZtId.getDistNo()); | |
asyncTaskDTO.setDistName(ztByZtId.getDistName()); | |
asyncTaskDTO.setZtId(ztByZtId.getZtId()); | |
asyncTaskDTO.setZtName(ztByZtId.getZtName()); | |
asyncTaskDTO.setTaskName("上传平台"); | |
// 设置任务的事务管理器 | |
asyncTaskDTO.setTransactionManagerPrimary(transactionManagerPrimary); | |
asyncTaskDTO.setTransactionManagerSecond(transactionManagerSecond); | |
// 设置数据库操作工具 | |
asyncTaskDTO.setJdbcTemplatePrimary(jdbcTemplatePrimary); | |
asyncTaskDTO.setJdbcTemplateSecond(jdbcTemplateSecond); | |
// 这里传入实际业务需要用到的 | |
asyncTaskDTO.setPublicInfoMasterTableManager(publicInfoMasterTableManager); | |
/** | |
* 同步锁的标识值 这里任务根据指定规则来锁(保证唯一排斥即可) | |
*/ | |
asyncTaskDTO.setSyncKeyword(ztByZtId.getZtId() + "_" + years + "_" + months); | |
// 开始异步执行任务 | |
collectPlatformAsyncTaskVO.setKeyword(keyword); | |
AsyncTaskResultVO asyncTaskResultVO = asyncMessageService.execAsyncTask(asyncTaskDTO); | |
// 如果任务成功提交 | |
if (asyncTaskResultVO != null && asyncTaskResultVO.getIsSubmitTask()) { | |
collectPlatformAsyncTaskVO.setStatus(IN_PROGRESS.getMessage()); | |
collectPlatformAsyncTaskVO.setIsSubmitTask(asyncTaskResultVO.getIsSubmitTask()); | |
collectPlatformAsyncTaskVO.setRemark("任务执行中。。。"); | |
} else { | |
collectPlatformAsyncTaskVO.setStatus(ABANDON_EXECUTION.getMessage()); | |
collectPlatformAsyncTaskVO.setRemark("执行失败"); | |
collectPlatformAsyncTaskVO.setNoSubmitIllustrate(asyncTaskResultVO != null ? asyncTaskResultVO.getNoSubmitIllustrate() : null); | |
} | |
collectPlatformAsyncTaskVOS.add(collectPlatformAsyncTaskVO); |
请求提交响应的结果
{ | |
"code": 200, | |
"message": "请求成功", | |
"data": [ | |
{ | |
"keyword": "超级用户_547af385-e6d1-4e5c-bb11-9ea6b4c84776", | |
"isSubmitTask": true, | |
"noSubmitIllustrate": null, | |
"ztId": "010651001000", | |
"ztName": "111广州市天河区员村街石东股份合作经济联社", | |
"status": "执行中", | |
"remark": "任务执行中。。。", | |
"errorTables": null | |
}, | |
{ | |
"keyword": null, | |
"isSubmitTask": false, | |
"noSubmitIllustrate": null, | |
"ztId": "010651001001", | |
"ztName": "广州市天河区员村街石东第一股份合作经济社", | |
"status": "放弃执行", | |
"remark": "该账套没有需要上传到廉政平台的报表", | |
"errorTables": null | |
} | |
] | |
} |
执行任务打印的日志
2024-04-26 12:59:57.613 INFO 29332 --- [11-9ea6b4c84776] c.b.g.o.task.timely.AsyncTaskExecutor : 当前任务keyword:[超级用户_547af385-e6d1-4e5c-bb11-9ea6b4c84776], 任务类型: [上传平台], 任务名称: [上传平台], 任务进度: [4%], 是否正在运行: [是] | |
2024-04-26 12:59:58.052 DEBUG 29332 --- [11-9ea6b4c84776] c.b.g.o.t.t.AsyncCollectPlatformExecutor : 任务类型: [上传平台], 任务步骤: , 结果: [无] | |
2024-04-26 12:59:58.052 INFO 29332 --- [11-9ea6b4c84776] c.b.g.o.task.timely.AsyncTaskExecutor : 当前任务keyword:[超级用户_547af385-e6d1-4e5c-bb11-9ea6b4c84776], 任务类型: [上传平台], 任务名称: [上传平台], 任务进度: [8%], 是否正在运行: [是] | |
2024-04-26 12:59:58.052 DEBUG 29332 --- [11-9ea6b4c84776] c.b.g.o.t.t.AsyncCollectPlatformExecutor : 任务类型: [上传平台], 任务步骤: , 结果: [无] | |
2024-04-26 12:59:57.613 INFO 29332 --- [11-9ea6b4c84776] c.b.g.o.task.timely.AsyncTaskExecutor : 当前任务keyword:[超级用户_547af385-e6d1-4e5c-bb11-9ea6b4c84776], 任务类型: [上传平台], 任务名称: [上传平台], 任务进度: [16%], 是否正在运行: [是] | |
从上面可以看出执行的过程的信息 | |
2024-04-26 12:59:58.273 INFO 29332 --- [11-9ea6b4c84776] c.b.ghj.common.config.BizThreadPoolExt : 业务线程-pool-monitor: 任务名称: 上传平台_超级用户_547af385-e6d1-4e5c-bb11-9ea6b4c84776, 任务耗时: 837 ms, 占用cpu时间: 31 ms, 占总cpu: 0.1 %, 初始线程数: 1, 核心线程数: 30, 执行的任务数量: 1, 已完成任务数量: 0, 任务总数: 1, 队列里缓存的任务数量: 0, 池中存在的最大线程数: 1, 最大允许的线程数: 64, 线程空闲时间: 60000, 线程池是否关闭: false, 线程池是否终止: false | |
可以看到任务在线程池中执行的情况 |
关闭线程池的打印日志
2024-04-26 13:12:09.390 INFO 29332 --- [extShutdownHook] c.b.ghj.common.config.BizThreadPoolExt : 业务线程 关闭线程池, 已执行任务: 1, 正在执行任务: 0, 未执行任务数量: 0 | |
2024-04-26 13:12:09.390 INFO 29332 --- [extShutdownHook] c.b.g.c.e.t.DynamicThreadPoolManager : 线程池:[业务线程] 已经停止, 该线程池已经关闭并且没有还在运行的线程 | |
2024-04-26 13:12:09.390 INFO 29332 --- [extShutdownHook] c.b.ghj.common.config.BizThreadPoolExt : 定时任务线程 关闭线程池, 已执行任务: 1, 正在执行任务: 0, 未执行任务数量: 0 | |
2024-04-26 13:12:09.390 INFO 29332 --- [extShutdownHook] c.b.g.c.e.t.DynamicThreadPoolManager : 线程池:[定时任务线程] 已经停止, 该线程池已经关闭并且没有还在运行的线程 |
数据表的情况
select * from async_message where keyword = '超级用户_547af385-e6d1-4e5c-bb11-9ea6b4c84776' order by id desc | |
id keyword username task_type task_name years months days dist_no dist_name zt_id zt_name content_type content_info error_info exec_status exec_flag run_flag read_flag exec_time created_by created_time updated_by updated_time | |
529 超级用户_547af385-e6d1-4e5c-bb11-9ea6b4c84776 超级用户 上传平台 上传平台 2024 2 010651001 石东 010651001000 111广州市天河区员村街石东股份合作经济联社 1 执行: 上传平台 失败! 上传到远端平台 - 无上传公开数据,结束操作! -1 1 1 0 792 超级用户 2024-04-26 12:59:57.000 超级用户 2024-04-26 12:59:58.000 |
# 后续扩展
- 能否统一在一台服务器上做跨平台业务处理, 类似支付宝支付的流程, 调接口传递对应的执行指令,执行完在调用回调接口响应结果。
# 总结
以上是项目的一些核心功能,因为上面提到的功能还可以继续封装,不仅限于在该项目使用。
上面的功能有些点可以优化:
可以配置动态数据源的配置可以放到配置中心,或者放到数据库,项目启动的时候加载
可以动态更新线程池的状态(更新线程不会影响正在执行的线程)
异步任务处理器可以抽象成接口,供其他项目调用
如果是集群环境下需要优化异步任务锁的方式