本文基于社区版redis 4.0.8
1、命令解析
redis接收到的命令请求首先存储在客户端对象的querybuf输入缓冲区,然后解析命令请求的各个参数,并存储在客户端对象的argv和argc字段。
客户端解析命令请求的入口函数为readqueryfromclient,会读取socket数据存储到客户端对象的输入缓冲区,并调用函数processinputbuffer解析命令请求。
注:内联命令:使用telnet会话输入命令的方式
void processinputbuffer(client *c) { ...... //循环遍历输入缓冲区,获取命令参数,调用processmultibulkbuffer解析命令参数和长度 while(sdslen(c->querybuf)) { if (c->reqtype == proto_req_inline) { if (processinlinebuffer(c) != c_ok) break;//处理telnet方式的内联命令 } else if (c->reqtype == proto_req_multibulk) { if (processmultibulkbuffer(c) != c_ok) break; //解析命令参数和长度暂存到客户端结构体中 } else { serverpanic("unknown request type"); } } } //解析命令参数和长度暂存到客户端结构体中 int processmultibulkbuffer(client *c) { //定位到行尾 newline = strchr(c->querybuf,'\r'); //解析命令请求参数数目,并存储在客户端对象的c->multibulklen字段 serverassertwithinfo(c,null,c->querybuf[0] == '*'); ok = string2ll(c->querybuf 1,newline-(c->querybuf 1),&ll); c->multibulklen = ll; pos = (newline-c->querybuf) 2;//记录已解析命令的请求长度resp的长度 /* setup argv array on client structure */ //分配请求参数存储空间 c->argv = zmalloc(sizeof(robj*)*c->multibulklen); // 开始循环解析每个请求参数 while(c->multibulklen) { ...... newline = strchr(c->querybuf pos,'\r'); if (c->querybuf[pos] != '$') { return c_err; ok = string2ll(c->querybuf pos 1,newline-(c->querybuf pos 1),&ll); pos = newline-(c->querybuf pos) 2; c->bulklen = ll;//字符串参数长度暂存在客户端对象的bulklen字段 //读取该长度的参数内容,并创建字符串对象,同时更新待解析参数multibulklen c->argv[c->argc ] =createstringobject(c->querybuf pos,c->bulklen); pos = c->bulklen 2; c->multibulklen--; }
2、命令调用
当multibulklen的值更新为0时,表示参数解析完成,开始调用processcommand来处理命令,处理命令前有很多校验逻辑,如下:
void processinputbuffer(client *c) { ...... //调用processcommand来处理命令 if (processcommand(c) == c_ok) { ...... } } //处理命令函数 int processcommand(client *c) { //校验是否是quit命令 if (!strcasecmp(c->argv[0]->ptr,"quit")) { addreply(c,shared.ok); c->flags |= client_close_after_reply; return c_err; } //调用lookupcommand,查看该命令是否存在 c->cmd = c->lastcmd = lookupcommand(c->argv[0]->ptr); if (!c->cmd) { flagtransaction(c); addreplyerrorformat(c,"unknown command '%s'", (char*)c->argv[0]->ptr); return c_ok; //检查用户权限 if (server.requirepass && !c->authenticated && c->cmd->proc != authcommand) { addreply(c,shared.noautherr); //还有很多检查,不一一列举,比如集群/持久化/复制等 /* 真正执行命令 */ if (c->flags & client_multi && c->cmd->proc != execcommand && c->cmd->proc != discardcommand && c->cmd->proc != multicommand && c->cmd->proc != watchcommand) queuemulticommand(c); //将结果写入outbuffer addreply(c,shared.queued); } // 调用execcommand执行命令 void execcommand(client *c) { call(c,cmd_call_full);//调用call执行命令 //调用execcommand调用call执行命令 void call(client *c, int flags) { start = ustime(); c->cmd->proc(c);//执行命令 duration = ustime()-start; //如果是慢查询,记录慢查询 if (flags & cmd_call_slowlog && c->cmd->proc != execcommand) { char *latency_event = (c->cmd->flags & cmd_fast) ? "fast-command" : "command"; latencyaddsampleifneeded(latency_event,duration/1000); //记录到慢日志中 slowlogpushentryifneeded(c,c->argv,c->argc,duration); //更新统计信息:当前命令执行时间和调用次数 if (flags & cmd_call_stats) { c->lastcmd->microseconds = duration; c->lastcmd->calls ;
3、返回结果
redis返回结果并不是直接返回给客户端,而是先写入到输出缓冲区(buf字段)或者输出链表(reply字段)
int processcommand(client *c) { ...... //将结果写入outbuffer addreply(c,shared.queued); ...... } //将结果写入outbuffer void addreply(client *c, robj *obj) { //调用listaddnodehead将客户端添加到服务端结构体的client_pending_write链表,以便后续能快速查找出哪些客户端有数据需要发送 if (prepareclienttowrite(c) != c_ok) return; //然后添加字符串到输出缓冲区 if (_addreplytobuffer(c,obj->ptr,sdslen(obj->ptr)) != c_ok) //如果添加失败,则添加到输出链表中 _addreplyobjecttolist(c,obj); }
addreply函数只是将待发送给客户端的数据暂存在输出链表或者输出缓冲区,那么什么时候将这些数据发送给客户端呢?答案是开启事件循环时,调用的beforesleep函数,该函数专门执行一些不是很费时的操作,如过期键删除,向客户端返回命令回复等
void beforesleep(struct aeeventloop *eventloop) { ...... /* handle writes with pending output buffers. */ handleclientswithpendingwrites(); } //回复客户端命令函数 int handleclientswithpendingwrites(void) { listiter li; listnode *ln; int processed = listlength(server.clients_pending_write); listrewind(server.clients_pending_write,&li); while((ln = listnext(&li))) { client *c = listnodevalue(ln); c->flags &= ~client_pending_write; listdelnode(server.clients_pending_write,ln); /* 发送客户端数据 */ if (writetoclient(c->fd,c,0) == c_err) continue; /* if there is nothing left, do nothing. otherwise install * the write handler. */ //如果数据量很大,一次性没有发送完成,则进行添加文件事件,监听当前客户端socket文件描述符的可写事件即可 if (clienthaspendingreplies(c) && aecreatefileevent(server.el, c->fd, ae_writable, sendreplytoclient, c) == ae_err) { freeclientasync(c); } } return processed;
以上就是redis命令处理过程实例源码分析的详细内容。