? ? ? ?在Redis系統中也存在后臺服務的概念,background Service,后臺線程在Redis中的表現主要為background I/O Service,有了后臺線程的支持,系統在執行的效率上也勢必會有不一樣的提高。在Redis代碼中,描述了此功能的文件為bio.c,同樣借此機會學習一下,在C語言中的多線程編程到底是怎么一回事。我們先來看看,在Redis中的background job的工作形式;
~~~
/* Background I/O service for Redis.
*
* 后臺I/O服務
* This file implements operations that we need to perform in the background.
* Currently there is only a single operation, that is a background close(2)
* system call. This is needed as when the process is the last owner of a
* reference to a file closing it means unlinking it, and the deletion of the
* file is slow, blocking the server.
*
* In the future we'll either continue implementing new things we need or
* we'll switch to libeio. However there are probably long term uses for this
* file as we may want to put here Redis specific background tasks (for instance
* it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL
* implementation).
*
* DESIGN
* ------
*
* The design is trivial, we have a structure representing a job to perform
* and a different thread and job queue for every job type.
* Every thread wait for new jobs in its queue, and process every job
* sequentially.
*
* Jobs of the same type are guaranteed to be processed from the least
* recently inserted to the most recently inserted (older jobs processed
* first).
*
* Currently there is no way for the creator of the job to be notified about
* the completion of the operation, this will only be added when/if needed.
*
* 作者定義了一個結構體代表一個工作,每個線程等待從相應的job Type工作隊列中獲取一個job,每個job的排列的都按照時間
* 有序排列的
* ----------------------------------------------------------------------------
~~~
這里總共與2種Background I/O Type:
~~~
/* Background job opcodes */
/* 定義了2種后臺工作的類別 */
#define REDIS_BIO_CLOSE_FILE 0 /* Deferred close(2) syscall.文件的關閉 */
#define REDIS_BIO_AOF_FSYNC 1 /* Deferred AOF fsync.AOF文件的同步 */
/* BIO后臺操作類型總數為2個 */
#define REDIS_BIO_NUM_OPS 2
~~~
一個是AOF文件的同步操作,AOF就是“Append ONLY File”的縮寫,記錄每次的數據改變的寫操作,用于數據的恢復。還有一個我好像沒碰到過,CLOSE FILE,難道是異步關閉文件的意思。
~~~
static pthread_t bio_threads[REDIS_BIO_NUM_OPS]; /* 定義了bio線程組變量 */
static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS]; /* 線程相對應的mutex變量,用于同步操作 */
static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS];
static list *bio_jobs[REDIS_BIO_NUM_OPS]; /* 每種job類型都是一個列表 */
/* The following array is used to hold the number of pending jobs for every
* OP type. This allows us to export the bioPendingJobsOfType() API that is
* useful when the main thread wants to perform some operation that may involve
* objects shared with the background thread. The main thread will just wait
* that there are no longer jobs of this type to be executed before performing
* the sensible operation. This data is also useful for reporting. */
static unsigned long long bio_pending[REDIS_BIO_NUM_OPS]; /* 此類型job等待執行的數量 */
/* This structure represents a background Job. It is only used locally to this
* file as the API does not expose the internals at all. */
/* background Job結構體 */
struct bio_job {
//job創建的時間
time_t time; /* Time at which the job was created. */
/* Job specific arguments pointers. If we need to pass more than three
* arguments we can just pass a pointer to a structure or alike. */
/* job特定參數指針 */
void *arg1, *arg2, *arg3;
};
~~~
上面聲明了一些變量,包括bio_threads線程數組,總數2個,bio_jobs列表數組,存放每種Type的job。下面我們看主要的一些方法:
~~~
/* Exported API */
void bioInit(void); /* background I/O初始化操作 */
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3); /* 創建后臺job,通過傳入的3個參數初始化 */
unsigned long long bioPendingJobsOfType(int type); /* 返回type類型的job正在等待被執行的個數 */
void bioWaitPendingJobsLE(int type, unsigned long long num); /* 返回type類型的job正在等待被執行的個數 */
time_t bioOlderJobOfType(int type);
void bioKillThreads(void); /* 殺死后臺所有線程 */
~~~
首先看初始化操作;
~~~
/* Initialize the background system, spawning the thread. */
/* background I/O初始化操作 */
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
int j;
/* Initialization of state vars and objects */
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_condvar[j],NULL);
//創建每個job類型的List列表
bio_jobs[j] = listCreate();
bio_pending[j] = 0;
}
/* Set the stack size as by default it may be small in some system */
//設置線程棧空間
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&attr, stacksize);
/* Ready to spawn our threads. We use the single argument the thread
* function accepts in order to pass the job ID the thread is
* responsible of. */
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
//創建2個線程,專門運行相應類型的job
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
//賦值到相應的Thread中
bio_threads[j] = thread;
}
}
~~~
也就是說,執行完上述的操作之后,在bio_threads線程中就運行著2個線程,從各自的job列表中取出相應的等待執行的jo;
~~~
/* 創建后臺job,通過傳入的3個參數初始化 */
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
struct bio_job *job = zmalloc(sizeof(*job));
job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
pthread_mutex_lock(&bio_mutex[type]);
//加入相對應的job type列表
listAddNodeTail(bio_jobs[type],job);
//等待的job數量增加1
bio_pending[type]++;
pthread_cond_signal(&bio_condvar[type]);
pthread_mutex_unlock(&bio_mutex[type]);
}
~~~
簡潔的創建background job操作,上面利用了mutex變量實現了線程同步操作,保證線程安全。下面看一下最重要的執行background Job的操作實現(省略了部分代碼):
~~~
/* 執行后臺的job,參數內包含著哪種type */
void *bioProcessBackgroundJobs(void *arg) {
......
while(1) {
listNode *ln;
/* The loop always starts with the lock hold. */
if (listLength(bio_jobs[type]) == 0) {
pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);
continue;
}
/* Pop the job from the queue. */
//從工作列表中取出第一個job
ln = listFirst(bio_jobs[type]);
job = ln->value;
/* It is now possible to unlock the background system as we know have
* a stand alone job structure to process.*/
pthread_mutex_unlock(&bio_mutex[type]);
/* Process the job accordingly to its type. */
//執行具體的工作
if (type == REDIS_BIO_CLOSE_FILE) {
close((long)job->arg1);
} else if (type == REDIS_BIO_AOF_FSYNC) {
aof_fsync((long)job->arg1);
} else {
redisPanic("Wrong job type in bioProcessBackgroundJobs().");
}
zfree(job);
/* Lock again before reiterating the loop, if there are no longer
* jobs to process we'll block again in pthread_cond_wait(). */
pthread_mutex_lock(&bio_mutex[type]);
listDelNode(bio_jobs[type],ln);
bio_pending[type]--;
}
}
~~~
while循環,從隊列中取出一個,執行一個操作。當然,如果想馬上停止一切后臺線程,可以執行下面的方法,調用
pthread_cancel:
~~~
/* Kill the running bio threads in an unclean way. This function should be
* used only when it's critical to stop the threads for some reason.
* Currently Redis does this only on crash (for instance on SIGSEGV) in order
* to perform a fast memory check without other threads messing with memory. */
/* 殺死后臺所有線程 */
void bioKillThreads(void) {
int err, j;
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
//調用pthread_cancel方法kill當前的后臺線程
if (pthread_cancel(bio_threads[j]) == 0) {
if ((err = pthread_join(bio_threads[j],NULL)) != 0) {
redisLog(REDIS_WARNING,
"Bio thread for job type #%d can be joined: %s",
j, strerror(err));
} else {
redisLog(REDIS_WARNING,
"Bio thread for job type #%d terminated",j);
}
}
}
}
~~~
- 前言
- (一)--Redis結構解析
- (二)--結構體分析(1)
- (三)---dict哈希結構
- (四)-- sds字符串
- (五)--- sparkline微線圖
- (六)--- ziplist壓縮列表
- (七)--- zipmap壓縮圖
- (八)--- t_hash哈希轉換
- (九)--- t_list,t_string的分析
- (十)--- testhelp.h小型測試框架和redis-check-aof.c日志檢測
- (十一)--- memtest內存檢測
- (十二)--- redis-check-dump本地數據庫檢測
- (十三)--- redis-benchmark性能測試
- (十四)--- rdb.c本地數據庫操作
- (十五)--- aof-append only file解析
- (十六)--- config配置文件
- (十七)--- multi事務操作
- (十八)--- db.c內存數據庫操作
- (十九)--- replication主從數據復制的實現
- (二十)--- ae事件驅動
- (二十一)--- anet網絡通信的封裝
- (二十二)--- networking網絡協議傳輸
- (二十三)--- CRC循環冗余算法和RAND隨機數算法
- (二十四)--- tool工具類(2)
- (二十五)--- zmalloc內存分配實現
- (二十六)--- slowLog和hyperloglog
- (二十七)--- rio系統I/O的封裝
- (二十八)--- object創建和釋放redisObject對象
- (二十九)--- bio后臺I/O服務的實現
- (三十)--- pubsub發布訂閱模式
- (三十一)--- latency延遲分析處理
- (三十二)--- redis-cli.c客戶端命令行接口的實現(1)
- (三十三)--- redis-cli.c客戶端命令行接口的實現(2)
- (三十四)--- redis.h服務端的實現分析(1)
- (三十五)--- redis.c服務端的實現分析(2)
- (三十六)--- Redis中的11大優秀設計