从根源解决异步任务“丢失”的方案
你现在的实现做了两件关键事:
使用 ThreadPoolTaskExecutor 执行
CompletableFuture异步任务;自己实现了“背压提交”逻辑
submitWithQueueBackpressure,在队列满时自循环 sleep 重试。
现象是:即使设置等待时间(500ms -> 2000ms),仍出现“任务丢失”(部分三方结果没回来)。单用户点查询大概率能拿全,但并发用户时担心仍会丢。这说明问题不是单一维度的“等待时间不够”,而是整体的并发控制与任务生命周期管理不完善。
下面给出从根源上可落地的方案,按优先级与变更复杂度排序。
1) 用“令牌桶/信号量”进行可预测的限流,而不是靠自循环
当前“自循环+队列容量”属于乐观策略:抢不到再等。高并发下会:
CPU 空转(sleep/重试);
无法保证公平性(先来先服务);
提交点存在竞态(检查容量与提交之间仍可能被拒绝);
任务拥塞时上游不停重试放大压力。
更稳妥方式是:明确限制每类外部接口的并发度,拿不到令牌就同步排队(或者快速失败/返回友好提示)。推荐做法:
全局信号量(或每接口Key的信号量):
使用
Semaphore控制每类三方接口的并发量。tryAcquire(timeout)获取不到时选择快速失败或排队等待一定时长。保证 finally 释放令牌。
示例(以 step1 为例,按 url 或 callType 维度限流):
@Component
public class InterfaceRateLimiters {
// 针对不同三方接口或 url 做并发配额
private final ConcurrentHashMap<String, Semaphore> limiters = new ConcurrentHashMap<>();
public Semaphore getLimiter(String key, int maxConcurrent) {
return limiters.computeIfAbsent(key, k -> new Semaphore(maxConcurrent, true)); // fair=true
}
}
在 service 中使用:
@Autowired
private InterfaceRateLimiters rateLimiters;
// 例如按 callType 或 url 作为 key
private CompletableFuture<List<DataBackToForm>> step1Async(SysInterfaceInputFeilds userInput, InterfaceCallDeploy configInput) {
String key = "TCREDIT:" + configInput.getUrl();
Semaphore sem = rateLimiters.getLimiter(key, 10); // 这个配额可配置化
return CompletableFuture.supplyAsync(() -> {
boolean acquired = false;
try {
acquired = sem.tryAcquire(3, TimeUnit.SECONDS); // 等待获取令牌,超时可视业务改为快速失败
if (!acquired) {
// 返回一个明确的降级结果,而不是悄悄丢失
return Collections.emptyList();
}
// 真正的外部调用
// ...
return populateDataValuesFromJson(dataList, interfaceNode);
} catch (Exception e) {
// 记录日志及可重试策略
return new ArrayList<>();
} finally {
if (acquired) sem.release();
}
}, asyncTaskExecutor);
}
这样做的好处:
明确的并发上限,避免线程池队列堆积导致拒绝或饥饿;
公平性好(构造 fair 的 Semaphore);
不依赖自旋重试;
拿不到令牌时可以明确返回“系统忙/排队中”的可观测结果,前端可提示或重试。
建议按不同外部接口配置不同的并发度与超时,来源可以是数据库或配置中心。
2) 把“提交背压”改为“有界队列+拒绝策略+调用方重试/降级”
ThreadPoolTaskExecutor 背后是 ThreadPoolExecutor。要从根上稳定,建议:
使用有界阻塞队列(如
LinkedBlockingQueue(capacity));自定义
RejectedExecutionHandler,拒绝时不要静默:要么直接在调用方快速失败,要么在调用方做有限指数退避重试;避免在业务线程里“忙等”,改用阻塞提交或限时 offer:
如果你能拿到原生队列:用
queue.offer(runnable, timeout, unit);或者把
submitWithQueueBackpressure改为:当满时阻塞一段时间再尝试一次,超过总超时时间则抛出明确异常,交由上层捕获并返回降级。
如果继续使用 CompletableFuture.supplyAsync 不易做到阻塞提交,建议直接用 nativeThreadPoolExecutor.submit(Callable) 并对拒绝进行捕获处理;更进一步的,是由信号量前置限流(第1点),这样提交时基本不会遇到拒绝。
3) 任务“丢失”的另一常见根因:异常吞掉、超时未处理、结果合并策略隐式忽略
从代码看,很多 catch (Exception e) 后直接 return new ArrayList<>(),上层 thenApply 时 join() 也不会感知错误,这会造成逻辑上的“丢失”,但不是线程池丢,而是失败被吞了。
建议:
给每个step包装清晰的结果:成功/失败/错误码/重试建议;不要用空列表代替失败;
在
allOf后合并时,统计每个 step 的成功/失败数量,打日志、埋点告警;对外部调用设置超时控制,避免异步任务永远挂着:
可在 HTTP 调用层设置连接/读取超时;
CompletableFuture 层增加
orTimeout或completeOnTimeout:return submitWith...().orTimeout(5, TimeUnit.SECONDS) .exceptionally(ex -> { // 标记此 step 超时 return Collections.emptyList(); });
如果确实需要“确保全部返回”,就不要默默吞异常;要把失败显式记录,便于定位问题。
4) 合理的批次与限并发调度(按callType分批已经有了,但需要再加并发护栏)
你现在是按 callType 分组后,组内并发 allOf。若某一组配置了很多接口,会瞬时把所有子任务并发放出去。建议:
组内再加一个“每组并发窗口”,例如使用
CompletableFuture+ 自建一个小型的“异步限流器”(Semaphore 或 RateLimiter):提交前
acquire;完成后
release;
或使用 Reactor/RxJava 这类框架的
flatMap+concurrency参数控制最大并发度。
这样可以防止瞬时突刺,减少对线程池和下游接口的压力,降低被对方限流或网络超时概率。
5) 避免双重异步叠加造成调度不可控
你在 getAllData 返回 executeAsyncOperations,executeAsyncOperations 又标了 @Async("asyncTaskExecutor"),里面各 step 又用 supplyAsync 提交到同一个池。这是“三层异步”叠加:
Controller -> 调 service 异步 -> service 内再发多个异步 -> allOf join
问题:
容易让上层误判“任务结束”(方法很快返回了 CompletableFuture),但内部还有大量子任务未受控;
如果线程池容量不足,外层占了一些线程等待 allOf,而里层提交的任务在同一个池里争抢,可能出现“自阻塞”(虽然你用的是 CompletableFuture,不会阻塞线程,但合并/回调线程仍需要执行环境)。
建议:
保持“只有一层”异步:去掉
executeAsyncOperations的@Async,让其返回的 CompletableFuture 作为唯一的异步链;或者保留
@Async,但内部步骤不要再supplyAsync,而是同步执行,靠外层线程池并行分发不同 step(通常前者更灵活)。
结合第1点(信号量限流),推荐做法:
外层只负责组织任务,真正并发程度由信号量控制;
supplyAsync的线程池与@Async的线程池明确不同或只使用一处。
6) 配置线程池:有界、可观测、和业务峰值匹配
请确认:
corePoolSize、maxPoolSize、queueCapacity 是有界的,不能无界;
线程池的
RejectedExecutionHandler自定义,记录关键上下文(用户、callType、url、traceId);对于 I/O 密集型外部调用,线程数可适度 > CPU 核数,但要结合下游限流;
暴露指标:活跃线程数、队列剩余容量、拒绝次数、每接口成功率/超时率;
使用 MDC 透传 traceId,结合日志快速排查“丢失”源头。
7) 可选:令牌桶/漏桶的时间维度限流(而非仅并发数)
如果三方接口是“每秒 N 次”的限额,使用 Guava RateLimiter 或 Resilience4j RateLimiter 更合适:
例如为每个
url创建RateLimiter,获取不到在短时间内等待,超时则降级;和 Semaphore 组合使用:RateLimiter 控制速率,Semaphore 控制并发。
8) 重试与幂等:避免短暂故障造成“看起来丢失”
对三方超时/5xx/网络抖动,做有限重试(指数退避+抖动),同时确保幂等;
失败要有明确状态返回,不要直接当作“空数据”静默吞掉;
如果能接受“最终一致”,可以把失败任务投递到重试队列(如延时队列)后台异步补偿。
9) 前端/用户并发的治理
对“同一用户同一请求参数”的重复点击,做去重/合并(请求合并或缓存最近一次结果);
在服务端对同一个“查询key”使用去重Map,正在执行时让后续请求等待同一个 Future 的完成,避免把相同任务重复压给线程池。
结论与推荐落地顺序
引入按接口维度的 Semaphore 限流(令牌机制),替换/前置当前背压自循环。
移除多层异步叠加,保留一层 CompletableFuture;用信号量和有界线程池来控制并发。
在每个 step 的结果中不要吞异常,增加可观测性:超时、被限流、被拒绝、三方错误分别统计。
线程池配置为有界+拒绝策略+指标暴露;必要时组内再加并发窗口。
按需加入 RateLimiter(QPS 限制)与有限重试。
这样可以从“容量管理、调用治理、错误可观测”三方面根治“任务丢失”问题:即使在多用户并发点击时,也不会靠自循环碰运气,而是有明确的令牌配额、可预测的排队与降级路径,保证业务稳定和可诊断。需要的话,我可以根据你项目里的线程池配置类,给出完整的限流与线程池配置示例代码。