功能设计:设计一个数据同步的方案(p6spy+注解+AOP)

这个方案只完成了一个简单的demo,估计完成了80%后,由于方案的修改,这个部分功能也就作废了,打算写篇文章记录一下,大致的思路

用到的两个知识点:

P6Spy是一个可以用来在应用程序中拦截和修改数据操作语句的开源框架。 通过P6Spy我们可以对SQL语句进行拦截,相当于一个SQL语句的记录器,这样我们可以用它来作相关的分析

Java 注解(Annotation)又称 Java 标注,是 JDK5.0 引入的一种注释机制。

Java 语言中的类、方法、变量、参数和包等都可以被标注。和 Javadoc 不同,Java 标注可以通过反射获取标注内容。在编译器生成类文件时,标注可以被嵌入到字节码中。Java 虚拟机可以保留标注内容,在运行时可以获取到标注内容 。 当然它也支持自定义 Java 标注。

需求背景

平台A和平台B是两个完全一样的平台,由于最终业务的处理只能在平台A,所以可以理解为A相当于服务端,平台B相当于客户端。客户可以在B发起申请,在A进行审批,然后返回给B平台,同时平台A需要备份B的全部数据。一开始的相互通过接口,但是不好的事情,两个平台的服务器不能互通,只能通过接口代理的方式,同时接口数量有限制,所以,设计了一个数据同步的方案。

  • B平台将全部的执行的Sql语句同步到平台A,
  • A将部分属于平台B的数据的更新Sql,返回给平台B

获取执行的语句这部分,由于mybatis的拦截器,并不能直接获取到执行的sql语句,选用了P6Spy框架,获取最终执行的SQL语句;

由于是同步数据更新的返回,要是通过使用P6Spy的方式,并不能知道当前的更新语句所属平台,一开始打算使用数据归属字段(dataBy)进行关键词正则匹配,但是数据的部分更新(不更新dataBy字段),依旧没法获取属于平台,这里采用了注解+AOP,使用自定义注解+部分mybatisplus注解,组装查询归属语句,以及最后的更新语句

前提条件

  1. 数据库业务表均有data_by字段,在插入数据库时拦截,插入平台的的唯一的code
  2. 两个平台的业务表结构类似,属于B平台是A平台的子集

流程图

数据上传
image.png
数据更新

image.png

功能设计

这部分完成了拦截器的配置,数据上传偏差纠正机制

数据上传

  1. 要求将全部的DML语句(除查询语句)
  2. 简单的数据数据偏差纠正机制

拦截语句

引入jar包

<!-- https://mvnrepository.com/artifact/p6spy/p6spy -->
        <dependency>
            <groupId>p6spy</groupId>
            <artifactId>p6spy</artifactId>
            <version>3.7.0</version>
        </dependency>

配置文件

spy.properties 放到resources目录下

# 使用日志系统记录 sql
appender=com.p6spy.engine.spy.appender.Slf4JLogger
# 自定义日志打印
logMessageFormat=org.jeecg.config.init.P6spySqlFormatConfigure
# 是否开启慢 SQL记录
outagedetection=true
# 慢 SQL记录标准 2 秒
outagedetectioninterval=2
# 开启过滤
filter=true
# 包含 QRTZ的不打印
exclude=QRTZ,select 1

拦截器

@Slf4j
@Configuration
public class P6spySqlFormatConfigure implements MessageFormattingStrategy {
    private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");


    @Override
    public String formatMessage(int connectionId, String now, long elapsed, String category, String prepared, String sql) {
       //Todo 异步执行
     	 //筛选需要同步sql语句
       //插入数据到同步的表中
      //打印语句
      System.out.println(sql);
      return !"".equals(sql.trim()) ?
                this.format.format(new Date())
                        + " | took " + elapsed+ "ms | " + category
                        + " | connection " + connectionId + "\n " + sql + ";" : "";
    }

拦截效果
image20210701234047072.png

数据纠正

由于两个平台的关联性不大,纠正的思路大致是这样的

  1. 平台B会将全部的需要上传sql语句保存在数据库,数据库为自增主键。
  2. 平台B上传执行语句和上传库的递增主键,平台A会记录B当前的上传记录的递增主键,并且语句执行完成,返回当前的递增主键给B
  3. B平台对比返回的主键和当前上传的主键,如何一致,则继续上传下条数据,如果不一致,跳到返回的主键位置上传数据

SyncAcceptInfoDTO.java

@Data
public class SyncAcceptInfoDTO {

    @NotBlank(message = "执行语句的id不能为空")
    private String source;

    /**
     * 执行语句
     */
    private String sqlExecuted;

