# overt 财务公开项目

# 简介

通过门户网站,向所在农村集体的村民进行特定公开,其中 “三资” 信息中的招标公告、结果公示、待交易资产为全面公开;财务信息的更新频率为每月一次

# 框架

框架说明版本
springboot应用开发框架2.3.12.RELEASE
spring-boot-starter-data-jpa操作数据的框架
spring-boot-starter-webMVC 框架
spring-boot-starter-securitySpring 安全框架
spring-security-cascas 验证框架
knife4j-spring-boot-starterapi 文档框架
mapstruct对象转换工具
flowable-spring-boot-starter工作流 flowable 架包6.8.0
easyexcelexcel 导入导出工具
hibernate-validator参数校验6.0.17.Final

# 系统功能

# 动态线程池管理

# 功能点:

配置动态添加线程池,扩展了线程池:

  1. 可以清楚的知道线程的执行时间

  2. 可以知道执行的线程运行的什么任务

  3. 可以知道该任务执行占用的 cpu 时间和占总 cpu 的百分比

  4. 可动态修改线程池核心数、线程池拒绝策略等

  5. 可以记录线程池中运行线程任务的情况

# 思路:

我们都知道在创建一个线程池需要传参,线程池名称、核心线程池大小、最大线程池大小、空闲线程的存活时间、用于保存等待执行的任务的阻塞队列、无法执行时的拒绝策略 这些参数等等

我想随意的调节这些参数怎么办?

我想加一个执行特定的任务的线程池怎么办?

这里我是存储在配置文件中,项目启动的时候使用 事件的监听器bean定义注册器 ,读取配置文件中所有线程配置动态注册

# 具体实现步骤:

application.yml

#注: 这里也只是比较单一的动态线程池。
#动态线程池配置
dynamic:
  thread:
    pool:
      pool1:
        poolName: 业务线程
        #用注册到 spring 容器中的 bean 名称,方便使用
        beanName: bizPool
        corePoolSize: 30
        maximumPoolSize: 64
        keepAliveTime: 60
        blockQueueCapacity: 100
        #任务拒绝策略
        #1: AbortPolicy 默认的处理策略,会直接抛出 RejectedExecutionException 异常,阻止系统继续接受新的任务
        #2: CallerRunsPolicy 当任务被拒绝时,会在调用者的线程中执行该任务。
        #3: DiscardPolicy 当任务被拒绝时,会默默地丢弃该任务。
        #4: DiscardOldestPolicy 当任务被拒绝时,会丢弃最老的一个任务,然后尝试重新提交被拒绝的任务。
        rejectedType: 1
      pool2:
        poolName: 定时任务线程
        #用注册到 spring 容器中的 bean 名称,方便使用
        beanName: jobPool
        corePoolSize: 15
        maximumPoolSize: 32
        keepAliveTime: 30
        blockQueueCapacity: 50
        #任务拒绝策略
        #1: AbortPolicy 默认的处理策略,会直接抛出 RejectedExecutionException 异常,阻止系统继续接受新的任务
        #2: CallerRunsPolicy 当任务被拒绝时,会在调用者的线程中执行该任务。
        #3: DiscardPolicy 当任务被拒绝时,会默默地丢弃该任务。
        #4: DiscardOldestPolicy 当任务被拒绝时,会丢弃最老的一个任务,然后尝试重新提交被拒绝的任务。
        rejectedType: 1

动态线程池的配置类:

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
 * @author Tz
 * @date 2024/01/22 15:03
 * @Description 动态线程池的配置属性
 */
@Component
@ConfigurationProperties(prefix = "dynamic.thread")
public class DynamicThreadPoolProperties {
    private Map<String, PoolConfig> pool;
    public Map<String, PoolConfig> getPool() {
        return pool;
    }
    public void setPool(Map<String, PoolConfig> pool) {
        this.pool = pool;
    }
    @Data
    public static class PoolConfig {
        /**
         * 线程池名称
         */
        private String poolName;
        /**
         * 动态注册的 bean 名称
         */
        private String beanName;
        /**
         * 核心线程池大小,即线程池中始终保持存活的线程数量
         */
        private int corePoolSize;
        /**
         * 最大线程池大小,即线程池中允许的最大线程数量
         */
        private int maximumPoolSize;
        /**
         * 空闲线程的存活时间,超过这个时间的空闲线程会被回收  默认单位为:second
         */
        private long keepAliveTime;
        /**
         * 用于保存等待执行的任务的阻塞队列 默认队列为 new ArrayBlockingQueue<>(blockQueueCapacity, true)
         */
        private int blockQueueCapacity;
        /**
         * 当线程池已满,且任务无法执行时的拒绝策略
         * <p>
         * <li>
         *     1: AbortPolicy 默认的处理策略,会直接抛出 RejectedExecutionException 异常,阻止系统继续接受新的任务
         * <li>
         *     2: CallerRunsPolicy 当任务被拒绝时,会在调用者的线程中执行该任务。
         * <li>
         *     3: DiscardPolicy 当任务被拒绝时,会默默地丢弃该任务。
         * <li>
         *     4: DiscardOldestPolicy 当任务被拒绝时,会丢弃最老的一个任务,然后尝试重新提交被拒绝的任务。
         */
        private int rejectedType;
    }
}

扩展的线程池

package com.bright.ghj.common.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.management.ManagementFactory;
import com.sun.management.OperatingSystemMXBean;
import java.lang.management.ThreadMXBean;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
 * @author Tz
 * @date 2024/01/22 15:03
 * @Description 业务线程池扩展
 */
