實時數倉主要是為了解決傳統數倉數據時效性低的問題,實時數倉通常會用在實時的OLAP分析、實時的數據看板、業務指標實時監控等場景。雖然關于實時數倉的架構及技術選型與傳統的離線數倉會存在差異,但是關于數倉建設的基本方法論是一致的。本文會分享基于Flink SQL從0到1搭建一個實時數倉的demo,涉及數據采集、存儲、計算、可視化整個處理流程。通過本文你可以了解到:
* 實時數倉的基本架構
* 實時數倉的數據處理流程
* Flink1.11的SQL新特性
* Flink1.11存在的bug
* 完整的操作案例
> 古人學問無遺力,少壯工夫老始成。
>
> 紙上得來終覺淺,絕知此事要躬行。
## 案例簡介
本文會以電商業務為例,展示實時數倉的數據處理流程。另外,本文旨在說明實時數倉的構建流程,所以不會涉及太復雜的數據計算。為了保證案例的可操作性和完整性,本文會給出詳細的操作步驟。為了方便演示,本文的所有操作都是在Flink SQL Cli中完成的。
## 架構設計
具體的架構設計如圖所示:首先通過canal解析MySQL的binlog日志,將數據存儲在Kafka中。然后使用Flink SQL對原始數據進行清洗關聯,并將處理之后的明細寬表寫入kafka中。維表數據存儲在MySQL中,通過Flink SQL對明細寬表與維表進行JOIN,將聚合后的數據寫入MySQL,最后通過FineBI進行可視化展示。

