写在文章开头
最近遇到很多烦心事,希望通过技术来得以放松,今天这篇文章笔者希望会通过源码的方式分析一下AOF
如何通过Linux
父子进程管道通信的方式保证进行AOF异步重写
时还能实时接收用户处理的指令生成的AOF字符串,从而保证尽可能的可靠性。
Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili 。
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
详解AOF管道通信的设计
Linux管道通信进程
在进程AOF
重写时,redis
会fork
出一个子进程,让子进程进行异步重写机制,避免AOF
文件重写的耗时导致redis
执行性能下降。由此也诞生了另外一个问题,AOF
子进程异步重写期间,用户最新发送的指令能否被AOF
子进程接收并持久化到文件中。
对此redis
借助Linux
管道通信的方式实现,通过管道通信的方式实现实时数据发送,对应子进程收到这些指令对应的字符串之后,就会将其写入AOF
重写文件。
需要注意的是Linux
管道通信通常都是单向的,即收发通道需要交由两个数组空间才能实现,例如父进程写入客户端实时指令到通道只能通过数组0
空间完成发送,而客户端也只能通过数组1
空间完成数组接收。同理要实现通道上客户端向服务端写数据和服务端读取数据就需要在新建相同的2长度的数组了。
我们给出创建AOF
子进程的核心代码,即位于aof.c
的rewriteAppendOnlyFileBackground
,可以看到在创建子进程之前,redis
会通过aofCreatePipes
函数创建管道为后续的重写子进程以及父进程提供条件:
int rewriteAppendOnlyFileBackground(void) {pid_t childpid;long long start;if (server.aof_child_pid != -1) return REDIS_ERR;if (aofCreatePipes() != REDIS_OK) return REDIS_ERR;//创建管道start = ustime();if ((childpid = fork()) == 0) {//fork子进程进行aof重写char tmpfile[256];//......//生成一个tmp文件snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {//重写aofsize_t private_dirty = zmalloc_get_private_dirty();//......exitFromChild(0);} else {exitFromChild(1);}} else {//......}return REDIS_OK;
}
步入aofCreatePipes
我们就可以看到笔者上文所介绍的管道pipes
的创建逻辑,可以看到其内部初始化一个长度为6的数组空间,两两构成一个逻辑上的通道,按序通道依次是:
- 父进程写数据到子进程的收发通道。
- 子进程向父进程发送确保
ACK
信号的通道。 - 父进程向子进程发送
ACK
确认信号的通道。
对应的我们给出创建管道的核心代码即位于aof.c
的aofCreatePipes
,可以看到其通道本质就是通过创建一个长度为6的数组fds
,按照笔者上文所说构成父进程发、子进程确认、父进程确认的通道,这其中父进程会调用anetNonBlock
方法将该通道设置为写入时非阻塞以保证主进程写入最新数据时不会阻塞整个流程:
int aofCreatePipes(void) {//创建3个管道int fds[6] = {-1, -1, -1, -1, -1, -1};int j;if (pipe(fds) == -1) goto error; /* parent -> children data. */if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */if (pipe(fds+4) == -1) goto error; /* children -> parent ack. *//* Parent -> children data is non blocking. *///父进程写到子进程的管道设置为非阻塞if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;//设置读事件监听if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;//将管道复制给各个成员遍历//主进程向子进程读写数据的通道server.aof_pipe_write_data_to_child = fds[1];server.aof_pipe_read_data_from_parent = fds[0];//子进程向父进程发送ack的通道server.aof_pipe_write_ack_to_parent = fds[3];server.aof_pipe_read_ack_from_child = fds[2];//父进程向子进程发送ack通道的server.aof_pipe_write_ack_to_child = fds[5];server.aof_pipe_read_ack_from_parent = fds[4];server.aof_stop_sending_diff = 0;return REDIS_OK;error:redisLog(REDIS_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",strerror(errno));for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);return REDIS_ERR;
}
AOF重写如何接收父进程数据
后续的父进程一旦收到客户端实时传入的指令例如set k v
之后,其核心流程就会传播该事件到AOF链路
上,将用户指令的字符串转为RESP
格式(redis
协议要求的格式)写入到父进程发送数据到子进程即第一个通道上,后续的子进程就会通过该通道的索引1数组获取这个最新的数据:
当服务端接收到客户端指令后就会执行call
方法执行解析并执行客户端指令,然后通过propagate
方法将客户端指令传播到AOF函数
上并写入到通道中:
void call(redisClient *c, int flags) {//......//基于命令者模式执行客户端传入的指令c->cmd->proc(c);//......//将指令传播到aof链路if (flags & REDIS_CALL_PROPAGATE) {int flags = REDIS_PROPAGATE_NONE;//......if (flags != REDIS_PROPAGATE_NONE)//将指令cmd和键值对argv传入交由aof事件执行propagate(c->cmd,c->db->id,c->argv,c->argc,flags);}//......
}
我们步入propagate
即可看到其内部发现如果AOF
非关闭状态且允许传播事件,则调用feedAppendOnlyFile
追加客户端指令和键值对到通道中:
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,int flags)
{//如果aof非关闭且允许传播aof事件则调用feedAppendOnlyFileif (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)feedAppendOnlyFile(cmd,dbid,argv,argc);//......
}
再次步入feedAppendOnlyFile
就可以看到redis
解析指令生成RESP
字符串写入aof缓冲区之后再调用aofRewriteBufferAppend
注册一个将缓冲区数据写入通道中的事件:
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {sds buf = sdsempty();robj *tmpargv[3];//......//基于当前数据库生成select指令字符串if (dictid != server.aof_selected_db) {char seldb[64];snprintf(seldb,sizeof(seldb),"%d",dictid);buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",(unsigned long)strlen(seldb),seldb);server.aof_selected_db = dictid;}//基于命令和参数生成命令的字符串if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||cmd->proc == expireatCommand) {/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {/* Translate SETEX/PSETEX to SET and PEXPIREAT */tmpargv[0] = createStringObject("SET",3);tmpargv[1] = argv[1];tmpargv[2] = argv[3];buf = catAppendOnlyGenericCommand(buf,3,tmpargv);decrRefCount(tmpargv[0]);buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);} else {/* All the other commands don't need translation or need the* same translation already operated in the command vector* for the replication itself. 生成字符串 */buf = catAppendOnlyGenericCommand(buf,argc,argv);}//如果开启aof则将buf写入aof_bufif (server.aof_state == REDIS_AOF_ON)server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));if (server.aof_child_pid != -1)//如果在进行aof重写将解析后指令的数据写入缓冲区aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));sdsfree(buf);
}
最终我们可以看到aofRewriteBufferAppend
函数可以看到该方法会将上一步写入aof缓冲区的数据写入到10M的数据块,再判断当前aof_pipe_write_data_to_child
是否为0(默认为-1,0说明没有任何事件,可以写入数据)则注册一个aofChildWriteDiffData
方法将这些数据写入到通道中:
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {listNode *ln = listLast(server.aof_rewrite_buf_blocks);aofrwblock *block = ln ? ln->value : NULL;while(len) {/* If we already got at least an allocated block, try appending* at least some piece into it. */if (block) {unsigned long thislen = (block->free < len) ? block->free : len;if (thislen) { /* The current block is not already full. *///将数据追加到aof_rewrite_buf_blocks中一个10M的数据块memcpy(block->buf+block->used, s, thislen);block->used += thislen;block->free -= thislen;s += thislen;len -= thislen;}}//......//查看aof_pipe_write_data_to_child是否有事件if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {//注册一个写事件调用aofChildWriteDiffData写入缓冲区aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,AE_WRITABLE, aofChildWriteDiffData, NULL);}
}
最后redis
定时任务即定时的时间时间会轮询到注册的事件aofChildWriteDiffData
,将数据块的数据取出并写入到aof_pipe_write_data_to_child
所指向的即父进程写数据到子进程的数组中:
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {//......while(1) {//取出数据块ln = listFirst(server.aof_rewrite_buf_blocks);block = ln ? ln->value : NULL;if (server.aof_stop_sending_diff || !block) {aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,AE_WRITABLE);return;}if (block->used > 0) {//将数据写入1通道传给子进程nwritten = write(server.aof_pipe_write_data_to_child,block->buf,block->used);if (nwritten <= 0) return;memmove(block->buf,block->buf+nwritten,block->used-nwritten);block->used -= nwritten;}if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);}
}
子进程如何保证可靠接收
后续的AOF
重写的异步子进程会调用rewriteAppendOnlyFile
遍历数据库键值完成重写之后,等到通道数据并完成写入后,双方各自发送确认ACK
之后,再次将父进程写入通道的数据持久化到文件后,将数据刷盘:
int rewriteAppendOnlyFile(char *filename) {dictIterator *di = NULL;dictEntry *de;rio aof;FILE *fp;char tmpfile[256];int j;long long now = mstime();char byte;size_t processed = 0;/* Note that we have to use a different temp name here compared to the* one used by rewriteAppendOnlyFileBackground() function. */snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());fp = fopen(tmpfile,"w");if (!fp) {redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));return REDIS_ERR;}server.aof_child_diff = sdsempty();rioInitWithFile(&aof,fp);if (server.aof_rewrite_incremental_fsync)rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);for (j = 0; j < server.dbnum; j++) {//根据遍历结果获得当前库char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";redisDb *db = server.db+j;dict *d = db->dict;if (dictSize(d) == 0) continue;//获取库的字典迭代器di = dictGetSafeIterator(d);if (!di) {fclose(fp);return REDIS_ERR;}/* SELECT the new DB *///写入切库指令if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;/* Iterate this DB writing every entry *///遍历库while((de = dictNext(di)) != NULL) {sds keystr;robj key, *o;long long expiretime;//获取键值对keystr = dictGetKey(de);o = dictGetVal(de);initStaticStringObject(key,keystr);expiretime = getExpire(db,&key);/* If this key is already expired skip it */if (expiretime != -1 && expiretime < now) continue;/* Save the key and associated value */if (o->type == REDIS_STRING) {//如果value是字符串则记录set指令/* Emit a SET command */char cmd[]="*3\r\n$3\r\nSET\r\n";if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;/* Key and value */if (rioWriteBulkObject(&aof,&key) == 0) goto werr;if (rioWriteBulkObject(&aof,o) == 0) goto werr;} else if (o->type == REDIS_LIST) {//如果是list则用RPUSH插入到尾部if (rewriteListObject(&aof,&key,o) == 0) goto werr;} else if (o->type == REDIS_SET) {//调用SADD遍历并存储if (rewriteSetObject(&aof,&key,o) == 0) goto werr;} else if (o->type == REDIS_ZSET) {//调用ZADD进行遍历重写if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;} else if (o->type == REDIS_HASH) {//调用HMSET进行重写if (rewriteHashObject(&aof,&key,o) == 0) goto werr;} else {redisPanic("Unknown object type");}/* Save the expire time */if (expiretime != -1) {char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;if (rioWriteBulkObject(&aof,&key) == 0) goto werr;if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;}/* Read some diff from the parent process from time to time. */if (aof.processed_bytes > processed+1024*10) {processed = aof.processed_bytes;aofReadDiffFromParent();}}dictReleaseIterator(di);di = NULL;}//刷盘if (fflush(fp) == EOF) goto werr;if (fsync(fileno(fp)) == -1) goto werr;//......//等待父进程写入通道数据到来int nodata = 0;mstime_t start = mstime();while(mstime()-start < 1000 && nodata < 20) {//等待数据到来if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0){nodata++;continue;}nodata = 0; //从通道拿数据写入文件中aofReadDiffFromParent();}/* Ask the master to stop sending diffs. *///通过通道发送!,告知主进程停止发送新信号进行重写if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)goto werr;//收到parent确认信号后,确认收到后进行后续的最后数据写入和刷盘if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||byte != '!') goto werr;redisLog(REDIS_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");//再一次通道中拿到父进程的数据aofReadDiffFromParent();//......//刷盘,将文件数据持久化到硬盘中/* Make sure data will not remain on the OS's output buffers */if (fflush(fp) == EOF) goto werr;if (fsync(fileno(fp)) == -1) goto werr;if (fclose(fp) == EOF) goto werr;//......
}
最后我们给出aofReadDiffFromParent
方法,可以看到AOF
重写子进程本质就是通过read
方法获取aof_pipe_read_data_from_parent
数组中父进程写入的数据到aof
缓冲区buf中,最后回到外层函数完成数据写入,由此完成一次完整的可靠AOF
重写:
//AOF重写时调用这个函数
ssize_t aofReadDiffFromParent(void) {char buf[65536]; /* Default pipe buffer size on most Linux systems. */ssize_t nread, total = 0;//读取数据到buf然后写入到aof_child_diffwhile ((nread =read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);total += nread;}return total;
}
小结
自此我们通过三篇文章完整的介绍了AOF写入
和重写
的完整的流程,希望对你有帮助。
我是 sharkchili ,CSDN Java 领域博客专家,开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。