public class BizThreadPoolExt extends ThreadPoolExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger(BizThreadPoolExt.class);
    /**
     * Java 虚拟机的线程系统的管理接口
     */
    private final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
    /**
     * 运行 Java 虚拟机的操作系统的管理接口
     */
    private final OperatingSystemMXBean osBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
    /**
     * 默认拒绝策略
     */
    private static final RejectedExecutionHandler DEFAULT_DENIAL_STRATEGY = new AbortPolicy();
    /**
     * 线程池名称,一般以业务名称命名,方便区分
     */
    private String poolName;
    /**
     * 最短执行时间
     */
    private Long minCostTime;
    /**
     * 最长执行时间
     */
    private Long maxCostTime;
    /**
     * 总的耗时
     */
    private AtomicLong totalCostTime = new AtomicLong();
    private ThreadLocal<Long> startTimeThreadLocal = new ThreadLocal<>();
    /**
     * 调用父类的构造方法,并初始化 HashMap 和线程池名称
     *
     * @param corePoolSize    线程池核心线程数
     * @param maximumPoolSize 线程池最大线程数
     * @param keepAliveTime   线程的最大空闲时间
     * @param unit            空闲时间的单位
     * @param workQueue       保存被提交任务的队列
     * @param poolName        线程池名称
     */
    public BizThreadPoolExt(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                            TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), poolName);
    }
    /**
     * 调用父类的构造方法,并初始化 HashMap 和线程池名称
     *
     * @param corePoolSize    线程池核心线程数
     * @param maximumPoolSize 线程池最大线程数
     * @param keepAliveTime   线程的最大空闲时间
     * @param unit            空闲时间的单位
     * @param workQueue       保存被提交任务的队列
     * @param
     * @param poolName        线程池名称
     */
    public BizThreadPoolExt(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                            TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, String poolName) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), handler, poolName);
    }
    /**
     * 调用父类的构造方法,并初始化 HashMap 和线程池名称
     *
     * @param corePoolSize    线程池核心线程数
     * @param maximumPoolSize 线程池最大线程数
     * @param keepAliveTime   线程的最大空闲时间
     * @param unit            空闲时间的单位
     * @param workQueue       保存被提交任务的队列
     * @param threadFactory   线程工厂
     * @param poolName        线程池名称
     */
    public BizThreadPoolExt(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                            TimeUnit unit, BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory, String poolName) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, DEFAULT_DENIAL_STRATEGY);
        this.poolName = poolName;
        this.maxCostTime = 0L;
        this.minCostTime = 0L;
    }
    /**
     * 调用父类的构造方法,并初始化 HashMap 和线程池名称
     *
     * @param corePoolSize    线程池核心线程数
     * @param maximumPoolSize 线程池最大线程数
     * @param keepAliveTime   线程的最大空闲时间
     * @param unit            空闲时间的单位
     * @param workQueue       保存被提交任务的队列
     * @param threadFactory   线程工厂
     * @param handler         拒绝策略
     * @param poolName        线程池名称
     */
    public BizThreadPoolExt(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                            TimeUnit unit, BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory, RejectedExecutionHandler handler, String poolName) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.poolName = poolName;
        this.maxCostTime = 0L;
        this.minCostTime = 0L;
    }
    /**
     * 根据线程 id 获取线程执行占用总 cpu 的百分比
     * @param threadId 线程 id
     * @return         占用总 cpu 的百分比
     */
    private double getThreadCpuUsage(long threadId) {
        long threadCpuTime = threadMXBean.getThreadCpuTime(threadId);
        long systemCpuTime = osBean.getProcessCpuTime();
        double cpuUsage = ((double) threadCpuTime / (double) systemCpuTime) * 100;
        cpuUsage = Math.round(cpuUsage * 100.0) / 100.0;
        return cpuUsage;
    }
    /**
     * 线程池延迟关闭时(等待线程池里的任务都执行完毕),统计线程池情况
     */
    @Override
    public void shutdown() {
        // 统计已执行任务、正在执行任务、未执行任务数量
        LOGGER.info("{} 关闭线程池, 已执行任务: {}, 正在执行任务: {}, 未执行任务数量: {}",
                this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
        super.shutdown();
    }
    /**
     * 线程池立即关闭时,统计线程池情况
     */
    @Override
    public List<Runnable> shutdownNow() {
        // 统计已执行任务、正在执行任务、未执行任务数量
        LOGGER.info("{} 立即关闭线程池,已执行任务: {}, 正在执行任务: {}, 未执行任务数量: {}",
                this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
        return super.shutdownNow();
    }
    /**
     * 任务执行之前,记录任务开始时间
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        startTimeThreadLocal.set(System.currentTimeMillis());
    }
    /**
     * 任务执行之后,计算任务结束时间
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        long costTime = System.currentTimeMillis() - startTimeThreadLocal.get();
        String threadName = Thread.currentThread().getName();
        startTimeThreadLocal.remove();
        maxCostTime = maxCostTime > costTime ? maxCostTime : costTime;
        if (getCompletedTaskCount() == 0) {
            minCostTime = costTime;
        }
        minCostTime = minCostTime < costTime ? minCostTime : costTime;
        totalCostTime.addAndGet(costTime);
        LOGGER.info("{}-pool-monitor: " +
                        "任务名称: {}, 任务耗时: {} ms, 占用cpu时间: {} ms, 占总cpu: {} %, 初始线程数: {}, 核心线程数: {}, 执行的任务数量: {}, " +
                        "已完成任务数量: {}, 任务总数: {}, 队列里缓存的任务数量: {}, 池中存在的最大线程数: {}, " +
                        "最大允许的线程数: {},  线程空闲时间: {}, 线程池是否关闭: {}, 线程池是否终止: {}",
                this.poolName, threadName,
                costTime, threadMXBean.getThreadCpuTime(Thread.currentThread().getId()) / (1000 * 1000), getThreadCpuUsage(Thread.currentThread().getId()),
                this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(),
                this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
                this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
    }
    public Long getMinCostTime() {
        return minCostTime;
    }
    public Long getMaxCostTime() {
        return maxCostTime;
    }
    public void setMaxCostTime(Long maxCostTime) {
        this.maxCostTime = maxCostTime;
    }
    public void setMinCostTime(Long mixCostTime) {
        this.minCostTime = mixCostTime;
    }
    public long getAverageCostTime(){
        if(getCompletedTaskCount()==0||totalCostTime.get()==0){
            return 0;
        }
        return totalCostTime.get()/getCompletedTaskCount();
    }
    public String getPoolName() {
        return poolName;
    }
}

动态线程池的管理类(管理所有线程池的什么周期, 其实这里也只是管理释放线程池)

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
 * @author Tz
 * @date 2024/02/23 17:16
 * @Description 线程池管理
 */
@Slf4j
@Component
public class DynamicThreadPoolManager {
    private final Map<String, BizThreadPoolExt> bizThreadPoolExtMap = new HashMap<>();
    /**
     * 注册线程池到管理器中
     * @param poolName         线程池名称
     * @param bizThreadPoolExt 线程池
     */
    public void registerThreadPool(String poolName, BizThreadPoolExt bizThreadPoolExt) {
        bizThreadPoolExtMap.put(poolName, bizThreadPoolExt);
    }
    /**
     * 释放所有线程池
     */
    public void shutdownAllThreadPools(){
        for (BizThreadPoolExt bizThreadPoolExt : bizThreadPoolExtMap.values()) {
            try {
                // 关闭线程池
                bizThreadPoolExt.shutdown();
                // 等待线程池内部的线程任务完成,并设置超时时间 30 秒
                boolean execFlag = bizThreadPoolExt.awaitTermination(30L, TimeUnit.SECONDS);
                if (execFlag) {
                    log.info("线程池:[{}] 已经停止, 该线程池已经关闭并且没有还在运行的线程", bizThreadPoolExt.getPoolName());
                } else {
                    log.info("线程池:[{}] 中可能还存在正在运行的线程", bizThreadPoolExt.getPoolName());
                }
            } catch (InterruptedException e) {
                if (!bizThreadPoolExt.isShutdown()) {
                    bizThreadPoolExt.shutdown();
                }
            }
        }
    }
}

动态线程池的变更事件类

import org.springframework.context.ApplicationEvent;
/**
 * @author Tz
 * @date 2024/01/22 15:03
 * @Description 动态线程池的变更事件
 */
public class DynamicThreadPoolChangeEvent extends ApplicationEvent {
    public DynamicThreadPoolChangeEvent(Object source) {
        super(source);
    }
}

动态线程池的变更事件发布类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
/**
 * @author Tz
 * @date 2024/01/22 15:03
 * @Description 动态线程池的变更事件发布类
 */
@Service
public class DynamicThreadPoolChangePublisher {
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;
    /**
     * PostConstruct: 初始化执行发布
     */
    @PostConstruct
    public void init() {
        // 会在 spring 容器启动的时候执行监听器的方法
        applicationEventPublisher.publishEvent(new DynamicThreadPoolChangeEvent(this));
    }
}

动态线程池的变更事件监听类(核心)

import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.ConstructorArgumentValues;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * @author Tz
 * @date 2024/01/22 15:03
 * @Description 动态线程池变更事件的监听器
 */
@Component
public class DynamicThreadPoolChangeListener implements ApplicationListener<DynamicThreadPoolChangeEvent> {
    private final DynamicThreadPoolProperties dynamicThreadPoolProperties;
    private final DynamicThreadPoolManager dynamicThreadPoolManager;
    private final ApplicationContext context;
    public DynamicThreadPoolChangeListener(ApplicationContext context
            , DynamicThreadPoolProperties dynamicThreadPoolProperties
            , DynamicThreadPoolManager dynamicThreadPoolManager) {
        this.context = context;
        this.dynamicThreadPoolProperties = dynamicThreadPoolProperties;
        this.dynamicThreadPoolManager = dynamicThreadPoolManager;
    }
    @Override
    public void onApplicationEvent(DynamicThreadPoolChangeEvent event) {
        // 获取 BeanDefinitionRegistry
        BeanDefinitionRegistry beanDefinitionRegistry = (BeanDefinitionRegistry) context.getAutowireCapableBeanFactory();
        // 获取配置文件中的线程池配置
        Map<String, DynamicThreadPoolProperties.PoolConfig> poolConfigMap = dynamicThreadPoolProperties.getPool();
        for (Map.Entry<String, DynamicThreadPoolProperties.PoolConfig> entry : poolConfigMap.entrySet()) {
            DynamicThreadPoolProperties.PoolConfig poolConfig = entry.getValue();
            // 获取相关属性
            String poolName = poolConfig.getPoolName();
            int corePoolSize = poolConfig.getCorePoolSize();
            String beanName = poolConfig.getBeanName();
            long keepAliveTime = poolConfig.getKeepAliveTime();
            int blockQueueCapacity = poolConfig.getBlockQueueCapacity();
            int maximumPoolSize = poolConfig.getMaximumPoolSize();
            int rejectedType = poolConfig.getRejectedType();
            // 需要注册的线程池
            BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(BizThreadPoolExt.class);
            // 构建构造器的参数
            ConstructorArgumentValues constructorArgs = new ConstructorArgumentValues();
            constructorArgs.addIndexedArgumentValue(0, corePoolSize);
            constructorArgs.addIndexedArgumentValue(1, maximumPoolSize);
            constructorArgs.addIndexedArgumentValue(2, keepAliveTime);
            constructorArgs.addIndexedArgumentValue(3, TimeUnit.SECONDS);
            constructorArgs.addIndexedArgumentValue(4, new ArrayBlockingQueue<>(blockQueueCapacity, true));
            // 处理设置的拒绝策略
            switch (rejectedType) {
                case 2:
                    constructorArgs.addIndexedArgumentValue(5, new ThreadPoolExecutor.CallerRunsPolicy());
                    break;
                case 3:
                    constructorArgs.addIndexedArgumentValue(5, new ThreadPoolExecutor.DiscardPolicy());
                    break;
                case 4:
                    constructorArgs.addIndexedArgumentValue(5, new ThreadPoolExecutor.DiscardOldestPolicy());
                    break;
                default:
                    constructorArgs.addIndexedArgumentValue(5, new ThreadPoolExecutor.AbortPolicy());
                    break;
            }
            // 设置线程池名称
            constructorArgs.addIndexedArgumentValue(6, poolName);
            // 设置 bean
            builder.getRawBeanDefinition().setConstructorArgumentValues(constructorArgs);
            // 注册 bean
            BeanDefinition beanDefinition = builder.getRawBeanDefinition();
            beanDefinitionRegistry.registerBeanDefinition(beanName, beanDefinition);
            // 获取线程池实例
            BizThreadPoolExt bizThreadPoolExt = (BizThreadPoolExt) context.getBean(beanName);
            // 注册到线程管理器中
            dynamicThreadPoolManager.registerThreadPool(poolName
                    , bizThreadPoolExt);
        }
    }
}

还需要动态线程池关闭的监听器,因为动态注册后关闭线程不会触发关闭事件需要手动触发关闭事件

import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;
/**
 * @author Tz
 * @date 2024/02/23 17:15
 * @Description 动态线程池的销毁时间事件 ContextClosedEvent 容器关闭的事件
 */
@Component
public class DynamicThreadPoolCleanupListener implements ApplicationListener<ContextClosedEvent> {
    private final DynamicThreadPoolManager dynamicThreadPoolManager;
    public DynamicThreadPoolCleanupListener(DynamicThreadPoolManager dynamicThreadPoolManager) {
        this.dynamicThreadPoolManager = dynamicThreadPoolManager;
    }
    @Override
    public void onApplicationEvent(ContextClosedEvent event) {
        // 进行销毁前对线程池进行手动释放
        dynamicThreadPoolManager.shutdownAllThreadPools();
    }
}

项目结构:

# 使用:

直接注入使用

/**
     * 使用 beanname 为 bizPool 的线程池
     */
    @Resource
    private BizThreadPoolExt bizPool;
    
    
	/**
     * 使用 beanname 为 jobPool 的线程池
     */
    @Resource
    private BizThreadPoolExt jobPool;

# 异步任务提交器

# 功能点:

由于在该项目中的功能大部分都是需要异步执行的任务,比如上传数据到上级平台、上传附件数据解析保存、上传 access 数据库的数据保存等等。所以这里进行了异步任务的封装。

# 思路:

可以把一个任务看作一个有状态的对象,该对象包括以下内容:

  1. 任务对象的基础信息,执行状态、进度、等信息
  2. 任务的行为,开始执行任务,停止任务,为自己加锁,推进进度等等。具体执行的任务内容由自己实现决定。
  3. 选哟异步任务提交的情况的对象, 能够知道任务是否进入线程池了

通过上述问题:

定义一个通用的异步任务处理器,提供通用被调用的方法,该方法做状态的处理,并在该方法中在调用子类实现的逻辑

在定义一个通用的返回方法,可以知道任务是否被提交, 子类实现可以根据判断提交的情况针对业务做自定义返回

异步任务提交的方法接受异步任务执行器,并异步调用执行中的通用方法,提交成功在返回执行结果

# 具体实现步骤:

项目结构

首先创建一张表保存任务对象执行情况的信息,因为需要查询任务最后的执行情况

/*
 Navicat Premium Data Transfer
 Source Server         : 192.168.0.36
 Source Server Type    : SQL Server
 Source Server Version : 10501600
 Source Host           : 192.168.0.36:1433
 Source Catalog        : zcjy_th_test
 Source Schema         : dbo
 Target Server Type    : SQL Server
 Target Server Version : 10501600
 File Encoding         : 65001
 Date: 26/04/2024 11:57:32
*/
-- ----------------------------
-- Table structure for async_message
-- ----------------------------
IF EXISTS (SELECT * FROM sys.all_objects WHERE object_id = OBJECT_ID(N'[dbo].[async_message]') AND type IN ('U'))
	DROP TABLE [dbo].[async_message]
GO
CREATE TABLE [dbo].[async_message] (
  [id] int IDENTITY(1,1) NOT NULL,
  [keyword] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,
  [username] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,
  [task_type] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,
  [task_name] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,
  [years] int NULL,
  [months] int NULL,
  [days] int NULL,
  [dist_no] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,
  [dist_name] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,
  [zt_id] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,
  [zt_name] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,
  [content_type] int NULL,
  [content_info] text COLLATE Chinese_PRC_CI_AS NULL,
  [error_info] text COLLATE Chinese_PRC_CI_AS NULL,
  [exec_status] int NULL,
  [exec_flag] bit NULL,
  [run_flag] bit NULL,
  [read_flag] bit NULL,
  [exec_time] int NULL,
  [created_by] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,
  [created_time] datetime NULL,
  [updated_by] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,
  [updated_time] datetime NULL
)
GO
ALTER TABLE [dbo].[async_message] SET (LOCK_ESCALATION = TABLE)
GO
EXEC sp_addextendedproperty
'MS_Description', N'主键',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'id'
GO
EXEC sp_addextendedproperty
'MS_Description', N'关键值;关键值',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'keyword'
GO
EXEC sp_addextendedproperty
'MS_Description', N'发送任务的用户名;发送任务的用户名',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'username'
GO
EXEC sp_addextendedproperty
'MS_Description', N'任务类型;任务类型',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'task_type'
GO
EXEC sp_addextendedproperty
'MS_Description', N'任务名称;任务名称',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'task_name'
GO
EXEC sp_addextendedproperty
'MS_Description', N'年份;年份',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'years'
GO
EXEC sp_addextendedproperty
'MS_Description', N'月份;月份',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'months'
GO
EXEC sp_addextendedproperty
'MS_Description', N'天数;天数',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'days'
GO
EXEC sp_addextendedproperty
'MS_Description', N'地区号;地区号',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'dist_no'
GO
EXEC sp_addextendedproperty
'MS_Description', N'地区名称;地区名称',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'dist_name'
GO
EXEC sp_addextendedproperty
'MS_Description', N'账套id;账套id',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'zt_id'
GO
EXEC sp_addextendedproperty
'MS_Description', N'账套名称;账套名称',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'zt_name'
GO
EXEC sp_addextendedproperty
'MS_Description', N'消息体类型;消息响应的类型。0:普通、1:页面跳转、2:嵌套窗口',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'content_type'
GO
EXEC sp_addextendedproperty
'MS_Description', N'消息内容;执行任务的响应消息',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'content_info'
GO
EXEC sp_addextendedproperty
'MS_Description', N'错误信息;执行错误情况',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'error_info'
GO
EXEC sp_addextendedproperty
'MS_Description', N'执行情况;0、待执行。1、执行中。2、执行成功。-1、执行失败',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'exec_status'
GO
EXEC sp_addextendedproperty
'MS_Description', N'是否执行;消息执行状态(1、成功。0、失败)',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'exec_flag'
GO
EXEC sp_addextendedproperty
'MS_Description', N'是否可用;消息是否可用(1、可用、0、不可用)',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'run_flag'
GO
EXEC sp_addextendedproperty
'MS_Description', N'是否已读消息;消息是否已读状态',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'read_flag'
GO
EXEC sp_addextendedproperty
'MS_Description', N'执行时间;执行花费的时间(毫秒)',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'exec_time'
GO
EXEC sp_addextendedproperty
'MS_Description', N'创建人',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'created_by'
GO
EXEC sp_addextendedproperty
'MS_Description', N'创建时间',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'created_time'
GO
EXEC sp_addextendedproperty
'MS_Description', N'更新人',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'updated_by'
GO
EXEC sp_addextendedproperty
'MS_Description', N'更新时间',
'SCHEMA', N'dbo',
'TABLE', N'async_message',
'COLUMN', N'updated_time'
GO
EXEC sp_addextendedproperty
'MS_Description', N'异步任务消息表',
'SCHEMA', N'dbo',
'TABLE', N'async_message'
GO
-- ----------------------------
-- Primary Key structure for table async_message
-- ----------------------------
ALTER TABLE [dbo].[async_message] ADD CONSTRAINT [PK__async_me__3213E83F100234CD] PRIMARY KEY CLUSTERED ([id])
WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON)  
ON [PRIMARY]
GO

创建记录任务对象执行情况的信息的对象

import lombok.Data;
import javax.persistence.*;
import java.io.Serializable;
import java.util.Date;
/**
 * @author Tz
 * @date 2024/01/22 15:03
 * @Description 异步任务消息实体
 */
@Data
@Entity
@Table(name ="async_message" , schema = "dbo")
public class AsyncMessage implements Serializable {
    private static final long serialVersionUID =  1L;
    /** 主键 */
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "id")
    private Integer id ;
    /** 关键值;关键值 */
    @Column(name = "keyword")
    private String keyword ;
    /** 发送任务的用户名;发送任务的用户名 */
    @Column(name = "username")
    private String username ;
    /** 任务类型;任务类型 */
    @Column(name = "task_type")
    private String taskType ;
    /** 任务名称;任务名称 */
    @Column(name = "task_name")
    private String taskName ;
    /** 年份;年份 */
    @Column(name = "years")
    private Integer years ;
    /** 月份;月份 */
    @Column(name = "months")
    private Integer months ;
    /** 天数;天数 */
    @Column(name = "days")
    private Integer days ;
    /** 地区号;地区号 */
    @Column(name = "dist_no")
    private String distNo ;
    /** 地区名称;地区名称 */
    @Column(name = "dist_name")
    private String distName ;
    /** 账套 id; 账套 id */
    @Column(name = "zt_id")
    private String ztId ;
    /** 账套名称;账套名称 */
    @Column(name = "zt_name")
    private String ztName ;
    /** 消息体类型;执行任务的类型 */
    @Column(name = "content_type")
    private Integer contentType ;
    /** 消息内容;消息响应的类型。0:普通、1:页面跳转、2:嵌套窗口 */
    @Column(name = "content_info")
    private String contentInfo ;
    /** 错误信息;消息响应的类型。执行错误情况 */
    @Column(name = "error_info")
    private String errorInfo ;
    /** 执行情况;0、待执行。1、执行中。2、执行成功。-1、执行失败。-2、执行取消。-3、执行错误 */
    @Column(name = "exec_status")
    private Integer execStatus ;
    /** 状态类别;0、全部。1、执行中。2、执行成功。3、执行失败 */
    @Transient
    private Integer statusCategory ;
    /** 是否执行;消息执行状态(1、成功。0、失败) */
    @Column(name = "exec_flag")
    private Boolean execFlag ;
    /** 是否可用;消息是否可用(1、可用、0、不可用) */
    @Column(name = "run_flag")
    private Boolean runFlag ;
    /** 是否已读消息;消息是否已读状态 */
    @Column(name = "read_flag")
    private Boolean readFlag ;
    /** 执行时间;执行花费的时间(毫秒) */
    @Column(name = "exec_time")
    private Integer execTime ;
    /** 创建人 */
    @Column(name = "created_by")
    private String createdBy ;
    /** 创建时间 */
    @Column(name = "created_time")
    private Date createdTime ;
    /** 更新人 */
    @Column(name = "updated_by")
    private String updatedBy ;
    /** 更新时间 */
    @Column(name = "updated_time")
    private Date updatedTime ;
}