    /**
     * 执行语句的id
     */
    @NotNull(message = "执行语句的id不能为空")
    private Integer executedId;
}

由于是demo,所以代码还是有点混乱

public Integer accept(SyncAcceptInfoDTO syncAcceptInfo) {

        Integer executedId = syncAcceptInfo.getExecutedId();

        String source = syncAcceptInfo.getSource();
        String executeSql = syncAcceptInfo.getSqlExecuted();
        //获取当前执行的缓冲类型的执行id 是多少
        Integer lastExecutedId = (Integer) redisUtil.get(source);
        /* 如果 lastExecutedId 为空,直接执行当前的语句  返回 executedId
           =1 下条执行的语句 返回executedId
           =0 语句已经执行返回 返回 lastExecutedId
           >1 语句未执行  返回 lastExecutedId
           <0 语句已执行  返回 lastExecutedId
         */
        if (lastExecutedId == null) {
            //查询当前库中记录的上传位置
            LambdaQueryWrapper<SyncSqlRecord> queryWrapper = new LambdaQueryWrapper();
            queryWrapper.eq(SyncSqlRecord::getSource, source);
            SyncSqlRecord syncSqlRecord = syncSqlRecordService.getOne(queryWrapper);
            if (syncSqlRecord != null) {
                lastExecutedId = Integer.parseInt(syncSqlRecord.getExecutedId());
            } else {
                //执行sql语句
                executeSql(executeSql, source);
                //存入配置信息
                SyncSqlRecord newSyncSqlRecord = new SyncSqlRecord();
                newSyncSqlRecord.setSource(source);
                newSyncSqlRecord.setExecutedId(String.valueOf(executedId));
                syncSqlRecordService.save(newSyncSqlRecord);
                ///配置缓冲
                redisUtil.set(source, executedId);
                return executedId;
            }
        }
        if (executedId - lastExecutedId == 1) {
            executeSql(executeSql, source);
            UpdateWrapper<SyncSqlRecord> updateWrapper = new UpdateWrapper();
            updateWrapper.set("executed_id", executedId);
            updateWrapper.eq("source", source);
            syncSqlRecordService.update(updateWrapper);
            redisUtil.set(source, executedId);
            return executedId;
        } else {
            //返回当前的之前的sql
            return lastExecutedId;
        }
    }


		/**
     * SyncSqlAcceptServiceImpl:: executeSql
     * <p>TO:执行同步的sql语句
     * <p>HISTORY: 2021/6/30 liuhao : Created.
     *
     * @param executeSql 执行的语句
     * @param source     数据来源
     */
    private void executeSql(String executeSql, String source) {
        SyncSqlAccept syncSqlAccept = new SyncSqlAccept();
        syncSqlAccept.setSource(source);
        syncSqlAccept.setSqlExecuted(executeSql);
        try {
            jdbcTemplate.execute(executeSql);
            syncSqlAccept.setResults("success");
        } catch (DataAccessException exception) {
            log.info("执行的语句异常为:{}", exception);
            syncSqlAccept.setResults(exception.getMessage());
            throw exception;
        } finally {
          	//保存执行的记录
            this.save(syncSqlAccept);
        }
    }

数据更新

  1. 通过注解,获取到需要更新的对象
  2. 使用mybatisplus的注解获取到主键对应字段和表名,组装s q l 语句

之前也提到了,无法通过执行的sql语句知道当前的对象是否需要推向给B平台的数据,所以通过注解的方式获取需要推送给B平台的数据

使用了两个注解

  1. 一个是方法上的注解,表名该方法需要将数据的推送对应平台
  2. 一个是参数上的注解,表明该参数是更新数据的来源

为什么使用两个注解,由于使用的@annotation只能作用于方法上才有效果,如果使用@Pointcut()写入全部包的切点,则会导致每个方法都能进入切点。所以使用了方法级的注解,先进入我们的切面,然后查找参数级的注解

方法级的注解

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SyncSql {
}

参数级的注解

@Target({ElementType.PARAMETER})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SyncSqlEntity {
    SyncSqlType value();
}

public enum SyncSqlType {
    /**
     * 更新类型的同步
     */
    UPDATE,
    /**
     * 删除类型的同步
     */
    DELETE,
}

切面的方法

@Aspect
@Component
@Slf4j
public class SyncSqlAspect {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Value("${sys.dataBy}")
    private String dataBy;

    @Pointcut("@annotation(org.jeecg.common.aspect.annotation.SyncSql)")
    public void SyncSqCut() {

    }

