<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                ? ? ? ? ?繼續學習redis源碼下的Data數據相關文件的代碼分析,今天我看的是一個叫aof的文件,這個字母是append ONLY file的簡稱,意味只進行追加文件操作。這里的文件追加記錄時為了記錄數據操作的改變記錄,用以異常情況的數據恢復的。類似于之前我說的redo,undo日志的作用。我們都知道,redis作為一個內存數據庫,數據的每次操作改變是先放在內存中,等到內存數據滿了,在刷新到磁盤文件中,達到持久化的目的。所以aof的操作模式,也是采用了這樣的方式。這里引入了一個block塊的概念,其實就是一個緩沖區塊。關于塊的一些定義如下: ~~~ /* AOF的下面的一些代碼都用到了一個簡單buffer緩存塊來進行存儲,存儲了數據的一些改變操作記錄,等到 緩沖中的達到一定的數據規模時,在持久化地寫入到一個文件中,redis采用的方式是append追加的形式,這意味 每次追加都要調整存儲的塊的大小,但是不可能會有無限大小的塊空間,所以redis在這里引入了塊列表的概念, 設定死一個塊的大小,超過單位塊大小,存入另一個塊中,這里定義每個塊的大小為10M. */ #define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10) /* 10 MB per block */ /* 標準的aof文件讀寫塊 */ typedef struct aofrwblock { //當前文件塊被使用了多少,空閑的大小 unsigned long used, free; //具體存儲內容,大小10M char buf[AOF_RW_BUF_BLOCK_SIZE]; } aofrwblock; ~~~ 也就是說,每個塊的大小默認為10M,這個大小說大不大,說小不小了,如果填入的數據超出長度了,系統會動態申請一個新的緩沖塊,在server端是通過一個塊鏈表的形式,組織整個塊的: ~~~ /* Append data to the AOF rewrite buffer, allocating new blocks if needed. */ /* 在緩沖區中追加數據,如果超出空間,會新申請一個緩沖塊 */ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { listNode *ln = listLast(server.aof_rewrite_buf_blocks); //定位到緩沖區的最后一塊,在最后一塊的位置上進行追加寫操作 aofrwblock *block = ln ? ln->value : NULL; while(len) { /* If we already got at least an allocated block, try appending * at least some piece into it. */ if (block) { //如果當前的緩沖塊的剩余空閑能支持len長度的內容時,直接寫入 unsigned long thislen = (block->free < len) ? block->free : len; if (thislen) { /* The current block is not already full. */ memcpy(block->buf+block->used, s, thislen); block->used += thislen; block->free -= thislen; s += thislen; len -= thislen; } } if (len) { /* First block to allocate, or need another block. */ int numblocks; //如果不夠的話,需要新創建,進行寫操作 block = zmalloc(sizeof(*block)); block->free = AOF_RW_BUF_BLOCK_SIZE; block->used = 0; //還要把緩沖塊追加到服務端的buffer列表中 listAddNodeTail(server.aof_rewrite_buf_blocks,block); /* Log every time we cross more 10 or 100 blocks, respectively * as a notice or warning. */ numblocks = listLength(server.aof_rewrite_buf_blocks); if (((numblocks+1) % 10) == 0) { int level = ((numblocks+1) % 100) == 0 ? REDIS_WARNING : REDIS_NOTICE; redisLog(level,"Background AOF buffer size: %lu MB", aofRewriteBufferSize()/(1024*1024)); } } } } ~~~ 當想要主動的將緩沖區中的數據刷新到持久化到磁盤中時,調用下面的方法: ~~~ /* Write the append only file buffer on disk. * * Since we are required to write the AOF before replying to the client, * and the only way the client socket can get a write is entering when the * the event loop, we accumulate all the AOF writes in a memory * buffer and write it on disk using this function just before entering * the event loop again. * * About the 'force' argument: * * When the fsync policy is set to 'everysec' we may delay the flush if there * is still an fsync() going on in the background thread, since for instance * on Linux write(2) will be blocked by the background fsync anyway. * When this happens we remember that there is some aof buffer to be * flushed ASAP, and will try to do that in the serverCron() function. * * However if force is set to 1 we'll write regardless of the background * fsync. */ #define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */ /* 刷新緩存區的內容到磁盤中 */ void flushAppendOnlyFile(int force) { ssize_t nwritten; int sync_in_progress = 0; mstime_t latency; if (sdslen(server.aof_buf) == 0) return; if (server.aof_fsync == AOF_FSYNC_EVERYSEC) sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0; if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { /* With this append fsync policy we do background fsyncing. * If the fsync is still in progress we can try to delay * the write for a couple of seconds. */ if (sync_in_progress) { if (server.aof_flush_postponed_start == 0) { /* No previous write postponinig, remember that we are * postponing the flush and return. */ server.aof_flush_postponed_start = server.unixtime; return; } else if (server.unixtime - server.aof_flush_postponed_start < 2) { /* We were already waiting for fsync to finish, but for less * than two seconds this is still ok. Postpone again. */ return; } /* Otherwise fall trough, and go write since we can't wait * over two seconds. */ server.aof_delayed_fsync++; redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis."); } } /* We want to perform a single write. This should be guaranteed atomic * at least if the filesystem we are writing is a real physical one. * While this will save us against the server being killed I don't think * there is much to do about the whole server stopping for power problems * or alike */ //在進行寫入操作的時候,還監聽了延遲 latencyStartMonitor(latency); nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); latencyEndMonitor(latency); /* We want to capture different events for delayed writes: * when the delay happens with a pending fsync, or with a saving child * active, and when the above two conditions are missing. * We also use an additional event name to save all samples which is * useful for graphing / monitoring purposes. */ if (sync_in_progress) { latencyAddSampleIfNeeded("aof-write-pending-fsync",latency); } else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) { latencyAddSampleIfNeeded("aof-write-active-child",latency); } else { latencyAddSampleIfNeeded("aof-write-alone",latency); } latencyAddSampleIfNeeded("aof-write",latency); /* We performed the write so reset the postponed flush sentinel to zero. */ server.aof_flush_postponed_start = 0; if (nwritten != (signed)sdslen(server.aof_buf)) { static time_t last_write_error_log = 0; int can_log = 0; /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */ if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) { can_log = 1; last_write_error_log = server.unixtime; } /* Lof the AOF write error and record the error code. */ if (nwritten == -1) { if (can_log) { redisLog(REDIS_WARNING,"Error writing to the AOF file: %s", strerror(errno)); server.aof_last_write_errno = errno; } } else { if (can_log) { redisLog(REDIS_WARNING,"Short write while writing to " "the AOF file: (nwritten=%lld, " "expected=%lld)", (long long)nwritten, (long long)sdslen(server.aof_buf)); } if (ftruncate(server.aof_fd, server.aof_current_size) == -1) { if (can_log) { redisLog(REDIS_WARNING, "Could not remove short write " "from the append-only file. Redis may refuse " "to load the AOF the next time it starts. " "ftruncate: %s", strerror(errno)); } } else { /* If the ftrunacate() succeeded we can set nwritten to * -1 since there is no longer partial data into the AOF. */ nwritten = -1; } server.aof_last_write_errno = ENOSPC; } /* Handle the AOF write error. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) { /* We can't recover when the fsync policy is ALWAYS since the * reply for the client is already in the output buffers, and we * have the contract with the user that on acknowledged write data * is synched on disk. */ redisLog(REDIS_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting..."); exit(1); } else { /* Recover from failed write leaving data into the buffer. However * set an error to stop accepting writes as long as the error * condition is not cleared. */ server.aof_last_write_status = REDIS_ERR; /* Trim the sds buffer if there was a partial write, and there * was no way to undo it with ftruncate(2). */ if (nwritten > 0) { server.aof_current_size += nwritten; sdsrange(server.aof_buf,nwritten,-1); } return; /* We'll try again on the next call... */ } } else { /* Successful write(2). If AOF was in error state, restore the * OK state and log the event. */ if (server.aof_last_write_status == REDIS_ERR) { redisLog(REDIS_WARNING, "AOF write error looks solved, Redis can write again."); server.aof_last_write_status = REDIS_OK; } } server.aof_current_size += nwritten; /* Re-use AOF buffer when it is small enough. The maximum comes from the * arena size of 4k minus some overhead (but is otherwise arbitrary). */ if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) { sdsclear(server.aof_buf); } else { sdsfree(server.aof_buf); server.aof_buf = sdsempty(); } /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are * children doing I/O in the background. */ if (server.aof_no_fsync_on_rewrite && (server.aof_child_pid != -1 || server.rdb_child_pid != -1)) return; /* Perform the fsync if needed. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) { /* aof_fsync is defined as fdatasync() for Linux in order to avoid * flushing metadata. */ latencyStartMonitor(latency); aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */ latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-fsync-always",latency); server.aof_last_fsync = server.unixtime; } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) { if (!sync_in_progress) aof_background_fsync(server.aof_fd); server.aof_last_fsync = server.unixtime; } } ~~~ 當然有操作會對數據庫中的所有數據,做操作記錄,便宜用此文件進行全盤恢復: ~~~ /* Write a sequence of commands able to fully rebuild the dataset into * "filename". Used both by REWRITEAOF and BGREWRITEAOF. * * In order to minimize the number of commands needed in the rewritten * log Redis uses variadic commands when possible, such as RPUSH, SADD * and ZADD. However at max REDIS_AOF_REWRITE_ITEMS_PER_CMD items per time * are inserted using a single command. */ /* 將數據庫的內容按照鍵值,再次完全重寫入文件中 */ int rewriteAppendOnlyFile(char *filename) { dictIterator *di = NULL; dictEntry *de; rio aof; FILE *fp; char tmpfile[256]; int j; long long now = mstime(); /* Note that we have to use a different temp name here compared to the * one used by rewriteAppendOnlyFileBackground() function. */ snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); fp = fopen(tmpfile,"w"); if (!fp) { redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno)); return REDIS_ERR; } rioInitWithFile(&aof,fp); if (server.aof_rewrite_incremental_fsync) rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES); for (j = 0; j < server.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; redisDb *db = server.db+j; dict *d = db->dict; if (dictSize(d) == 0) continue; di = dictGetSafeIterator(d); if (!di) { fclose(fp); return REDIS_ERR; } /* SELECT the new DB */ if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; if (rioWriteBulkLongLong(&aof,j) == 0) goto werr; /* Iterate this DB writing every entry */ //遍歷數據庫中的每條記錄,進行日志記錄 while((de = dictNext(di)) != NULL) { sds keystr; robj key, *o; long long expiretime; keystr = dictGetKey(de); o = dictGetVal(de); initStaticStringObject(key,keystr); expiretime = getExpire(db,&key); /* If this key is already expired skip it */ if (expiretime != -1 && expiretime < now) continue; /* Save the key and associated value */ if (o->type == REDIS_STRING) { /* Emit a SET command */ char cmd[]="*3\r\n$3\r\nSET\r\n"; if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; /* Key and value */ if (rioWriteBulkObject(&aof,&key) == 0) goto werr; if (rioWriteBulkObject(&aof,o) == 0) goto werr; } else if (o->type == REDIS_LIST) { if (rewriteListObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_SET) { if (rewriteSetObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_ZSET) { if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_HASH) { if (rewriteHashObject(&aof,&key,o) == 0) goto werr; } else { redisPanic("Unknown object type"); } /* Save the expire time */ if (expiretime != -1) { char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; if (rioWriteBulkObject(&aof,&key) == 0) goto werr; if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr; } } dictReleaseIterator(di); } /* Make sure data will not remain on the OS's output buffers */ if (fflush(fp) == EOF) goto werr; if (fsync(fileno(fp)) == -1) goto werr; if (fclose(fp) == EOF) goto werr; /* Use RENAME to make sure the DB file is changed atomically only * if the generate DB file is ok. */ if (rename(tmpfile,filename) == -1) { redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno)); unlink(tmpfile); return REDIS_ERR; } redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed"); return REDIS_OK; werr: fclose(fp); unlink(tmpfile); redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno)); if (di) dictReleaseIterator(di); return REDIS_ERR; } ~~~ 系統同樣開放了后臺的此方法操作: ~~~ /* This is how rewriting of the append only file in background works: * * 1) The user calls BGREWRITEAOF * 2) Redis calls this function, that forks(): * 2a) the child rewrite the append only file in a temp file. * 2b) the parent accumulates differences in server.aof_rewrite_buf. * 3) When the child finished '2a' exists. * 4) The parent will trap the exit code, if it's OK, will append the * data accumulated into server.aof_rewrite_buf into the temp file, and * finally will rename(2) the temp file in the actual file name. * The the new file is reopened as the new append only file. Profit! */ /* 后臺進行AOF數據文件寫入操作 */ int rewriteAppendOnlyFileBackground(void) ~~~ 原理就是和昨天分析的一樣,用的是fork(),創建子線程,最后開放出API: ~~~ /* aof.c 中的API */ void aofRewriteBufferReset(void) /* 釋放server中舊的buffer,并創建一份新的buffer */ unsigned long aofRewriteBufferSize(void) /* 返回當前AOF的buffer的總大小 */ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) /* 在緩沖區中追加數據,如果超出空間,會新申請一個緩沖塊 */ ssize_t aofRewriteBufferWrite(int fd) /* 將保存內存中的buffer內容寫入到文件中,也是分塊分塊的寫入 */ void aof_background_fsync(int fd) /* 開啟后臺線程進行文件同步操作 */ void stopAppendOnly(void) /* 停止追加數據操作,這里用的是一個命令模式 */ int startAppendOnly(void) /* 開啟追加模式 */ void flushAppendOnlyFile(int force) /* 刷新緩存區的內容到磁盤中 */ sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) /* 根據輸入的字符串,進行參數包裝,再次輸出 */ sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, robj *seconds) /* 將過期等的命令都轉化為PEXPIREAT命令,把時間轉化為了絕對時間 */ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) /* 根據cmd的不同操作,進行命令的不同轉化 */ struct redisClient *createFakeClient(void) /* 命令總是被客戶端所執行的,因此要引入客戶端的方法 */ void freeFakeClientArgv(struct redisClient *c) /* 釋放客戶端參數操作 */ void freeFakeClient(struct redisClient *c) /* 釋放客戶端參數操作 */ int loadAppendOnlyFile(char *filename) /* 加載AOF文件內容 */ int rioWriteBulkObject(rio *r, robj *obj) /* 寫入bulk對象,分為LongLong對象,和普通的String對象 */ int rewriteListObject(rio *r, robj *key, robj *o) /* 寫入List列表對象,分為ZIPLIST壓縮列表和LINEDLIST普通鏈表操作 */ int rewriteSetObject(rio *r, robj *key, robj *o) /* 寫入set對象數據 */ int rewriteSortedSetObject(rio *r, robj *key, robj *o) /* 寫入排序好的set對象 */ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) /* 寫入哈希迭代器當前指向的對象 */ int rewriteHashObject(rio *r, robj *key, robj *o) /* 寫入哈希字典對象 */ int rewriteAppendOnlyFile(char *filename) /* 將數據庫的內容按照鍵值,再次完全重寫入文件中 */ int rewriteAppendOnlyFileBackground(void) /* 后臺進行AOF數據文件寫入操作 */ void bgrewriteaofCommand(redisClient *c) /* 后臺寫AOF文件操作命令模式 */ void aofRemoveTempFile(pid_t childpid) /* 移除某次子線程ID為childpid所生產的aof文件 */ void aofUpdateCurrentSize(void) /* 更新當前aof文件的大小 */ void backgroundRewriteDoneHandler(int exitcode, int bysignal) /* 后臺子線程寫操作完成后的回調方法 */ ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看