相关的 VO、DTO 字段和 PO 实体类是一样的, 这里使用了 mapstruct 做了对象转换, 转换器:

import org.mapstruct.Mapper;
import java.util.ArrayList;
import java.util.List;
/**
 * @author Tz
 * @date 2024/01/22 15:03
 * @Description 异步任务消息实体的 转换器
 */
@Mapper(componentModel = "spring")
public interface AsyncMessageConverter extends BaseConverter<AsyncMessage, AsyncMessageDTO, AsyncMessageVO, CommonCommand> {
    /**
     * 自定义 List-PO 转 List-DTO 处理状态分类的字段
     * @param poList
     * @return
     */
    @Override
    default List<AsyncMessageDTO> poConvertDTO(List<AsyncMessage> poList) {
        if ( poList == null ) {
            return null;
        }
        List<AsyncMessageDTO> list = new ArrayList<AsyncMessageDTO>( poList.size() );
        for ( AsyncMessage asyncMessage : poList ) {
            asyncMessage.setStatusCategory(getCategoryFromStatus(asyncMessage.getExecStatus()));
            list.add( poConvertDTO( asyncMessage ) );
        }
        return list;
    }
}

创建一个异步任务执行器抽象类,提供异步任务通用方法,具体实现由任务子类来实现,这里继承了 AsyncMessageDTO 获得执行情况的状态,和保存状态

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.io.PrintWriter;
import java.io.Serializable;
import java.io.StringWriter;
import java.util.Enumeration;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import static com.bright.ghj.overt.enums.AsyncTaskExecInfo.*;
/**
 * @author Tz
 * @date 2024/01/22 15:03
 * @Description 异步任务执行器的抽象类 继承 AsyncMessageDTO 获得执行情况的状态
 */
