? ? ? ? 今天我是看完了t_list和t_string的代碼,從名字知道,這也是和之前的t_hash非常類似的,無非就是各種形式,轉化來轉化去的。先講講t_list,對比于昨天的t_hash,t_hash是ziplist和dict之間的轉換,t_list則是描述的是ziplist壓縮表和linedlist普通鏈表,換句話說,當client這個robj作為參數傳入的時候,都分為ENCODING_ZIIPLIST和ENCODING_LINKEDLIST 2類編碼方式處理。還有一個在t_list出現的一個 比較新穎的東西是出現了鎖的概念,操作系統的東西在這里也會碰到了,這里的代碼也非常值得學習。下面,我們看看一般的t_list中包括的一些方法和命令:
~~~
/* List方法 */
/* list的操作分為2種,LINKEDLIST普通鏈表和ZIPLIST壓縮列表的相關操作 */
void listTypeTryConversion(robj *subject, robj *value) /* 判斷是否需要將ziplist轉為linkedlist */
void listTypePush(robj *subject, robj *value, int where) /* 在頭部或尾部插入value元素 */
robj *listTypePop(robj *subject, int where) /* 在列表的頭部或尾彈出元素 */
unsigned long listTypeLength(robj *subject) /* 列表的長度 */
listTypeIterator *listTypeInitIterator(robj *subject, long index, unsigned char direction) /* 返回列表迭代器,方向有頭尾之分 */
void listTypeReleaseIterator(listTypeIterator *li) /* 釋放列表迭代器 */
int listTypeNext(listTypeIterator *li, listTypeEntry *entry) /* 根據列表迭代器,獲取下一個元素 */
robj *listTypeGet(listTypeEntry *entry) /* 獲取listType元素,有ziplist和linkedlist */
void listTypeInsert(listTypeEntry *entry, robj *value, int where) /* listType了類型插入元素操作 */
int listTypeEqual(listTypeEntry *entry, robj *o) /* 判斷2個元素是否相等 */
void listTypeDelete(listTypeEntry *entry) /* listType類型刪除元素 */
void listTypeConvert(robj *subject, int enc) /* listType類型的轉換操作,這里指的是往linkedList上轉 */
/* List的相關命令 */
void pushGenericCommand(redisClient *c, int where) /* 插入操作命令的原始操作 */
void lpushCommand(redisClient *c) /* 左邊插入元素命令 */
void rpushCommand(redisClient *c) /* 右邊插入元素命令 */
void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) /* 有返回狀態的插入操作命令,假設首先都是能夠實現插入命令的 */
void lpushxCommand(redisClient *c) /* 左邊插入元素有返回消息的命令 */
void rpushxCommand(redisClient *c) /* 右邊插入元素有返回消息的命令 */
void linsertCommand(redisClient *c) /* 列表指定位置插入元素操作命令 */
void llenCommand(redisClient *c) /* 列表返回長度命令 */
void lindexCommand(redisClient *c) /* 獲取index位置的上的元素 */
void lsetCommand(redisClient *c) /* listType類型設置value命令 */
void popGenericCommand(redisClient *c, int where) /* 實現彈出操作的原始命令 */
void lpopCommand(redisClient *c) /* 左邊彈出元素命令 */
void rpopCommand(redisClient *c) /* 右邊彈出操作命令 */
void lrangeCommand(redisClient *c) /* 移動listType位置操作 */
void ltrimCommand(redisClient *c) /* listType實現截取操作,把多余范圍的元素刪除 */
void lremCommand(redisClient *c) /* 移除在listType中出現的與指定的元素相等的元素 */
void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) /* 元素從一個obj右邊彈出,在從左側推入另一個obj列表操作 */
void rpoplpushCommand(redisClient *c) /* 右邊彈出,左邊推入元素的命令 */
void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) >/* 設置客戶端為阻塞模式,并設置超時時間,當請求特定的key元素時 */
void unblockClientWaitingData(redisClient *c) /* 客戶端解鎖操作 */
void signalListAsReady(redisDb *db, robj *key) /* 將key存入server中,后續可以用于客戶端的存取 */
int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where) /* 根據server,Client的key,value情況,判斷server此時能否服務于Client,否則Client將被阻塞 */
void handleClientsBlockedOnLists(void) /* 服務端解除阻塞住的Client */
int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) /* 獲取超時時間 */
void blockingPopGenericCommand(redisClient *c, int where) /* 阻塞彈出命令的原始操作 */
void blpopCommand(redisClient *c) /* 左邊彈出數據的阻塞式命令 */
void brpopCommand(redisClient *c) /* 右邊彈出數據的阻塞式命令 */
void brpoplpushCommand(redisClient *c) /* 彈出推入阻塞式命令 */
~~~
看到這么多API,是否有被嚇到的感覺,但里面其實很多的方法都很簡單,我只挑幾個比較典型的方法做一下分析,向什么返回長度了,獲取,設置鍵值的大家可以自己看具體的方法,不難的。一般的t_list的處理流程,比如獲取迭代器的方法:
~~~
/* Stores pointer to current the entry in the provided entry structure
* and advances the position of the iterator. Returns 1 when the current
* entry is in fact an entry, 0 otherwise. */
int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
/* Protect from converting when iterating */
redisAssert(li->subject->encoding == li->encoding);
entry->li = li;
if (li->encoding == REDIS_ENCODING_ZIPLIST) {
entry->zi = li->zi;
if (entry->zi != NULL) {
if (li->direction == REDIS_TAIL)
//根據方向調用pre或next的方法
li->zi = ziplistNext(li->subject->ptr,li->zi);
else
li->zi = ziplistPrev(li->subject->ptr,li->zi);
return 1;
}
} else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
entry->ln = li->ln;
if (entry->ln != NULL) {
if (li->direction == REDIS_TAIL)
//普通的列表的調用方式同上
li->ln = li->ln->next;
else
li->ln = li->ln->prev;
return 1;
}
} else {
redisPanic("Unknown list encoding");
}
return 0;
}
~~~
很多API和t_hash的方法都是極其類似的,我就不多說了,我們將關注點,放在列表的鎖控制上,先講個前提,在redis客戶端執行操作的時候,會有個請求超時時間,在這個請求的時間內,客戶端如果沒有找到key對應的數據,是會被阻塞的,什么意思呢,比如:
~~~
/* 設置客戶端為阻塞模式,并設置超時時間,當請求特定的key元素時 */
/* 這個客戶端阻塞的意思:當客戶端請求list中某個特定key值時,如果key存在且列表非空,當然 */
/* 當然不會阻塞,正常返回數據,如果當客戶端請求某個key不存在,或列表為empty的時候,客戶端將被阻塞 */
/* 只有當有這個key被寫入的時候,客戶端才會被解鎖 *
~~~
這個其實就是操作系統中的PV操作類似的原理,跟java里的wait(),signal()方法的模型是一樣的,他的實現代碼如下:
~~~
/* This is how the current blocking POP works, we use BLPOP as example:
* - If the user calls BLPOP and the key exists and contains a non empty list
* then LPOP is called instead. So BLPOP is semantically the same as LPOP
* if blocking is not required.
* - If instead BLPOP is called and the key does not exists or the list is
* empty we need to block. In order to do so we remove the notification for
* new data to read in the client socket (so that we'll not serve new
* requests if the blocking request is not served). Also we put the client
* in a dictionary (db->blocking_keys) mapping keys to a list of clients
* blocking for this keys.
* - If a PUSH operation against a key with blocked clients waiting is
* performed, we mark this key as "ready", and after the current command,
* MULTI/EXEC block, or script, is executed, we serve all the clients waiting
* for this list, from the one that blocked first, to the last, accordingly
* to the number of elements we have in the ready list.
*/
/* Set a client in blocking mode for the specified key, with the specified
* timeout */
/* 設置客戶端為阻塞模式,并設置超時時間,當請求特定的key元素時 */
/* 這個客戶端阻塞的意思:當客戶端請求list中某個特定key值時,如果key存在且列表非空,當然 */
/* 當然不會阻塞,正常返回數據,如果當客戶端請求某個key不存在,或列表為empty的時候,客戶端將被阻塞 */
/* 只有當有這個key被寫入的時候,客戶端才會被解鎖 */
void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
dictEntry *de;
list *l;
int j;
//設置超時時間
c->bpop.timeout = timeout;
c->bpop.target = target;
if (target != NULL) incrRefCount(target);
for (j = 0; j < numkeys; j++) {
//下面為為找到的key上鎖
/* If the key already exists in the dict ignore it. */
//如果此時,某些key已經存在了,直接忽略
if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
incrRefCount(keys[j]);
/* And in the other "side", to map keys -> clients */
//根據key找到對于請求該key的客戶端,也就是將要被阻塞的Client
de = dictFind(c->db->blocking_keys,keys[j]);
if (de == NULL) {
int retval;
/* For every key we take a list of clients blocked for it */
l = listCreate();
//為c客戶端添加一個阻塞的key
retval = dictAdd(c->db->blocking_keys,keys[j],l);
//增加key的引用次數
incrRefCount(keys[j]);
redisAssertWithInfo(c,keys[j],retval == DICT_OK);
} else {
l = dictGetVal(de);
}
listAddNodeTail(l,c);
}
/* Mark the client as a blocked client */
/* 標記Client為阻塞的客戶端 */
c->flags |= REDIS_BLOCKED;
//服務端的阻塞客戶端計數遞增
server.bpop_blocked_clients++;
}
~~~
所有的key未來都會存在于server.readykeys里面,客戶端判斷自己所請求的key是否存在于服務端中的readykeys中,如果不存在就會被阻塞了。將key存入服務單的數據庫的操作:
~~~
/* If the specified key has clients blocked waiting for list pushes, this
* function will put the key reference into the server.ready_keys list.
* Note that db->ready_keys is a hash table that allows us to avoid putting
* the same key again and again in the list in case of multiple pushes
* made by a script or in the context of MULTI/EXEC.
*
* The list will be finally processed by handleClientsBlockedOnLists() */
/* 將key存入server中,后續可以用于客戶端的存取 */
void signalListAsReady(redisDb *db, robj *key) {
readyList *rl;
/* No clients blocking for this key? No need to queue it. */
/* 如果沒有客戶端為此key所阻塞的,直接不添加 */
if (dictFind(db->blocking_keys,key) == NULL) return;
/* Key was already signaled? No need to queue it again. */
/* 如果key已經存在,不不要重復添加 */
if (dictFind(db->ready_keys,key) != NULL) return;
/* Ok, we need to queue this key into server.ready_keys. */
rl = zmalloc(sizeof(*rl));
rl->key = key;
rl->db = db;
incrRefCount(key);
//添加到list列表尾部
listAddNodeTail(server.ready_keys,rl);
/* We also add the key in the db->ready_keys dictionary in order
* to avoid adding it multiple times into a list with a simple O(1)
* check. */
incrRefCount(key);
redisAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
}
~~~
其實這里我不免會有個小問題,為什么list出現Client被阻塞的操作,而在t_hash中不會出現類似的操作呢?略疑惑。
? ? 下面說說t_string的用法,t_string里面沒有涉及什么特定的結構體,就是最直接的鍵值對的設置,直接改變數據庫中的鍵值,先列出里面的主要方法:
~~~
/* 方法 API */
static int checkStringLength(redisClient *c, long long size) /* 檢查字符串長度是否超出限制最大長度,512*1024*1024個字節長度 */
void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) /* 設置的泛型指令操作 */
void setCommand(redisClient *c) /* 設置的綜合方法,設置都是根據Client中的參數而定 */
void setnxCommand(redisClient *c) /* key不存在的時候才設置值,flag為REDIS_SET_NX時 */
void setexCommand(redisClient *c) /* key存在的時候才設置值,flag為REDIS_SET_NO_FLAGS,命令到期時間單位為秒 */
void psetexCommand(redisClient *c) /* key存在的時候才設置值,flag為REDIS_SET_NO_FLAGS,命令到期時間單位為毫秒 */
int getGenericCommand(redisClient *c) /* 獲取命令的泛型命令 */
void getCommand(redisClient *c) /* 獲取命令的操作 */
void getsetCommand(redisClient *c) /* 獲取set命令的操作 */
void setrangeCommand(redisClient *c) /* 設置rangge命令的操作 */
void getrangeCommand(redisClient *c) /* 獲取range命令的操作 */
void mgetCommand(redisClient *c) /* 執行多次getCommand指令 */
void msetGenericCommand(redisClient *c, int nx) /* 一次運行多個設置操作泛型命令 */
void msetCommand(redisClient *c) /* 多次設置指令的非NX模式 */
void msetnxCommand(redisClient *c) /* 多次設置指令的NX模式,即只在不存在的時候才設置 */
void incrDecrCommand(redisClient *c, long long incr) /* 增加或減少固定值的操作,減少其實就是增加-1 */
void incrCommand(redisClient *c) /* 遞增1操作,incr遞增設置為1 */
void decrCommand(redisClient *c) /* 遞減1操作,incr遞增設置為-1 */
void incrbyCommand(redisClient *c) /* 增加指令,從Client中獲取遞增值 */
void decrbyCommand(redisClient *c) /* 減少指令,從Client中獲取減少值 */
void incrbyfloatCommand(redisClient *c) /* 增加float類型值的操作命令,在獲取原始值的時候,獲取的方法不一樣,為getLongDoubleFromObjectOrReply*/
void appendCommand(redisClient *c) /* 追加命令操作,其實也是key:value的形式 */
void strlenCommand(redisClient *c) /* 獲取命令長度 */
~~~
里面都是一些鍵值對的簡單操作,有一個不同點是,里面的設置操作分為3種:
~~~
/* SET操作的一些FLAG */
#define REDIS_SET_NO_FLAGS 0
#define REDIS_SET_NX (1<<0) /* Set if key not exists. */
#define REDIS_SET_XX (1<<1) /* Set if key exists. */
~~~
如上面所說,nx模式指的是key不存在的時候才設置值,xx為存在的時候設置,這種模式設置有命令到期時間的限制,分為單位為秒級和毫秒級,而nx模式下,沒有時間上的限制,調用的泛型方法:
~~~
void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply)
~~~
注意其中的expire,和時間單位unit:
~~~
/* key不存在的時候才設置值,flag為REDIS_SET_NX時 */
void setnxCommand(redisClient *c) {
c->argv[2] = tryObjectEncoding(c->argv[2]);
setGenericCommand(c,REDIS_SET_NX,c->argv[1],c->argv[2],NULL,0,shared.cone,shared.czero);
}
/* key存在的時候才設置值,flag為REDIS_SET_NO_FLAGS,命令到期時間單位為秒 */
void setexCommand(redisClient *c) {
c->argv[3] = tryObjectEncoding(c->argv[3]);
setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_SECONDS,NULL,NULL);
}
/* key存在的時候才設置值,flag為REDIS_SET_NO_FLAGS,命令到期時間單位為毫秒 */
void psetexCommand(redisClient *c) {
c->argv[3] = tryObjectEncoding(c->argv[3]);
setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_MILLISECONDS,NULL,NULL);
}
~~~
nx模式時expire和unit參數分別為NULL,0,所以我做出了如下猜想,可能是因為,當鍵值對存在的時候,考慮多并發設置的情況,有些Client可能被阻塞,所以有超時時間的存在,而key不存在的時候,就看誰更快了吧,就是直接設置。下面還有一個這個文件比較有意思的方法,有點批處理的味道,換句話說,一個方法里執行多次set操作:
~~~
/* 一次運行多個設置操作泛型命令 */
void msetGenericCommand(redisClient *c, int nx) {
int j, busykeys = 0;
/* 設置操作一定是成對出現的 */
if ((c->argc % 2) == 0) {
addReplyError(c,"wrong number of arguments for MSET");
return;
}
/* Handle the NX flag. The MSETNX semantic is to return zero and don't
* set nothing at all if at least one already key exists. */
if (nx) {
for (j = 1; j < c->argc; j += 2) {
if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
busykeys++;
}
}
//如果key存在,返回0
if (busykeys) {
addReply(c, shared.czero);
return;
}
}
//隔2個執行設置key操作指令一次
for (j = 1; j < c->argc; j += 2) {
c->argv[j+1] = tryObjectEncoding(c->argv[j+1]);
setKey(c->db,c->argv[j],c->argv[j+1]);
notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",c->argv[j],c->db->id);
}
server.dirty += (c->argc-1)/2;
addReply(c, nx ? shared.cone : shared.ok);
}
~~~
也不是什么特別的操作,應該redis編寫者為了方便效率的提高吧。其他的一些方法,請讀者可以學習t_string.c的文件,還有提醒一點,在redis數據庫中,存儲的形式都是K-V形式的,所有的獲取,設置都是通過KEY的形式操作,你會看到很多這樣的方法:
~~~
dbAdd(c->db,c->argv[1],new);
setKey(c->db,c->argv[j],c->argv[j+1]);
~~~
- 前言
- (一)--Redis結構解析
- (二)--結構體分析(1)
- (三)---dict哈希結構
- (四)-- sds字符串
- (五)--- sparkline微線圖
- (六)--- ziplist壓縮列表
- (七)--- zipmap壓縮圖
- (八)--- t_hash哈希轉換
- (九)--- t_list,t_string的分析
- (十)--- testhelp.h小型測試框架和redis-check-aof.c日志檢測
- (十一)--- memtest內存檢測
- (十二)--- redis-check-dump本地數據庫檢測
- (十三)--- redis-benchmark性能測試
- (十四)--- rdb.c本地數據庫操作
- (十五)--- aof-append only file解析
- (十六)--- config配置文件
- (十七)--- multi事務操作
- (十八)--- db.c內存數據庫操作
- (十九)--- replication主從數據復制的實現
- (二十)--- ae事件驅動
- (二十一)--- anet網絡通信的封裝
- (二十二)--- networking網絡協議傳輸
- (二十三)--- CRC循環冗余算法和RAND隨機數算法
- (二十四)--- tool工具類(2)
- (二十五)--- zmalloc內存分配實現
- (二十六)--- slowLog和hyperloglog
- (二十七)--- rio系統I/O的封裝
- (二十八)--- object創建和釋放redisObject對象
- (二十九)--- bio后臺I/O服務的實現
- (三十)--- pubsub發布訂閱模式
- (三十一)--- latency延遲分析處理
- (三十二)--- redis-cli.c客戶端命令行接口的實現(1)
- (三十三)--- redis-cli.c客戶端命令行接口的實現(2)
- (三十四)--- redis.h服務端的實現分析(1)
- (三十五)--- redis.c服務端的實現分析(2)
- (三十六)--- Redis中的11大優秀設計