java 之自定义 扁平化 try catch 函数式工具类
设计初衷 为了解决业务服务较为复杂多层try嵌套的业务逻辑
假如有个业务场景:
第三方会同步一批数据到我方系统,我方使用mq异步消费来解耦,一次消费会同时消费100条数据,其中可能存在一些未知异常所以数据之间不能互相影响。
业务中会调用如下四个方法
public Demo insert(Demo demo); // 数据库插入一条数据
public Void restApi(Demo demo); // 调用其他微服务
public Void restApiNoCare(Demo demo); // 调用其他微服务允许失败
public Void unsafe(); // 一段有几率报错的代码
@Data
public static class Demo {
private String name;
private Integer code;
}
下面是原始版写法
public void execute(List<Demo> demos) { // 模拟mq 的回调
// 最外层会有一次try 防止 无限重试,导致mq阻塞 无法消费到后续消息
try {
unsafe(); //一个会发生异常的方法
demos.stream().map(
i -> {
try { // 此处try是为了不让一条数据影响整个list的执行
restApi(i);
insert(i);
try {
restApiNoCare(i);
} catch (Throwable throwable) {
// 这是一个不重要的方法允许失败 不应该影响任何主业务
}
return i;
} catch (Throwable t) {
// 给系统告警服务发送失败信息
}
return null;
}).collect(Collectors.toList());
} catch (Throwable t) {
}
}
try | 原因 |
---|---|
第一次try | 防止 无限重试,导致mq阻塞 无法消费到后续消息 |
第二次try | 当一条数据出现问题 不应该影响其他数据 |
第三次try | 因为方法不重要,不应该影响主流程 |
如上try 进行了三次嵌套 显然代码是合法的,但可读性很差
解决思路扁平化try 处理,方法调用时传递表达式 解耦逻辑,使代码逻辑清晰
public void executeByTryStream(List<Demo> demos) {
var trys = Trys.apply(() -> {
var tryList =
Trys.TryStream.of(demos, i -> {
restApi(i);
insert(i);
return Trys.apply(() -> restApiNoCare(i), i);
});
// 过滤出所有异常的数据
tryList.partErrorDO((r, err) -> {
err.printStackTrace();
// 给系统告警服务发送失败信息
});
return true;
}, demos);
if (trys.isFail()) {
trys.getThrowable().printStackTrace();
}
}
method-name | 解释 | 入参 | 返回值 |
---|---|---|---|
Trys.apply | 执行表达式当成功返回Data当失败返回Exception | Supplier<T > | Trys对象 |
TryStream.of | 以流式并行或串行执行Trys中的方法 | Stream<Trys > | TryStream,其中包含List<Trys > |
具体代码如下
@Builder(toBuilder = true)
@Data
@NoArgsConstructor
@AllArgsConstructor
@Slf4j
public class Trys<R, T> {
/**
* 失败不会抛出异常而是把异常作为返回值返回
*/
private Throwable throwable;
/**
* 处理成功后的返回值
*/
private T t;
/**
* 元数据
*/
private R resource;
public Boolean isSuccess() {
return throwable == null;
}
public Boolean isFail() {
return throwable != null;
}
/**
* @param supplier 具体执行的函数式
* @param r 元数据
* @param <R> 元数据类型
* @param <T> 返回值类型
* @return
*/
public static <R, T> Trys<R, T> apply(Supplier<T> supplier, R r) {
try {
return Trys.<R, T>builder().t(supplier.get()).resource(r).build();
} catch (Throwable throwable) {
log.info("Trys has error {}", throwable.getMessage());
return Trys.<R, T>builder().throwable(throwable).resource(r).build();
}
}
/**
* 多个trys 组合为一个TryStream
*
* @param <R> 同Trys
* @param <T> 同Trys
*/
@Data
@Builder(toBuilder = true)
public static class TryStream<R, T> {
private List<Trys<R, T>> trys;
public static <R, T> TryStream<R, T> of(Stream<Trys<R, T>> list) {
return TryStream.<R, T>builder().trys(list.collect(Collectors.toList())).build();
}
public static <R, T> TryStream<R, T> of(List<R> list, Function<R,T> function) {
var trys = list.stream().map(i -> Trys.apply(
() -> function.apply(i),i)).collect(Collectors.toList());
return TryStream.<R, T>builder().trys(trys).build();
}
/**
* 返回所有成功的trys
*
* @return
*/
public List<T> allSuccess() {
return trys.stream().filter(Trys::isSuccess).map(Trys::getT).collect(Collectors.toList());
}
/**
* 对于异常数据执行某些特殊操作 如 打印异常或执行补偿动作等
*
* @param biConsumer R 代表初期话构建的资源内容 Throwable代表具体处理时发生的异常
*/
public TryStream<R, T> partErrorDO(BiConsumer<R, Throwable> biConsumer) {
trys.stream().filter(Trys::isFail)
.forEach(i -> biConsumer.accept(i.resource, i.throwable));
return this;
}
/**
* 打印基础日志
*/
public TryStream<R, T> logInfo() {
log.info("try Stream done success total:{},fail total:{}", trys.stream().filter(Trys::isSuccess).count(), trys.stream().filter(Trys::isFail).count());
return this;
}
}
}
扩展功能
- 异步处理并聚合提升性能
- 由于返回了完整的Resource 以及响应 以及 异常 可以灵活的filter 异常|返回值 执行指定动作,这是try catch 之流完全做不到的事情。
未完待续