@Slf4j
@Data
@NoArgsConstructor
@EqualsAndHashCode(callSuper = true)
public abstract class AsyncTaskExecutor extends AsyncMessageDTO implements Serializable {
    /**
     * 异步任务执行器名称
     */
    private String asyncTaskExecutorName;
    /**
     * 原子操作
     * <p>
     * key:   实际的异步任务执行器名称
     * value: 实际的异步任务同步锁集合
     */
    private static ConcurrentHashMap<String, ConcurrentHashMap> BASIC_TASK_LOCK = new ConcurrentHashMap<>();
    /**
     * 进度百分比
     */
    protected static final Integer PERCENTAGE  = 100;
    /**
     * 完成度
     */
    protected Integer completionDegree;
    /**
     * 进度步长
     */
    protected Integer progressStepSize;
    /**
     * 任务是否是运行状态
     */
    protected Boolean isRunning = true;
    /**
     * 同步锁的关键值
     * <p>
     * 相同的 key 值会进行同步操作
     * <p>
     * 注意:不设置也是同步执行的
     */
    protected String syncKeyword = "";
    /**
     * 处理异步消息任务的接口
     */
    protected AsyncMessageService asyncMessageService;
    /**
     * 初始化值
     */
    public AsyncTaskExecutor(String asyncTaskExecutorName){
        //======= 初始化相关参数 =======
        // 初始进度
        setCompletionDegree(0);
        // 默认未读状态
        setReadFlag(false);
        // 默认待执行状态
        setExecStatus(TODO_EXEC.getStatus());
        // 消息默认可以用
        setRunFlag(true);
        // 消息默认未开始执行
        setExecFlag(false);
        setCreatedBy(SecurityUtil.getLoginUser().getUsername());
        setCreatedTime(DateUtil.getCurrDate());
        this.asyncTaskExecutorName = asyncTaskExecutorName;
        this.asyncMessageService = ApplicationContextProvider.getApplicationContext().getBean(AsyncMessageServiceImpl.class);
    }
    /**
     * 基础执行器,只是做异步任务的消息记录 具体的业务由子类实现 {@link #taskContent ()} 完成
     */
    public void baseExec() {
        AsyncMessageDTO asyncMessageDTO = asyncMessageService.getById(this.getId());
        // 记录时间
        long startTime = System.currentTimeMillis();
        try {
            // 锁对象
            Object object = null;
            if (!BASIC_TASK_LOCK.containsKey(asyncTaskExecutorName)) {
                BASIC_TASK_LOCK.put(asyncTaskExecutorName, new ConcurrentHashMap<String, Object>());
            }
            //1、校验任务是否取消
            // 需要二次判断,处理锁
            synchronized (this){
                // 查看这条消息是否是未执行且可用的
                AsyncMessageDTO messageDTO = asyncMessageService.getById(asyncMessageDTO.getId());
                // 如果不可用 代表该消息在没被消费前被取消了
                if(!messageDTO.getRunFlag()){
                    messageDTO.setContentInfo("任务被取消!");
                    messageDTO.setExecStatus(EXECUTION_CANCEL.getStatus());
                    messageDTO.setExecFlag(true);
                    messageDTO.setUpdatedBy(asyncMessageDTO.getUsername());
                    messageDTO.setUpdatedTime(DateUtil.getCurrDate());
                    asyncMessageService.save(messageDTO);
                    return;
                }
                // 如果根据 key 查询到了对象锁
                ConcurrentHashMap concurrentHashMap = BASIC_TASK_LOCK.get(asyncTaskExecutorName);
                Enumeration enumeration = concurrentHashMap.keys();
                while(enumeration.hasMoreElements()){
                    String key = (String) enumeration.nextElement();
                    if(key.equalsIgnoreCase(this.syncKeyword)){
                        object = concurrentHashMap.get(key);
                    }
                }
                // 没有获取到锁则新增
                if(object == null){
                    object = new Object();
                }
            }
            // 进入消费 表示该消息已经开始消费了
            //3、锁住自己定义的同步锁 进行业务处理
            // 如果上一次执行的锁标识没有释放,现在有需要没释放的锁,就会进行等待
            synchronized(object) {
                try {
                    // 查看这条消息是否是未消费且可用的
                    AsyncMessageDTO resMqMessage = asyncMessageService.getById(asyncMessageDTO.getId());
                    // 如果不可用 代表该消息在没被消费前被取消了
                    if(!resMqMessage.getRunFlag()){
                        resMqMessage.setContentInfo("任务被取消!");
                        resMqMessage.setExecStatus(EXECUTION_CANCEL.getStatus());
                        resMqMessage.setUpdatedBy(resMqMessage.getUsername());
                        resMqMessage.setUpdatedTime(DateUtil.getCurrDate());
                        asyncMessageService.save(resMqMessage);
                        return;
                    }
                    // 开始执行 将锁标识放入
                    BASIC_TASK_LOCK.get(asyncTaskExecutorName).put(this.syncKeyword, object);
                    asyncMessageDTO.setExecFlag(true);
                    asyncMessageDTO.setExecStatus(EXECUTING.getStatus());
                    AsyncMessageDTO updResMessage = asyncMessageService.save(asyncMessageDTO);
                    if (Objects.isNull(updResMessage)) {
                        throw new RuntimeException("更新异步任务状态异常!");
                    }
                    // 执行实际的任务内容
                    boolean execFlag = taskContent();
                    if (execFlag) {
                        // 执行成功
                        asyncMessageDTO.setExecStatus(EXECUTION_SUCCESSFUL.getStatus());
                        asyncMessageDTO.setContentInfo(String.format("执行: %s 成功!", asyncMessageDTO.getTaskName()));
                    } else {
                        // 执行失败
                        asyncMessageDTO.setExecStatus(EXECUTION_FAILED.getStatus());
                        asyncMessageDTO.setContentInfo(String.format("执行: %s 失败!", asyncMessageDTO.getTaskName()));
                        // 设置执行失败的错误信息
                        asyncMessageDTO.setErrorInfo(this.getErrorInfo());
                    }
                } catch (Exception e){
                    // 将异常信息保存到 String 中
                    StringWriter sw = new StringWriter();
                    PrintWriter pw = new PrintWriter(sw);
                    e.printStackTrace(pw);
                    String exceptionAsString = sw.toString();
                    System.out.println(exceptionAsString);
                    // 执行失败
                    asyncMessageDTO.setExecStatus(EXECUTION_ERROR.getStatus());
                    asyncMessageDTO.setContentInfo(String.format("执行: %s 失败!", asyncMessageDTO.getTaskName()));
                    asyncMessageDTO.setErrorInfo(exceptionAsString);
                } finally {
                    //4、释放当前执行地区持有的锁
                    BASIC_TASK_LOCK.get(asyncTaskExecutorName).remove(this.syncKeyword);
                }
            }
            long endTime = System.currentTimeMillis();
            asyncMessageDTO.setExecTime(endTime - startTime);
            asyncMessageDTO.setUpdatedBy(asyncMessageDTO.getUsername());
            asyncMessageDTO.setUpdatedTime(DateUtil.getCurrDate());
            asyncMessageService.save(asyncMessageDTO);
        } catch (Exception e) {
            // 将异常信息保存到 String 中
            StringWriter sw = new StringWriter();
            PrintWriter pw = new PrintWriter(sw);
            e.printStackTrace(pw);
            String exceptionAsString = sw.toString();
            // 执行错误
            asyncMessageDTO.setExecStatus(EXECUTION_ERROR.getStatus());
            asyncMessageDTO.setContentInfo(e.getMessage());
            asyncMessageDTO.setErrorInfo(exceptionAsString);
            long endTime = System.currentTimeMillis();
            asyncMessageDTO.setExecTime(endTime - startTime);
            asyncMessageDTO.setUpdatedBy(asyncMessageDTO.getUsername());
            asyncMessageDTO.setUpdatedTime(DateUtil.getCurrDate());
            asyncMessageService.save(asyncMessageDTO);
        } finally {
            this.isRunning = false;
        }
    }
    /**
     * 实际异步执行的任务体抽象方法、由具体的异步任务实现
     * @return            执行结果
     * @throws Exception  可能出现的业务异常
     */
    public abstract boolean taskContent() throws Exception;
    /**
     * 推进进度
     */
    public void advanceProgress() {
        completionDegree = completionDegree + progressStepSize;
        if (completionDegree > PERCENTAGE) {
            completionDegree = PERCENTAGE;
        }
        log.info("当前任务keyword:[{}], 任务类型: [{}], 任务名称: [{}], 任务进度: [{}%], 是否正在运行: [{}]"
                , getKeyword(), getTaskType(), getTaskName(), getCompletionDegree(), getIsRunning() ? "是" : "否");
    }
    /**
     * 对执行器进行必要的校验
     * @return 校验结果
     */
    public boolean checkAsyncTaskExecutor() {
        if (StringUtil.isBlank(this.asyncTaskExecutorName)
                || StringUtil.isBlank(this.syncKeyword)) {
            return false;
        }
        return true;
    }
}

