[TOC]
*****
# **Oracle 數據庫表備份**
這次我們將一個 Oracle 數據庫實例上的 LOGIN 表的數據導入到一個 Kafka 主題上,再將它同步到另一個 Oracle 數據庫實例上的 LOGIN 表。
<br >
## **準備 Oracle 數據庫實例**
1. 設置環境變量:
```
$ export CLASSPATH=.:/usr/local/share/java/kafka-connect-jdbc/ojdbc6.jar
```
2. 使用 Docker 快速安裝并運行 3 個 Oracle 11g 的實例,用戶名/初始密碼:`SYSTEM`/`oracle`。
```
$ docker run --name oracle11g -d -p 1521:1521 -p 8080:8080 -e ORACLE_ALLOW_REMOTE=true oracleinanutshell/oracle-xe-11g
$ docker run --name oracle11g-1 -d -p 11521:1521 -p 18080:8080 -e ORACLE_ALLOW_REMOTE=true oracleinanutshell/oracle-xe-11g
```
> 修改密碼:
> $ docker exec -it oracle11g bash
> $ sqlplus
> Enter user-name: system
> Enter password: oracle
> SQL> alter user system identified by admin123;
> SQL> exit;
> $ exit
在每個實例下創建 TEST 用戶和 LOGIN 表:
```
-- USER SQL
CREATE USER C##TEST IDENTIFIED BY "test1234"
DEFAULT TABLESPACE "USERS"
TEMPORARY TABLESPACE "TEMP";
GRANT "DBA" TO C##TEST ;
GRANT "CONNECT" TO C##TEST ;
GRANT "RESOURCE" TO C##TEST ;
-- TABLES
CREATE TABLE C##TEST.LOGIN (
ID NUMBER NOT NULL
, USERNAME VARCHAR2(30) NOT NULL
, LOGIN_TIME DATE NOT NULL
, CONSTRAINT LOGIN_PK PRIMARY KEY (ID) ENABLE
);
CREATE SEQUENCE C##TEST.SEQ_LOGIN CACHE 20;
CREATE INDEX C##TEST.IDX_LOGIN_LT ON C##TEST.LOGIN (LOGIN_TIME DESC);
```
<br >
## **安裝 JDBC Connector 插件**
參見 [JDBC Connector](JDBCConnector.md)
<br >
## **創建 Source/Sink Connector**
1. 創建 JDBC Source Connector:
```
$ echo '{"name":"oracle-jdbc-source","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector","tasks.max":"1","dialect.name":"OracleDatabaseDialect","connection.url":"jdbc:oracle:thin:@localhost:1521:xe","connection.user":"TEST","connection.password":"test1234","mode":"timestamp","table.whitelist":"LOGIN","validate.non.null":false,"timestamp.column.name":"LOGIN_TIME","topic.prefix":"oracle.test."}}' | curl -X POST -d @- http://localhost:8083/connectors --header "Content-Type:application/json"
```
> **mode**:表加載模式,支持的模式有 `bulk`,每次輪詢時都要對整個表進行批量加載;`incrementing`,在每個表上使用遞增列以僅檢測新增的行。請注意,這不會檢測到對現有行的修改或刪除;`timestamp`,使用時間戳(或類似時間戳的列)來檢測新行和修改過的行。 假設每次寫入都會更新該列,并且值是單調遞增的,不需要保證唯一。`timestamp+incrementing`,使用兩列,用于檢測新行和已修改行的時間戳列,以及用于為更新提供全局唯一ID的遞增列,以便可以為每一行分配唯一的偏移量。
> **validate.non.null**:默認情況下,JDBC 連接器將驗證所有增量表和時間戳表是否將 ID 和時間戳列設置為 NOT NULL。 如果沒有,則 JDBC 連接器將無法啟動。 將此設置為 false 將禁用這些檢查。
> [**incrementing.column.name**](http://incrementing.column.name):遞增的列的名稱,用于檢測新行。 空值表示自動偵測遞增列。 該列不能為空。
> [**timestamp.column.name**](http://timestamp.column.name):用一個或多個時間戳列的逗號分隔列表,使用 COALESCE SQL 函數檢測新行或修改過的行。每次輪詢都會發現第一個非空時間戳值大于看到的最大先前時間戳值的行。 至少一列不能為空。
```
```
2. 創建 JDBC Sink Connector:
```
$ echo '{"name":"oracle-jdbc-sink","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","topics":"oracle.test.LOGIN","tasks.max":"1","connection.url":"jdbc:oracle:thin:@localhost:11521:xe","connection.user":"TEST","connection.password":"test1234","auto.create":"false","insert.mode":"insert","table.name.format":"LOGIN"}}' | curl -X POST -d @- http://localhost:8083/connectors --header "Content-Type:application/json"
```
> **insert.mode**:要使用的插入模式,支持的模式有,`insert`、`upsert`、`update`。
> **pk.mode**:主鍵模式, 支持的模式有 `none`、`kafka`、`record_key`、`record_value`。
```
```
3. 列出所有活動的 Connector:
```
$ curl localhost:8083/connectors
["oracle-jdbc-source","oracle-jdbc-sink"]
```
<br >
## **測試**
1. 通過 SQL 腳本在數據庫實例 1 上新增測試數據:
```
DECLARE
i NUMBER;
BEGIN
i:=0;
LOOP
i:=i+1;
INSERT INTO TEST.LOGIN(ID, USERNAME, LOGIN_TIME)
VALUES (TEST.SEQ_LOGIN.NEXTVAL,to_char(systimestamp, 'yyyymmddhh24miss'), SYSDATE);
IF i > 100000 THEN
EXIT;
END IF;
END LOOP;
END;
```
<br >
1. 在數據庫實例 2 上檢查數據是否已同步
```
select count(1) from test.login;
select * from test.login order by login_time desc,id desc;
```
<br >
1. SQL 監控分析:
```
SELECT ADDRESS,HASH_VALUE,SQL_ID, SQL_FULLTEXT,PARSE_CALLS,EXECUTIONS,
APPLICATION_WAIT_TIME,CONCURRENCY_WAIT_TIME/1000000 CONCURRENCY_WAIT_TIME,CLUSTER_WAIT_TIME,USER_IO_WAIT_TIME,
OPTIMIZER_COST, CPU_TIME/1000000 CPU_TIME, ELAPSED_TIME/1000000 ELAPSED_TIME,
DECODE(EXECUTIONS,0,0,ROUND(CPU_TIME/1000000/EXECUTIONS,5)) AV_CPU_TIME,
DECODE(EXECUTIONS,0,0,ROUND(ELAPSED_TIME/1000000/EXECUTIONS,5)) AV_ELAPSED_TIME,
SHARABLE_MEM,PERSISTENT_MEM,RUNTIME_MEM,
DISK_READS,DIRECT_WRITES,BUFFER_GETS,
ROWS_PROCESSED,
FIRST_LOAD_TIME, TO_CHAR(LAST_ACTIVE_TIME,'YYYY-MM-DD/HH24:MI:SS') LAST_ACTIVE_TIME
FROM V$SQLAREA
WHERE PARSING_SCHEMA_NAME='TEST' AND LAST_ACTIVE_TIME >=SYSDATE-1/24
AND SQL_FULLTEXT NOT LIKE'%sys.%'
ORDER BY EXECUTIONS DESC;
```
<br >
## **清理測試環境**
1. 刪除連接器:
```
$ curl -X DELETE http://localhost:8083/connectors/oracle-jdbc-source
$ curl -X DELETE http://localhost:8083/connectors/oracle-jdbc-sink
```
<br >
1. 清空數據表:
```
TRUNCATE TABLE LOGIN;
```
<br >
1. 刪除主題 oracle.test.LOGIN