管道-过滤器模式的体系结构是面向数据流的软件体系结构。它最典型的应用是在编译系统。一个普通的编译系统包括词法分析器,语法分析器,语义分析与中间代码生成器,优化器,目标代码生成器等一系列对源程序进行处理的过程。人们可以将编译系统看作一系列过滤器的连接体,按照管道-过滤器的体系结构进行设计。此外,这种体系结构在其它一些领域也有广泛的应用。因此它成为软件工程和软件开发中的一个突出的研究领域。

简单点的说,管道模式,整个控制的结构类似一个单链表,上个步骤将数据处理后,传递到下一个步骤进行处理,直到全部步骤处理完毕,所以需要提前将步骤处理顺序维护到链上。
管道模式是责任链模式的一种,但是不同责任链模式,责任链模式会将数据在在链上传递,直到有一个适合的一个处理器用于处理数据。而管道模式,每个处理器都会处理数据。

需求背景

需要将设备基础信息表中未进行检测的数据,按照类型划分进行数据的规范格式校验,每个类型的处理都会剔除不符合其类型规范的数据,所有的类型步骤处理后,剩余的数据就是合格的数据。于是两个问题

  1. 如何校验数据?
  2. 怎么将类型的处理整合到一起?

校验数据可以采用了模板方法模式进行处理;由于每个步骤都会进行数据处理,结束传递给下个步骤,采用管道模式比较合适

流程图

image.png

校验数据

这次我们主要讲管道模式,关于校验数据这块,就简单的提一下,校验这块使用使用了javax.validation标签,在类的属性上使用标签进行处理,由于存在分组groups,所以定义了一个接口

public interface IExcValidEntity<T> {
    /**
     * IExcValidEntity::getValidateGroups
     * <p>TO:返回当前参数需要校验的参数的组
     * <p>HISTORY: 2020/12/28 liuha : Created.
     *
     * @param t 待校验的参数
     * @return Class<? extends Default> 参数校验的分组
     */
    Class<? extends Default>[] getValidateGroups(T t);

    /**
     * IExcValidEntity:: customValidate
     * <p>TO:自定义的校验参数的方法
     * <p>HISTORY: 2020/12/28 liuha : Created.
     *
     * @param t 待校验的参数
     * @return Map   key 校验的数字字段 value 提示语
     */
    Map<String, String> customValidate(T t);
}

这里Default指的是javax.validation.groups.Default;,如果自定义的校验分组class不继承Default类,会在校验数据的时,默认不校验无分组的属性,如果validation不能解决的校验,则需要自定义校验

校验的工具类

public class ValidationUtil {

    private static Validator VALIDATOR;

    private static Validator getValidatorOrCreate() {
        if (VALIDATOR == null) {
            synchronized (ValidationUtil.class) {
                // Init the validation
                VALIDATOR = Validation.buildDefaultValidatorFactory().getValidator();
            }
        }

        return VALIDATOR;
    }
    public static void validateGroups(Object obj, Class<?>... groups) {

        Validator validator = getValidatorOrCreate();

        // Validate the object
        Set<ConstraintViolation<Object>> constraintViolations = validator.validate(obj, groups);

        if (!CollectionUtils.isEmpty(constraintViolations)) {
            // If contain some errors then throw constraint violation exception
            throw new ConstraintViolationException(constraintViolations);
        }
    }


    public static Map<String, String> validate(Object obj, Class<?>... groups) {
        Validator validator = getValidatorOrCreate();
        Set<ConstraintViolation<Object>> constraintViolations=null;
        if(groups.length>0){
            constraintViolations = validator.validate(obj,groups);
        }else {
            //不存在分组校验,默认分组
            constraintViolations = validator.validate(obj, Default.class);
        }
        if (CollectionUtils.isEmpty(constraintViolations)) {
            return Collections.emptyMap();
        }
        Map<String, String> errMap = new HashMap<>(16);
        for (ConstraintViolation constraintViolation : constraintViolations) {
            errMap.put(String.valueOf(constraintViolation.getPropertyPath()), constraintViolation.getMessage());
        }
        return errMap;
    }

}

获取校验结果

public class CommonValidEntity<T> {


    @TableField(exist = false)
    private IExcValidEntity handler;