异步任务执行情况的枚举类

import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
 * @author Tz
 * @date 2024/01/22 15:03
 * @Description 异步任务执行情况信息的枚举
 */
@Getter
@AllArgsConstructor
public enum AsyncTaskExecInfo {
    /**
     * 待执行
     */
    TODO_EXEC(0, "待执行"),
    /**
     * 正在执行
     */
    EXECUTING(1, "正在执行"),
    /**
     * 执行成功
     */
    EXECUTION_SUCCESSFUL(2, "执行成功"),
    /**
     * 执行失败 (程序运行后,结果错误)
     */
    EXECUTION_FAILED(-1, "执行失败"),
    /**
     * 执行取消
     */
    EXECUTION_CANCEL(-2, "执行取消"),
    /**
     * 执行错误 (程序异常导致的错误)
     */
    EXECUTION_ERROR(-3, "执行错误");
    private final Integer status;
    private final String info;
    /**
     * 根据类别获取状态列表
     * @param  category 类别 0:全部,1:处理中,2:成功,3:失败
     * @return 状态列表
     */
    public static List<Integer> getStatusFromCategory(Integer category) {
        switch (category) {
            case 1:
                return Arrays.asList(TODO_EXEC.status, EXECUTING.status);
            case 2:
                return Collections.singletonList(EXECUTION_SUCCESSFUL.status);
            case 3:
                return Arrays.asList(EXECUTION_FAILED.status, EXECUTION_CANCEL.status, EXECUTION_ERROR.status);
            default:
                return Arrays.asList(TODO_EXEC.status
                        , EXECUTING.status
                        , EXECUTION_SUCCESSFUL.status
                        , EXECUTION_FAILED.status
                        , EXECUTION_CANCEL.status
                        , EXECUTION_ERROR.status);
        }
    }
    /**
     * 根据执行状态获取对应的状态分类
     * @param status 执行状态
     * @return       状态分类
     */
    public static Integer getCategoryFromStatus(Integer status) {
        if (TODO_EXEC.status.equals(status) || EXECUTING.status.equals(status)){
            return 1;
        } else if (EXECUTION_SUCCESSFUL.status.equals(status)) {
            return 2;
        } else if (EXECUTION_FAILED.status.equals(status) || EXECUTION_CANCEL.status.equals(status) || EXECUTION_ERROR.status.equals(status)) {
            return 3;
        } else {
            return 0;
        }
    }
}

保存异步任务执行情况信息的 service

import java.util.List;
/**
 * @author Tz
 * @date 2024/01/22 15:03
 * @Description 异步任务消息接口 额外提供可以提交异步执行任务的功能
 */
public interface AsyncMessageService {
    /**
     * 保存异步任务消息
     * @param asyncMessageDTO 异步任务消息 DTO
     * @return                异步任务消息 DTO
     */
    AsyncMessageDTO save(AsyncMessageDTO asyncMessageDTO);
    /**
     * 根据 id 查询异步任务消息
     * @param id 主键
     * @return   异步任务消息 DTO
     */
    AsyncMessageDTO getById(Integer id);
    /**
     * 通用的提交执行异步任务
     * <p>
     * 会提交实现 {@link AsyncTaskExecutor#baseExec ()} 方法中的任务内容,
     * 实际执行的是子类的 {@link AsyncTaskExecutor#taskContent ()}
     * @param asyncTaskExecutor  需要执行的异步任务内容
     * @return                   执行生成的结果
     */
    AsyncTaskResultVO execAsyncTask(AsyncTaskExecutor asyncTaskExecutor);
    /**
     * 根据主键标识获取异步任务的进度
     * <p>
     * 当项目重启 如果获取历史执行任务的进度会报错 直接返回 100
     * <p>
     * 添加一个可以自动清理已经完成任务的功能 避免一直占用内存 {@link AsyncMessageServiceImpl#cleanAsyncTaskTimer ()}
     * @param keyword 主键标识
     * @return        进度百分比
     */
    Integer getCompletionDegree(String keyword);
    /**
     * 根据 id 撤回发送的异步任务
     * @param id 主键
     * @return   返回撤回的数量
     */
    Integer revokeAsyncTask(Integer id);
    /**
     * 根据条件获取异步任务消息列表
     * @param asyncTaskMassageQuery 查询条件
     * @return                      异步消息列表
     */
    PageVO<AsyncMessageVO> pageAsyncMessage(AsyncTaskMassageQuery asyncTaskMassageQuery);
    /**
     * 根据 keyword 集合查询异步消息列表
     * @param keywords keyword 集合
     * @return         异步消息列表
     */
    List<AsyncMessageVO> listAsyncMessageByKeywords(List<String> keywords);
    /**
     * 一键已读
     *
     * @param statusCategory 需要已读的状态类别
     * @param username       需要已读的用户
     * @return 已读记录数
     */
    Integer oneClickRead(Integer statusCategory, String username);
    /**
     * 阅读一条未读消息
     * @param id 主键
     * @return   读取结果
     */
    Integer read(Integer id);
    /**
     * 获取异步任务执行的情况结果
     * @param username  执行任务的用户名
     * @return          异步任务的执行情况结果的 VO
     */
    AsyncTaskExecInfoResultVO getAsyncTaskExecInfo(String username);
    /**
     * 确认读取执行的结果
     * @param username 执行任务的用户名
     */
    void confirmReadingExecResult(String username);
}

