<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                ## 背景 我們知道MySQL的主備同步是通過binlog在備庫重放進行的,IO線程把主庫binlog拉過去存入relaylog,然后SQL線程重放 relaylog 中的event,然而這種模式有一個問題就是SQL線程只有一個,在主庫壓力大的時候,備庫單個SQL線程是跑不過主庫的多個用戶線程的,這樣備庫延遲是不可避免的。為了解決這種n對1造成的備庫延遲問題,5.6 引入了并行復制機制,即SQL線程在執行的時候可以并發跑。 關于其背后的設計思想,可以參考這幾個worklog?[WL#4648](http://dev.mysql.com/worklog/task/?id=4648),[WL#5563](http://dev.mysql.com/worklog/task/?id=5563),[WL#5569](http://dev.mysql.com/worklog/task/?id=5569),[WL#5754](http://dev.mysql.com/worklog/task/?id=5754),[WL#5599](http://dev.mysql.com/worklog/task/?id=5599),之前的月報也對并行復制原理進程了闡述,讀者朋友可以[回顧下](http://mysql.taobao.org/index.php?title=MySQL%E5%86%85%E6%A0%B8%E6%9C%88%E6%8A%A5_2015.01#MySQL_.C2.B7_.E4.BC.98.E5.8C.96.E6.94.B9.E8.BF.9B.C2.B7_.E5.A4.8D.E5.88.B6.E6.80.A7.E8.83.BD.E6.94.B9.E8.BF.9B.E8.BF.87.E7.A8.8B)。 本篇將從代碼實現角度講述并行復制是如何做的,分析基于MySQL 5.6.26。 ## 準備知識 ### binlog binlog 是對數據庫更改操作的記錄,里面是一個個的event,如類似下面的event序列: ~~~ Query_log Table_map Write/Delete/Update_row_event Xid ~~~ 關于每個event的含義可以參考[官方文檔](https://dev.mysql.com/doc/internals/en/binlog-event.html)。 ### 配置 并行復制提供了幾個參數配置,可以通過修改參數值對其進行調節。 ~~~ slave_parallel_workers // worker 線程個數 slave-checkpoint-group // 隔多少個事務做一次 checkpoint slave-checkpoint-period // 隔多長時間做一次 checkpoint slave-pending-jobs-size-max // 分發給worker的、處于等待狀態的event的大小上限 ~~~ ### 概念術語 下面是并行復制中用到幾個概念: ~~~ MTS // Multi-Threaded Slave,并行復制 group // 一個事務在binlog中對應的一組event序列 worker // 簡稱W,event 執行線程,MTS新引入的 Coordinator // 簡稱C,分發協作線程,就是之前的 SQL線程 checkpoint // 簡稱CP,檢查點,C線程在滿足一定條件下去做,目的是收集W線程執行完信息,向前推動執行位點 B-event // 標志事務開始的event,BEGIN 這種Query或者GTID G-event // 包含分發信息的event,如Table_map、Query T-event // 標志事務結束的event,COMMIT/ROLLBACK 這種Query 或者XID ~~~ ### 相關代碼文件 sql/rpl_rli_pdb.h // pdb的是 parallelized by db name簡寫[WL#5563](http://dev.mysql.com/worklog/task/?id=5563) sql/rpl_rli_pdb.cc sql/rpl_slave.cc sql/log_event.cc sql/rpl_rli.h ### 并行執行原則 1. 并行執行的基本模型是生產者-消費者,C線程將event按db插入各W線程的任務隊列,W線程從隊列里取出event執行; ![MTS 并行復制模型](https://box.kancloud.cn/2015-09-24_56039cf4ca6d2.png "MTS 并行復制模型") 2. 同一個group(事務)內的event都發給同一個worker,保證事務的一致性; 3. 分發關系由包含db信息的event(G-evnet)決定,其它event按決定好的關系進行分發; ## 重要數據結構 1. `db_worker_hash_entry`,db->worker 映射關系,也即分發關系,所有的分發關系緩存在C的一個HASH表中(APH) ~~~ - db // db 名 - worker // 指向worker的指針,表示被分發到的W線程 - usage // 有多少正在分發的group用到這個關系 - temporary_tables // 用于在C和W之前傳遞臨時表 ~~~ 2. `slave_job_item`,worker的jobs隊列的成員 ~~~ - data // 就是一個binlog event ~~~ 3. `circular_buffer_queue`,用DYNAMIC_ARRAY arrary實現的一個首尾相連的環形隊列,是其他重要數據結構的基類 ~~~ - Q // 底層用到的 DYNAMIC_ARRAY - size // Queue 的容量 - avail // 隊列尾 - entry // 隊列頭 - len // 隊列實際大小 - de_queue() // 出隊操作 - de_tail() // 尾部出隊 - en_queue() // 入隊 - head_queue() // 取隊列頭,但是不出隊 ~~~ 4. `Slave_job_group`,維護一個正在執行的事務的信息,如對應的位點信息、事務分發到的worker、有沒有執行完等。 ~~~ - group_master_log_name // 對應主庫的 binlog 文件名 - group_master_log_pos // 對應在主庫 binlog 中的位置 - group_relay_log_name // 對應備庫 relaylog 文件名 - group_relay_log_pos // 對應在備庫 relaylog 中的位置 - worker_id // 對應的worker的id - worker // worker 指針 - total_seqno // 當前group是啟動以來執行的第幾個group - master_log_pos // group中B-event的位置 - checkpoint_seqno // 當前group是從上次做完CP后的第幾個group - checkpoint_log_pos // worker收到checkpoint信號后更新 - checkpoint_log_name // 同上 - checkpoint_relay_log_pos // 同上 - checkpoint_relay_log_name // 同上 - done // 這個group是否已經被worker commit掉 - shifted // checkpoint 的時候shift值 - ts // 時間,更新SBM - reset() // 重置上面的成員變量 ~~~ 5. `Slave_committed_queue`,維護分發執行的group信息,是`circular_buffer_queue`的子類,隊列里存的時?`Slave_job_group` ~~~ - lwm // 類型是Slave_job_group,低水位(Low-Water-Mark),表示上次CP執行到的位置 - last_done // 類型是一個DYNAMIC_ARRAY,里面存的是Slave_job_group:total_seqno,表示每個worker執行到第幾個group - assigned_group_index // 正在分發的group在GAQ中的位置 - move_queue_head() // 做checkpoint時,把已經commit的group移出隊列 - get_job_group() // 返回隊列指定位置的Slave_job_group - en_queue() // 入隊一個 Slave_job_group ~~~ 6. `Slave_jobs_queue`,任務隊列,也是`circular_buffer_queue`的子類,隊列里存的是`slave_job_item`,每個worker有一個這樣的任務隊列 ~~~ - overfill // 隊列滿標志 - waited_overfill // 隊列滿的次數 ~~~ 7. `Slave_worker`,對應一個worker,`Relay_log_info`?的子類 ~~~ - jobs // 類型是 Slave_jobs_queue,C分發過來的event都放在這里面 - c_rli // 指向C的指針 - curr_group_exec_parts // 類型是 DYNAMIC_ARRAY,里面存的是當前group用到的分發關系,是指向APH成員的指針,簡寫CGEP - curr_group_seen_begin // 當前所在 group 有沒有解析到 B-event - id // worker 的id標識 - last_group_done_index // worker上一次執行的group在GAQ中的位置 - gaq_index // worker 當前執行的的事務在GAQ中的位置 - usage_partition // worker用到的分發關系個數 - end_group_sets_max_dbs // 和串行執行相關的 - bitmap_shifted // CP后bitmap需要偏移的距離,用于調整 group_executed - wq_overrun_cnt // 超載多少 - overrun_level // 超載指標 - underrun_level // 饑餓指標 - excess_cnt // 用于往mts_wq_excess_cnt累計 - group_executed // 類型是 MY_BITMAP,標示CP后執行的group - group_shifted // 類型是 MY_BITMAP,計算group_executed,臨時用作中間變量 - running_status // 標識 worker 線程的狀態,可以有 NOT_RUNNING、RUNNING、ERROR_LEAVING、KILLED - slave_worker_ends_group () // 當一個group執行完或者異常終止時會調用 - commit_positions() // group執行完是調用,用于更新位點和bitmap - rollback_positions() // 回滾bitmap ~~~ 8. `Relay_log_info`,對應C線程,在MTS之前對應SQL線程,為了支持并行復制,在原來的基礎上又加了一些成員 ~~~ - mapping_db_to_worker // 非常重要的成員,類型是HASH,用于緩存所有的分發關系,APH(Assigned Partition Hash),目的能通過db快速找到映射關系,但HASH長度大于mts_partition_hash_soft_max(固定16)時,會對沒有使用的映射關系進行回收。 - workers // 類型是 DYNAMIC_ARRAY,成員是一個個Slave_worker - pending_jobs // 一個統計信息,表示待執行job個數 - mts_slave_worker_queue_len_max // 每個worker最多能容納jobs的個數,目前hard code是16384 - mts_pending_jobs_size // 所有worker的job占的內存 - mts_pending_jobs_size_max // 所有worker的job占的內存,對應配置 slave_pending_jobs_size_max - mts_wq_oversize // 標示job占用內存已達上限 - gaq // 非常重要的成員,代碼注釋里經常提到的GAQ,類型是Slave_committed_queue,存的成員是Slave_job_group,大小對應配置 slave-checkpoint-group,用于W和C交互 - curr_group_assigned_parts // 類型是 DYNAMIC_ARRAY,當前group中已經分配的event的映射關系,可以和Slave_worker的curr_group_exec_parts對應,簡寫CGAP - curr_group_da // 類型是DYNAMIC_ARRAY,對于還無法決定分發worker的event,先存在這里 - mts_wq_underrun_w_id // 標識比較空閑的worker的id - mts_wq_excess_cnt // 標示worker的超載情況 - mts_worker_underrun_level // 當W的任務隊列大小低于這個值的認為處于饑餓狀態 - mts_coordinator_basic_nap // 當work負載較大時,C線程sleep,會用到這個值 - opt_slave_parallel_workers // 對應配置 slave_parallel_workers - slave_parallel_workers // 當前實際的worker數 - exit_counter // 退出時用 - max_updated_index // 退出時用 - checkpoint_seqno // 上次CP后分發的group個數 - checkpoint_group // 對應配置 mts_checkpoint_group - recovery_groups // 類型是 MY_BITMAP,恢復時用到 - mts_group_status // 分發線程所處的狀態,取值為 MTS_NOT_IN_GROUP、MTS_IN_GROUP、MTS_END_GROUP、MTS_KILLED_GROUP - mts_events_assigned // 分發的event計數 - mts_groups_assigned // 分發的group計數 - least_occupied_workers // 類型是 DYNAMIC_ARRAY,從注釋將worker按從空閑到繁忙排序的一個數組,用于先worker用,但是實際并未用到。 - last_clock // 上次做checkpoint的時間 ~~~ 9. 其它方法 ~~~ map_db_to_worker() // 把db映射給worker get_least_occupied_worker() // 獲取負載最小的worker wait_for_workers_to_finish() // 等待worker完成,并發臨時轉成串行是用到 append_item_to_jobs() // 把任務分發給 worker mts_move_temp_table_to_entry() // 用于傳遞臨時表 mts_move_temp_tables_to_thd() // 同上 ~~~ ## 初始化 和單線程SQL相比,MTS需要初始化新加的MTS變量和啟動worker線程。 主要是`slave_start_workers()`這個函數。會初始化C線程的MTS變量,如workers、curr_group_assigned_parts、curr_group_da、gaq等,接著調用`init_hash_workers()`?初始化HASH表mapping_db_to_worker,在這些做完后依次調用?`slave_start_single_worker()`?初始化每個worker并啟動W線程。worker 的的初始化包括jobs任務隊列、curr_group_exec_parts 等相關變量,其中jobs長度目前是固定的16384,目前還不可配置;worker線程的主函數是`handle_slave_worker()`,不停的調用`slave_worker_exec_job()`來執行C分配的event。 ## Coordinator 分發協作 分發線程主體和之前的SQL線程基本是一樣的,不停的調用?`exec_relay_log_event()`?函數。`exec_relay_log_event()`主要分2部分,一是調用`next_event()`讀取relay log,一是`apply_event_and_update_pos()`?做分發。 `next_event()`?比較簡單,就是不停的用?`Log_event::read_log_event()`?從relay log 讀取event,除此之外還會調用`mts_checkpoint_routine()`?做checkpoint,后面會詳細講checkpiont過程。 `apply_event_and_update_pos()`進行分發的入口是`Log_event::apply_event()`,如果沒有開MTS,就是原來的邏輯,SQL線程直接執行event,如果開了MTS的話,調用`get_slave_worker()`,這個是分發的主邏輯。 在介紹分發邏輯前,先將所有的binlog event 可以分下類(代碼里是這么分的): ~~~ B-event // BEGIN(Query) 或者 GTID G-event // 包含db信息的event,Table_map 或者 Query P-event // 一般放在G-event前的,如int_var、rand、user_var等 R-event // 一般放在G-event后的,如各種Rows event T-event // COMMIT/ROLLBACK(Query) 或者XID ~~~ 分發邏輯是這樣的: 1. 如果是B-event,表明是事務的開始,mts_groups_assigned 計數加1,同時GAQ中入隊一個新的group,表示一個新的事務開始,然后把event放入curr_group_da,因為B-event沒有db信息,還不知道分發給哪個worker; 2. 如果是G-event,event里包含db信息,就需要按這個db找到一個分發到的worker,worker選擇機制是`map_db_to_worker()`實現。調用`map_db_to_worker()`時,有2個參數比較重要,一個是dbname,這個就是分發關系的key,一個是last_worker,表示當前group中event上一次分發到的worker(last_assigned_worker); * 在當前group已經用到的映射關系(curr_group_assigned_parts CGAP)中找,如果有同db的映射關系,就直接返回last_worker;如果找不到,就去APH中按db名搜索; * 如果APH中搜到的話,分3種情況,a) 這個映射關系沒有group用到,就直接把db映射為last_worker,如果last_worker為空的主話,就找一個最空閑的worker,`get_least_occupied_worker()`?b) 這個映射關系有group用,并且對應的worker和last_worker一樣,就用last_worker,映射關系引用計數加1 c) 如果映射關系對應的worker和last_worker不一樣,這表示有沖突,就需要等到引用這個映射關系的group全部執行完,然后把db映射為last_worker; * 如果沒搜到的話,就新生成一個映射關系,key用db,value用last_worker,如果last_worker為空的話,選最空閑的worker,`get_least_occupied_worker()`,并把新生成的映射插入到APH中,如果HASP表長度大于 mts_partition_hash_soft_max 的話,在插入前會對APH做一次收縮,從中去除掉沒有被group引用的映射關系; * 把選擇的映射關系插入到 curr_group_assigned_parts 中。 3. 如果是其它event,worker直接用last_assigned_worker。 什么時候切換為串行? 如果G-event包含的db個數大于MAX_DBS_IN_EVENT_MTS(16個),或者更新的表被外鍵依賴,那么就需要串行執行當前group。串行固定選用第0個worker來執行,在分發前會等待其它worker全部執行完,在分發后會等待所有worker執行完。gropu執行完后自動切換為并行執行。 worker 確定好了,下一步就是分發event了,入口函數?`append_item_to_jobs()`。這個函數的作用非常明確,就是把event插入到worker的jobs隊列中,在插入前會有對event大小有檢查: 1. 如果event大小已經超過了等待任務大小的上限(配置slave-pending-jobs-size-max ),就報event太大的錯,然后返回; 2. 如果event大小+已經在等待的任務大小超過了slave-pending-jobs-size-max,就等待,至到等待隊列變小; 3. 如果當前的worker的隊列滿的話,也等待。 ## Worker 執行 W線程執行的主邏輯是?`slave_worker_exec_job()`: 1. 從自己的job隊列里取出event; 2. 根據event的信息,來更新worker中的變量,如curr_group_exec_parts(CGEP)、future_event_relay_log_pos、gaq_index等; 3. 執行event,`do_apply_event_worker()`,最終調用每個event的`do_apply_event()`方法,和單線程下一樣; 4. 如果是T event,調用?`slave_worker_ends_group()`,表示一個事務已經執行完了,a) 更新位點,通過`commit_positions()`,更新事務在GAQ中對應的`Slave_job_group`,這樣C就知道W執行到哪了,另外還會更新W的bitmap信息(如果是xid event,在apply_event中就會調用commit_positions) b) 清空 curr_group_exec_parts,將映射關系中的引用數減1; 5. 更新C的隊列統計信息,如等待執行任務數pending_jobs,等待執行任務大小mts_pending_jobs_size等; 6. 更新 overrun 和 underrun 狀態。 分發和執行邏輯可以用下圖簡單表示: ![MTS 分發邏輯](https://box.kancloud.cn/2015-09-24_56039cf545e37.png "MTS 分發邏輯") C線程在GAQ中插入group,標示一個要執行的事務,接著確定分發關系(從CGAP或者APH中,或者生成新的),然后按映射關系把event分發給對應worker的job隊列;worker在執行event過程中更新自己的CGEP,在執行完整個group后,根據CGEP中的記錄去更新APH中引用關系的計數,同時把GAQ中的對應group標示為done。 ## checkpoint 過程 如前所述,C線程會在從relaylog讀取event后,會嘗試做checkpoint,入口函數是`mts_checkpoint_routine()`。checkpoint的作用是把worker執行完的事務從GAQ中去除,向前推進事務完成點。 有2個條件會觸發checkpoint: 1. 當前時間距上次checkpoint已經超過配置 mts-checkpoint-period,這時會嘗試做一次checkpoint,不管有沒有向前推進事務; 2. 上一次checkpoint后分發的事務數已經到達checkpoint設置上限(slave-checkpoint-group),這時會強制做checkpoint,如果一次checkpoint沒成功,會一直重試,直至成功。 GAQ中的事務推進通過?`Slave_committed_queue::move_queue_head()`?實現,從前向后掃描GAQ中的group: 1. 如果當前group已經完成(通過標志`Slave_job_group.done`標志確認),就把這個group出隊,同時把這個出隊的group信息賦給低水位lwm,向前推進; 2. 如果遇到沒有完成的group,就是遇到一個gap,表示對應worker還沒執行完當前group,checkpoint不能再向前推進了,到此結束,返回值就是退出前已經推進的group個數。 ![MTS checkpoint邏輯](https://box.kancloud.cn/2015-09-24_56039cf58c81c.png "MTS checkpoint 邏輯") ## slave 停止 類似單線程復制,stop slave 命令會終止C線程和W線程的運行。 C線程收到退出信號后,會先調用`slave_stop_workers()`終止W線程,過程如下: 1. 依次把每個運行中的 worker 的 runnig_status 設置`Slave_worker::STOP`,同時設置worker執行終止位置`rli->max_updated_index`; 2. C線程等待所有W線程終止(`w->running_status == Slave_worker::NOT_RUNNING`); 3. 調用`mts_checkpoint_routine()`,做一次checkpoint; 4. 釋放資源,如APH、GAQ、CGDA(curr_group_da)、CGAP(curr_group_assigned_parts)等。 W線程在`pop_jobs_item()`中會調用`set_max_updated_index_on_stop()`,會檢查2個條件 1) job隊列是空的,2) 當前worker執行的事務在GAQ中的位置,是否已經超過`rli->max_updated_index`;任一條件滿足就設置狀態 running_status 為?`Slave_worker::STOP_ACCEPTED`,表示開始退出。 從上面的邏輯可以看出,在收到stop信號后,worker線程會等正在執行的group完成后,才會退出。 ## 異常退出 W被kill或者執行出錯 1. `slave_worker_exec_job()`?進入錯誤處理邏輯,調用`Slave_worker::slave_worker_ends_group()`,給C線程發KILL_QUERY信號,然后做相關變量的清理,把job隊列的任務全部清理掉,最終把running_status置為`Slave_worker::NOT_RUNNING`,表示結束; 2. C線程收到kill信號后,停止分發,然后進入`slave_stop_workers()`邏輯,給活躍的W線程發送STOP信號; 3. 其它W線程收到STOP信號后,會處理job隊列中所有的event; 4. 和stop slave不同的是,C線程最后不會做checkpoint。 C被kill C被kill的處理邏輯和stop slave差不多,不同之處在于等worker全部終止后,不會做checkpoint。 ## 恢復 Slave線程重啟(正常關閉或者異常kill)后,需要根據Coordinator和每個Worker的記錄信息來進行恢復,推進到一個一致狀態后再開始并行,詳細過程我們下期月報再分析。 ## 存在的問題 5.6 的MTS是按db來進行分發的,分發粒度太大,如果只有一個db的時候,就沒有并發性了,所有group都分給一個worker,就變成單線程執行了。一個簡單的優化改進是改成按table來分發,只需要把分發的key從dbname改成dbname + tablename,整體分發邏輯不需要變動。再進一步,如果遇到熱點表更新呢,這時候binlog里記錄的event都是針對一個表的更新,又會變成串行執行。這個時候就需要變化一下分發測略嘍,如按事務維度進行分發,這個策略對源碼的改動就會比較大些,有需要的同學可以試試:-)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看