? ? ? ? 上次我只分析了Redis網絡部分的代碼一部分,今天我把networking的代碼實現部分也學習了一遍,netWorking的代碼更多偏重的是Client客戶端的操作。里面addReply()系列的方法操作是主要的部分。光光這個系列的方法,應該占據了一半的API的數量。我把API分成了3個部分:
~~~
/* ------------ API ---------------------- */
void *dupClientReplyValue(void *o) /* 復制value一份 */
int listMatchObjects(void *a, void *b) /* 比價2個obj是否相等 */
robj *dupLastObjectIfNeeded(list *reply) /* 返回回復列表中最后一個元素對象 */
void copyClientOutputBuffer(redisClient *dst, redisClient *src) /* 將源Client的輸出buffer復制給目標Client */
static void acceptCommonHandler(int fd, int flags) /* 網絡連接后的調用方法 */
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask)
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask)
void disconnectSlaves(void) /* 使server的slave失去連接 */
void replicationHandleMasterDisconnection(void)
void flushSlavesOutputBuffers(void) /* 從方法將會在freeMemoryIfNeeded(),釋放內存空間函數,將存在內存中數據操作結果刷新到磁盤中 */
int processEventsWhileBlocked(void)
/* ------------- addReply API ----------------- */
int _addReplyToBuffer(redisClient *c, char *s, size_t len) /* 往客戶端緩沖區中添加內容 */
void _addReplyObjectToList(redisClient *c, robj *o) /* robj添加到reply的列表中 */
void _addReplySdsToList(redisClient *c, sds s) /* 在回復列表中添加Sds字符串對象 */
void _addReplyStringToList(redisClient *c, char *s, size_t len) /* 在回復列表中添加字符串對象,參數中已經給定字符的長度 */
void addReply(redisClient *c, robj *obj) /* 在redisClient的buffer中寫入數據,數據存在obj->ptr的指針中 */
void addReplySds(redisClient *c, sds s) /* 在回復中添加Sds字符串,下面的額addReply()系列方法原理基本類似 */
void addReplyString(redisClient *c, char *s, size_t len)
void addReplyErrorLength(redisClient *c, char *s, size_t len)
void addReplyError(redisClient *c, char *err) /* 往Reply中添加error類的信息 */
void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
void addReplyStatusLength(redisClient *c, char *s, size_t len)
void addReplyStatus(redisClient *c, char *status)
void addReplyStatusFormat(redisClient *c, const char *fmt, ...)
void *addDeferredMultiBulkLength(redisClient *c) /* 在reply list 中添加一個空的obj對象 */
void setDeferredMultiBulkLength(redisClient *c, void *node, long length)
void addReplyDouble(redisClient *c, double d) /* 在bulk reply中添加一個double類型值,bulk的意思為大塊的,bulk reply的意思為大數據量的回復 */
void addReplyLongLongWithPrefix(redisClient *c, long long ll, char prefix)
void addReplyLongLong(redisClient *c, long long ll)
void addReplyMultiBulkLen(redisClient *c, long length)
void addReplyBulkLen(redisClient *c, robj *obj) /* 添加bulk 大塊的數據的長度 */
void addReplyBulk(redisClient *c, robj *obj) /* 將一個obj的數據,拆分成大塊數據的添加 */
void addReplyBulkCBuffer(redisClient *c, void *p, size_t len)
void addReplyBulkCString(redisClient *c, char *s)
void addReplyBulkLongLong(redisClient *c, long long ll)
/* ------------- Client API ----------------- */
redisClient *createClient(int fd) /* 創建redisClient客戶端,1.建立連接,2.設置數據庫,3.屬性設置 */
int prepareClientToWrite(redisClient *c) /* 此方法將會被調用于Client準備接受新數據之前調用,在fileEvent為客戶端設定writer的handler處理事件 */
static void freeClientArgv(redisClient *c)
void freeClient(redisClient *c) /* 釋放freeClient,要分為Master和Slave2種情況作不同的處理 */
void freeClientAsync(redisClient *c)
void freeClientsInAsyncFreeQueue(void) /* 異步的free客戶端 */
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) /* 將Client中的reply數據存入文件中 */
void resetClient(redisClient *c)
int processInlineBuffer(redisClient *c) /* 處理redis Client的內鏈的buffer,就是c->querybuf */
static void setProtocolError(redisClient *c, int pos)
int processMultibulkBuffer(redisClient *c) /* 處理大塊的buffer */
void processInputBuffer(redisClient *c) /* 處理redisClient的查詢buffer */
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) /* 從Client獲取查詢query語句 */
void getClientsMaxBuffers(unsigned long *longest_output_list,
unsigned long *biggest_input_buffer) /* 獲取Client中輸入buffer和輸出buffer的最大長度值 */
void formatPeerId(char *peerid, size_t peerid_len, char *ip, int port) /* 格式化ip,port端口號的輸出,ip:port */
int genClientPeerId(redisClient *client, char *peerid, size_t peerid_len) /* 獲取Client客戶端的ip,port地址信息 */
char *getClientPeerId(redisClient *c) /* 獲取c->peerid客戶端的地址信息 */
sds catClientInfoString(sds s, redisClient *client) /* 格式化的輸出客戶端的屬性信息,直接返回一個拼接好的字符串 */
sds getAllClientsInfoString(void) /* 獲取所有Client客戶端的屬性信息,并連接成一個總的字符串并輸出 */
void clientCommand(redisClient *c) /* 執行客戶端的命令的作法 */
void rewriteClientCommandVector(redisClient *c, int argc, ...) /* 重寫客戶端的命令集合,舊的命令集合的應用計數減1,新的Command Vector的命令集合增1 */
void rewriteClientCommandArgument(redisClient *c, int i, robj *newval) /* 重寫Client中的第i個參數 */
unsigned long getClientOutputBufferMemoryUsage(redisClient *c) /* 獲取Client中已經用去的輸出buffer的大小 */
int getClientType(redisClient *c)
int getClientTypeByName(char *name) /* Client中的名字的3種類型,normal,slave,pubsub */
char *getClientTypeName(int class)
int checkClientOutputBufferLimits(redisClient *c) /* 判斷Clint的輸出緩沖區的已經占用大小是否超過軟限制或是硬限制 */
void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) /* 異步的關閉Client,如果緩沖區中的軟限制或是硬限制已經到達的時候,緩沖區超出限制的結果會導致釋放不安全, */
~~~
我們從最簡單的_addReplyToBuffer在緩沖區中添加回復數據開始說起,因為后面的各種addReply的方法都或多或少的調用了和這個歌方法。
~~~
/* -----------------------------------------------------------------------------
* Low level functions to add more data to output buffers.
* -------------------------------------------------------------------------- */
/* 往客戶端緩沖區中添加內容 */
int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
size_t available = sizeof(c->buf)-c->bufpos;
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK;
/* If there already are entries in the reply list, we cannot
* add anything more to the static buffer. */
//如果當前的reply已經存在內容,則操作出錯
if (listLength(c->reply) > 0) return REDIS_ERR;
/* Check that the buffer has enough space available for this string. */
if (len > available) return REDIS_ERR;
memcpy(c->buf+c->bufpos,s,len);
c->bufpos+=len;
return REDIS_OK;
}
~~~
最直接影響的一句話,就是memcpy(c->buf+c->bufpos,s,len);所以內容是加到c->buf中的,這也就是客戶端的輸出buffer,添加操作還有另外一種形式是添加對象類型:
~~~
/* robj添加到reply的列表中 */
void _addReplyObjectToList(redisClient *c, robj *o) {
robj *tail;
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
if (listLength(c->reply) == 0) {
incrRefCount(o);
//在回復列表匯總添加robj內容
listAddNodeTail(c->reply,o);
c->reply_bytes += zmalloc_size_sds(o->ptr);
} else {
tail = listNodeValue(listLast(c->reply));
/* Append to this object when possible. */
if (tail->ptr != NULL &&
sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
{
c->reply_bytes -= zmalloc_size_sds(tail->ptr);
tail = dupLastObjectIfNeeded(c->reply);
tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
c->reply_bytes += zmalloc_size_sds(tail->ptr);
} else {
incrRefCount(o);
listAddNodeTail(c->reply,o);
c->reply_bytes += zmalloc_size_sds(o->ptr);
}
}
asyncCloseClientOnOutputBufferLimitReached(c);
}
~~~
把robj對象加載reply列表中,并且改變reply的byte大小,最后還調用了一個asyncCloseClientOnOutputBufferLimitReached(c);方法,這個方法我是在這個文件的最底部找到的,一開始還真不知道什么意思,作用就是當添加完數據后,當客戶端的輸出緩沖的大小超出限制時,會被異步關閉:
~~~
/* Asynchronously close a client if soft or hard limit is reached on the
* output buffer size. The caller can check if the client will be closed
* checking if the client REDIS_CLOSE_ASAP flag is set.
*
* Note: we need to close the client asynchronously because this function is
* called from contexts where the client can't be freed safely, i.e. from the
* lower level functions pushing data inside the client output buffers. */
/* 異步的關閉Client,如果緩沖區中的軟限制或是硬限制已經到達的時候,緩沖區超出限制的結果會導致釋放不安全, */
void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) {
redisAssert(c->reply_bytes < ULONG_MAX-(1024*64));
if (c->reply_bytes == 0 || c->flags & REDIS_CLOSE_ASAP) return;
if (checkClientOutputBufferLimits(c)) {
sds client = catClientInfoString(sdsempty(),c);
freeClientAsync(c);
redisLog(REDIS_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
sdsfree(client);
}
}
~~~
在addReply方法調用的時候,有時是需要一個前提的,我說的是在寫數據事件發生的時候,你得先對寫的文件創建一個監聽事件:
~~~
/* 在回復中添加Sds字符串 */
void addReplySds(redisClient *c, sds s) {
//在調用添加操作之前,都要先執行prepareClientToWrite(c),設置文件事件的寫事件
if (prepareClientToWrite(c) != REDIS_OK) {
/* The caller expects the sds to be free'd. */
sdsfree(s);
return;
}
if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) {
sdsfree(s);
} else {
/* This method free's the sds when it is no longer needed. */
_addReplySdsToList(c,s);
}
}
~~~
在這個prepareClientToWrite()里面是干嘛的呢?
~~~
/* This function is called every time we are going to transmit new data
* to the client. The behavior is the following:
*
* If the client should receive new data (normal clients will) the function
* returns REDIS_OK, and make sure to install the write handler in our event
* loop so that when the socket is writable new data gets written.
*
* If the client should not receive new data, because it is a fake client,
* a master, a slave not yet online, or because the setup of the write handler
* failed, the function returns REDIS_ERR.
*
* Typically gets called every time a reply is built, before adding more
* data to the clients output buffers. If the function returns REDIS_ERR no
* data should be appended to the output buffers. */
/* 此方法將會被調用于Client準備接受新數據之前調用,在fileEvent為客戶端設定writer的handler處理事件 */
int prepareClientToWrite(redisClient *c) {
if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;
if ((c->flags & REDIS_MASTER) &&
!(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR;
if (c->fd <= 0) return REDIS_ERR; /* Fake client */
if (c->bufpos == 0 && listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
c->replstate == REDIS_REPL_ONLINE) &&
//在這里創建寫的文件事件
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
return REDIS_OK;
}
~~~
在addReply的方法里提到了一個addReplyBulk類型方法,Bulk的中文意思為大塊的,說明addReplyBulk添加的都是一些比較大塊的數據,找一個方法看看:
~~~
/* Add a Redis Object as a bulk reply */
/* 將一個obj的數據,拆分成大塊數據的添加 */
void addReplyBulk(redisClient *c, robj *obj) {
//reply添加長度
addReplyBulkLen(c,obj);
//reply添加對象
addReply(c,obj);
addReply(c,shared.crlf);
}
~~~
將原本一個robj的數據拆分成可3個普通的addReply的方法調用。就變成了數據量變大了的數據。大數據的回復一個比較不好的地方是到時解析的時候或者是Data的復制的時候會比較耗時。在networking的方法里還提供了freeClient()的操作:
~~~
/* 釋放freeClient,要分為Master和Slave2種情況作不同的處理 */
void freeClient(redisClient *c) {
listNode *ln;
/* If this is marked as current client unset it */
if (server.current_client == c) server.current_client = NULL;
/* If it is our master that's beging disconnected we should make sure
* to cache the state to try a partial resynchronization later.
*
* Note that before doing this we make sure that the client is not in
* some unexpected state, by checking its flags. */
if (server.master && c->flags & REDIS_MASTER) {
redisLog(REDIS_WARNING,"Connection with master lost.");
if (!(c->flags & (REDIS_CLOSE_AFTER_REPLY|
REDIS_CLOSE_ASAP|
REDIS_BLOCKED|
REDIS_UNBLOCKED)))
{
//如果是Master客戶端,需要做緩存Client的處理,可以迅速重新啟用
replicationCacheMaster(c);
return;
}
}
~~~
...后面代碼略去了
?當Client中的輸出buffer數據漸漸變多了的時候就要準備持久化到磁盤文件了,要調用下面這個方法了,
~~~
/* Helper function used by freeMemoryIfNeeded() in order to flush slave
* output buffers without returning control to the event loop. */
/* 從方法將會在freeMemoryIfNeeded(),釋放內存空間函數,將存在內存中數據操作結果刷新到磁盤中 */
void flushSlavesOutputBuffers(void) {
listIter li;
listNode *ln;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = listNodeValue(ln);
int events;
events = aeGetFileEvents(server.el,slave->fd);
if (events & AE_WRITABLE &&
slave->replstate == REDIS_REPL_ONLINE &&
listLength(slave->reply))
{
//在這里調用了write的方法
sendReplyToClient(server.el,slave->fd,slave,0);
}
}
}
~~~
這個方法的核心調用又在sendReplyToClient()方法,就是把Client的reply內容和buf內容存入文件。以上就是我的理解了,代碼量有點大,的確看的我頭有點大。
- 前言
- (一)--Redis結構解析
- (二)--結構體分析(1)
- (三)---dict哈希結構
- (四)-- sds字符串
- (五)--- sparkline微線圖
- (六)--- ziplist壓縮列表
- (七)--- zipmap壓縮圖
- (八)--- t_hash哈希轉換
- (九)--- t_list,t_string的分析
- (十)--- testhelp.h小型測試框架和redis-check-aof.c日志檢測
- (十一)--- memtest內存檢測
- (十二)--- redis-check-dump本地數據庫檢測
- (十三)--- redis-benchmark性能測試
- (十四)--- rdb.c本地數據庫操作
- (十五)--- aof-append only file解析
- (十六)--- config配置文件
- (十七)--- multi事務操作
- (十八)--- db.c內存數據庫操作
- (十九)--- replication主從數據復制的實現
- (二十)--- ae事件驅動
- (二十一)--- anet網絡通信的封裝
- (二十二)--- networking網絡協議傳輸
- (二十三)--- CRC循環冗余算法和RAND隨機數算法
- (二十四)--- tool工具類(2)
- (二十五)--- zmalloc內存分配實現
- (二十六)--- slowLog和hyperloglog
- (二十七)--- rio系統I/O的封裝
- (二十八)--- object創建和釋放redisObject對象
- (二十九)--- bio后臺I/O服務的實現
- (三十)--- pubsub發布訂閱模式
- (三十一)--- latency延遲分析處理
- (三十二)--- redis-cli.c客戶端命令行接口的實現(1)
- (三十三)--- redis-cli.c客戶端命令行接口的實現(2)
- (三十四)--- redis.h服務端的實現分析(1)
- (三十五)--- redis.c服務端的實現分析(2)
- (三十六)--- Redis中的11大優秀設計