保存异步任务执行情况信息的 service 的实现

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
 * @author Tz
 * @date 2024/01/22 15:03
 * @Description 异步任务消息接口 实现
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class AsyncMessageServiceImpl implements AsyncMessageService {
    private final AsyncMessageManager asyncMessageManager;
    private final AsyncMessageConverter asyncMessageConverter;
    /**
     * 保存执行任务对象,可以获取进度等信息
     */
    public static ConcurrentMap<String, AsyncTaskExecutor> CONCURRENT_MAP = new ConcurrentHashMap<>();
    /**
     * 异步任务执行的扩展线程池 (这里用到上面的动态线程池)
     */
    private final BizThreadPoolExt bizPool;
    /**
     * 定时任务执行的扩展线程池
     */
    private final BizThreadPoolExt jobPool;
    @Override
    public AsyncMessageDTO save(AsyncMessageDTO asyncMessageDTO) {
        return asyncMessageManager.save(asyncMessageDTO);
    }
    /**
     * 根据 id 查询异步任务消息
     *
     * @param id 主键
     * @return   异步任务消息 DTO
     */
    @Override
    public AsyncMessageDTO getById(Integer id) {
        return asyncMessageManager.getById(id);
    }
    @Override
    public AsyncTaskResultVO execAsyncTask(AsyncTaskExecutor asyncTaskExecutor) {
        AsyncTaskResultVO asyncTaskResultVO = new AsyncTaskResultVO();
        boolean checkAsyncTaskExecutor = asyncTaskExecutor.checkAsyncTaskExecutor();
        if (!checkAsyncTaskExecutor) {
            // 提交任务失败
            asyncTaskResultVO.setIsSubmitTask(false);
            asyncTaskResultVO.setNoSubmitIllustrate("执行器不通过校验,可能的原因: 未设置同步锁或未识别的执行器");
            return asyncTaskResultVO;
        }
        asyncTaskResultVO.setKeyword(asyncTaskExecutor.getKeyword());
        // 保存执行记录
        AsyncMessageDTO save = save(asyncTaskExecutor);
        if (save != null) {
            asyncTaskExecutor.setId(save.getId());
            CompletableFuture.runAsync(() -> {
                		// 设置当前执行任务的线程名称,方便追踪
                        Thread.currentThread().setName(asyncTaskExecutor.getTaskName() + "_" + asyncTaskExecutor.getKeyword());
                        asyncTaskExecutor.baseExec();
                    }, bizPool)
                    .exceptionally(throwable -> {
                        throwable.printStackTrace();
                        return null;
                    });
            // 保存执行的任务
            CONCURRENT_MAP.put(asyncTaskExecutor.getKeyword(), asyncTaskExecutor);
            // 提交任务成功
            asyncTaskResultVO.setIsSubmitTask(true);
        } else {
            // 提交任务失败
            asyncTaskResultVO.setIsSubmitTask(false);
        }
        return asyncTaskResultVO;
    }
    /**
     * 根据主键标识获取异步任务的进度
     * <p>
     * 当项目重启 如果获取历史执行任务的进度会报错 直接返回 100
     * <p>
     * 添加一个可以自动清理已经完成任务的功能 避免一直占用内存 {@link AsyncMessageServiceImpl#cleanAsyncTaskTimer ()}
     * @param keyword 主键标识
     * @return        进度百分比
     */
    @Override
    public Integer getCompletionDegree(String keyword) {
        // 如果不存在 可能是任务完成后已经被清除了
        if (!CONCURRENT_MAP.containsKey(keyword)) {
            // 直接返回 100
            return 100;
        }
        return CONCURRENT_MAP.get(keyword).getCompletionDegree();
    }
    /**
     * 根据 id 撤回发送的异步任务
     *
     * @param id 主键
     * @return 返回撤回的数量
     */
    @Override
    public Integer revokeAsyncTask(Integer id) {
        Integer resultSize = asyncMessageManager.revokeAsyncTask(id);
        if (resultSize <= 0) {
            throw new RuntimeException("没有可撤回的任务或当前任务正在执行中!");
        }
        return resultSize;
    }
    /**
     * 根据条件获取异步任务消息列表
     *
     * @param asyncTaskMassageQuery 查询条件
     * @return                      异步消息列表
     */
    @Override
    public PageVO<AsyncMessageVO> pageAsyncMessage(AsyncTaskMassageQuery asyncTaskMassageQuery) {
        PageDTO<AsyncMessageDTO> pageDTO = asyncMessageManager.pageAsyncMessage(asyncTaskMassageQuery, PageUtil.toPageable(asyncTaskMassageQuery));
        PageVO<AsyncMessageVO> pageVO = new PageVO<>();
        pageVO.setData(asyncMessageConverter.dtoConvertVO(pageDTO.getData()));
        pageVO.setTotal(pageDTO.getTotal());
        pageVO.setPageCount(pageDTO.getPageCount());
        return pageVO;
    }
    /**
     * 根据 keyword 集合查询异步消息列表
     *
     * @param keywords keyword 集合
     * @return         异步消息列表
     */
    @Override
    public List<AsyncMessageVO> listAsyncMessageByKeywords(List<String> keywords) {
        List<AsyncMessageDTO> asyncMessageDTOS = asyncMessageManager.listAsyncMessageByKeywords(keywords);
        return asyncMessageConverter.dtoConvertVO(asyncMessageDTOS);
    }
    /**
     * 一键已读
     *
     * @param statusCategory 需要已读的状态类别
     * @param username       需要已读的用户
     * @return 已读记录数
     */
    @Override
    public Integer oneClickRead(Integer statusCategory, String username) {
        // 状态类别需要转实际的状态
        List<Integer> execStatusList = AsyncTaskExecInfo.getStatusFromCategory(statusCategory);
        return asyncMessageManager.oneClickRead(execStatusList, username);
    }
    /**
     * 阅读一条未读消息
     *
     * @param id 主键
     * @return 读取结果
     */
    @Override
    public Integer read(Integer id) {
        return asyncMessageManager.read(id);
    }
    /**
     * 获取异步任务执行的情况结果
     * @param username 执行任务的用户名
     * @return         异步任务的执行情况结果的 VO
     */
    @Override
    public AsyncTaskExecInfoResultVO getAsyncTaskExecInfo(String username) {
        //TODO 集群情况下需要考虑换种方式
        AsyncTaskExecInfoResultVO asyncTaskExecInfoResultVO = new AsyncTaskExecInfoResultVO();
        asyncTaskExecInfoResultVO.setIsExistsCompleteTask(false);
        asyncTaskExecInfoResultVO.setCompleteTaskMessages(new ArrayList<>());
        Map<String, Integer> taskCount = new HashMap<>();
        // 循环遍历所有提交的任务
        Collection<AsyncTaskExecutor> asyncTaskExecutors = CONCURRENT_MAP.values();
        for (AsyncTaskExecutor asyncTaskExecutor : asyncTaskExecutors) {
            if (asyncTaskExecutor.getUsername().equalsIgnoreCase(username)
                    && !asyncTaskExecutor.getIsRunning()) {
                asyncTaskExecInfoResultVO.setIsExistsCompleteTask(true);
                taskCount.put(asyncTaskExecutor.getTaskType(), taskCount.getOrDefault(asyncTaskExecutor.getTaskType(), 0) + 1);
            }
        }
        // 纪录执行任务的情况
        if (!CollectionUtil.isEmpty(taskCount) && asyncTaskExecInfoResultVO.getIsExistsCompleteTask()) {
            for (String key : taskCount.keySet()) {
                asyncTaskExecInfoResultVO.getCompleteTaskMessages().add(key + ",有完成的任务!");
            }
        }
        return asyncTaskExecInfoResultVO;
    }
    /**
     * 确认读取执行的结果
     *
     * @param username 执行任务的用户名
     */
    @Override
    public void confirmReadingExecResult(String username) {
        //TODO 集群情况下需要考虑换种方式
        // 循环遍历所有提交的任务
        Collection<AsyncTaskExecutor> asyncTaskExecutors = CONCURRENT_MAP.values();
        for (AsyncTaskExecutor asyncTaskExecutor : asyncTaskExecutors) {
            if (asyncTaskExecutor.getUsername().equalsIgnoreCase(username)
                    && !asyncTaskExecutor.getIsRunning()) {
                CONCURRENT_MAP.remove(asyncTaskExecutor.getKeyword());
            }
        }
    }
    /**
     * 清理异步消息的定时器
     * <p>
     * initialDelay = 300000: 延迟执行时间 单位毫秒 这里是 5 分钟
     * <p>
     * fixedRate = 1800000:  执行间隔时间 单位毫秒 这里是 30 分钟
     * <p>
     * Scheduled (initialDelay = 3000, fixedRate = 3000): 测试时用的
     */
    @Scheduled(initialDelay = 300000, fixedRate = 1800000)
    public void cleanAsyncTaskTimer() {
        List<AsyncMessageVO> asyncMessageVOS = listAsyncMessageByKeywords(new ArrayList<>(CONCURRENT_MAP.keySet()));
        if (!CollectionUtil.isEmpty(asyncMessageVOS)) {
            for (AsyncMessageVO asyncMessageVO : asyncMessageVOS) {
                // 清除不是待处理和正在运行的任务
                if (!asyncMessageVO.getExecStatus().equals(TODO_EXEC.getStatus())
                        && !asyncMessageVO.getExecStatus().equals(EXECUTING.getStatus())) {
                    log.info("清除keyword:[{}], 异步任务的缓存", asyncMessageVO.getKeyword());
                    CompletableFuture.runAsync(() -> {
                        Thread.currentThread().setName("clean_task_" + asyncMessageVO.getTaskName() + "_" + asyncMessageVO.getKeyword());
                        CONCURRENT_MAP.remove(asyncMessageVO.getKeyword());
                    }, jobPool);
                }
            }
        }
    }
}

还需要提交任务的结果对象的抽象类,子类可以实现自己提交任务成功或者失败时的返回信息

import lombok.Data;
/**
 * @author Tz
 * @date 2024/01/22 15:03
 * @Description 通用异步提交任务的返回结果类
 */
@Data
public class AsyncTaskResultVO extends CommonVO {
    /**
     * 异步执行任务的消息 key 值
     */
    private String keyword;
    /**
     * 任务是否已经提交
     */
    private Boolean isSubmitTask;
    /**
     * 未提交说明
     */
    private String noSubmitIllustrate;
}

由于项目用的 JPA 分多了一层 manager