image
## 業務數據準備
* 訂單表(order\_info)
~~~sql
CREATE TABLE `order_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號',
`consignee` varchar(100) DEFAULT NULL COMMENT '收貨人',
`consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人電話',
`total_amount` decimal(10,2) DEFAULT NULL COMMENT '總金額',
`order_status` varchar(20) DEFAULT NULL COMMENT '訂單狀態',
`user_id` bigint(20) DEFAULT NULL COMMENT '用戶id',
`payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式',
`delivery_address` varchar(1000) DEFAULT NULL COMMENT '送貨地址',
`order_comment` varchar(200) DEFAULT NULL COMMENT '訂單備注',
`out_trade_no` varchar(50) DEFAULT NULL COMMENT '訂單交易編號(第三方支付用)',
`trade_body` varchar(200) DEFAULT NULL COMMENT '訂單描述(第三方支付用)',
`create_time` datetime DEFAULT NULL COMMENT '創建時間',
`operate_time` datetime DEFAULT NULL COMMENT '操作時間',
`expire_time` datetime DEFAULT NULL COMMENT '失效時間',
`tracking_no` varchar(100) DEFAULT NULL COMMENT '物流單編號',
`parent_order_id` bigint(20) DEFAULT NULL COMMENT '父訂單編號',
`img_url` varchar(200) DEFAULT NULL COMMENT '圖片路徑',
`province_id` int(20) DEFAULT NULL COMMENT '地區',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='訂單表';
~~~
* 訂單詳情表(order\_detail)
~~~sql
CREATE TABLE `order_detail` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號',
`order_id` bigint(20) DEFAULT NULL COMMENT '訂單編號',
`sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
`sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名稱(冗余)',
`img_url` varchar(200) DEFAULT NULL COMMENT '圖片名稱(冗余)',
`order_price` decimal(10,2) DEFAULT NULL COMMENT '購買價格(下單時sku價格)',
`sku_num` varchar(200) DEFAULT NULL COMMENT '購買個數',
`create_time` datetime DEFAULT NULL COMMENT '創建時間',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='訂單詳情表';
~~~
* 商品表(sku\_info)
~~~sql
CREATE TABLE `sku_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'skuid(itemID)',
`spu_id` bigint(20) DEFAULT NULL COMMENT 'spuid',
`price` decimal(10,0) DEFAULT NULL COMMENT '價格',
`sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名稱',
`sku_desc` varchar(2000) DEFAULT NULL COMMENT '商品規格描述',
`weight` decimal(10,2) DEFAULT NULL COMMENT '重量',
`tm_id` bigint(20) DEFAULT NULL COMMENT '品牌(冗余)',
`category3_id` bigint(20) DEFAULT NULL COMMENT '三級分類id(冗余)',
`sku_default_img` varchar(200) DEFAULT NULL COMMENT '默認顯示圖片(冗余)',
`create_time` datetime DEFAULT NULL COMMENT '創建時間',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='商品表';
~~~
* 商品一級類目表(base\_category1)
~~~sql
CREATE TABLE `base_category1` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號',
`name` varchar(10) NOT NULL COMMENT '分類名稱',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='一級分類表';
~~~
* 商品二級類目表(base\_category2)
~~~sql
CREATE TABLE `base_category2` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號',
`name` varchar(200) NOT NULL COMMENT '二級分類名稱',
`category1_id` bigint(20) DEFAULT NULL COMMENT '一級分類編號',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='二級分類表';
~~~
* 商品三級類目表(base\_category3)
~~~sql
CREATE TABLE `base_category3` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號',
`name` varchar(200) NOT NULL COMMENT '三級分類名稱',
`category2_id` bigint(20) DEFAULT NULL COMMENT '二級分類編號',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='三級分類表';
~~~
* 省份表(base\_province)
~~~sql
CREATE TABLE `base_province` (
`id` int(20) DEFAULT NULL COMMENT 'id',
`name` varchar(20) DEFAULT NULL COMMENT '省名稱',
`region_id` int(20) DEFAULT NULL COMMENT '大區id',
`area_code` varchar(20) DEFAULT NULL COMMENT '行政區位碼'
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
~~~
* 區域表(base\_region)
~~~sql
CREATE TABLE `base_region` (
`id` int(20) NOT NULL COMMENT '大區id',
`region_name` varchar(20) DEFAULT NULL COMMENT '大區名稱',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
~~~
> 注意:以上的建表語句是在MySQL中完成的,完整的建表及模擬數據生成腳本見:
>
> 鏈接:[https://pan.baidu.com/s/1fcMgDHGKedOpzqLbSRUGwA](https://links.jianshu.com/go?to=https%3A%2F%2Fpan.baidu.com%2Fs%2F1fcMgDHGKedOpzqLbSRUGwA) 提取碼:zuqw
## 數據處理流程
### ODS層數據同步
關于ODS層的數據同步參見我的另一篇文章[基于Canal與Flink實現數據實時增量同步(一)](https://links.jianshu.com/go?to=https%3A%2F%2Fmp.weixin.qq.com%2Fs%2FooPAScXAw2soqlgEoSbRAw)。主要使用canal解析MySQL的binlog日志,然后將其寫入到Kafka對應的topic中。由于篇幅限制,不會對具體的細節進行說明。同步之后的結果如下圖所示:

image
### DIM層維表數據準備
本案例中將維表存儲在了MySQL中,實際生產中會用HBase存儲維表數據。我們主要用到兩張維表:**區域維表**和**商品維表**。處理過程如下:
* 區域維表
首先將`mydw.base_province`和`mydw.base_region`這個主題對應的數據抽取到MySQL中,主要使用Flink SQL的Kafka數據源對應的canal-json格式,注意:在執行裝載之前,需要先在MySQL中創建對應的表,本文使用的MySQL數據庫的名字為**dim**,用于存放維表數據。如下:
~~~sql
-- -------------------------
-- 省份
-- kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_base_province`;
CREATE TABLE `ods_base_province` (
`id` INT,
`name` STRING,
`region_id` INT ,
`area_code`STRING
) WITH(
'connector' = 'kafka',
'topic' = 'mydw.base_province',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 省份
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `base_province`;
CREATE TABLE `base_province` (
`id` INT,
`name` STRING,
`region_id` INT ,
`area_code`STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'base_province', -- MySQL中的待插入數據的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 省份
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO base_province
SELECT *
FROM ods_base_province;
-- -------------------------
-- 區域
-- kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_base_region`;
CREATE TABLE `ods_base_region` (
`id` INT,
`region_name` STRING
) WITH(
'connector' = 'kafka',
'topic' = 'mydw.base_region',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 區域
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `base_region`;
CREATE TABLE `base_region` (
`id` INT,
`region_name` STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'base_region', -- MySQL中的待插入數據的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 區域
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO base_region
SELECT *
FROM ods_base_region;
~~~
經過上面的步驟,將創建維表所需要的原始數據已經存儲到了MySQL中,接下來就需要在MySQL中創建維表,我們使用上面的兩張表,創建一張視圖:`dim_province`作為維表:
~~~sql
-- ---------------------------------
-- DIM層,區域維表,
-- 在MySQL中創建視圖
-- ---------------------------------
DROP VIEW IF EXISTS dim_province;
CREATE VIEW dim_province AS
SELECT
bp.id AS province_id,
bp.name AS province_name,
br.id AS region_id,
br.region_name AS region_name,
bp.area_code AS area_code
FROM base_region br
JOIN base_province bp ON br.id= bp.region_id
;
~~~
這樣我們所需要的維表:dim\_province就創建好了,只需要在維表join時,使用Flink SQL創建JDBC的數據源,就可以使用該維表了。同理,我們使用相同的方法創建商品維表,具體如下:
~~~sql
-- -------------------------
-- 一級類目表
-- kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_base_category1`;
CREATE TABLE `ods_base_category1` (
`id` BIGINT,
`name` STRING
)WITH(
'connector' = 'kafka',
'topic' = 'mydw.base_category1',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 一級類目表
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `base_category1`;
CREATE TABLE `base_category1` (
`id` BIGINT,
`name` STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'base_category1', -- MySQL中的待插入數據的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 一級類目表
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO base_category1
SELECT *
FROM ods_base_category1;
-- -------------------------
-- 二級類目表
-- kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_base_category2`;
CREATE TABLE `ods_base_category2` (
`id` BIGINT,
`name` STRING,
`category1_id` BIGINT
)WITH(
'connector' = 'kafka',
'topic' = 'mydw.base_category2',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 二級類目表
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `base_category2`;
CREATE TABLE `base_category2` (
`id` BIGINT,
`name` STRING,
`category1_id` BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'base_category2', -- MySQL中的待插入數據的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 二級類目表
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO base_category2
SELECT *
FROM ods_base_category2;
-- -------------------------
-- 三級類目表
-- kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_base_category3`;
CREATE TABLE `ods_base_category3` (
`id` BIGINT,
`name` STRING,
`category2_id` BIGINT
)WITH(
'connector' = 'kafka',
'topic' = 'mydw.base_category3',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 三級類目表
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `base_category3`;
CREATE TABLE `base_category3` (
`id` BIGINT,
`name` STRING,
`category2_id` BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'base_category3', -- MySQL中的待插入數據的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 三級類目表
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO base_category3
SELECT *
FROM ods_base_category3;
-- -------------------------
-- 商品表
-- Kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_sku_info`;
CREATE TABLE `ods_sku_info` (
`id` BIGINT,
`spu_id` BIGINT,
`price` DECIMAL(10,0),
`sku_name` STRING,
`sku_desc` STRING,
`weight` DECIMAL(10,2),
`tm_id` BIGINT,
`category3_id` BIGINT,
`sku_default_img` STRING,
`create_time` TIMESTAMP(0)
) WITH(
'connector' = 'kafka',
'topic' = 'mydw.sku_info',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 商品表
-- MySQL Sink
-- -------------------------
DROP TABLE IF EXISTS `sku_info`;
CREATE TABLE `sku_info` (
`id` BIGINT,
`spu_id` BIGINT,
`price` DECIMAL(10,0),
`sku_name` STRING,
`sku_desc` STRING,
`weight` DECIMAL(10,2),
`tm_id` BIGINT,
`category3_id` BIGINT,
`sku_default_img` STRING,
`create_time` TIMESTAMP(0),
PRIMARY KEY (tm_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'sku_info', -- MySQL中的待插入數據的表
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'sink.buffer-flush.interval' = '1s'
);
-- -------------------------
-- 商品
-- MySQL Sink Load Data
-- -------------------------
INSERT INTO sku_info
SELECT *
FROM ods_sku_info;
~~~
經過上面的步驟,我們可以將創建商品維表的基礎數據表同步到MySQL中,同樣需要提前創建好對應的數據表。接下來我們使用上面的基礎表在mySQL的dim庫中創建一張視圖:`dim_sku_info`,用作后續使用的維表。
~~~sql
-- ---------------------------------
-- DIM層,商品維表,
-- 在MySQL中創建視圖
-- ---------------------------------
CREATE VIEW dim_sku_info AS
SELECT
si.id AS id,
si.sku_name AS sku_name,
si.category3_id AS c3_id,
si.weight AS weight,
si.tm_id AS tm_id,
si.price AS price,
si.spu_id AS spu_id,
c3.name AS c3_name,
c2.id AS c2_id,
c2.name AS c2_name,
c3.id AS c1_id,
c3.name AS c1_name
FROM
(
sku_info si
JOIN base_category3 c3 ON si.category3_id = c3.id
JOIN base_category2 c2 ON c3.category2_id =c2.id
JOIN base_category1 c1 ON c2.category1_id = c1.id
);
~~~
至此,我們所需要的維表數據已經準備好了,接下來開始處理DWD層的數據。
### DWD層數據處理
經過上面的步驟,我們已經將所用的維表已經準備好了。接下來我們將對ODS的原始數據進行處理,加工成DWD層的明細寬表。具體過程如下:
~~~sql
-- -------------------------
-- 訂單詳情
-- Kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_order_detail`;
CREATE TABLE `ods_order_detail`(
`id` BIGINT,
`order_id` BIGINT,
`sku_id` BIGINT,
`sku_name` STRING,
`img_url` STRING,
`order_price` DECIMAL(10,2),
`sku_num` INT,
`create_time` TIMESTAMP(0)
) WITH(
'connector' = 'kafka',
'topic' = 'mydw.order_detail',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- -------------------------
-- 訂單信息
-- Kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_order_info`;
CREATE TABLE `ods_order_info` (
`id` BIGINT,
`consignee` STRING,
`consignee_tel` STRING,
`total_amount` DECIMAL(10,2),
`order_status` STRING,
`user_id` BIGINT,
`payment_way` STRING,
`delivery_address` STRING,
`order_comment` STRING,
`out_trade_no` STRING,
`trade_body` STRING,
`create_time` TIMESTAMP(0) ,
`operate_time` TIMESTAMP(0) ,
`expire_time` TIMESTAMP(0) ,
`tracking_no` STRING,
`parent_order_id` BIGINT,
`img_url` STRING,
`province_id` INT
) WITH(
'connector' = 'kafka',
'topic' = 'mydw.order_info',
'properties.bootstrap.servers' = 'kms-3:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' ,
'scan.startup.mode' = 'earliest-offset'
) ;
-- ---------------------------------
-- DWD層,支付訂單明細表dwd_paid_order_detail
-- ---------------------------------
DROP TABLE IF EXISTS dwd_paid_order_detail;
CREATE TABLE dwd_paid_order_detail
(
detail_id BIGINT,
order_id BIGINT,
user_id BIGINT,
province_id INT,
sku_id BIGINT,
sku_name STRING,
sku_num INT,
order_price DECIMAL(10,0),
create_time TIMESTAMP(0),
pay_time TIMESTAMP(0)
) WITH (
'connector' = 'kafka',
'topic' = 'dwd_paid_order_detail',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- DWD層,已支付訂單明細表
-- 向dwd_paid_order_detail裝載數據
-- ---------------------------------
INSERT INTO dwd_paid_order_detail
SELECT
od.id,
oi.id order_id,
oi.user_id,
oi.province_id,
od.sku_id,
od.sku_name,
od.sku_num,
od.order_price,
oi.create_time,
oi.operate_time
FROM
(
SELECT *
FROM ods_order_info
WHERE order_status = '2' -- 已支付
) oi JOIN
(
SELECT *
FROM ods_order_detail
) od
ON oi.id = od.order_id;
~~~

image
### ADS層數據
經過上面的步驟,我們創建了一張dwd\_paid\_order\_detail明細寬表,并將該表存儲在了Kafka中。接下來我們將使用這張明細寬表與維表進行JOIN,得到我們ADS應用層數據。
* **ads\_province\_index**
首先在MySQL中創建對應的ADS目標表:**ads\_province\_index**
~~~sql
CREATE TABLE ads.ads_province_index(
province_id INT(10),
area_code VARCHAR(100),
province_name VARCHAR(100),
region_id INT(10),
region_name VARCHAR(100),
order_amount DECIMAL(10,2),
order_count BIGINT(10),
dt VARCHAR(100),
PRIMARY KEY (province_id, dt)
) ;
~~~
向MySQL的ADS層目標裝載數據:
~~~sql
-- Flink SQL Cli操作
-- ---------------------------------
-- 使用 DDL創建MySQL中的ADS層表
-- 指標:1.每天每個省份的訂單數
-- 2.每天每個省份的訂單金額
-- ---------------------------------
CREATE TABLE ads_province_index(
province_id INT,
area_code STRING,
province_name STRING,
region_id INT,
region_name STRING,
order_amount DECIMAL(10,2),
order_count BIGINT,
dt STRING,
PRIMARY KEY (province_id, dt) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/ads',
'table-name' = 'ads_province_index',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe'
);
-- ---------------------------------
-- dwd_paid_order_detail已支付訂單明細寬表
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
detail_id BIGINT,
order_id BIGINT,
user_id BIGINT,
province_id INT,
sku_id BIGINT,
sku_name STRING,
sku_num INT,
order_price DECIMAL(10,2),
create_time STRING,
pay_time STRING
) WITH (
'connector' = 'kafka',
'topic' = 'dwd_paid_order_detail',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_province_index
-- 訂單匯總臨時表
-- ---------------------------------
CREATE TABLE tmp_province_index(
province_id INT,
order_count BIGINT,-- 訂單數
order_amount DECIMAL(10,2), -- 訂單金額
pay_date DATE
)WITH (
'connector' = 'kafka',
'topic' = 'tmp_province_index',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_province_index
-- 訂單匯總臨時表數據裝載
-- ---------------------------------
INSERT INTO tmp_province_index
SELECT
province_id,
count(distinct order_id) order_count,-- 訂單數
sum(order_price * sku_num) order_amount, -- 訂單金額
TO_DATE(pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY province_id,TO_DATE(pay_time,'yyyy-MM-dd')
;
-- ---------------------------------
-- tmp_province_index_source
-- 使用該臨時匯總表,作為數據源
-- ---------------------------------
CREATE TABLE tmp_province_index_source(
province_id INT,
order_count BIGINT,-- 訂單數
order_amount DECIMAL(10,2), -- 訂單金額
pay_date DATE,
proctime as PROCTIME() -- 通過計算列產生一個處理時間列
) WITH (
'connector' = 'kafka',
'topic' = 'tmp_province_index',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- DIM層,區域維表,
-- 創建區域維表數據源
-- ---------------------------------
DROP TABLE IF EXISTS `dim_province`;
CREATE TABLE dim_province (
province_id INT,
province_name STRING,
area_code STRING,
region_id INT,
region_name STRING ,
PRIMARY KEY (province_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'dim_province',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'scan.fetch-size' = '100'
);
-- ---------------------------------
-- 向ads_province_index裝載數據
-- 維表JOIN
-- ---------------------------------
INSERT INTO ads_province_index
SELECT
pc.province_id,
dp.area_code,
dp.province_name,
dp.region_id,
dp.region_name,
pc.order_amount,
pc.order_count,
cast(pc.pay_date as VARCHAR)
FROM
tmp_province_index_source pc
JOIN dim_province FOR SYSTEM_TIME AS OF pc.proctime as dp
ON dp.province_id = pc.province_id;
~~~
當提交任務之后:觀察Flink WEB UI:

image
查看ADS層的ads\_province\_index表數據:

image
* **ads\_sku\_index**
首先在MySQL中創建對應的ADS目標表:**ads\_sku\_index**
~~~sql
CREATE TABLE ads_sku_index
(
sku_id BIGINT(10),
sku_name VARCHAR(100),
weight DOUBLE,
tm_id BIGINT(10),
price DOUBLE,
spu_id BIGINT(10),
c3_id BIGINT(10),
c3_name VARCHAR(100) ,
c2_id BIGINT(10),
c2_name VARCHAR(100),
c1_id BIGINT(10),
c1_name VARCHAR(100),
order_amount DOUBLE,
order_count BIGINT(10),
sku_count BIGINT(10),
dt varchar(100),
PRIMARY KEY (sku_id,dt)
);
~~~
向MySQL的ADS層目標裝載數據:
~~~sql
-- ---------------------------------
-- 使用 DDL創建MySQL中的ADS層表
-- 指標:1.每天每個商品對應的訂單個數
-- 2.每天每個商品對應的訂單金額
-- 3.每天每個商品對應的數量
-- ---------------------------------
CREATE TABLE ads_sku_index
(
sku_id BIGINT,
sku_name VARCHAR,
weight DOUBLE,
tm_id BIGINT,
price DOUBLE,
spu_id BIGINT,
c3_id BIGINT,
c3_name VARCHAR ,
c2_id BIGINT,
c2_name VARCHAR,
c1_id BIGINT,
c1_name VARCHAR,
order_amount DOUBLE,
order_count BIGINT,
sku_count BIGINT,
dt varchar,
PRIMARY KEY (sku_id,dt) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/ads',
'table-name' = 'ads_sku_index',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe'
);
-- ---------------------------------
-- dwd_paid_order_detail已支付訂單明細寬表
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
detail_id BIGINT,
order_id BIGINT,
user_id BIGINT,
province_id INT,
sku_id BIGINT,
sku_name STRING,
sku_num INT,
order_price DECIMAL(10,2),
create_time STRING,
pay_time STRING
) WITH (
'connector' = 'kafka',
'topic' = 'dwd_paid_order_detail',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_sku_index
-- 商品指標統計
-- ---------------------------------
CREATE TABLE tmp_sku_index(
sku_id BIGINT,
order_count BIGINT,-- 訂單數
order_amount DECIMAL(10,2), -- 訂單金額
order_sku_num BIGINT,
pay_date DATE
)WITH (
'connector' = 'kafka',
'topic' = 'tmp_sku_index',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_sku_index
-- 數據裝載
-- ---------------------------------
INSERT INTO tmp_sku_index
SELECT
sku_id,
count(distinct order_id) order_count,-- 訂單數
sum(order_price * sku_num) order_amount, -- 訂單金額
sum(sku_num) order_sku_num,
TO_DATE(pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY sku_id,TO_DATE(pay_time,'yyyy-MM-dd')
;
-- ---------------------------------
-- tmp_sku_index_source
-- 使用該臨時匯總表,作為數據源
-- ---------------------------------
CREATE TABLE tmp_sku_index_source(
sku_id BIGINT,
order_count BIGINT,-- 訂單數
order_amount DECIMAL(10,2), -- 訂單金額
order_sku_num BIGINT,
pay_date DATE,
proctime as PROCTIME() -- 通過計算列產生一個處理時間列
) WITH (
'connector' = 'kafka',
'topic' = 'tmp_sku_index',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kms-3:9092',
'format' = 'changelog-json'
);
-- ---------------------------------
-- DIM層,商品維表,
-- 創建商品維表數據源
-- ---------------------------------
DROP TABLE IF EXISTS `dim_sku_info`;
CREATE TABLE dim_sku_info (
id BIGINT,
sku_name STRING,
c3_id BIGINT,
weight DECIMAL(10,2),
tm_id BIGINT,
price DECIMAL(10,2),
spu_id BIGINT,
c3_name STRING,
c2_id BIGINT,
c2_name STRING,
c1_id BIGINT,
c1_name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://kms-1:3306/dim',
'table-name' = 'dim_sku_info',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123qwe',
'scan.fetch-size' = '100'
);
-- ---------------------------------
-- 向ads_sku_index裝載數據
-- 維表JOIN
-- ---------------------------------
INSERT INTO ads_sku_index
SELECT
sku_id ,
sku_name ,
weight ,
tm_id ,
price ,
spu_id ,
c3_id ,
c3_name,
c2_id ,
c2_name ,
c1_id ,
c1_name ,
sc.order_amount,
sc.order_count ,
sc.order_sku_num ,
cast(sc.pay_date as VARCHAR)
FROM
tmp_sku_index_source sc
JOIN dim_sku_info FOR SYSTEM_TIME AS OF sc.proctime as ds
ON ds.id = sc.sku_id
;
~~~
當提交任務之后:觀察Flink WEB UI:

image
查看ADS層的ads\_sku\_index表數據:

image
### FineBI結果展示

## 其他注意點
### Flink1.11.0存在的bug
當在代碼中使用Flink1.11.0版本時,如果將一個change-log的數據源insert到一個upsert sink時,會報如下異常:
~~~bash
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
Current node is TableSourceScan(table=[[default_catalog, default_database, t_pick_order]], fields=[order_no, status])
~~~
該bug目前已被修復,修復可以在Flink1.11.1中使用。
## 總結
本文主要分享了構建一個實時數倉的demo案例,通過本文可以了解實時數倉的數據處理流程,在此基礎之上,對Flink SQL的CDC會有更加深刻的認識。另外,本文給出了非常詳細的使用案例,你可以直接上手進行操作,在實踐中探索實時數倉的構建流程。
- php開發
- 常用技巧
- 字符數組對象
- php換行替換,PHP替換回車換行符的三種方法
- PHP 數組轉字符串,與字符串轉數組
- php將img中的寬高刪除,PHP刪除HTML中寬高樣式的詳解
- php去除換行(回車換行)的三種方法
- php 過濾word 樣式
- php如何設置隨機數
- 2個比較經典的PHP加密解密函數分享
- php怎么去除小數點后多余的0
- php中判斷是一維數組還是二維數組的解決方案
- php 獲取數組中出現次數最多的值(重復最多的值)與出現的次數
- PHP過濾掉換行符、特殊空格、制表符等
- PHP中json_endoce轉義反斜杠的問題
- PHP過濾Emoji表情和特殊符號的方法
- PHP完美的提取鏈接正則
- php很牛的圖片采集
- 日期處理
- php 獲取今日、昨日、上周、本月的起始時間戳和結束時間戳的方法非常簡單
- PHP指定時間戳/日期加一天,一年,一周,一月
- 使用php 獲取時間今天明天昨天時間戳的詳解
- php獲得當月的節假日函數(包含周末,年度節假日)
- PHP獲取本月起始和截止時間戳
- php 獲取每月開始結束時間,php 獲取指定月份的開始結束時間戳
- PHP獲取今天,昨天,本月,上個月,本年 起始時間戳
- php、mysql查詢當天,本周,本月的用法
- php獲取兩個時間戳之間相隔多少天多少小時多少分多少秒
- 毫秒級時間戳和日期格式轉換
- php-倒計時
- 請求提交上傳
- php+put+post,Curl和PHP-如何通過PUT,POST,GET通過curl傳遞json
- PHP put提交和獲取數據
- PHP curl put方式上傳文件
- 數據導入導出
- PHP快速導入大量數據到數據庫的方法
- PHP快速導出百萬級數據到CSV或者EXCEL文件
- PHP解析大型Excel表格的庫:box/spout
- PHP導入(百萬級)Excel表格數據
- PHP如何切割excel大文件
- 使用 PHP_XLSXWriter 代替 PHPExcel 10W+ 數據秒級導出
- 安裝php擴展XLSXWriter
- 解決php導入excel表格時獲取日期變成浮點數的方法
- xml處理
- PHP XML和數組互相轉換
- php解析xml字符串
- php 生成vcf通訊錄
- 文件操作相關
- php獲取文件后綴的9種方法
- PHP判斷遠程文件是否存在
- PHP獲取文件修改時間,訪問時間,inode修改時間
- php獲取遠程文件大小教程
- php 讀取文件并以文件方式下載
- php 把數字轉化為大寫中文
- 請求響應
- PHP 獲取當前訪問的URL
- 壓縮
- php生成zip壓縮包
- PHPMailer
- 整理PHPMailer 發送郵件 郵件內容為html 可以添加多個附件等
- 通達oa
- OA管理員密碼忘了怎么辦,這里教你分分鐘搞定…
- 跨域
- php解決多站點跨域
- php設置samesite cookie,有效防止CSRF
- Chrome 配置samesite=none方式
- Cookie 的 SameSite 屬性
- 圖片
- php pdf首頁截圖,PHP_PHP中使用Imagick讀取pdf并生成png縮略圖實例,pdf生成png首頁縮略圖
- PHP -- 七牛云 在線視頻 獲取某一幀作為封面圖
- PHP圖片壓縮方法
- 如何解決PHP curl或file_get_contents下載圖片損壞或無法打開的問題
- php遠程下載文章中圖片并保存源文件名不變
- 詳解PHP如何下載采集圖片到本地(附代碼實例)
- php如何將webp格式圖片轉為jpeg
- PHP獲取遠程圖片的寬高和體積大小
- php 軟件版本號比較
- 使用PHP通過SMTP發送電郵
- 常用正則表達式
- php如何用正則表達式匹配中文
- 用于分割字符串的 PHP preg_match_all 正則表達式
- 性能優化
- php.ini配置調優
- PHP 幾種常見超時的設置方法
- PHP函數in_array、array_key_exists和isset效率分析
- php array push 和array_merge 效率誰高,php 通過array_merge()和array+array合并數組的區別和效率比較...
- php 兩個數組取交集、并集、差集
- 設置PHP最大連接數及php-fpm 高并發 參數調整
- 小工具
- php 獲取代碼執行時間和消耗的內存
- PHP如何判斷某項擴展是否開啟
- centos7.x下php 導出擴展 XLSXWriter 安裝
- php生成mysql數據庫字典
- PHP 實現 word/excel/ppt 轉換為 PDF
- composer的使用
- showdoc sqlite3 找回管理員密碼
- php怎么將數組轉為xml
- PHP抖音最新視頻提取代碼
- yii
- Yii2 如何獲取Header參數?
- swoole
- Linux下搭建swoole服務的基本步驟
- 相關學習資料
- 帶你學習swoole_process詳解
- 按照官方文檔 在win10下安裝 docker for windows easyswoole鏡像 掛載目錄
- php常用框架
- Hyperf
- 常用算法PHP版
- thinkphp6
- TP6 事件綁定、監聽、訂閱
- Thinkphp 模板中輸出HTML的變量
- Thinkphp6(操作SQL數據庫)
- thinkphp6 mysql查詢語句對于為null和為空字符串給出特定值處理
- Thinkphp 6 - 連接配置多個數據庫并實現自由切換(詳細過程及實例demo)
- TP框架中的Db::name 和 dB::table 以及 db('') 的區別
- thinkphp6.0模型篇之模型的軟刪除
- thinkphp6自定義日志驅動,增加顯示全部請求信息
- 其他系統
- 微擎數據庫字段字典
- Flutter實現微信支付和iOS IAP支付
- Flutter上線項目實戰——蘋果內購
- PHP接入蘋果支付
- 調試
- php如何獲取當前腳本所有加載的文件
- php跟蹤所有調用方法,日志方法
- 解析phpstorm + xdebug 遠程斷點調試
- PHP XDEBUG調試 PHPSTORM配置
- 異常處理
- PHP 出現 502 解決方案
- php 語法解析錯誤 syntax error unexpected namespace T_NAMESPACE
- Composer 安裝與使用
- 數據庫相關
- php pdo怎么設置utf8
- php 如何根據最新聊天對用戶進行排序
- php lic&fpm
- 讓php程序在linux后臺執行
- PHPcli模式和fpm模式優缺點在哪里?
- 運行模式
- php運行模式之cli模式
- 自己庫
- php批量獲取所有公眾號粉絲openid
- 地圖
- php 判斷點在多邊形內,php百度地圖api判斷地址是否在多邊形區域內
- PHP,Mysql-根據一個給定經緯度的點,進行附近地點查詢
- MySQL 根據經緯度查找排序
- PHP+MySQL獲取坐標范圍內的數據
- 【百度地圖】刪除指定覆蓋物
- 百度地圖多點+畫連接線+數字標注
- laravel5.8
- laravel5.8(四)引入自定義常量文件及公共函數文件
- Lumen 查詢執行SQL
- 使你的 Laravel 項目模塊化
- Laravel 多條件 AND , OR條件組合查詢
- Laravel 查詢 多個or或者and條件
- laravel redis操作大全
- laravel中外部定義whereIn的用法和where中使用in
- lumen5.8
- 創建laravel5.8 lumen前后臺api項目--記錄請求和響應日志
- Laravel和Lumen開啟SQL日志記錄
- Laravel 5.8 常用操作(路徑+日志+分頁+其他操作)
- 升級php7.4 laravel lumen報錯Trying to access array offset on value of type null
- Laravel 任務調度(計劃任務,定時任務)
- laravel的command定時任務時間的設置
- Laravel任務調度的簡單使用
- laravel單數據庫執行事務和多數據庫執行事務
- laravel中鎖以及事務的簡單使用
- 申請其他相關
- 小程序地理位置接口申請
- PHP高并發
- php 高并發下 秒殺處理思路
- 記錄 PHP高并發 商品秒殺 問題的 Redis解決方案
- thinkphp3.2
- thinkphp3.2 數據庫 AND OR連綴使用
- laravel
- laravel的聯表查詢with方法的使用
- laravel獲取請求路由對應的控制器和方法
- Laravel 模型關聯建立與查詢
- Laravel多表(3張表以上)with[]關聯查詢,對關聯的模型做條件查詢(has,跟join一樣結果 )
- Laravel模型屬性的隱藏屬性、顯示屬性和臨時暴露隱藏屬性用法介紹
- aravel獲取當前的url以及當前的基礎域名方法匯總
- Laravel 模型實現多庫查詢或者多表映射
- 關于 Laravel 的 with 多表查詢問題
- Laravel 模型過濾(Filter)設計
- 懶加載、預加載、with()、load() 傻傻分不清楚?
- laravel模型$castsl屬性
- Laravel Query Builder 復雜查詢案例:子查詢實現分區查詢 partition by
- Laravel 模型關聯、關聯查詢、預加載使用實例
- laravel 中with關聯查詢限定查詢字段
- laravel 原生字段查詢 whereRaw 和 where(DB::raw(''))
- lavarel - where條件分組查詢(orWhere)
- 通過 Laravel 查詢構建器實現復雜的查詢語句
- 兩個結果集合并成一個
- Laravel 對某一列進行篩選然后求和 sum()
- laravel怎么優雅的拼接where,處理whereIn與where數組查詢的問題
- laravel查詢時判斷是否存在數據
- laravel中的whereNull和whereNotNull
- laravel框架中的子查詢
- Laravel框架中 orwhere 多條件查詢的使用
- Laravel中where的高級使用方法
- laravel復雜的數據庫查詢(事例)
- laravel多條件查詢方法(and,or嵌套查詢)
- Laravel 的 where or 查詢
- Laravel 進行where 多個or和and的條件查詢可用
- laravel Middleware 中間件 $next($request) 報錯不執行問題
- 基于Laravel框架--自定義CORS跨域中間件
- laravel9新增路由文件及解決跨域問題方法
- 解決在laravel中leftjoin帶條件查詢沒有返回右表為NULL的問題
- 【Laravel 】faker數據填充詳解
- 數據庫
- mysql
- mysql聯合索引(復合索引)詳解
- MYSQL 清空表和截斷表
- MySQL快速生成大量測試數據(100萬、1000萬、1億)
- 提高mysql千萬級大數據SQL查詢優化30條經驗(Mysql索引優化注意)
- MySQL常用命令
- MySQL(三)|《千萬級大數據查詢優化》第一篇:創建高性能的索引
- MySQL(一)|性能分析方法、SQL性能優化和MySQL內部配置優化
- MySQL(二)|深入理解MySQL的四種隔離級別及加鎖實現原理
- MySQL(四)|《千萬級大數據查詢優化》第一篇:創建高性能的索引(補充)
- MySQL(五)|《千萬級大數據查詢優化》第二篇:查詢性能優化(1)
- MySQL(六)|《千萬級大數據查詢優化》第二篇:查詢性能優化(2)
- MySQL(七)|MySQL分庫分表的那點事
- Mysql索引優化 Mysql通過索引提升查詢效率(第二棒)
- MySQL查詢的性能優化(查詢緩存、排序跟索引)
- 【總結】MySQL數據庫
- MySQL存儲引擎、事務日志并發訪問以及隔離級別
- 技巧
- 數據庫 SQL查詢重復記錄 方法
- 替換數據庫中某個字段中的部分字符
- mysql開啟bin log 并查看bin log日志(linux)
- 分表分區
- 千萬級別數據的mysql數據表優化
- MYSQL百萬級數據,如何優化
- MySQL備份和恢復
- MySQL間隙鎖死鎖問題
- 小技巧
- 基礎
- MySQL中sql_mode參數
- mysql數據庫異常
- this is incompatible with sql_mode=only_full_group_by
- mysql安全
- MySQL數據庫被比特幣勒索及安全調整
- mysql忘記密碼后重置(以windows系統mysql 8.4為例)
- MongoDB
- sql查詢
- MYSQL按時間段分組查詢當天,每小時,15分鐘數據分組
- 高級
- 基于 MySQL + Tablestore 分層存儲架構的大規模訂單系統實踐-架構篇
- 數據庫安全
- 服務器被黑,MySQL 數據庫遭比特幣勒索!該如何恢復?
- 數千臺MySQL數據庫遭黑客比特幣勒索,該怎么破?
- MySQL 數據庫規范
- MySQL數據庫開發的36條鐵律
- Elasticsearch
- 安裝與配置
- ElasticSearch關閉重啟命令
- 設置ES默認分詞器IK analyzer
- 查詢
- elasticsearch 模糊查詢不分詞,實現 mysql like
- elasticSearch多條件高級檢索語句,包含多個must、must_not、should嵌套示例,并考慮nested對象的特殊檢索
- elasticSearch按字段普通檢索,結果高亮
- Elasticsearch 如何實現查詢/聚合不區分大小寫?
- 索引更新&刷新
- refresh與批量操作的效率
- Elasticsearch 刪除type
- 分詞器
- ElasticSearch最全分詞器比較及使用方法
- 異常錯誤
- 解決ES因內存不足而無法查詢的錯誤,Data too large, data for [<http_request>]
- linux
- 基本知識
- CentOS7.5 通過wget下載文件到指定目錄
- 【CentOS】vi命令
- centos7查看硬盤使用情況
- CentOS7 查看目錄大小
- Centos 7下查看當前目錄大小及文件個數
- 普通用戶sudo\su 到root免密碼
- 普通用戶切換到root用戶下的免密配置方法
- linux 獲取進程啟動參數,linux查看進程啟動及運行時間
- Linux 查看進程
- linux刪除文件后不釋放磁盤的問題
- Linux查找大文件命令
- linux 如何關閉正在執行的php腳本
- linux三劍客(grep、sed、awk)基本使用
- centos 卸載軟件
- centos查看內存、cpu占用、占用前10,前X
- Centos 查看系統狀態
- 異常
- 問題解決:Failed to download metadata for repo ‘appstream‘: Cannot prepare internal mirrorlist:...
- php相關
- centos 安裝phpize
- Centos7.2下phpize安裝php擴展
- 切換版本
- 運營工具
- 資深Linux運維工程師常用的10款軟件/工具介紹
- 一款良心的終端連接工具
- 六款Linux常用遠程連接工具介紹,看看哪一款最適合你
- Finalshell
- Linux Finalshell連接centos7和文件無顯示問題
- WSL2:我在原生的Win10玩轉Linux系統
- MobaXterm
- 運維
- linux服務器上定時自動備份數據庫,并保留最新5天的數據
- Centos系統開啟及關閉端口
- CentOS7開放和關閉端口命令
- Linux中查看所有正在運行的進程
- 防火墻firewall-cmd命令詳解
- centos 7.8阿里云服務器掛載 數據盤
- Linux Finalshell連接centos7和文件無顯示問題
- Centos7系統端口被占用問題的解決方法
- vi
- 如何在Vim/Vi中復制,剪切和粘貼
- 命令
- [Linux kill進程] kill 進程pid的使用詳解
- 備份還原
- Linux的幾種備份、恢復系統方式
- Linux系統全盤備份方法
- 相關軟件安裝
- linux下 lua安裝
- python
- 升級pip之后出現sys.stderr.write(f“ERROR: {exc}“)
- lua
- centos源碼部署lua-5.3
- deepin
- deepin20.6設置默認的root密碼
- 任務相關
- 寶塔定時任務按秒執行
- CentOS 7 定時任務 crontab 入門
- centos7定時任務crontab
- Linux下定時任務的查看及取消
- Linux(CentOS7)定時執行任務Crond詳細說明
- Linux 查看所有定時任務
- linux查看所有用戶定時任務
- Linux 定時任務(超詳細)
- 防火墻
- Centos7開啟防火墻及特定端口
- CentOS防火墻操作:開啟端口、開啟、關閉、配置
- 生成 SSH 密鑰(windows+liunx)
- 阿里云,掛載云盤
- 前端
- layui
- layui多文件上傳
- layer.msg()彈框,彈框后繼續運行
- radio取值
- layui-數據表格排序
- Layui select選擇框添加搜索選項功能
- 保持原來樣式
- layui表格單元如何自動換行
- layui-laydate時間日歷控件使用方法詳解
- layui定時刷新數據表格
- layer 延時設置
- layer.open 回調函數
- 【Layui內置方法】layer.msg延時關閉msg對話框(代碼案例)
- layui多圖上傳圖片順序錯亂及重復上傳解決
- layer.confirm關閉彈窗
- vue
- Vue跨域解決方法
- vue 4.xx.xx版本降級至2.9.6
- vue-cli 2.x升級到3.x版本, 和3.x降級到2.x版本命令
- 最新版Vue或者指定版本
- Vue2.6.11按需模塊安裝配置
- jQuery
- jQuery在頁面加載時動態修改圖片尺寸的方法
- jquery操作select(取值,設置選中)
- 日歷
- FullCalendar中文文檔:Event日程事件
- js
- JS 之 重定向
- javascript截取video視頻第一幀作為封面方案
- HTML <video> preload 屬性
- jQuery使用ajax提交post數據
- JS截取視頻靚麗的幀作為封面
- H5案例分享:移動端touch事件判斷滑屏手勢的方向
- JS快速獲取圖片寬高的方法
- win
- Windows環境下curl的使用
- Cygwin
- Windows下安裝Cygwin及apt-cyg
- Cygwin 安裝、CMake 安裝
- mklink命令 詳細使用
- Nginx
- Nginx高級篇-性能優化
- Nginx常用命令(Linux)
- linux+docker+nginx如何配置環境并配置域名訪問
- Nginx的啟動(start),停止(stop)命令
- linux 查看nginx 安裝路徑
- 安裝配置
- Linux 查看 nginx 安裝目錄和配置文件路徑
- 【NGINX入門】3.Nginx的緩存服務器proxy_cache配置
- thinkphp6.0 偽靜態失效404(win下)
- 深入
- nginx rewrite及多upstream
- Nginx負載均衡(upstream)
- 專業術語
- 耦合?依賴?耦合和依賴的關系?耦合就是依賴
- PHP常用六大設計模式
- 高可用
- 分布式與集群
- Nginx 實踐案例:反向代理單臺web;反向代理多組web并實現負載均衡
- 容器
- Docker
- 30 分鐘快速入門 Docker 教程
- linux查看正在運行的容器,說說Docker 容器常用命令
- Windows 安裝Docker至D盤
- 配置
- win10 快速搭建 lnmp+swoole 環境 ,部署laravel6 與 swoole框架laravel-s項目1
- win10 快速搭建 lnmp+swoole 環境 ,部署laravel6 與 swoole框架laravel-s項目2
- docker 容器重命名
- Linux docker常用命令
- 使用
- docker 搭建php 開發環境 添加擴展redis、swoole、xdebug
- docker 單機部署redis集群
- Docker 退出容器不停止容器運行 并重新進入正在運行的容器
- 進入退出docker容器
- Docker的容器設置隨Docker的啟動而啟動
- 使用異常處理
- docker容器中bash: vi: command not found
- OCI runtime exec failed: exec failed:解決方法
- docker啟動容器慢,很慢,特別慢的坑
- 解決windows docker開發thinkphp6啟動慢的問題
- 【Windows Docker】docker掛載解決IO速度慢的問題
- Docker的網絡配置,導致Docker使用網路很慢的問題及解決辦法
- golang工程部署到docker容器
- Docker 容器設置自啟動
- 如何優雅地刪除Docker鏡像和容器(超詳細)
- 5 個好用的 Docker 圖形化管理工具
- Docker 可能會用到的命令
- Kubernetes
- 消息隊列
- RabbitMQ
- php7.3安裝使用rabbitMq
- Windows環境PHP如何使用RabbitMQ
- RabbitMQ學習筆記:4369、5672、15672、25672默認端口號修改
- Window10 系統 RabbitMQ的安裝和簡單使用
- RabbitMQ默認端口
- RabbitMQ可視化界面登錄不了解決方案
- RocketMQ
- Kafka
- ActiveMQ
- mqtt
- phpMQTT詳解以及處理使用過程中內存耗死問題
- MQTT--物聯網(IoT)消息推送協議
- php實現mqtt發布/發送 消息到主題
- Mqtt.js 的WSS鏈接
- emqx
- 如何在 PHP 項目中使用 MQTT
- emqx 修改dashboard 密碼
- 其他
- Windows 系統中單機最大TCP的連接數詳解
- EMQX
- Linux系統EMQX設置開機自啟
- Centos7 EMQX部署
- docker安裝 EMQX 免費版 docker安裝并配置持久化到服務器
- 實時數倉
- 網易云音樂基于 Flink + Kafka 的實時數倉建設實踐
- 實時數倉-基于Flink1.11的SQL構建實時數倉探索實踐
- 安全
- 網站如何保護用戶的密碼
- 關于web項目sessionID欺騙的問題
- php的sessionid可以偽造,不要用來做防刷新處理了
- DVWA-Weak Session IDs (弱會話)漏洞利用方式
- 保證接口數據安全的10種方案
- cookie和session的竊取
- 萬能密碼漏洞
- 黑客如何快速查找網站后臺地址方法整理
- 網站后臺萬能密碼/10大常用弱口令
- 萬能密碼漏洞02
- 大多數網站后臺管理的幾個常見的安全問題注意防范
- token可以被竊取嗎_盜取token
- token被劫持[token被劫持如何保證接口安全性]
- PHP給后臺管理系統加安全防護機制的一些方案
- php - 重新生成 session ID
- 隱藏響應中的server和X-Powered-By
- PHP會話控制之如何正確設置session_name
- Session攻擊001
- PHP防SQL注入代碼,PHP 預防CSRF、XSS、SQL注入攻擊
- php25個安全實踐
- php架構師 系統管理員必須知道的PHP安全實踐
- 版本控制
- Linux服務器關聯Git,通過執行更新腳本實現代碼同步
- PHP通過exec執行git pull
- git 在linux部署并從windows上提交代碼到linux
- git上傳到linux服務器,git一鍵部署代碼到遠程服務器(linux)
- linux更新git命令,使用Linux定時腳本更新服務器的git代碼
- git異常
- 如何解決remote: The project you were looking for could not be found
- git status顯示大量文件修改的原因是什么
- PHPstorm批量修改文件換行符CRLF為LF
- git使用
- git常用命令大全
- centos git保存賬戶密碼
- GIT 常用命令
- git怎樣還原修改
- Git 如何放棄所有本地修改的方法
- Git忽略文件模式改變
- git: 放棄所有本地修改
- Git三種方法從遠程倉庫拉取指定的某一個分支
- 雜七雜八
- h5視頻
- H5瀏覽器支持播放格式:H264 AVCA的MP4格式,不能轉換為mpeg-4格式,
- iOS無法播放MP4視頻文件的解決方案 mp4視頻iphone播放不了怎么辦
- h5點播播放mp4視頻遇到的坑,ios的h5不能播放視頻等
- 【Linux 并發請求數】支持多少并發請求數
- Linux下Apache服務器并發優化
- 緩存
- redis
- Linux啟動PHP的多進程任務與守護redis隊列
- 重啟redis命令
- golang