    @After("SyncSqCut()")
    public void AfterSyncSql(JoinPoint joinPoint) {
        Object[] args = joinPoint.getArgs();
        if (args.length == 0) {
            return;
        }
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();

        //参数注解,1维是参数,2维是注解
        Annotation[][] annotations = method.getParameterAnnotations();
        for (int i = 0; i < annotations.length; i++) {
            Object param = args[i];
            Annotation[] paramAnn = annotations[i];
            //参数为空,直接下一个参数
            if (param == null || paramAnn.length == 0) {
                continue;
            }

            for (Annotation annotation : paramAnn) {
                //这里判断当前注解是否为SyncSqlEntity.class
                if (annotation.annotationType().equals(SyncSqlEntity.class)) {
                    //校验该参数,验证一次退出该注解
                    log.info("请求参数:" + param);
                    //获取到表名

                    if (param.getClass().getAnnotation(TableName.class) != null) {
                      
                        String tableName = param.getClass().getAnnotation(TableName.class).value();
                        log.info("表名:" + tableName);
												
                      	//转成JSONObject 便于获取值
                        ObjectMapper mapper = new ObjectMapper();
                        String json = "{}";
                        try {
                            //解决@JsonFormat注解解析不了的问题详见SysAnnouncement类的@JsonFormat
                            json = mapper.writeValueAsString(param);
                        } catch (JsonProcessingException e) {
                            log.error("json解析失败" + e.getMessage(), e);
                        }
                        JSONObject item = JSONObject.parseObject(json);
                      
                        //获取到主键
                        Field tableId = null;
                        for (Field field : oConvertUtils.getAllFields(param)) {

                            //存在主键
                            if (field.getAnnotation(TableId.class) != null) {
                                log.info("主键的属性:" + field.getName());
                                tableId = field;
                                break;
                            }
                        }
                        //获取对应的主键的值
                        if (tableId != null) {

                            String fieldIdName = tableId.getName();
                            String key = String.valueOf(item.get(tableId.getName()));
                            String sql = "select  data_by  from " + tableName + " where " + 			oConvertUtils.camelToUnderline(fieldIdName) + "= '" + key + "' ";
                            log.info("查询语句:" + sql);

                            Map<String, Object> resultMap;
                            try {
                                resultMap = jdbcTemplate.queryForMap(sql);
                                log.info("查询的结果为:", JSON.toJSON(resultMap));
                            } catch (DataAccessException exception) {
                                throw new JeecgBootException("查询同步字段异常" + exception);
                            }
                            //数据来源
                            String data_by = (String) resultMap.get("data_by");
                            log.info("data_by数据归属:" + data_by);
                            //非本地的数据,需要进行同步的操作
                            if (!dataBy.equalsIgnoreCase(data_by)) {
                                //组装语句为更新语句
                                String update = createUpdate(item, tableName, fieldIdName, key);
                                log.info("更新语句:" + update);
                            }
                        }
                    }
                    break;
                }
            }
        }
    }




    public String createUpdate(JSONObject item , String tableName, String fieldIdName, String fieldIdValue) {
        StringBuilder stringBuilder =new StringBuilder();
        stringBuilder.append("update ").append(tableName).append(" set ");

        final StringBuilder finalStringBuilder = stringBuilder;
        item.forEach((k, v)->{
            if (v != null && !fieldIdName.equalsIgnoreCase(k)) {
              //驼峰转成下划线
                finalStringBuilder.append(oConvertUtils.camelToUnderline(k)).append(" = ");
                if(v instanceof String){
                    finalStringBuilder.append("'").append(v).append("'");
                }else {
                    finalStringBuilder.append(v);
                }
                finalStringBuilder.append(",");
            }
        });
        stringBuilder=finalStringBuilder.deleteCharAt(stringBuilder.length()-1);
        stringBuilder.append(" where ").append("  ").append(fieldIdName).append(" = ").append("'").append(fieldIdValue).append("'");
        return stringBuilder.toString();

    }

}



@Slf4j
public class oConvertUtils {
/**
	 * 将驼峰命名转化成下划线
	 * @param para
	 * @return
	 */
	public static String camelToUnderline(String para){
        if(para.length()<3){
        	return para.toLowerCase(); 
        }
        StringBuilder sb=new StringBuilder(para);
        int temp=0;//定位
        //从第三个字符开始 避免命名不规范 
        for(int i=2;i<para.length();i++){
            if(Character.isUpperCase(para.charAt(i))){
                sb.insert(i+temp, "_");
                temp+=1;
            }
        }
        return sb.toString().toLowerCase(); 
	}
}

这里使用到了mybatisplus的@TableName@TableId 来寻找model对应的表和主键

总结

由于方案最后被放弃了,所以只完成了一个demo,接口的代理部分,还没来得及完成;基本上大体的功能也差不多完成,方案上也有存在问题,在后面具体实现上也会暴露出来。不过,对注解的使用上,也有了更深的理解,在后面使用使用AOP开发上也有很大的帮忙,减少对现有的代码的改造

最近的文章

项目工具:摆脱环境依赖缺失的问题(docker部署redis)

问题项目需要在内网(不联网)进行部署我们的后端服务,框架涉及使用redis做成缓存,由于内网的系统为centos7,我们在部署的时候,遇到了编译redis6缺少环境的文件问题,后来把环境包导入内网环境后,还是存在各种依赖的缺失问题。使用了之前其他服务器部署的解决方案,虽然能解决掉服务的部署问题,但是…

继续阅读
更早的文章

每日题解:LeetCode 897. 递增顺序搜索树

题目地址题目描述给你一棵二叉搜索树,请你 按中序遍历 将其重新排列为一棵递增顺序搜索树,使树中最左边的节点成为树的根节点,并且每个节点没有左子节点,只有一个右子节点。示例 1:输入:root = [5,3,6,2,4,null,8,1,null,null,null,7,9]输出:[1,null,2,…

继续阅读