import org.springframework.data.domain.Pageable;
import java.util.List;
/**
 * @author Tz
 * @date 2024/01/22 15:03
 * @Description 异步消息的 Manager
 */
public interface AsyncMessageManager {
    /**
     * 保存异步任务消息
     *
     * @param asyncMessageDTO 异步任务消息的 DTO
     * @return                异步任务消息的 DTO
     */
    AsyncMessageDTO save(AsyncMessageDTO asyncMessageDTO);
    /**
     * 根据 id 查询异步任务消息
     *
     * @param id 主键
     * @return   异步任务消息 DTO
     */
    AsyncMessageDTO getById(Integer id);
    /**
     * 根据 id 撤回当前任务
     *
     * @param id 主键 id
     * @return   返回结果数
     */
    Integer revokeAsyncTask(Integer id);
    /**
     * 根据条件获取异步任务消息列表
     *
     * @param asyncTaskMassageQuery 查询条件
     * @param pageable              分页条件
     * @return                      异步消息列表
     */
    PageDTO<AsyncMessageDTO> pageAsyncMessage(AsyncTaskMassageQuery asyncTaskMassageQuery, Pageable pageable);
    /**
     * 根据 keyword 集合查询异步消息列表
     *
     * @param keywords keyword 集合
     * @return         异步消息列表
     */
    List<AsyncMessageDTO> listAsyncMessageByKeywords(List<String> keywords);
    /**
     * 一键已读
     *
     * @param execStatus 需要已读的状态列表
     * @param username   需要已读的用户
     * @return 已读记录数
     */
    Integer oneClickRead(List<Integer> execStatus, String username);
    /**
     * 阅读一条未读消息
     *
     * @param id 主键
     * @return 读取结果
     */
    Integer read(Integer id);
}

manager 实现

import lombok.RequiredArgsConstructor;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Component;
import javax.persistence.criteria.Predicate;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
/**
 * @author Tz
 * @date 2024/01/22 15:03
 * @Description 异步消息的 Manager 实现
 */
@Component
@RequiredArgsConstructor
public class AsyncMessageManagerImpl implements AsyncMessageManager {
    private final AsyncMessageRepository asyncMessageRepository;
    private final AsyncMessageConverter asyncMessageConverter;
    /**
     * 保存异步任务消息
     *
     * @param asyncMessageDTO 异步任务消息的 DTO
     * @return                异步任务消息的 DTO
     */
    @Override
    public AsyncMessageDTO save(AsyncMessageDTO asyncMessageDTO) {
        AsyncMessage asyncMessage = asyncMessageConverter.dtoConvertPO(asyncMessageDTO);
        AsyncMessage save = asyncMessageRepository.save(asyncMessage);
        return asyncMessageConverter.poConvertDTO(save);
    }
    /**
     * 根据 id 查询异步任务消息
     *
     * @param id 主键
     * @return   异步任务消息 DTO
     */
    @Override
    public AsyncMessageDTO getById(Integer id) {
        Optional<AsyncMessage> asyncMessageOptional = asyncMessageRepository.findById(id);
        if (!asyncMessageOptional.isPresent()) {
            throw new RuntimeException("无提交信息!");
        }
        return asyncMessageConverter.poConvertDTO(asyncMessageOptional.get());
    }
    /**
     * 根据 id 撤回当前任务
     *
     * @param id 主键 id
     * @return   返回结果数
     */
    @Override
    public Integer revokeAsyncTask(Integer id) {
        return asyncMessageRepository.revokeAsyncTask(id);
    }
    /**
     * 根据条件获取异步任务消息列表
     *
     * @param asyncTaskMassageQuery 查询条件
     * @param pageable              分页条件
     * @return                      异步消息列表
     */
    @Override
    public PageDTO<AsyncMessageDTO> pageAsyncMessage(AsyncTaskMassageQuery asyncTaskMassageQuery, Pageable pageable) {
        Page<AsyncMessage> page = asyncMessageRepository.findAll((root, criteriaQuery, criteriaBuilder) -> {
            List<Predicate> ps = new ArrayList<>();
            if (StringUtil.isNotBlank(asyncTaskMassageQuery.getUsername())) {
                ps.add(criteriaBuilder.equal(root.get("username"), asyncTaskMassageQuery.getUsername()));
            }
            if (StringUtil.isNotBlank(asyncTaskMassageQuery.getTaskType())) {
                ps.add(criteriaBuilder.equal(root.get("taskType"), asyncTaskMassageQuery.getTaskType()));
            }
            // 根据状态类别条件查询执行状态
            if (asyncTaskMassageQuery.getStatusCategory() != null) {
                ps.add(criteriaBuilder.in(root.get("execStatus")).value(AsyncTaskExecInfo.getStatusFromCategory(asyncTaskMassageQuery.getStatusCategory())));
            }
            if (asyncTaskMassageQuery.getReadFlag() != null) {
                ps.add(criteriaBuilder.equal(root.get("readFlag"), asyncTaskMassageQuery.getReadFlag()));
            }
            criteriaQuery.where(ps.toArray(new Predicate[0]));
            criteriaQuery.orderBy(criteriaBuilder.desc(root.get("updatedTime")));
            return criteriaQuery.getRestriction();
        }, pageable);
        List<AsyncMessageDTO> list = asyncMessageConverter.poConvertDTO(page.getContent());
        return PageDTO.of(page.getTotalElements(), page.getTotalPages(), list);
    }
    /**
     * 根据 keyword 集合查询异步消息列表
     *
     * @param keywords keyword 集合
     * @return         异步消息列表
     */
    @Override
    public List<AsyncMessageDTO> listAsyncMessageByKeywords(List<String> keywords) {
        List<AsyncMessage> asyncMessages = asyncMessageRepository.listAsyncMessageByKeywords(keywords);
        return asyncMessageConverter.poConvertDTO(asyncMessages);
    }
    /**
     * 一键已读
     *
     * @param execStatus 需要已读的状态列表
     * @param username   需要已读的用户
     * @return 已读记录数
     */
    @Override
    public Integer oneClickRead(List<Integer> execStatus, String username) {
        return asyncMessageRepository.oneClickRead(execStatus, username);
    }
    /**
     * 阅读一条未读消息
     *
     * @param id 主键
     * @return 读取结果
     */
    @Override
    public Integer read(Integer id) {
        return asyncMessageRepository.read(id);
    }
}

# 使用:

比如我需要执行上传采集数据的异步任务, 那么可以继承 AsyncTaskExecutor, 实现上传的逻辑:

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.web.client.ResourceAccessException;
import org.springframework.web.client.RestTemplate;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
 * @author Tz
 * @date 2024/01/22 15:03
 * @Description 上传采集数据的异步任务执行器实现
 */
@Slf4j
@Data
@EqualsAndHashCode(callSuper = true)
public class AsyncCollectPlatformExecutor extends AsyncTaskExecutor {
    private static final String ASYNC_TASK_EXECUTOR_NAME = "上传平台任务处理器";
    private String collectDataUrl;
    /**
     * 当前地区标识
     */
    private String region;
    private JdbcTemplate jdbcTemplatePrimary;
    private JdbcTemplate jdbcTemplateSecond;
    private RestTemplate restTemplate;
    private PlatformTransactionManager transactionManagerPrimary;
    private PlatformTransactionManager transactionManagerSecond;
    private List<FileListDFLZ> fileListDFLZList;
    private ZtDTO ztDTO;
    private PublicInfoMasterTableManager publicInfoMasterTableManager;
    private List<OpenReportRecordVO> auditedData;
    /**
     * 初始化设置执行器名称
     */
    public AsyncCollectPlatformExecutor() {
        super(ASYNC_TASK_EXECUTOR_NAME);
    }
    /**
     * 实际异步执行的任务体抽象方法、由具体的异步任务实现
     * @return            执行结果
     * @throws Exception  可能出现的业务异常
     */
    @Override
    public boolean taskContent() {
        return handleUpload();
    }
    /**
     * 具体的逻辑
     * @return 返回执行的结果
     */
    public boolean handleUpload() {
        // 执行上传平台逻辑
        // 还需要区分上传的是党务、事务、还是财务、或者其他类型的大类报表
        
        如果是多数据远
        // 实际执行的业务需要手动开启事务、手动提交回滚
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        TransactionStatus primaryStatus = transactionManagerPrimary.getTransaction(def);
        TransactionStatus secondStatus = transactionManagerSecond.getTransaction(def);
        try {
            
            // 决定进度步长由它决定
            int execCount = dataList.size();
            setProgressStepSize(PERCENTAGE / execCount);
            // 推进进度
            advanceProgress();
            // 提交事务
            // 先进后出提交 否则会出现:Transaction synchronization is not active 的错误
            transactionManagerSecond.commit(secondStatus);
            transactionManagerPrimary.commit(primaryStatus);
            // 根据自己的业务情况进行返回
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            // 回滚事务
            // 先进后出提交 否则会出现:Transaction synchronization is not active 的错误
            transactionManagerSecond.rollback(secondStatus);
            transactionManagerPrimary.rollback(primaryStatus);
            throw new RuntimeException(e);
        }
    }
}

实现自己的异步任务提交失败或成功的返回信息

