# 批處理中心
## 基礎理論


[參考鏈接](https://gitee.com/kailing/partitionjob)
本工程不依賴mq,改為本地作業


# 設定讀取處理寫入規則
```
@Bean("slaveStep")
public Step slaveStep(DeliverPostProcessorItem processorItem,
JdbcCursorItemReader reader) {
CompositeItemProcessor itemProcessor = new CompositeItemProcessor();
List<ItemProcessor> processorList = new ArrayList<>();
processorList.add(processorItem);
itemProcessor.setDelegates(processorList);
return stepBuilderFactory.get("slaveStep")
.<DeliverPost, DeliverPost>chunk(1000)//事務提交批次
.reader(reader)
.processor(itemProcessor)
.writer(dbItemWriter)
.build();
}
```
## 數據分片
```
/**
* @create 2019年4月2日
* Content :根據數據ID分片
*/
public class ColumnRangePartitioner implements Partitioner {
private JdbcOperations jdbcTemplate;
ColumnRangePartitioner(DataSource dataSource){
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> result = new LinkedHashMap<String, ExecutionContext>();
int current_thread = 1 ;
int total_thread = gridSize ;
while (current_thread <= total_thread) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + current_thread, value);
value.putInt("current_thread", current_thread);
value.putInt("total_thread", total_thread);
current_thread++;
}
return result;
}
}
```
## 本地基于游標方式讀取分片信息
```
@Bean(destroyMethod = "")
@StepScope
public JdbcCursorItemReader<DeliverPost> JdbcCursorItemReader(
@Value("#{stepExecutionContext['current_thread']}") Long current_thread,
@Value("#{stepExecutionContext['total_thread']}") Long total_thread) {
System.err.println("接收到分片參數["+total_thread+"->"+current_thread+"]");
JdbcCursorItemReader<DeliverPost> reader = new JdbcCursorItemReader<>();
reader.setDataSource(this.dataSource); // 設置數據源
reader.setFetchSize(100); // 設置一次最大讀取條數
reader.setRowMapper(new DeliverPostRowMapper()); // 把數據庫中的每條數據映射到Person對中
reader.setSql("select order_id , post_id from oc_deliver_post_t where post_id is not null and post_id <> '0' and mod(substring(order_id , -4) ,? )= ( ? -1 )");
reader.setPreparedStatementSetter(new PreparedStatementSetter() {
public void setValues(PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setLong(1, total_thread);
preparedStatement.setLong(2, current_thread);
}
});
return reader;
}
```
### 分片數據處理過程
```
/**
* @create 2019年4月2日
* Content :數據處理Item
*/
@Service
public class DeliverPostProcessorItem implements ItemProcessor<DeliverPost, DeliverPost> {
Logger logger = LoggerFactory.getLogger(DeliverPostProcessorItem.class);
@Autowired
private CommonDao commonDao ;
@Autowired
private ThirdServiceProp thirdServiceProp;
@Override
public DeliverPost process(DeliverPost deliverPost) throws Exception {
logger.info("訂單號:【{}】經過處理器 ", deliverPost.getOrderId());
{
// ems是否簽收
String resp = this.getEms(deliverPost.getPostId());
try {
Map respMap = JSONObject.parseObject(resp, Map.class);
if ("0000".equals(respMap.get("code"))) {
Map rep = (Map) respMap.get("rep");
Map msg = (Map) rep.get("msg");
List<Map> traces = (List<Map>) msg.get("traces");
for (Iterator<Map> it = traces.iterator(); it.hasNext();) {
Map temp = it.next();
if ("10".equals(temp.get("code"))) {
// 已簽收
deliverPost.setIsArrived(1);
}
}
}
} catch (Exception e) {
System.out.println(e);
}
}
{
// 中通是否簽收
String resp = this.getZT(deliverPost.getPostId());
try {
Map respMap = JSONObject.parseObject(resp, Map.class);
if ("0000".equals(respMap.get("code"))) {
Map rep = (Map) respMap.get("rep");
Map msg = (Map) rep.get("msg");
List<Map> data = (List<Map>) msg.get("data");
for (Iterator<Map> it = data.iterator(); it.hasNext();) {
Map temp = it.next();
List<Map> traces = (List<Map>) temp.get("traces");
for (Iterator<Map> it1 = traces.iterator(); it1.hasNext();) {
Map tempT = it1.next();
if ("收件".equals(tempT.get("scanType"))) {
// 已簽收
deliverPost.setIsArrived(1);
}
}
}
}
} catch (Exception e) {
}
}
return deliverPost;
}
public String getEms(String postId) {
String transid= PointUtil.getRandom() ;
// JSONObject resultJosn = JSONObject.fromObject(result);
StringBuffer strbuf = new StringBuffer();
String jsonOut = "";
try {
com.alibaba.fastjson.JSONObject obj = new com.alibaba.fastjson.JSONObject();
obj.put("method", "ems.inland.trace.query");
obj.put("action", "3th_ems");
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
obj.put("prea", sdf.format(date));// 163315236523523
com.alibaba.fastjson.JSONObject req = new com.alibaba.fastjson.JSONObject();
obj.put("req", req);
com.alibaba.fastjson.JSONObject msg = new com.alibaba.fastjson.JSONObject();
req.put("msg", msg);
msg.put("mailNo", postId);
msg.put("authorization", "408a6c32e61d3ad5cb5c4e0cb3d2b089");
msg.put("timestamp", System.currentTimeMillis());
// 請求數據
jsonOut = obj.toString();
logger.info("EMS請求處理開始: transid=【{}】 ,req=【{}】", transid ,jsonOut);
String callurl = commonDao.getHttpUrl("104");
int timeOut = 3000;
URL url = new URL(thirdServiceProp.getUrl() + callurl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
byte[] contentbyte = jsonOut.getBytes("UTF-8");
conn.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
conn.setRequestProperty("Content-Length", contentbyte.length + "");
conn.setRequestProperty("Accept-Encoding", "");
conn.setRequestProperty("Accept", "application/json");
conn.setConnectTimeout(3000);
conn.setReadTimeout(3000);
conn.setUseCaches(false);
conn.setDoInput(true);
conn.setDoOutput(true);
conn.connect();
OutputStream out = conn.getOutputStream();
out.write(contentbyte); // 發送請求報文
out.flush();
out.close();
InputStream in = conn.getInputStream();
BufferedReader dr = new BufferedReader(new InputStreamReader(in, "utf-8"));
String text_rsp = null;
while ((text_rsp = dr.readLine()) != null) {
strbuf.append(text_rsp);
}
in.close();
logger.info("EMS請求處理結束: transid=【{}】 ,res=【{}】 ",transid, strbuf);
} catch (Exception e) {
strbuf.setLength(0);
strbuf.append("{\"code\":\"8888\",\"detail\":\"失敗\"}");
logger.error(postId + "EMS轉發接口報錯!!!");
}
return strbuf.toString();
}
public String getZT(String postId) {
String transid= PointUtil.getRandom() ;
// JSONObject resultJosn = JSONObject.fromObject(result);
StringBuffer strbuf = new StringBuffer();
String jsonOut = "";
try {
com.alibaba.fastjson.JSONObject obj = new com.alibaba.fastjson.JSONObject();
obj.put("method", "api.traceInterfaceNewTraces");
obj.put("action", "3th_zto");
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
obj.put("prea", sdf.format(date));// 163315236523523
com.alibaba.fastjson.JSONObject req = new com.alibaba.fastjson.JSONObject();
obj.put("req", req);
com.alibaba.fastjson.JSONObject msg = new com.alibaba.fastjson.JSONObject();
req.put("msg", msg);
msg.put("company_id", "20f74746141c4433a15e7ddd5aade604");
msg.put("data", Arrays.asList(postId));
msg.put("msg_type", "NEW_TRACES");
// 請求數據
jsonOut = obj.toString();
logger.info("中通請求處理開始: transid=【{}】 ,req=【{}】 ",transid , jsonOut);
String callurl = commonDao.getHttpUrl("103");
//固定token
callurl =callurl.replace("tokenid", "798d3ed2ebaec83ae608c10207f783d6") ;
int timeOut = 3000;
URL url = new URL(thirdServiceProp.getUrl() + callurl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
byte[] contentbyte = jsonOut.getBytes("UTF-8");
conn.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
conn.setRequestProperty("Content-Length", contentbyte.length + "");
conn.setRequestProperty("Accept-Encoding", "");
conn.setRequestProperty("Accept", "application/json");
conn.setConnectTimeout(3000);
conn.setReadTimeout(3000);
conn.setUseCaches(false);
conn.setDoInput(true);
conn.setDoOutput(true);
conn.connect();
OutputStream out = conn.getOutputStream();
out.write(contentbyte); // 發送請求報文
out.flush();
out.close();
InputStream in = conn.getInputStream();
BufferedReader dr = new BufferedReader(new InputStreamReader(in, "utf-8"));
String text_rsp = null;
while ((text_rsp = dr.readLine()) != null) {
strbuf.append(text_rsp);
}
in.close();
logger.info("中通請求處理結束: transid=【{}】 ,res=【{}】 ",transid, strbuf);
} catch (Exception e) {
strbuf.setLength(0);
strbuf.append("{\"code\":\"8888\",\"detail\":\"失敗\"}");
logger.error(postId + "中通轉發接口報錯!!!");
}
return strbuf.toString();
}
}
```
分片數據輸出item
```
/**
* @create 2019年4月2日
* Content :數據輸出item
*/
@Component
@StepScope
public class DBWriterItem<T> implements ItemWriter<T> {
@Autowired
private DeliverPostDao deliverPostDao ;
@Override
public void write(List<? extends T> list) throws Exception {
deliverPostDao.batchInsert((List<? extends DeliverPost>) list);
}
}
```
- 前言
- 1.項目說明
- 2.項目更新日志
- 3.文檔更新日志
- 01.快速開始
- 01.maven構建項目
- 02.環境安裝
- 03.STS項目導入
- 03.IDEA項目導入
- 04.數據初始化
- 05.項目啟動
- 06.付費文檔說明
- 02.總體流程
- 1.oauth接口
- 2.架構設計圖
- 3.微服務介紹
- 4.功能介紹
- 5.梳理流程
- 03.模塊詳解
- 01.老版本1.0.1分支模塊講解
- 01.db-core模塊
- 02.api-commons模塊
- 03.log-core模塊
- 04.security-core模塊
- 05.swagger-core模塊
- 06.eureka-server模塊
- 07.auth-server模塊
- 08.auth-sso模塊解析
- 09.user-center模塊
- 10.api-gateway模塊
- 11.file-center模塊
- 12.log-center模塊
- 13.batch-center模塊
- 14.back-center模塊
- 02.spring-boot-starter-web那點事
- 03.自定義db-spring-boot-starter
- 04.自定義log-spring-boot-starter
- 05.自定義redis-spring-boot-starter
- 06.自定義common-spring-boot-starter
- 07.自定義swagger-spring-boot-starter
- 08.自定義uaa-server-spring-boot-starter
- 09.自定義uaa-client-spring-boot-starter
- 10.自定義ribbon-spring-boot-starter
- 11.springboot啟動原理
- 12.eureka-server模塊
- 13.auth-server模塊
- 14.user-center模塊
- 15.api-gateway模塊
- 16.file-center模塊
- 17.log-center模塊
- 18.back-center模塊
- 19.auth-sso模塊
- 20.admin-server模塊
- 21.zipkin-center模塊
- 22.job-center模塊
- 23.batch-center
- 04.全新網關
- 01.基于spring cloud gateway的new-api-gateway
- 02.spring cloud gateway整合Spring Security Oauth
- 03.基于spring cloud gateway的redis動態路由
- 04.spring cloud gateway聚合swagger文檔
- 05.技術詳解
- 01.互聯網系統設計原則
- 02.系統冪等性設計與實踐
- 03.Oauth最簡向導開發指南
- 04.oauth jdbc持久化策略
- 05.JWT token方式啟用
- 06.token有效期的處理
- 07.@PreAuthorize注解分析
- 08.獲取當前用戶信息
- 09.認證授權白名單配置
- 10.OCP權限設計
- 11.服務安全流程
- 12.認證授權詳解
- 13.驗證碼技術
- 14.短信驗證碼登錄
- 15.動態數據源配置
- 16.分頁插件使用
- 17.緩存擊穿
- 18.分布式主鍵生成策略
- 19.分布式定時任務
- 20.分布式鎖
- 21.網關多維度限流
- 22.跨域處理
- 23.容錯限流
- 24.應用訪問次數控制
- 25.統一業務異常處理
- 26.日志埋點
- 27.GPRC內部通信
- 28.服務間調用
- 29.ribbon負載均衡
- 30.微服務分布式跟蹤
- 31.異步與線程傳遞變量
- 32.死信隊列延時消息
- 33.單元測試用例
- 34.Greenwich.RELEASE升級
- 35.混沌工程質量保證
- 06.開發初探
- 1.開發技巧
- 2.crud例子
- 3.新建服務
- 4.區分前后臺用戶
- 07.分表分庫
- 08.分布式事務
- 1.Seata介紹
- 2.Seata部署
- 09.shell部署
- 01.eureka-server
- 02.user-center
- 03.auth-server
- 04.api-gateway
- 05.file-center
- 06.log-center
- 07.back-center
- 08.編寫shell腳本
- 09.集群shell部署
- 10.集群shell啟動
- 11.部署阿里云問題
- 10.網關安全
- 1.openresty https保障服務安全
- 2.openresty WAF應用防火墻
- 3.openresty 高可用
- 11.docker配置
- 01.docker安裝
- 02.Docker 開啟遠程API
- 03.采用docker方式打包到服務器
- 04.docker創建mysql
- 05.docker網絡原理
- 06.docker實戰
- 6.01.安裝docker
- 6.02.管理鏡像基本命令
- 6.03.容器管理
- 6.04容器數據持久化
- 6.05網絡模式
- 6.06.Dockerfile
- 6.07.harbor部署
- 6.08.使用自定義鏡像
- 12.統一監控中心
- 01.spring boot admin監控
- 02.Arthas診斷利器
- 03.nginx監控(filebeat+es+grafana)
- 04.Prometheus監控
- 05.redis監控(redis+prometheus+grafana)
- 06.mysql監控(mysqld_exporter+prometheus+grafana)
- 07.elasticsearch監控(elasticsearch-exporter+prometheus+grafana)
- 08.linux監控(node_exporter+prometheus+grafana)
- 09.micoservice監控
- 10.nacos監控
- 11.druid數據源監控
- 12.prometheus.yml
- 13.grafana告警
- 14.Alertmanager告警
- 15.監控微信告警
- 16.關于接口監控告警
- 17.prometheus-HA架構
- 18.總結
- 13.統一日志中心
- 01.統一日志中心建設意義
- 02.通過ELK收集mysql慢查詢日志
- 03.通過elk收集微服務模塊日志
- 04.通過elk收集nginx日志
- 05.統一日志中心性能優化
- 06.kibana安裝部署
- 07.日志清理方案
- 08.日志性能測試指標
- 09.總結
- 14.數據查詢平臺
- 01.數據查詢平臺架構
- 02.mysql配置bin-log
- 03.單節點canal-server
- 04.canal-ha部署
- 05.canal-kafka部署
- 06.實時增量數據同步mysql
- 07.canal監控
- 08.clickhouse運維常見腳本
- 15.APM監控
- 1.Elastic APM
- 2.Skywalking
- 01.docker部署es
- 02.部署skywalking-server
- 03.部署skywalking-agent
- 16.壓力測試
- 1.ocp.jmx
- 2.test.bat
- 3.壓測腳本
- 4.壓力報告
- 5.報告分析
- 6.壓測平臺
- 7.并發測試
- 8.wrk工具
- 9.nmon
- 10.jmh測試
- 17.SQL優化
- 1.oracle篇
- 01.基線測試
- 02.調優前奏
- 03.線上瓶頸定位
- 04.執行計劃解讀
- 05.高級SQL語句
- 06.SQL tuning
- 07.數據恢復
- 08.深入10053事件
- 09.深入10046事件
- 2.mysql篇
- 01.innodb存儲引擎
- 02.BTree索引
- 03.執行計劃
- 04.查詢優化案例分析
- 05.為什么會走錯索引
- 06.表連接優化問題
- 07.Connection連接參數
- 08.Centos7系統參數調優
- 09.mysql監控
- 10.高級SQL語句
- 11.常用維護腳本
- 12.percona-toolkit
- 18.redis高可用方案
- 1.免密登錄
- 2.安裝部署
- 3.配置文件
- 4.啟動腳本
- 19.消息中間件搭建
- 19-01.rabbitmq集群搭建
- 01.rabbitmq01
- 02.rabbitmq02
- 03.rabbitmq03
- 04.鏡像隊列
- 05.haproxy搭建
- 06.keepalived
- 19-02.rocketmq搭建
- 19-03.kafka集群
- 20.mysql高可用方案
- 1.環境
- 2.mysql部署
- 3.Xtrabackup部署
- 4.Galera部署
- 5.galera for mysql 集群
- 6.haproxy+keepalived部署
- 21.es集群部署
- 22.生產實施優化
- 1.linux優化
- 2.jvm優化
- 3.feign優化
- 4.zuul性能優化
- 23.線上問題診斷
- 01.CPU性能評估工具
- 02.內存性能評估工具
- 03.IO性能評估工具
- 04.網絡問題工具
- 05.綜合診斷評估工具
- 06.案例診斷01
- 07.案例診斷02
- 08.案例診斷03
- 09.案例診斷04
- 10.遠程debug
- 24.fiddler抓包實戰
- 01.fiddler介紹
- 02.web端抓包
- 03.app抓包
- 25.疑難解答交流
- 01.有了auth/token獲取token了為啥還要配置security的登錄配置
- 02.權限數據存放在redis嗎,代碼在哪里啊
- 03.其他微服務和認證中心的關系
- 04.改包問題
- 05.use RequestContextListener or RequestContextFilter to expose the current request
- 06./oauth/token對應代碼在哪里
- 07.驗證碼出不來
- 08./user/login
- 09.oauth無法自定義權限表達式
- 10.sleuth引發線程數過高問題
- 11.elk中使用7x版本問題
- 12.RedisCommandTimeoutException問題
- 13./oauth/token CPU過高
- 14.feign與權限標識符問題
- 15.動態路由RedisCommandInterruptedException: Command interrupted
- 26.學習資料
- 海量學習資料等你來拿
- 27.持續集成
- 01.git安裝
- 02.代碼倉庫gitlab
- 03.代碼倉庫gogs
- 04.jdk&&maven
- 05.nexus安裝
- 06.sonarqube
- 07.jenkins
- 28.Rancher部署
- 1.rancher-agent部署
- 2.rancher-server部署
- 3.ocp后端部署
- 4.演示前端部署
- 5.elk部署
- 6.docker私服搭建
- 7.rancher-server私服
- 8.rancher-agent docker私服
- 29.K8S部署OCP
- 01.準備OCP的構建環境和部署環境
- 02.部署順序
- 03.在K8S上部署eureka-server
- 04.在K8S上部署mysql
- 05.在K8S上部署redis
- 06.在K8S上部署auth-server
- 07.在K8S上部署user-center
- 08.在K8S上部署api-gateway
- 09.在K8S上部署back-center
- 30.Spring Cloud Alibaba
- 01.統一的依賴管理
- 02.nacos-server
- 03.生產可用的Nacos集群
- 04.nacos配置中心
- 05.common.yaml
- 06.user-center
- 07.auth-server
- 08.api-gateway
- 09.log-center
- 10.file-center
- 11.back-center
- 12.sentinel-dashboard
- 12.01.sentinel流控規則
- 12.02.sentinel熔斷降級規則
- 12.03.sentinel熱點規則
- 12.04.sentinel系統規則
- 12.05.sentinel規則持久化
- 12.06.sentinel總結
- 13.sentinel整合openfeign
- 14.sentinel整合網關
- 1.sentinel整合zuul
- 2.sentinel整合scg
- 15.Dubbo與Nacos共存
- 31.Java源碼剖析
- 01.基礎數據類型和String
- 02.Arrays工具類
- 03.ArrayList源碼分析
- 32.面試專題匯總
- 01.JVM專題匯總
- 02.多線程專題匯總
- 03.Spring專題匯總
- 04.springboot專題匯總
- 05.springcloud面試匯總
- 文檔問題跟蹤處理