定時任務使用spring-task,分布式是基于redis實現,可保證同一任務同時只有一個節點在執行。
*****
**使用技術:**
* 分布式鎖,基于redis實現
* 隊列,基于redis實現
* 發布/訂閱,基于redis實現
* 分布式緩存,基于redis實現
*****
實現原理:
1. 每個啟動的節點服務都執行一個內置任務,每秒執行一次,基于分布式鎖嘗試鎖定自己為master節點,集群環境中只有一個master節點。
2. master節點的每個定時任務執行時向隊列推入一個令牌,同時向集群所有節點廣播一條消息,各節點收到消息后從隊列中搶令牌,誰搶到誰執行真正的任務
```
/**
* 嘗試鎖定本服務作為master
*
* @param timeoutMillis
*/
@SuppressWarnings("unchecked")
public void tryLockMaster(final long timeoutMillis) {
final ICache healthCache = cacheManager.getICache(TaskConstants.TASK_HEALTH_CACHE_NAME);
if (healthCache.getNativeCache() instanceof RedisOperations) {
RedisOperations redisOperations = (RedisOperations) healthCache.getNativeCache();
//利用redis全局鎖
RedisLock lock = new RedisLock(redisOperations, TaskConstants.TASK_HEALTH_CACHE_NAME);
lock.execute(new LockCallback<Boolean>() {
@Override
public Boolean doInLock(RedisConnection connection) {
if (!existMaster() || isMaster()) {
//將全局唯一ID置入緩存
healthCache.put(TaskConstants.TASK_HEALTH_KEY_NAME, TaskConstants.UU_ID);
//設置過期時間
healthCache.expire(TaskConstants.TASK_HEALTH_KEY_NAME, timeoutMillis / 1000);
log.info("I'm master! UUID[{}]", TaskConstants.UU_ID);
}
return true;
}
});
}
}
```
```
/**
* 集群中本任務只有一個服務(master)在執行
*
* @return
*/
@SuppressWarnings("unchecked")
public void doTask() {
MasterFactory masterFactory = getMasterFactory();
//如果使用集群模式則發送消息
if (masterFactory.isCluster()) {
if (masterFactory.isMaster()) {
//令牌入隊。使用隊列保證集群中同時只有一個服務執行任務,即集群中只有拿到令牌的服務才會執行任務
masterFactory.getQueueManager().getQueue(CURRENT_QUEUE).offer(1);
//發送執行任務命令
masterFactory.getPubSubManager().publish(1, CURRENT_CHANNEL);
}
}
//否則直接執行任務
else {
doExecute();
}
}
```
```
/**
* 接收消息并處理
*
* @param channel
* @param messageBody
* @param pattern
*/
@SuppressWarnings("unchecked")
@Override
public void onMessage(String channel, Object messageBody, String pattern) {
if (CURRENT_CHANNEL.equals(channel)) {
//使用隊列保證集群中同時只有一個服務執行任務
IQueueManager queueManager = masterFactory.getQueueManager();
IQueue queue = queueManager.getQueue(CURRENT_QUEUE);
if (queue.poll() != null) {
//同步執行任務,即同一個任務直到上一次執行完成時再執行下次。單機運行時,spring task內部已經實現,但集群任務下需要另外實現,此處基于redis全局鎖實現
if (queueManager instanceof RedisQueueManager) {
//基于redis全局鎖實現
RedisOperations redisOperations = ((RedisQueueManager) queueManager).getRedisOperations();
RedisLock lock = new RedisLock(redisOperations, CURRENT_TASK_LOCK_NAME);
lock.execute(new LockCallback<Boolean>() {
@Override
public Boolean doInLock(RedisConnection connection) {
//執行具體的任務
doExecute();
return true;
}
});
}
//其他方式待實現...
else {
doExecute();
}
}
}
}
```
要想啟用分布式任務模式需設置以下參數
--cache.cacheDriver=rediscache --mq.pubsub.import=true --mq.queue.import=true --redis.host=xxx.xxx.xxx.xxx
- walk簡介
- 核心模塊
- walk-data
- IData
- EntityHelper
- walk-cache
- 緩存管理器
- 緩存對象
- 緩存注解
- walk-batis
- 單表操作
- 批量操作
- 列表/分頁查詢
- 所有方法列表
- sql熱部署
- 二級緩存
- 數據庫方言
- 其他使用技巧
- 實體類生成工具
- walk-mq
- 隊列管理器
- 隊列對象
- 訂閱/發布管理器
- 訂閱器
- 發布器
- walk-shiro
- 用戶認證/授權
- url動態授權/回收
- 分布式會話
- 無狀態會話支持
- walk-base
- 前端基礎框架
- 公共頁面
- 自定義標簽
- 自定義函數
- 組件及工具
- 后端基礎框架
- 基礎結構
- 表單校驗
- 數據導入
- 數據導出
- 上傳下載
- 靜態參數加載器
- 靜態參數翻譯器
- 實體類翻譯器
- sql翻譯器
- 自定義翻譯器
- 靜態參數校驗器
- 分布式任務
- 增刪改查代碼生成器
- walk-restful
- 請求報文
- 返回報文
- 節點翻譯器
- api代碼生成
- walk-activiti
- 接口封裝
- 模型管理
- 流程圖展示
- 集成方法
- walk-console
- 在線會話管理
- 靜態參數表緩存管理
- 緩存管理
- 隊列管理
- 發布/訂閱管理
- walk-boot
- 常用功能
- 持久層操作
- 分布式緩存
- 分布式會話
- 分布式任務
- 前端常用功能
- 后端常用功能
- 工作流封裝
- 多數據源支持
- 關于讀寫分離
- 常用工具類
- 代碼生成工具
- SpringCloud集成
- 阿里edas平臺支持
- 其他
- 開發規約
- 環境要求
- 工程示例
- 工程結構
- web工程
- API工程
- 后臺任務
- 常見問題
- 事務不生效
- 分布式任務不生效
- 事務鎖
- 變更歷史