设计初衷 为了解决业务服务较为复杂多层try嵌套的业务逻辑

假如有个业务场景:

第三方会同步一批数据到我方系统,我方使用mq异步消费来解耦,一次消费会同时消费100条数据,其中可能存在一些未知异常所以数据之间不能互相影响。

业务中会调用如下四个方法

public Demo insert(Demo demo); // 数据库插入一条数据
public Void restApi(Demo demo); // 调用其他微服务
public Void restApiNoCare(Demo demo);  // 调用其他微服务允许失败
public Void unsafe();  // 一段有几率报错的代码
@Data
    public static class Demo {
        private String name;
        private Integer code;
    }

下面是原始版写法

public void execute(List<Demo> demos) { // 模拟mq 的回调
        // 最外层会有一次try 防止 无限重试,导致mq阻塞 无法消费到后续消息
        try {
            unsafe(); //一个会发生异常的方法
            demos.stream().map(
                    i -> {
                        try { // 此处try是为了不让一条数据影响整个list的执行
                            restApi(i);
                            insert(i);
                            try {
                                restApiNoCare(i);
                            } catch (Throwable throwable) {
                                // 这是一个不重要的方法允许失败 不应该影响任何主业务
                            }
                            return i;
                        } catch (Throwable t) {
                            // 给系统告警服务发送失败信息
                        }
                        return null;
                    }).collect(Collectors.toList());
        } catch (Throwable t) {
        }
    }
try原因
第一次try防止 无限重试,导致mq阻塞 无法消费到后续消息
第二次try当一条数据出现问题 不应该影响其他数据
第三次try因为方法不重要,不应该影响主流程

如上try 进行了三次嵌套 显然代码是合法的,但可读性很差

解决思路扁平化try 处理,方法调用时传递表达式 解耦逻辑,使代码逻辑清晰

public void executeByTryStream(List<Demo> demos) {
        var trys = Trys.apply(() -> {
            var tryList =
                    Trys.TryStream.of(demos, i -> {
                        restApi(i);
                        insert(i);
                        return Trys.apply(() -> restApiNoCare(i), i);
                    });
            //  过滤出所有异常的数据
            tryList.partErrorDO((r, err) -> {
                err.printStackTrace();
                // 给系统告警服务发送失败信息
            });
            return true;
        }, demos);
        if (trys.isFail()) {
            trys.getThrowable().printStackTrace();
        }
    }
method-name解释入参返回值
Trys.apply执行表达式当成功返回Data当失败返回ExceptionSupplier<T>Trys对象
TryStream.of以流式并行或串行执行Trys中的方法Stream<Trys>TryStream,其中包含List<Trys>

具体代码如下

@Builder(toBuilder = true)
@Data
@NoArgsConstructor
@AllArgsConstructor
@Slf4j
public class Trys<R, T> {
    /**
     * 失败不会抛出异常而是把异常作为返回值返回
     */
    private Throwable throwable;

    /**
     * 处理成功后的返回值
     */
    private T t;
    /**
     * 元数据
     */
    private R resource;
    public Boolean isSuccess() {
        return throwable == null;
    }
    public Boolean isFail() {
        return throwable != null;
    }
    /**
     * @param supplier 具体执行的函数式
     * @param r        元数据
     * @param <R>      元数据类型
     * @param <T>      返回值类型
     * @return
     */
    public static <R, T> Trys<R, T> apply(Supplier<T> supplier, R r) {
        try {
            return Trys.<R, T>builder().t(supplier.get()).resource(r).build();
        } catch (Throwable throwable) {
            log.info("Trys has error {}", throwable.getMessage());
            return Trys.<R, T>builder().throwable(throwable).resource(r).build();
        }
    }
    /**
     * 多个trys 组合为一个TryStream
     *
     * @param <R> 同Trys
     * @param <T> 同Trys
     */
    @Data
    @Builder(toBuilder = true)
    public static class TryStream<R, T> {
        private List<Trys<R, T>> trys;
        
        public static <R, T> TryStream<R, T> of(Stream<Trys<R, T>> list) {
            return TryStream.<R, T>builder().trys(list.collect(Collectors.toList())).build();
        }

        public static <R, T> TryStream<R, T> of(List<R> list, Function<R,T> function) {
         var trys = list.stream().map(i -> Trys.apply(
                            () -> function.apply(i),i)).collect(Collectors.toList());
            return TryStream.<R, T>builder().trys(trys).build();
        }

        /**
         * 返回所有成功的trys
         *
         * @return
         */
        public List<T> allSuccess() {
            return trys.stream().filter(Trys::isSuccess).map(Trys::getT).collect(Collectors.toList());
        }

        /**
         * 对于异常数据执行某些特殊操作 如 打印异常或执行补偿动作等
         *
         * @param biConsumer R 代表初期话构建的资源内容 Throwable代表具体处理时发生的异常
         */
        public TryStream<R, T> partErrorDO(BiConsumer<R, Throwable> biConsumer) {
            trys.stream().filter(Trys::isFail)
                    .forEach(i -> biConsumer.accept(i.resource, i.throwable));
            return this;
        }

        /**
         * 打印基础日志
         */
        public TryStream<R, T> logInfo() {
            log.info("try Stream done success total:{},fail total:{}", trys.stream().filter(Trys::isSuccess).count(), trys.stream().filter(Trys::isFail).count());
            return this;
        }
    }
}

扩展功能

  1. 异步处理并聚合提升性能
  2. 由于返回了完整的Resource 以及响应 以及 异常 可以灵活的filter 异常|返回值 执行指定动作,这是try catch 之流完全做不到的事情。

未完待续

文章作者: 幽林萌逐
版权声明: 本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 幽林萌逐的blog
java java fp 函数式编程 解耦 原创
喜欢就支持一下吧