# 采集项目
# 简介
数据采集主要是将各区的数据推送到市平台,中间可能争对不同地区的推送做不同的处理。分接收端和发送端,接收端部署在市平台,发送端在各个地区。
# 项目描述
项目名 | 说明 | 架构 |
---|---|---|
cwgk-data-collect-client | 财务数据采集的客户端 | springboot+mybatis+springmvc |
cwgk-data-collect-service | 财务数据采集的服务端 | springboot+mybatis+springmvc |
# 功能点
- 推送主表记录到市进行数据校验,校验不通过后在区进行记录和日志的报错,进行时间重置等待下次重试推送
- 校验通过后封装主表子表数据通过网络传输到市,进行保存
- 对推送返回的结果进行处理
- 重试机制(对应推送失败的数据会自动重试,重试的次数,重试的数据间隔)
# 遇到的问题点和处理方式
# 问题
需要考虑结果是否正常的保存,如果没有正常保存如何做补偿 (可能是网络异常或返回结果保存的时候的错误)
从中大的库将数据转入到中间库,不同库之间如何保证事务
# 解决:
对于第一点这里的处理方式是 在查询出要推送的数据中加一个锁标识, 推送前先加锁,成功返回后在更新锁的状态,推送前校验成功返回的结果写入则不回滚,加锁失败则回滚,这样就不会重新校验
// 设置事务回滚点 如果下面的加锁失败直接回滚到当前点
lockDataSavePoint = TransactionAspectSupport.currentTransactionStatus().createSavepoint();
// 给要操作的数据加锁标识,其实就是更新列表数据的标识 锁住数据是为 1
lockData(subCollectDataList);
logger.info("============数据加锁成功!=====================");
// 对数据进行推送
....
// 对推送的每条结果进行循环判断, 处理完成后解锁更新
// 数据操作成功解锁锁标志
temp.setLocked(0);
update(temp);
使用事务管理队列,当队列中的所有事务提交成功最后才提交 否则就回滚 (需要注意先进先出的方式提交事务)
//JdbcTemplate 事务包装,实现多个 JdbcTemplate 的事务一致性
@Slf4j
public class JdbcTemplateTransactionWrap {
/**
* 必须使用 Stack
*/
private final Deque<Operate> operates = new ArrayDeque<>();
public void addOperate(JdbcTemplate jdbcTemplate, List<String> sql) {
Operate operate = new Operate(jdbcTemplate, sql);
operate.startTransaction();
this.operates.push(operate);
}
public void commit() {
while (!operates.isEmpty()) {
operates.pop().commit();
}
}
public void execute() {
operates.forEach(Operate::execute);
}
public List<Object> executeCallback() {
return operates.stream().map(Operate::executeCallback).collect(Collectors.toList());
}
public void rollback() {
while (!operates.isEmpty()) {
try {
operates.pop().rollback();
} catch (Exception e) {
log.error("JdbcTemplateTransactionWrap rollback error !", e);
}
}
}
@RequiredArgsConstructor
private static class Operate {
private final List<String> sql;
private final JdbcTemplate jdbcTemplate;
private final Function<JdbcTemplate, Object> callback;
private PlatformTransactionManager transactionManager;
private TransactionStatus transactionStatus;
public Operate(JdbcTemplate jdbcTemplate, List<String> sql) {
this(sql, jdbcTemplate, null);
}
public Operate(JdbcTemplate jdbcTemplate, Function<JdbcTemplate, Object> callback) {
this(null, jdbcTemplate, callback);
}
public void startTransaction() {
this.transactionManager = new DataSourceTransactionManager(jdbcTemplate.getDataSource());
this.transactionStatus = this.transactionManager.getTransaction(new DefaultTransactionDefinition());
}
public void execute() {
Objects.requireNonNull(sql);
sql.forEach(jdbcTemplate::execute);
}
public Object executeCallback() {
Objects.requireNonNull(callback);
return callback.apply(this.jdbcTemplate);
}
public void commit() {
if(transactionManager != null && transactionStatus != null) {
this.transactionManager.commit(this.transactionStatus);
}
transactionManager = null;
transactionStatus = null;
}
public void rollback() {
if(transactionManager != null && transactionStatus != null) {
this.transactionManager.rollback(this.transactionStatus);
}
transactionManager = null;
transactionStatus = null;
}
}
}
JdbcTemplateTransactionWrap jdbcTemplateTransactionWrap = new JdbcTemplateTransactionWrap();
// 使用
try{
jdbcTemplateTransactionWrap.addOperate(jdbcTemplateOne,sqlsOne);
jdbcTemplateTransactionWrap.addOperate(jdbcTemplateTwo,sqlsTwo);
jdbcTemplateTransactionWrap.execute();
jdbcTemplateTransactionWrap.commit();
}catch (Exception e){
jdbcTemplateTransactionWrap.rollback();
throw new RuntimeException("数据同步失败,请检查" + e.getMessage());
}