<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                ? ? ? ? ? ? replication的英文單詞的原意是“復制”的意思,replication文件作為我在Data目錄下的分析的最后一個文件,足以說明他的重要性,代碼量1800+,的確非常難啃。只能說個我看代碼下來的大致印象吧,要我畫個結構圖好好理理這里面各個API的關系圖,這個我目前還真做不到。說到主從復制,這個是實現讀寫分離的最好手段了,也很常見,當用戶數達到一定量,當一個服務器承受不了達到上千萬的pv時,采取主從數據庫的形式也是一般架構師能夠想到的一種手段。Redis的主從數據庫在我這里就稱為主客戶端,從客戶端,因為客戶端中有所屬于的db,因為數據庫基于客戶單本身進行復制操作的。也就是說,一個Redis,存在一個master主客戶端,多個slave從客戶端,到時實現的就是slave向主客戶端進行復制操作。因為API比較多,進行了稍稍的歸類: ~~~ /* ---------------------------------- MASTER -------------------------------- */ void createReplicationBacklog(void) /* 創建backlog的buffer */ void resizeReplicationBacklog(long long newsize) /* 調整復制備份日志的大小,當replication backlog被修改的時候 */ void freeReplicationBacklog(void) /* 釋放備份日志 */ void feedReplicationBacklog(void *ptr, size_t len) /* 往備份日志中添加添加數據操作,會引起master_repl_offset偏移量的增加 */ void feedReplicationBacklogWithObject(robj *o) /* 往backlog添加數據,以Redis 字符串對象作為參數 */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) /* 將主數據庫復制到從數據庫 */ void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) /* 發送數據給monitor監聽者客戶端 */ long long addReplyReplicationBacklog(redisClient *c, long long offset) /* slave從客戶單添加備份日志 */ int masterTryPartialResynchronization(redisClient *c) /* 主數據庫嘗試分區同步 */ void syncCommand(redisClient *c) /* 同步命令函數 */ void replconfCommand(redisClient *c) /* 此函數用于從客戶端進行配置復制進程中的執行參數設置 */ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) /* 給slave客戶端發送BULK數據 */ void updateSlavesWaitingBgsave(int bgsaveerr) /* 此方法將用于后臺保存進程快結束時調用,更新slave從客戶端 */ /* ----------------------------------- SLAVE -------------------------------- */ void replicationAbortSyncTransfer(void) /* 中止與master主數據的同步操作 */ void replicationSendNewlineToMaster(void) /* 從客戶端發送空行給主客戶端,破壞了原本的協議格式,避免讓主客戶端檢測出從客戶端超時的情況 */ void replicationEmptyDbCallback(void *privdata) /* 清空數據庫后的回調方法,當老數據被刷新出去之后等待加載新數據的時候調用 */ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) /* 從客戶端讀取同步的Sync的BULK數據 */ char *sendSynchronousCommand(int fd, ...) /* 從客戶端發送給主客戶端同步數據的命令,附上驗證信息,和一些參數配置信息 */ int slaveTryPartialResynchronization(int fd) /* 從客戶端嘗試分區同步操作 */ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) /* 與主客戶端保持同步,期間包括端口號等的確認,socket連接 */ int connectWithMaster(void) /* 連接主客戶端 */ void undoConnectWithMaster(void) /* 撤銷連接主客戶端 */ int cancelReplicationHandshake(void) /* 當已經存在一個復制進程時,中止一個非阻塞的replication復制的嘗試 */ void replicationSetMaster(char *ip, int port) /* 設定主客戶端的ip地址和端口號 */ void replicationUnsetMaster(void) void slaveofCommand(redisClient *c) void roleCommand(redisClient *c) void replicationSendAck(void) /* 發送ACK包給主客戶端 ,告知當前的進程偏移量 */ /* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */ void replicationCacheMaster(redisClient *c) /* 緩存客戶端信息 */ void replicationDiscardCachedMaster(void) /* 當某個客戶端將不會再回復的時候,可以釋放掉緩存的主客戶端 */ void replicationResurrectCachedMaster(int newfd) /* 將緩存客戶端復活 */ /* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */ void refreshGoodSlavesCount(void) /* 更新slave從客戶端數量 */ void replicationScriptCacheInit(void) void replicationScriptCacheFlush(void) void replicationScriptCacheAdd(sds sha1) int replicationScriptCacheExists(sds sha1) void replicationCron(void) ~~~ 找一個標準的slave從客戶端向主客戶端實現同步的操作: ~~~ /* 與主客戶端保持同步,期間包括端口號等的確認,socket連接 */ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { char tmpfile[256], *err; int dfd, maxtries = 5; int sockerr = 0, psync_result; socklen_t errlen = sizeof(sockerr); REDIS_NOTUSED(el); REDIS_NOTUSED(privdata); REDIS_NOTUSED(mask); /* If this event fired after the user turned the instance into a master * with SLAVEOF NO ONE we must just return ASAP. */ if (server.repl_state == REDIS_REPL_NONE) { close(fd); return; } /* Check for errors in the socket. */ /* socket連接是否正常 */ if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1) sockerr = errno; if (sockerr) { aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s", strerror(sockerr)); goto error; } /* If we were connecting, it's time to send a non blocking PING, we want to * make sure the master is able to reply before going into the actual * replication process where we have long timeouts in the order of * seconds (in the meantime the slave would block). */ /* 連接測試,將由主客戶端發送PING命令給從客戶端,在給定的延遲時間內觀察是否有回復 */ if (server.repl_state == REDIS_REPL_CONNECTING) { redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event."); /* Delete the writable event so that the readable event remains * registered and we can wait for the PONG reply. */ aeDeleteFileEvent(server.el,fd,AE_WRITABLE); server.repl_state = REDIS_REPL_RECEIVE_PONG; /* Send the PING, don't check for errors at all, we have the timeout * that will take care about this. */ //發送PING命令 syncWrite(fd,"PING\r\n",6,100); return; } /* Receive the PONG command. */ //收到回復了 if (server.repl_state == REDIS_REPL_RECEIVE_PONG) { char buf[1024]; /* Delete the readable event, we no longer need it now that there is * the PING reply to read. */ aeDeleteFileEvent(server.el,fd,AE_READABLE); /* Read the reply with explicit timeout. */ buf[0] = '\0'; if (syncReadLine(fd,buf,sizeof(buf), server.repl_syncio_timeout*1000) == -1) { redisLog(REDIS_WARNING, "I/O error reading PING reply from master: %s", strerror(errno)); goto error; } /* We accept only two replies as valid, a positive +PONG reply * (we just check for "+") or an authentication error. * Note that older versions of Redis replied with "operation not * permitted" instead of using a proper error code, so we test * both. */ if (buf[0] != '+' && strncmp(buf,"-NOAUTH",7) != 0 && strncmp(buf,"-ERR operation not permitted",28) != 0) { redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf); goto error; } else { redisLog(REDIS_NOTICE, "Master replied to PING, replication can continue..."); } } /* AUTH with the master if required. */ //auth身份驗證 if(server.masterauth) { err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL); if (err[0] == '-') { redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err); sdsfree(err); goto error; } sdsfree(err); } /* Set the slave port, so that Master's INFO command can list the * slave listening port correctly. */ /* 設置從客戶端監聽端口 */ { sds port = sdsfromlonglong(server.port); err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port, NULL); sdsfree(port); /* Ignore the error if any, not all the Redis versions support * REPLCONF listening-port. */ if (err[0] == '-') { redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err); } sdsfree(err); } /* Try a partial resynchonization. If we don't have a cached master * slaveTryPartialResynchronization() will at least try to use PSYNC * to start a full resynchronization so that we get the master run id * and the global offset, to try a partial resync at the next * reconnection attempt. */ psync_result = slaveTryPartialResynchronization(fd); if (psync_result == PSYNC_CONTINUE) { redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization."); return; } /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC * and the server.repl_master_runid and repl_master_initial_offset are * already populated. */ if (psync_result == PSYNC_NOT_SUPPORTED) { redisLog(REDIS_NOTICE,"Retrying with SYNC..."); if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s", strerror(errno)); goto error; } } /* Prepare a suitable temp file for bulk transfer */ while(maxtries--) { snprintf(tmpfile,256, "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); if (dfd != -1) break; sleep(1); } if (dfd == -1) { redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno)); goto error; } /* Setup the non blocking download of the bulk file. */ if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL) == AE_ERR) { redisLog(REDIS_WARNING, "Can't create readable event for SYNC: %s (fd=%d)", strerror(errno),fd); goto error; } server.repl_state = REDIS_REPL_TRANSFER; server.repl_transfer_size = -1; server.repl_transfer_read = 0; server.repl_transfer_last_fsync_off = 0; server.repl_transfer_fd = dfd; server.repl_transfer_lastio = server.unixtime; server.repl_transfer_tmpfile = zstrdup(tmpfile); return; error: close(fd); server.repl_transfer_s = -1; server.repl_state = REDIS_REPL_CONNECT; return; } ~~~ ? ? ? ?在replication中,要一個cacheMaster的概念,就是可以臨時緩存主客戶端的信息,一般用于突然master和slave斷開連接的時候,可以下次進行主從同步的時候快速恢復: ~~~ /* 緩存客戶端信息 */ void replicationCacheMaster(redisClient *c) { listNode *ln; redisAssert(server.master != NULL && server.cached_master == NULL); redisLog(REDIS_NOTICE,"Caching the disconnected master state."); /* Remove from the list of clients, we don't want this client to be * listed by CLIENT LIST or processed in any way by batch operations. */ //首先移除此客戶端 ln = listSearchKey(server.clients,c); redisAssert(ln != NULL); listDelNode(server.clients,ln); /* Save the master. Server.master will be set to null later by * replicationHandleMasterDisconnection(). */ //保存為緩存客戶端 server.cached_master = server.master; /* Remove the event handlers and close the socket. We'll later reuse * the socket of the new connection with the master during PSYNC. */ //刪除在這個客戶端上的讀寫事件 aeDeleteFileEvent(server.el,c->fd,AE_READABLE); aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); close(c->fd); /* Set fd to -1 so that we can safely call freeClient(c) later. */ c->fd = -1; /* Invalidate the Peer ID cache. */ if (c->peerid) { sdsfree(c->peerid); c->peerid = NULL; } /* Caching the master happens instead of the actual freeClient() call, * so make sure to adjust the replication state. This function will * also set server.master to NULL. */ replicationHandleMasterDisconnection(); } ~~~ 當想讓這個master的復活的時候,調用下面的方法: ~~~ /* Turn the cached master into the current master, using the file descriptor * passed as argument as the socket for the new master. * * This funciton is called when successfully setup a partial resynchronization * so the stream of data that we'll receive will start from were this * master left. */ /* 將緩存客戶端復活 */ void replicationResurrectCachedMaster(int newfd) { //將cached_master賦值為主客戶端 server.master = server.cached_master; server.cached_master = NULL; server.master->fd = newfd; server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP); server.master->authenticated = 1; server.master->lastinteraction = server.unixtime; server.repl_state = REDIS_REPL_CONNECTED; /* Re-add to the list of clients. */ //重新添加入客戶端列表中 listAddNodeTail(server.clients,server.master); if (aeCreateFileEvent(server.el, newfd, AE_READABLE, readQueryFromClient, server.master)) { redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno)); freeClientAsync(server.master); /* Close ASAP. */ } /* We may also need to install the write handler as well if there is * pending data in the write buffers. */ if (server.master->bufpos || listLength(server.master->reply)) { if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE, sendReplyToClient, server.master)) { redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno)); freeClientAsync(server.master); /* Close ASAP. */ } } } ~~~ 當然如果確定在未來不糊在使用緩存的master的時,可以徹底摧毀: ~~~ /* Free a cached master, called when there are no longer the conditions for * a partial resync on reconnection. */ /* 當某個客戶端將不會再回復的時候,可以釋放掉緩存的主客戶端 */ void replicationDiscardCachedMaster(void) { if (server.cached_master == NULL) return; redisLog(REDIS_NOTICE,"Discarding previously cached master state."); server.cached_master->flags &= ~REDIS_MASTER; //直接釋放客戶端 freeClient(server.cached_master); //server的緩存客戶端賦值為NULL server.cached_master = NULL; } ~~~ 在這里面靠的就是server.cached_master屬性。slave在和master連接的時候,要進行master的ip地址和Port端口的確認: ~~~ /* Set replication to the specified master address and port. */ /* 設定主客戶端的ip地址和端口號 */ void replicationSetMaster(char *ip, int port) { sdsfree(server.masterhost); server.masterhost = sdsdup(ip); server.masterport = port; //設置完畢之后,斷開所有的連接,中止replication進程 if (server.master) freeClient(server.master); disconnectSlaves(); /* Force our slaves to resync with us as well. */ replicationDiscardCachedMaster(); /* Don't try a PSYNC. */ freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ cancelReplicationHandshake(); server.repl_state = REDIS_REPL_CONNECT; server.master_repl_offset = 0; } ~~~ 主從復制的實現其實還有很多細節和步驟的。稍稍分析了一下,以后有機會研究的更深入一點
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看