    /**
     * CommonValidEntity:: buildValidEntity
     * <p>TO:获取构建校验参数的实现类
     * <p>HISTORY: 2020/12/28 liuha : Created.
     *
     * @param clazz 实现类
     * @return CommonValidEntity  返回当前的对象
     */
    public CommonValidEntity buildValidEntity(Class<? extends IExcValidEntity> clazz) {
        handler = SpringContextUtils.getBean(clazz);

        if (handler == null) {
            throw new JeecgBootException("未找到实现类");
        }
        return this;
    }

    public Map<String, String> getValidateResult() {
        T date = (T) this;
        Map<String, String> validate = new HashMap();

        //如果存在自定义的实现类,则调用查询的类
        if (handler != null) {
            Class[] validateGroups = handler.getValidateGroups(date);
            validate.putAll(ValidationUtil.validate(date, validateGroups));
            //自定义的查询

            Map<String, String> customValidate = handler.customValidate(date);
            if (customValidate!=null && customValidate.size() > 0) {
                validate.putAll(customValidate);
            }
        } else {
            //默认的查询
            validate = ValidationUtil.validate(date);
        }
        return validate;
    }
    }

存在分组或者自定义校验类型,实现了IExcValidEntity接口,由于代码类似,我定义了一个公共的类,校验的entity继承这个类即可,由于java的单继承特性,所以IExcValidEntity定义为接口,而不是抽象类
image.png
标签和分组的使用
image.png
校验的代码

/**
 * @data 校验的数据,类继承了CommonValidEntit类
 * @param buildValidEntitySpeed 实现IExcValidEntity的类.
 */
Map<String, String> validateResult =data.buildValidEntity(buildValidEntity).getValidateResult();

后面由于处理方法代码类似,抽出代码为一个公共的类

@Component
@Slf4j
public class TjExcSpeeqpValid<T extends CommonValidEntity, D> {
    @Autowired
    private ITjExcSpeeqpService tjExcSpeeqpService;


    /**
     * TjExcSpeeqpValid:: buildValid
     * <p>TO:实现buildValidEntity的类的调用的校验方法
     * <p>HISTORY: 2021/1/9 liuha : Created.
     * @param records 管道中传递数据,需要剔除掉不合格的数据的集合
     * @param dataList 需要校验的数据
     * @param buildValidEntity 实现类
     */
    public void buildValid(List<TjExcSpeeqp> records, List<T> dataList,
                           Class<? extends IExcValidEntity<T>> buildValidEntity) {
        if (dataList.size() == 0) {
            return;
        }

            for (T data : dataList) {
                Map<String, String> validateResult = data.buildValidEntity(buildValidEntity).getValidateResult();
                //存在校验不通过
                if (validateResult.size() > 0) {

                }
            }
        //TODO 移出不合格的数据
        // TODO更新不合格的数据结果

    }

    /**
     * TjExcSpeeqpValid:: valid
     * <p>TO:不存在分组和自定义校验的调用方法
     * <p>HISTORY: 2021/1/9 liuha : Created.
     * @param records 管道中传递数据,提出不合格的数据的集合
     * @param dataList 需要校验的数据,类未实现buildValidEntity接口
     */
    public void valid(List<TjExcSpeeqp> records, List<D> dataList) {
        if (dataList.size() == 0) {
            return;
        }
            for (D data : dataList) {
                Map<String, String> validateResult = ValidationUtil.validate(data);
                //存在校验不通过
                if (validateResult.size() > 0) {
                }
            }

        //TODO 移出不合格的数据
        // TODO更新不合格的数据结果
    }


}

使用管道模式

定义处理步骤

public abstract class BaseValidTjExcHandler<T> {

    /**
     * BaseValidTjExcHandler:: handleRequest
     * <p>TO:处理请求
     * <p>HISTORY: 2020/12/31 liuha : Created.
     *
     * @param records 需要处理的数据
     */
    public abstract void handleData(List<T> records);

}

这里先定义一个抽象类,每个类型处理步骤都需要实现这个类的handleData方法,那怎么把步骤放到一个链上呢?这里有两种思路,

  1. 讲处理步骤的类放到数组或者List,然后遍历,依次处理数据
  2. 基础spring实现管道模式
    由于使用框架基于spring boot,既然可以让框架帮忙,何乐而不为?可以参照基于 Spring 实现管道模式的最佳实践