import lombok.Data;
import java.util.List;
/**
 * @author Tz
 * @date 2024/01/22 15:03
 * @Description 上传平台异步执行任务返回结果
 */
@Data
public class CollectPlatformAsyncTaskVO extends AsyncTaskResultVO {
    /**
     * 执行的账套号
     */
    private String ztId;
    /**
     * 执行的账套名称
     */
    private String ztName;
    /**
     * 执行的状态 进行中、拒绝
     */
    private String status;
    /**
     * 拒绝原因
     */
    private String remark;
    /**
     * 错误的表
     */
    private List<String> errorTables;
}

完成了执行器的实现,开始提交该异步任务

// 申请一个 keyword (uuid)
            String keyword = getKeyword(username);
            //1.2. 校验通过生成一个记录信息,可以让用户实时感知到上传的情况
            //1.3. 异步执行任务、监控该任务执行的状态和进度、 还要能够查看实际的采集情况
            AsyncCollectPlatformExecutor asyncTaskDTO = new AsyncCollectPlatformExecutor();
            asyncTaskDTO.setKeyword(keyword);
            asyncTaskDTO.setTaskType("上传平台");
            asyncTaskDTO.setUsername(username);
            asyncTaskDTO.setYears(years);
            asyncTaskDTO.setMonths(months);
            asyncTaskDTO.setDistNo(ztByZtId.getDistNo());
            asyncTaskDTO.setDistName(ztByZtId.getDistName());
            asyncTaskDTO.setZtId(ztByZtId.getZtId());
            asyncTaskDTO.setZtName(ztByZtId.getZtName());
            asyncTaskDTO.setTaskName("上传平台");
            // 设置任务的事务管理器
            asyncTaskDTO.setTransactionManagerPrimary(transactionManagerPrimary);
            asyncTaskDTO.setTransactionManagerSecond(transactionManagerSecond);
            // 设置数据库操作工具
            asyncTaskDTO.setJdbcTemplatePrimary(jdbcTemplatePrimary);
            asyncTaskDTO.setJdbcTemplateSecond(jdbcTemplateSecond);
			// 这里传入实际业务需要用到的
            asyncTaskDTO.setPublicInfoMasterTableManager(publicInfoMasterTableManager);
            /**
             * 同步锁的标识值 这里任务根据指定规则来锁(保证唯一排斥即可)
             */
            asyncTaskDTO.setSyncKeyword(ztByZtId.getZtId() + "_" + years + "_" + months);
            // 开始异步执行任务
            collectPlatformAsyncTaskVO.setKeyword(keyword);
            AsyncTaskResultVO asyncTaskResultVO = asyncMessageService.execAsyncTask(asyncTaskDTO);
            // 如果任务成功提交
            if (asyncTaskResultVO != null && asyncTaskResultVO.getIsSubmitTask()) {
                collectPlatformAsyncTaskVO.setStatus(IN_PROGRESS.getMessage());
                collectPlatformAsyncTaskVO.setIsSubmitTask(asyncTaskResultVO.getIsSubmitTask());
                collectPlatformAsyncTaskVO.setRemark("任务执行中。。。");
            } else {
                collectPlatformAsyncTaskVO.setStatus(ABANDON_EXECUTION.getMessage());
                collectPlatformAsyncTaskVO.setRemark("执行失败");
                collectPlatformAsyncTaskVO.setNoSubmitIllustrate(asyncTaskResultVO != null ? asyncTaskResultVO.getNoSubmitIllustrate() : null);
            }
            collectPlatformAsyncTaskVOS.add(collectPlatformAsyncTaskVO);

请求提交响应的结果

{
  "code": 200,
  "message": "请求成功",
  "data": [
    {
      "keyword": "超级用户_547af385-e6d1-4e5c-bb11-9ea6b4c84776",
      "isSubmitTask": true,
      "noSubmitIllustrate": null,
      "ztId": "010651001000",
      "ztName": "111广州市天河区员村街石东股份合作经济联社",
      "status": "执行中",
      "remark": "任务执行中。。。",
      "errorTables": null
    },
    {
      "keyword": null,
      "isSubmitTask": false,
      "noSubmitIllustrate": null,
      "ztId": "010651001001",
      "ztName": "广州市天河区员村街石东第一股份合作经济社",
      "status": "放弃执行",
      "remark": "该账套没有需要上传到廉政平台的报表",
      "errorTables": null
    }
  ]
}

执行任务打印的日志

2024-04-26 12:59:57.613  INFO 29332 --- [11-9ea6b4c84776] c.b.g.o.task.timely.AsyncTaskExecutor    : 当前任务keyword:[超级用户_547af385-e6d1-4e5c-bb11-9ea6b4c84776], 任务类型: [上传平台], 任务名称: [上传平台], 任务进度: [4%], 是否正在运行: []
2024-04-26 12:59:58.052 DEBUG 29332 --- [11-9ea6b4c84776] c.b.g.o.t.t.AsyncCollectPlatformExecutor : 任务类型: [上传平台], 任务步骤: , 结果: []
2024-04-26 12:59:58.052  INFO 29332 --- [11-9ea6b4c84776] c.b.g.o.task.timely.AsyncTaskExecutor    : 当前任务keyword:[超级用户_547af385-e6d1-4e5c-bb11-9ea6b4c84776], 任务类型: [上传平台], 任务名称: [上传平台], 任务进度: [8%], 是否正在运行: []
2024-04-26 12:59:58.052 DEBUG 29332 --- [11-9ea6b4c84776] c.b.g.o.t.t.AsyncCollectPlatformExecutor : 任务类型: [上传平台], 任务步骤: , 结果: []
2024-04-26 12:59:57.613  INFO 29332 --- [11-9ea6b4c84776] c.b.g.o.task.timely.AsyncTaskExecutor    : 当前任务keyword:[超级用户_547af385-e6d1-4e5c-bb11-9ea6b4c84776], 任务类型: [上传平台], 任务名称: [上传平台], 任务进度: [16%], 是否正在运行: []
从上面可以看出执行的过程的信息
2024-04-26 12:59:58.273  INFO 29332 --- [11-9ea6b4c84776] c.b.ghj.common.config.BizThreadPoolExt   : 业务线程-pool-monitor: 任务名称: 上传平台_超级用户_547af385-e6d1-4e5c-bb11-9ea6b4c84776, 任务耗时: 837 ms, 占用cpu时间: 31 ms, 占总cpu: 0.1 %, 初始线程数: 1, 核心线程数: 30, 执行的任务数量: 1, 已完成任务数量: 0, 任务总数: 1, 队列里缓存的任务数量: 0, 池中存在的最大线程数: 1, 最大允许的线程数: 64,  线程空闲时间: 60000, 线程池是否关闭: false, 线程池是否终止: false
可以看到任务在线程池中执行的情况

关闭线程池的打印日志

2024-04-26 13:12:09.390  INFO 29332 --- [extShutdownHook] c.b.ghj.common.config.BizThreadPoolExt   : 业务线程 关闭线程池, 已执行任务: 1, 正在执行任务: 0, 未执行任务数量: 0
2024-04-26 13:12:09.390  INFO 29332 --- [extShutdownHook] c.b.g.c.e.t.DynamicThreadPoolManager     : 线程池:[业务线程] 已经停止, 该线程池已经关闭并且没有还在运行的线程
2024-04-26 13:12:09.390  INFO 29332 --- [extShutdownHook] c.b.ghj.common.config.BizThreadPoolExt   : 定时任务线程 关闭线程池, 已执行任务: 1, 正在执行任务: 0, 未执行任务数量: 0
2024-04-26 13:12:09.390  INFO 29332 --- [extShutdownHook] c.b.g.c.e.t.DynamicThreadPoolManager     : 线程池:[定时任务线程] 已经停止, 该线程池已经关闭并且没有还在运行的线程

数据表的情况

select * from async_message where keyword = '超级用户_547af385-e6d1-4e5c-bb11-9ea6b4c84776' order by id desc
id	keyword	username	task_type	task_name	years	months	days	dist_no	dist_name	zt_id	zt_name	content_type	content_info	error_info	exec_status	exec_flag	run_flag	read_flag	exec_time	created_by	created_time	updated_by	updated_time
529	超级用户_547af385-e6d1-4e5c-bb11-9ea6b4c84776	超级用户	上传平台	上传平台	2024	2		010651001	石东	010651001000	111广州市天河区员村街石东股份合作经济联社	1	执行: 上传平台 失败!	上传到远端平台 - 无上传公开数据,结束操作!	-1	1	1	0	792	超级用户	2024-04-26 12:59:57.000	超级用户	2024-04-26 12:59:58.000

# 后续扩展

  1. 能否统一在一台服务器上做跨平台业务处理, 类似支付宝支付的流程, 调接口传递对应的执行指令,执行完在调用回调接口响应结果。

# 总结

以上是项目的一些核心功能,因为上面提到的功能还可以继续封装,不仅限于在该项目使用。

上面的功能有些点可以优化:

  • 可以配置动态数据源的配置可以放到配置中心,或者放到数据库,项目启动的时候加载

  • 可以动态更新线程池的状态(更新线程不会影响正在执行的线程)

  • 异步任务处理器可以抽象成接口,供其他项目调用

  • 如果是集群环境下需要优化异步任务锁的方式

# 参考资料