## 40 打動面試官:線程池流程編排中的運用實戰
## 引導語
在線程池的面試中,面試官除了喜歡問 ThreadPoolExecutor 的底層源碼外,還喜歡問你有沒有在實際的工作中用過 ThreadPoolExecutor,我們在并發集合類的《場景集合:并發 List、Map 的應用場景》一文中說過一種簡單的流程引擎,如果沒有看過的同學,可以返回去看一下。
本章就在流程引擎的基礎上運用 ThreadPoolExecutor,使用線程池實現 SpringBean 的異步執行。
### 1 流程引擎關鍵代碼回顧
《場景集合:并發 List、Map 的應用場景》文中流程引擎執行 SpringBean 的核心代碼為:
```
// 批量執行 Spring Bean private void stageInvoke(String flowName, StageEnum stage, FlowContent content) { List<DomainAbilityBean> domainAbilitys = FlowCenter.flowMap.getOrDefault(flowName, Maps.newHashMap()).get(stage); if (CollectionUtils.isEmpty(domainAbilitys)) { throw new RuntimeException("找不到該流程對應的領域行為" + flowName); } for (DomainAbilityBean domainAbility : domainAbilitys) { // 執行 Spring Bean domainAbility.invoke(content); } }
```
入參是 flowName(流程名稱)、stage(階段)、content(上下文),其中 stage 中會執行很多 SpringBean,SpringBean 被執行的代碼是 domainAbility.invoke(content)。
### 2 異步執行 SpringBean
從上述代碼中,我們可以看到所有的 SpringBean 都是串行執行的,效率較低,我們在實際業務中發現,有的 SpringBean 完全可以異步執行,這樣既能完成業務請求,又能減少業務處理的 rt,對于這個需求,我們條件反射的有了兩個想法:
1. 需要新開線程來異步執行 SpringBean,可以使用 Runable 或者 Callable;
2. 業務請求量很大,我們不能每次來一個請求,就開一個線程,我們應該讓線程池來管理異步執行的線程。
于是我們決定使用線程池來完成這個需求。
### 3 如何區分異步的 SpringBean
我們的 SpringBean 都是實現 DomainAbilityBean 這個接口的,接口定義如下:
```
public interface DomainAbilityBean { /** * 領域行為的方法入口 */ FlowContent invoke(FlowContent content); }
```
從接口定義上來看,沒有預留任何地方來標識該 SpringBean 應該是同步執行還是異步執行,這時候我們可以采取注解的方式,我們新建一個注解,只要 SpringBean 上有該注解,表示該 SpringBean 應該異步執行,否則應該同步執行,新建的注解如下:
```
/** * 異步 SpringBean 執行注解 * SpringBean 需要異步執行的話,就打上該注解 *author wenhe *date 2019/10/7 */ @Target(ElementType.TYPE)// 表示該注解應該打在類上 @Retention(RetentionPolicy.RUNTIME) @Documented public @interface AsyncComponent { }
```
接著我們新建了兩個 SpringBean,并在其中一個 SpringBean 上打上異步的注解,并且打印出執行 SpringBean 的線程名稱,如下圖:

圖中實現了兩個 SpringBean:BeanOne 和 BeanTwo,其中 BeanTwo 被打上了 AsyncComponent 注解,表明 BeanTwo 應該被異步執行,兩個 SpringBean 都打印出執行的線程的名稱。
### 4 mock 流程引擎數據中心
《場景集合:并發 List、Map 的應用場景》一文中,我們說可以從數據庫中加載出流程引擎需要的數據,此時我們 mock 一下,mock 的代碼如下:
```
@Component public class FlowCenter { /** * flowMap 是共享變量,方便訪問 */ public static final Map<String, Map<StageEnum, List<DomainAbilityBean>>> flowMap = Maps.newConcurrentMap(); /** * PostConstruct 注解的意思就是 * 在容器啟動成功之后,初始化 flowMap */ @PostConstruct public void init() { // 初始化 flowMap mock Map<StageEnum, List<DomainAbilityBean>> stageMap = flowMap.getOrDefault("flow1",Maps.newConcurrentMap()); for (StageEnum value : StageEnum.values()) { List<DomainAbilityBean> domainAbilitys = stageMap.getOrDefault(value, Lists.newCopyOnWriteArrayList()); if(CollectionUtils.isEmpty(domainAbilitys)){ domainAbilitys.addAll(ImmutableList.of( ApplicationContextHelper.getBean(BeanOne.class),
ApplicationContextHelper.getBean(BeanTwo.class) )); stageMap.put(value,domainAbilitys); } } flowMap.put("flow1",stageMap); // 打印出加載完成之后的數據結果 log.info("init success,flowMap is {}", JSON.toJSONString(flowMap)); } }
```
### 5 新建線程池
在以上操作完成之后,只剩下最后一步了,之前我們執行 SpringBean 時,是這行代碼:
```
domainAbility.invoke(content);
```
現在我們需要區分 SpringBean 是否是異步的,如果是異步的,丟到線程池中去執行,如果是同步的,仍然使用原來的方法進行執行,于是我們把這些邏輯封裝到一個工具類中,工具類如下:
```
/** * 組件執行器 * author wenhe * date 2019/10/7 */ public class ComponentExecutor { // 我們新建了一個線程池 private static ExecutorService executor = new ThreadPoolExecutor(15, 15, 365L, TimeUnit.DAYS, new LinkedBlockingQueue<>()); // 如果 SpringBean 上有 AsyncComponent 注解,表示該 SpringBean 需要異步執行,就
丟到線程池中去 public static final void run(DomainAbilityBean component, FlowContent content) { // 判斷類上是否有 AsyncComponent 注解 if (AnnotationUtils.isAnnotationPresent(AsyncComponent.class, AopUtils.getTargetClass(component))) { // 提交到線程池中 executor.submit(() -> { component.invoke(content); }); return; } // 同步 SpringBean 直接執行。 component.invoke(content); } }
```
我們把原來的執行代碼替換成使用組件執行器執行,如下圖:

### 6 測試
以上步驟完成之后,簡單的流程引擎就已經完成了,我們簡單地在項目啟動的時候加上測試,代碼如下:

更嚴謹的做法,是會寫單元測試來測試流程引擎,為了快一點,我們直接在項目啟動類上加上了測試代碼。
運行之后的關鍵結果如下:
```
[main] demo.sixth.SynchronizedDemo: SynchronizedDemo init begin [main] demo.sixth.SynchronizedDemo: SynchronizedDemo init end [main] demo.three.flow.FlowCenter : init success,flowMap is {"flow1":{"PARAM_VALID":[{},{}],"AFTER_TRANSACTION":[{"$ref":"$.flow1.PARAM_VALID[0]"},{"$ref":"$.flow1.PARAM_VALID[1]"}],"BUSINESS_VALID":[{"$ref":"$.flow1.PARAM_VALID[0]"},{"$ref":"$.flow1.PARAM_VALID[1]"}],"IN_TRANSACTION":[{"$ref":"$.flow1.PARAM_VALID[0]"},{"$ref":"$.flow1.PARAM_VALID[1]"}]}} o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
[main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8080 (http) [main] demo.DemoApplication : Started DemoApplication in 5.377 seconds (JVM running for 6.105) [main] demo.three.flow.BeanOne : BeanOne is run,thread name is main [main] demo.three.flow.BeanOne : BeanOne is run,thread name is main [pool-1-thread-1] demo.three.flow.BeanTwo : BeanTwo is run,thread name is pool-1-thread-1 [main] demo.three.flow.BeanOne : BeanOne is run,thread name is main [pool-1-thread-2] demo.three.flow.BeanTwo : BeanTwo is run,thread name is pool-1-thread-2 [pool-1-thread-3] demo.three.flow.BeanTwo : BeanTwo is run,thread name is pool-1-thread-3 [main] demo.three.flow.BeanOne : BeanOne is run,thread name is main [pool-1-thread-4] demo.three.flow.BeanTwo : BeanTwo is run,thread name is pool-1-thread-4
```
從運行結果中,我們可以看到 BeanTwo 已經被多個不同的線程異步執行了。
### 總結
這是一個線程池在簡單流程引擎上的運用實站,雖然這個流程引擎看起來比較簡單,但在實際工作中,還是非常好用的,大家可以把代碼拉下來,自己嘗試一下,調試一下參數,比如當我新增 SpringBean 的時候,流程引擎的表現如何。
- 前言
- 第1章 基礎
- 01 開篇詞:為什么學習本專欄
- 02 String、Long 源碼解析和面試題
- 03 Java 常用關鍵字理解
- 04 Arrays、Collections、Objects 常用方法源碼解析
- 第2章 集合
- 05 ArrayList 源碼解析和設計思路
- 06 LinkedList 源碼解析
- 07 List 源碼會問哪些面試題
- 08 HashMap 源碼解析
- 09 TreeMap 和 LinkedHashMap 核心源碼解析
- 10 Map源碼會問哪些面試題
- 11 HashSet、TreeSet 源碼解析
- 12 彰顯細節:看集合源碼對我們實際工作的幫助和應用
- 13 差異對比:集合在 Java 7 和 8 有何不同和改進
- 14 簡化工作:Guava Lists Maps 實際工作運用和源碼
- 第3章 并發集合類
- 15 CopyOnWriteArrayList 源碼解析和設計思路
- 16 ConcurrentHashMap 源碼解析和設計思路
- 17 并發 List、Map源碼面試題
- 18 場景集合:并發 List、Map的應用場景
- 第4章 隊列
- 19 LinkedBlockingQueue 源碼解析
- 20 SynchronousQueue 源碼解析
- 21 DelayQueue 源碼解析
- 22 ArrayBlockingQueue 源碼解析
- 23 隊列在源碼方面的面試題
- 24 舉一反三:隊列在 Java 其它源碼中的應用
- 25 整體設計:隊列設計思想、工作中使用場景
- 26 驚嘆面試官:由淺入深手寫隊列
- 第5章 線程
- 27 Thread 源碼解析
- 28 Future、ExecutorService 源碼解析
- 29 押寶線程源碼面試題
- 第6章 鎖
- 30 AbstractQueuedSynchronizer 源碼解析(上)
- 31 AbstractQueuedSynchronizer 源碼解析(下)
- 32 ReentrantLock 源碼解析
- 33 CountDownLatch、Atomic 等其它源碼解析
- 34 只求問倒:連環相扣系列鎖面試題
- 35 經驗總結:各種鎖在工作中使用場景和細節
- 36 從容不迫:重寫鎖的設計結構和細節
- 第7章 線程池
- 37 ThreadPoolExecutor 源碼解析
- 38 線程池源碼面試題
- 39 經驗總結:不同場景,如何使用線程池
- 40 打動面試官:線程池流程編排中的運用實戰
- 第8章 Lambda 流
- 41 突破難點:如何看 Lambda 源碼
- 42 常用的 Lambda 表達式使用場景解析和應用
- 第9章 其他
- 43 ThreadLocal 源碼解析
- 44 場景實戰:ThreadLocal 在上下文傳值場景下的實踐
- 45 Socket 源碼及面試題
- 46 ServerSocket 源碼及面試題
- 47 工作實戰:Socket 結合線程池的使用
- 第10章 專欄總結
- 48 一起看過的 Java 源碼和面試真題