? ? ? I/O操作對于每個系統來說都是必不可少的一部分。而且I/O操作的好壞,在一定程度上也會影響著系統的效率問題。今天我學習了一下在Redis中的I/O是怎么處理的,同樣的,Redis在他自己的系統中,也封裝了一個I/O層。簡稱RIO。得先看看RIO中有什么東西嘍:
~~~
struct _rio {
/* Backend functions.
* Since this functions do not tolerate short writes or reads the return
* value is simplified to: zero on error, non zero on complete success. */
/* 數據流的讀方法 */
size_t (*read)(struct _rio *, void *buf, size_t len);
/* 數據流的寫方法 */
size_t (*write)(struct _rio *, const void *buf, size_t len);
/* 獲取當前的讀寫偏移量 */
off_t (*tell)(struct _rio *);
/* The update_cksum method if not NULL is used to compute the checksum of
* all the data that was read or written so far. The method should be
* designed so that can be called with the current checksum, and the buf
* and len fields pointing to the new block of data to add to the checksum
* computation. */
/* 當讀入新的數據塊的時候,會更新當前的校驗和 */
void (*update_cksum)(struct _rio *, const void *buf, size_t len);
/* The current checksum */
/* 當前的校驗和 */
uint64_t cksum;
/* number of bytes read or written */
/* 當前讀取的或寫入的字節大小 */
size_t processed_bytes;
/* maximum single read or write chunk size */
/* 最大的單次讀寫的大小 */
size_t max_processing_chunk;
/* Backend-specific vars. */
/* rio中I/O變量 */
union {
//buffer結構體
struct {
//buffer具體內容
sds ptr;
//偏移量
off_t pos;
} buffer;
//文件結構體
struct {
FILE *fp;
off_t buffered; /* Bytes written since last fsync. */
//同步的最小大小
off_t autosync; /* fsync after 'autosync' bytes written. */
} file;
} io;
};
~~~
里面除了3個必須的方法,read,write方法,還有獲取偏移量的tell方法,還有2個結構體變量,一個buffer結構體,一個file結構體,作者針對不同的I/O情況,做了不同的處理,當執行臨時的I/O操作時,都與rio.buffer打交道,當與文件進行I/O操作時,則執行與rio.file之間的操作。下面看看rio統一定義的讀寫方法:
~~~
/* The following functions are our interface with the stream. They'll call the
* actual implementation of read / write / tell, and will update the checksum
* if needed. */
/* rio的寫方法 */
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
while (len) {
//判斷當前操作字節長度是否超過最大長度
size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
//寫入新的數據時,更新校驗和
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
//執行寫方法
if (r->write(r,buf,bytes_to_write) == 0)
return 0;
buf = (char*)buf + bytes_to_write;
len -= bytes_to_write;
//操作字節數增加
r->processed_bytes += bytes_to_write;
}
return 1;
}
/* rio的讀方法 */
static inline size_t rioRead(rio *r, void *buf, size_t len) {
while (len) {
//判斷當前操作字節長度是否超過最大長度
size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
//讀數據方法
if (r->read(r,buf,bytes_to_read) == 0)
return 0;
//讀數據時,更新校驗和
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read);
buf = (char*)buf + bytes_to_read;
len -= bytes_to_read;
r->processed_bytes += bytes_to_read;
}
return 1;
}
~~~
這里有一個比較不錯的地方,每次當有數據發生改變的時候,Redis都會做一個計算校驗和的處理算法,表明了數據操作的改變動作,用的算法就是之前介紹過CRC64算法,針對RIO的buffer IO和File IO,Redis定義了2個RIO結構體:
~~~
/* 根據上面描述的方法,定義了BufferRio */
static const rio rioBufferIO = {
rioBufferRead,
rioBufferWrite,
rioBufferTell,
NULL, /* update_checksum */
0, /* current checksum */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};
/* 根據上面描述的方法,定義了FileRio */
static const rio rioFileIO = {
rioFileRead,
rioFileWrite,
rioFileTell,
NULL, /* update_checksum */
0, /* current checksum */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};
~~~
里面分別定義了相對應的讀寫方法,比如buffer的Read方法和File的Read方法:
~~~
/* Returns 1 or 0 for success/failure. */
/* 讀取rio中的buffer內容到傳入的參數 */
static size_t rioBufferRead(rio *r, void *buf, size_t len) {
if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len)
return 0; /* not enough buffer to return len bytes. */
memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len);
r->io.buffer.pos += len;
return 1;
}
~~~
~~~
/* Returns 1 or 0 for success/failure. */
/* 讀取rio中的fp文件內容 */
static size_t rioFileRead(rio *r, void *buf, size_t len) {
return fread(buf,len,1,r->io.file.fp);
}
~~~
作用的rio的對象變量不一樣,最后在Redis的聲明中給出了4種不同類型數據的寫入方法:
~~~
/* rio寫入不同類型數據方法,最終調用的是riowrite方法 */
size_t rioWriteBulkCount(rio *r, char prefix, int count);
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);
size_t rioWriteBulkLongLong(rio *r, long long l);
size_t rioWriteBulkDouble(rio *r, double d);
~~~
舉其中的一個方法實現:
~~~
/* Write multi bulk count in the format: "*<count>\r\n". */
/* rio寫入不同類型數據方法,調用的是riowrite方法 */
size_t rioWriteBulkCount(rio *r, char prefix, int count) {
char cbuf[128];
int clen;
cbuf[0] = prefix;
clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count);
cbuf[clen++] = '\r';
cbuf[clen++] = '\n';
if (rioWrite(r,cbuf,clen) == 0) return 0;
return clen;
}
~~~
調用的還是里面的rioWrite方法,根據你定義的是buffer IO還是File IO,.各自有各自不同的實現而已。在文件的write方法時,有一個細節,當你把內容讀入到rio.file.buffer時,buffer超過給定的同步最小字節,你得必須將buffer內容刷新到文件中了。
~~~
/* Returns 1 or 0 for success/failure. */
/* 將buf寫入rio中的file文件中 */
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
size_t retval;
retval = fwrite(buf,len,1,r->io.file.fp);
r->io.file.buffered += len;
if (r->io.file.autosync &&
r->io.file.buffered >= r->io.file.autosync)
{
//判讀是否需要同步
fflush(r->io.file.fp);
aof_fsync(fileno(r->io.file.fp));
r->io.file.buffered = 0;
}
return retval;
}
~~~
- 前言
- (一)--Redis結構解析
- (二)--結構體分析(1)
- (三)---dict哈希結構
- (四)-- sds字符串
- (五)--- sparkline微線圖
- (六)--- ziplist壓縮列表
- (七)--- zipmap壓縮圖
- (八)--- t_hash哈希轉換
- (九)--- t_list,t_string的分析
- (十)--- testhelp.h小型測試框架和redis-check-aof.c日志檢測
- (十一)--- memtest內存檢測
- (十二)--- redis-check-dump本地數據庫檢測
- (十三)--- redis-benchmark性能測試
- (十四)--- rdb.c本地數據庫操作
- (十五)--- aof-append only file解析
- (十六)--- config配置文件
- (十七)--- multi事務操作
- (十八)--- db.c內存數據庫操作
- (十九)--- replication主從數據復制的實現
- (二十)--- ae事件驅動
- (二十一)--- anet網絡通信的封裝
- (二十二)--- networking網絡協議傳輸
- (二十三)--- CRC循環冗余算法和RAND隨機數算法
- (二十四)--- tool工具類(2)
- (二十五)--- zmalloc內存分配實現
- (二十六)--- slowLog和hyperloglog
- (二十七)--- rio系統I/O的封裝
- (二十八)--- object創建和釋放redisObject對象
- (二十九)--- bio后臺I/O服務的實現
- (三十)--- pubsub發布訂閱模式
- (三十一)--- latency延遲分析處理
- (三十二)--- redis-cli.c客戶端命令行接口的實現(1)
- (三十三)--- redis-cli.c客戶端命令行接口的實現(2)
- (三十四)--- redis.h服務端的實現分析(1)
- (三十五)--- redis.c服務端的實現分析(2)
- (三十六)--- Redis中的11大優秀設計