路由配置

@Configuration
public class PipelineRouteConfig  implements ApplicationContextAware {

    private ApplicationContext appContext;
    /**
     * 数据类型->管道中处理器类型列表 的路由
     */
    public static final
    Map<Class,
            List<Class<? extends BaseValidTjExcHandler>>>
            PIPELINE_ROUTE_MAP = new HashMap<>(8);


    /*
     * 在这里配置各种上下文类型对应的处理管道:键类型,值为处理器类型的列表
     */
    static {
        PIPELINE_ROUTE_MAP.put(TjExcSpeeqp.class,
                Arrays.asList(
                        TjExcSpeeqpHandler.class,
                        TjExcBoilerTecParameterHandler.class,
                        TjExcContainerTecParameterHandler.class,
                        TjExcLiftTecParameterHandler.class,
                        TjExcVehicleTecParameterHandler.class,
                        TjExcHoistTecParameterHandler.class,
                        TjExcPenstockTecParameterHandler.class
                ));
    }

    /**
     * 在 Spring 启动时,根据路由表生成对应的管道映射关系
     */
    @Bean("pipelineRouteMap")
    public Map<Class,
            List<? extends BaseValidTjExcHandler>> getHandlerPipelineMap() { return PIPELINE_ROUTE_MAP.entrySet()
                .stream()
                .collect(Collectors.toMap(Map.Entry::getKey, this::toPipeline));
    }

    /**
     * 根据给定的管道中 ContextHandler 的类型的列表,构建管道
     */
    private List<? extends BaseValidTjExcHandler> toPipeline(
            Map.Entry<Class, List<Class<? extends BaseValidTjExcHandler
                    >>> entry) {
        return entry.getValue()
                .stream()
                .map(appContext::getBean)
                .collect(Collectors.toList());
    }



    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        appContext = applicationContext;
    }
}

基于Spring的ApplicationContext ,可以很方便的构建管道的任务链,这里需要注意,如果想让处理链按照特定的顺序加载,在需要不同BaseValidTjExcHandler实现类上添加@Order()标签

管道执行

@Component
@Slf4j
public class PipelineExecutor {
    @Autowired
    TjExcSpeeqpValid excSpeeqpValid;
    /**
     * 引用 PipelineRouteConfig 中的 pipelineRouteMap
     */
    @Resource
    private Map<Class,
                List<? extends BaseValidTjExcHandler>> pipelineRouteMap;

    /**
     * 同步处理输入的上下文数据<br/>
     * 如果处理时上下文数据流通到最后一个处理器且最后一个处理器返回 true,则返回 true,否则返回 false
     *
     * @return 处理过程中管道是否畅通,畅通返回 true,不畅通返回 false
     */
    public boolean acceptSync(Class type,List<TjExcSpeeqp> records) {
        Objects.requireNonNull(records, "上下文数据不能为 null");
        // 拿到数据类型
        Class  dataType = type;
        // 获取数据处理管道
        List<? extends BaseValidTjExcHandler> pipeline = pipelineRouteMap.get(dataType);

        if (CollectionUtils.isEmpty(pipeline)) {
            log.error("{} 的管道为空", dataType.getSimpleName());
            return false;
        }

        // 管道是否畅通
        boolean lastSuccess = true;

        for (BaseValidTjExcHandler handler : pipeline) {
            try {
                // 待处理数据长度如果为0,则不处理数据
                if(records.size()>0){
                    handler.handleData(records);
                }
            } catch (Throwable ex) {
                lastSuccess = false;
                log.error("处理异常,handler={}",handler.getClass().getSimpleName());
            }
            // 不再向下处理
            if (!lastSuccess) { break; }
        }

        if (lastSuccess && records.size() > 0) {
         //处理正确的数据结果
        }
        return lastSuccess;
    }
}

最后调用

再实现了模板方法校验数据、管道模式处理数据后,最后调用时就只有一行代码,果然冰山一角

@Autowired
PipelineExecutor pipelineExecutor;

pipelineExecutor.acceptSync(TjExcSpeeqp.class,records);