? ? ? ? ? ? replication的英文單詞的原意是“復制”的意思,replication文件作為我在Data目錄下的分析的最后一個文件,足以說明他的重要性,代碼量1800+,的確非常難啃。只能說個我看代碼下來的大致印象吧,要我畫個結構圖好好理理這里面各個API的關系圖,這個我目前還真做不到。說到主從復制,這個是實現讀寫分離的最好手段了,也很常見,當用戶數達到一定量,當一個服務器承受不了達到上千萬的pv時,采取主從數據庫的形式也是一般架構師能夠想到的一種手段。Redis的主從數據庫在我這里就稱為主客戶端,從客戶端,因為客戶端中有所屬于的db,因為數據庫基于客戶單本身進行復制操作的。也就是說,一個Redis,存在一個master主客戶端,多個slave從客戶端,到時實現的就是slave向主客戶端進行復制操作。因為API比較多,進行了稍稍的歸類:
~~~
/* ---------------------------------- MASTER -------------------------------- */
void createReplicationBacklog(void) /* 創建backlog的buffer */
void resizeReplicationBacklog(long long newsize) /* 調整復制備份日志的大小,當replication backlog被修改的時候 */
void freeReplicationBacklog(void) /* 釋放備份日志 */
void feedReplicationBacklog(void *ptr, size_t len) /* 往備份日志中添加添加數據操作,會引起master_repl_offset偏移量的增加 */
void feedReplicationBacklogWithObject(robj *o) /* 往backlog添加數據,以Redis 字符串對象作為參數 */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) /* 將主數據庫復制到從數據庫 */
void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) /* 發送數據給monitor監聽者客戶端 */
long long addReplyReplicationBacklog(redisClient *c, long long offset) /* slave從客戶單添加備份日志 */
int masterTryPartialResynchronization(redisClient *c) /* 主數據庫嘗試分區同步 */
void syncCommand(redisClient *c) /* 同步命令函數 */
void replconfCommand(redisClient *c) /* 此函數用于從客戶端進行配置復制進程中的執行參數設置 */
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) /* 給slave客戶端發送BULK數據 */
void updateSlavesWaitingBgsave(int bgsaveerr) /* 此方法將用于后臺保存進程快結束時調用,更新slave從客戶端 */
/* ----------------------------------- SLAVE -------------------------------- */
void replicationAbortSyncTransfer(void) /* 中止與master主數據的同步操作 */
void replicationSendNewlineToMaster(void) /* 從客戶端發送空行給主客戶端,破壞了原本的協議格式,避免讓主客戶端檢測出從客戶端超時的情況 */
void replicationEmptyDbCallback(void *privdata) /* 清空數據庫后的回調方法,當老數據被刷新出去之后等待加載新數據的時候調用 */
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) /* 從客戶端讀取同步的Sync的BULK數據 */
char *sendSynchronousCommand(int fd, ...) /* 從客戶端發送給主客戶端同步數據的命令,附上驗證信息,和一些參數配置信息 */
int slaveTryPartialResynchronization(int fd) /* 從客戶端嘗試分區同步操作 */
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) /* 與主客戶端保持同步,期間包括端口號等的確認,socket連接 */
int connectWithMaster(void) /* 連接主客戶端 */
void undoConnectWithMaster(void) /* 撤銷連接主客戶端 */
int cancelReplicationHandshake(void) /* 當已經存在一個復制進程時,中止一個非阻塞的replication復制的嘗試 */
void replicationSetMaster(char *ip, int port) /* 設定主客戶端的ip地址和端口號 */
void replicationUnsetMaster(void)
void slaveofCommand(redisClient *c)
void roleCommand(redisClient *c)
void replicationSendAck(void) /* 發送ACK包給主客戶端 ,告知當前的進程偏移量 */
/* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */
void replicationCacheMaster(redisClient *c) /* 緩存客戶端信息 */
void replicationDiscardCachedMaster(void) /* 當某個客戶端將不會再回復的時候,可以釋放掉緩存的主客戶端 */
void replicationResurrectCachedMaster(int newfd) /* 將緩存客戶端復活 */
/* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */
void refreshGoodSlavesCount(void) /* 更新slave從客戶端數量 */
void replicationScriptCacheInit(void)
void replicationScriptCacheFlush(void)
void replicationScriptCacheAdd(sds sha1)
int replicationScriptCacheExists(sds sha1)
void replicationCron(void)
~~~
找一個標準的slave從客戶端向主客戶端實現同步的操作:
~~~
/* 與主客戶端保持同步,期間包括端口號等的確認,socket連接 */
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
char tmpfile[256], *err;
int dfd, maxtries = 5;
int sockerr = 0, psync_result;
socklen_t errlen = sizeof(sockerr);
REDIS_NOTUSED(el);
REDIS_NOTUSED(privdata);
REDIS_NOTUSED(mask);
/* If this event fired after the user turned the instance into a master
* with SLAVEOF NO ONE we must just return ASAP. */
if (server.repl_state == REDIS_REPL_NONE) {
close(fd);
return;
}
/* Check for errors in the socket. */
/* socket連接是否正常 */
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
sockerr = errno;
if (sockerr) {
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
strerror(sockerr));
goto error;
}
/* If we were connecting, it's time to send a non blocking PING, we want to
* make sure the master is able to reply before going into the actual
* replication process where we have long timeouts in the order of
* seconds (in the meantime the slave would block). */
/* 連接測試,將由主客戶端發送PING命令給從客戶端,在給定的延遲時間內觀察是否有回復 */
if (server.repl_state == REDIS_REPL_CONNECTING) {
redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
/* Delete the writable event so that the readable event remains
* registered and we can wait for the PONG reply. */
aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
server.repl_state = REDIS_REPL_RECEIVE_PONG;
/* Send the PING, don't check for errors at all, we have the timeout
* that will take care about this. */
//發送PING命令
syncWrite(fd,"PING\r\n",6,100);
return;
}
/* Receive the PONG command. */
//收到回復了
if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
char buf[1024];
/* Delete the readable event, we no longer need it now that there is
* the PING reply to read. */
aeDeleteFileEvent(server.el,fd,AE_READABLE);
/* Read the reply with explicit timeout. */
buf[0] = '\0';
if (syncReadLine(fd,buf,sizeof(buf),
server.repl_syncio_timeout*1000) == -1)
{
redisLog(REDIS_WARNING,
"I/O error reading PING reply from master: %s",
strerror(errno));
goto error;
}
/* We accept only two replies as valid, a positive +PONG reply
* (we just check for "+") or an authentication error.
* Note that older versions of Redis replied with "operation not
* permitted" instead of using a proper error code, so we test
* both. */
if (buf[0] != '+' &&
strncmp(buf,"-NOAUTH",7) != 0 &&
strncmp(buf,"-ERR operation not permitted",28) != 0)
{
redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf);
goto error;
} else {
redisLog(REDIS_NOTICE,
"Master replied to PING, replication can continue...");
}
}
/* AUTH with the master if required. */
//auth身份驗證
if(server.masterauth) {
err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
if (err[0] == '-') {
redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
sdsfree(err);
goto error;
}
sdsfree(err);
}
/* Set the slave port, so that Master's INFO command can list the
* slave listening port correctly. */
/* 設置從客戶端監聽端口 */
{
sds port = sdsfromlonglong(server.port);
err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
NULL);
sdsfree(port);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err[0] == '-') {
redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);
}
sdsfree(err);
}
/* Try a partial resynchonization. If we don't have a cached master
* slaveTryPartialResynchronization() will at least try to use PSYNC
* to start a full resynchronization so that we get the master run id
* and the global offset, to try a partial resync at the next
* reconnection attempt. */
psync_result = slaveTryPartialResynchronization(fd);
if (psync_result == PSYNC_CONTINUE) {
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
return;
}
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
* and the server.repl_master_runid and repl_master_initial_offset are
* already populated. */
if (psync_result == PSYNC_NOT_SUPPORTED) {
redisLog(REDIS_NOTICE,"Retrying with SYNC...");
if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
strerror(errno));
goto error;
}
}
/* Prepare a suitable temp file for bulk transfer */
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
if (dfd == -1) {
redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
goto error;
}
/* Setup the non blocking download of the bulk file. */
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
== AE_ERR)
{
redisLog(REDIS_WARNING,
"Can't create readable event for SYNC: %s (fd=%d)",
strerror(errno),fd);
goto error;
}
server.repl_state = REDIS_REPL_TRANSFER;
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;
error:
close(fd);
server.repl_transfer_s = -1;
server.repl_state = REDIS_REPL_CONNECT;
return;
}
~~~
? ? ? ?在replication中,要一個cacheMaster的概念,就是可以臨時緩存主客戶端的信息,一般用于突然master和slave斷開連接的時候,可以下次進行主從同步的時候快速恢復:
~~~
/* 緩存客戶端信息 */
void replicationCacheMaster(redisClient *c) {
listNode *ln;
redisAssert(server.master != NULL && server.cached_master == NULL);
redisLog(REDIS_NOTICE,"Caching the disconnected master state.");
/* Remove from the list of clients, we don't want this client to be
* listed by CLIENT LIST or processed in any way by batch operations. */
//首先移除此客戶端
ln = listSearchKey(server.clients,c);
redisAssert(ln != NULL);
listDelNode(server.clients,ln);
/* Save the master. Server.master will be set to null later by
* replicationHandleMasterDisconnection(). */
//保存為緩存客戶端
server.cached_master = server.master;
/* Remove the event handlers and close the socket. We'll later reuse
* the socket of the new connection with the master during PSYNC. */
//刪除在這個客戶端上的讀寫事件
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
close(c->fd);
/* Set fd to -1 so that we can safely call freeClient(c) later. */
c->fd = -1;
/* Invalidate the Peer ID cache. */
if (c->peerid) {
sdsfree(c->peerid);
c->peerid = NULL;
}
/* Caching the master happens instead of the actual freeClient() call,
* so make sure to adjust the replication state. This function will
* also set server.master to NULL. */
replicationHandleMasterDisconnection();
}
~~~
當想讓這個master的復活的時候,調用下面的方法:
~~~
/* Turn the cached master into the current master, using the file descriptor
* passed as argument as the socket for the new master.
*
* This funciton is called when successfully setup a partial resynchronization
* so the stream of data that we'll receive will start from were this
* master left. */
/* 將緩存客戶端復活 */
void replicationResurrectCachedMaster(int newfd) {
//將cached_master賦值為主客戶端
server.master = server.cached_master;
server.cached_master = NULL;
server.master->fd = newfd;
server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP);
server.master->authenticated = 1;
server.master->lastinteraction = server.unixtime;
server.repl_state = REDIS_REPL_CONNECTED;
/* Re-add to the list of clients. */
//重新添加入客戶端列表中
listAddNodeTail(server.clients,server.master);
if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
readQueryFromClient, server.master)) {
redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
freeClientAsync(server.master); /* Close ASAP. */
}
/* We may also need to install the write handler as well if there is
* pending data in the write buffers. */
if (server.master->bufpos || listLength(server.master->reply)) {
if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
sendReplyToClient, server.master)) {
redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
freeClientAsync(server.master); /* Close ASAP. */
}
}
}
~~~
當然如果確定在未來不糊在使用緩存的master的時,可以徹底摧毀:
~~~
/* Free a cached master, called when there are no longer the conditions for
* a partial resync on reconnection. */
/* 當某個客戶端將不會再回復的時候,可以釋放掉緩存的主客戶端 */
void replicationDiscardCachedMaster(void) {
if (server.cached_master == NULL) return;
redisLog(REDIS_NOTICE,"Discarding previously cached master state.");
server.cached_master->flags &= ~REDIS_MASTER;
//直接釋放客戶端
freeClient(server.cached_master);
//server的緩存客戶端賦值為NULL
server.cached_master = NULL;
}
~~~
在這里面靠的就是server.cached_master屬性。slave在和master連接的時候,要進行master的ip地址和Port端口的確認:
~~~
/* Set replication to the specified master address and port. */
/* 設定主客戶端的ip地址和端口號 */
void replicationSetMaster(char *ip, int port) {
sdsfree(server.masterhost);
server.masterhost = sdsdup(ip);
server.masterport = port;
//設置完畢之后,斷開所有的連接,中止replication進程
if (server.master) freeClient(server.master);
disconnectSlaves(); /* Force our slaves to resync with us as well. */
replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
cancelReplicationHandshake();
server.repl_state = REDIS_REPL_CONNECT;
server.master_repl_offset = 0;
}
~~~
主從復制的實現其實還有很多細節和步驟的。稍稍分析了一下,以后有機會研究的更深入一點
- 前言
- (一)--Redis結構解析
- (二)--結構體分析(1)
- (三)---dict哈希結構
- (四)-- sds字符串
- (五)--- sparkline微線圖
- (六)--- ziplist壓縮列表
- (七)--- zipmap壓縮圖
- (八)--- t_hash哈希轉換
- (九)--- t_list,t_string的分析
- (十)--- testhelp.h小型測試框架和redis-check-aof.c日志檢測
- (十一)--- memtest內存檢測
- (十二)--- redis-check-dump本地數據庫檢測
- (十三)--- redis-benchmark性能測試
- (十四)--- rdb.c本地數據庫操作
- (十五)--- aof-append only file解析
- (十六)--- config配置文件
- (十七)--- multi事務操作
- (十八)--- db.c內存數據庫操作
- (十九)--- replication主從數據復制的實現
- (二十)--- ae事件驅動
- (二十一)--- anet網絡通信的封裝
- (二十二)--- networking網絡協議傳輸
- (二十三)--- CRC循環冗余算法和RAND隨機數算法
- (二十四)--- tool工具類(2)
- (二十五)--- zmalloc內存分配實現
- (二十六)--- slowLog和hyperloglog
- (二十七)--- rio系統I/O的封裝
- (二十八)--- object創建和釋放redisObject對象
- (二十九)--- bio后臺I/O服務的實現
- (三十)--- pubsub發布訂閱模式
- (三十一)--- latency延遲分析處理
- (三十二)--- redis-cli.c客戶端命令行接口的實現(1)
- (三十三)--- redis-cli.c客戶端命令行接口的實現(2)
- (三十四)--- redis.h服務端的實現分析(1)
- (三十五)--- redis.c服務端的實現分析(2)
- (三十六)--- Redis中的11大優秀設計