喜悦国际村 专业PHP开发者社区's Archiver

奶瓶 发表于 2010-3-8 12:37 AM

《大型》(四)——我的连接我做主

[i=s] 本帖最后由 奶瓶 于 2010-3-8 10:20 PM 编辑 [/i]

在这个特殊的时刻——女生->女人的关键时刻,我发表了跳票两年的《大型》系列第四篇,多谢大家支持。
                                                                                                          —— NP博士

又是两年多过去了,我很有“暴雪”的风范……

期间经历了很多事情,有好的也有坏的,感谢还有人一如既往地等待我不断地跳票,希望这次能给大家带来一些新鲜的东西。

之前三篇的主要对象已经定位在“Web开发”人员中,每天与HTTP协议为伍的程序员们。其中有好多人从一开始就是直接“与HTTP亲密接触”的。我经常从他们的谈论、发帖的问题中看出,这些人对网络通讯的认知,被每天接触的HTTP曲解了,他们一直这样认为网络通讯就是“请求->反馈->断开->下次请求”,并且对聊天室、网络游戏这种“长”连接感觉非常神秘。

这次希望可以给朋友们一块真正了解网络通讯的敲门砖,memcached不再神秘,“长”连接不比“短”连接“长”在哪里,聊天室原来这么简单,epoll原来比fopen还容易理解……我的连接我做主。

以下有一些文字资料,是直接copy来的。多谢各位大师。

[b]@网络是这样通讯的[/b]
我们熟知的OSI七层网络模型(不熟知的自己去google……),在现实中有各种实现方式。我们经常谈论的TCP /UDP协议就是传输层中的两种协议实现,流行操作系统都对TCP /UDP有完整的支持,UNIX系统还支持本地socket方式。那么网络实际是个什么东西呢?

[b]×网络是进程间通讯的一种方式[/b]
这样理解计算机网络。实际网络的实现目的和IPC、共享内存、信号这些没有什么区别,都是为了提供两个进程之间的信息交流,所不同的是,其它方式都是只能在一台计算机上实现,而网络支持不同计算机上的两个进程间的通讯,它的适应性更广,可以实现的功能更多。

计算机是多种多样的,操作系统更是五花八门。所以为了实现在不同计算机之间的通讯,就需要一个被所有操作系统支持的桥梁,这就是套接字(socket)。其实socket也不是什么神秘的东西,按照*NIX系统中“一切都可以映射成文件”的理念,socket只是两个端点都是文件描述符的一条管道。

socket是成对出现的,一端位于服务器上,一端位于客户端上。应用程序要使用网络连接,就需要首先生成套接字。一个成功连接的socket实现是一个五元组,它是由两个具有相同协议族的本地三元组组成,三元包括协议族、网络地址、传输层端口。这样,网络上每一个通讯实体都是这样一个三元组。网络通讯中的套接字分为多种类型,如SOCK_STREAM、SOCK_DGRAM等。SOCK_STREAM的实现协议是TCP,这是一种面向连接的协议,双方通过三次握手建立“可靠”的连接,而SOCK_DGRAM的实现协议是UDP,这是一种无连接通讯方式,使用数据包交互,协议本身无法保证传输可靠。UDP的实现比TCP更底层,某些应用场合中,效率也要高过TCP。

更多详细内容,请参考计算机网络相关资料,广告之后更精彩……

[b]×HTTP是什么[/b]
HTTP属于应用层协议了,它是依靠TCP的可靠连接实现的(大家注意:HTTP、FTP、RTSP、SMTP这些应用层协议和TCP / UDP不是同一个层级上的东西。应用层协议制定了数据传输的格式标准,数据的各段含义,可以说它们是面向内容的,并不关心是怎样从一台计算机传输到另一台计算机的,而传输层协议只是关心怎样把数据从一端搬到另一端,并不理会数据的具体含义。)。

HTTP是一个客户端和服务器端请求和应答的标准(TCP)。客户端是终端用户,服务器端是网站。通过使用Web浏览器、网络爬虫或者其它的工具,客户端发起一个到服务器上指定端口(默认端口为 80)的HTTP请求。(我们称这个客户端)叫用户代理(useragent)。应答的服务器上存储着(一些)资源,比如HTML文件和图像。(我们称)这个应答服务器为源服务器(originserver)。在用户代理和源服务器中间可能存在多个中间层,比如代理,网关,或者隧道(tunnels)。尽管TCP/IP协议是互联网上最流行的应用,HTTP协议并没有规定必须使用它和(基于)它支持的层。事实上,HTTP可以在任何其他互联网协议上,或者在其他网络上实现。HTTP只假定(其下层协议提供)可靠的传输,任何能够提供这种保证的协议都可以被其使用。

HTTP消息头和实体中的各部分定义,有一端时间经验的web程序员都再熟悉不过了,不再多说。它的核心思想就是客户端发送了一些本地数据和一条“请求”去申请服务器返回相应内容,服务器收到请求之后,返回一个描述数据实体的头,然后就是客户端所需的数据。

[b]@HTTP是“短”的?[/b]
好多程序员都把非HTTP的那些可以提供持续服务的协议叫做“长”连接,那么HTTP是不是“短”的?

其实“长”和“短”与协议本身无关,HTTP被认为是“短”的只是因为本身规定服务器将请求结果完全反馈给客户端之后,这条连接就结束了。如果客户端还需要请求其它数据,需要再次建立一次连接。

这种特性决定了HTTP是客户端单向触发的,服务器不会主动通知客户端消息。因为HTTP本身是一种匿名协议,服务器端不需要获取和存储客户端信息,不同的客户端之间也不需要有什么交流,服务器只承担解析请求,反馈结果,并不负责客户端之间的消息转发。

那么为了做到“实时”效果,人们想出了很多基于HTTP实现的方法,也就是各种“伪push”。

早期的聊天室使用的就是“长”的HTTP连接。由于HTTP协议在传输结束之后就会断开连接,所以就人为地让它不结束,就可以保持连接一直有效,服务器就可以把消息源源不断地推给客户端。其实这是一种真正的“推”,只不过它的资源消耗是巨大的。还记得10年前的IDC都明令禁止“安装聊天室、江湖类程序”么?由于HTTP本身的设计并不是为了实现这种服务的,所以当连接数大到一定程度时,服务器端就会不堪重负。后来的Ajax技术催生了异步刷新技术,一段时间去请求一次服务器,服务器把“累计”的数据发送回来。再后来两种技术结合在一起,成了半长半刷新的comet。Web版的GoogleTalk就是这种技术的最好体现。

那么当应用规模更大了怎么办,好多web程序员就手麻了,因为在他们的印象中,网络就应该是这样的。其实,应该把视野放得更开,“长”连接并不神奇,重要的是——

[b]×忘掉自己是个web程序员[/b]
HTTP只是应用层协议,当我们的应用并不是绝对依赖于HTTP本身时,使用什么样的应用协议并不重要,我们完全可以自己发明一种协议。例如memcached的连接协议,就是简单地发送一些GET / ADD一类的明文消息(新版本的memcached支持了二进制命令),完全不需要使用HTTP复杂的冗长的消息结构。

[b]×网站也需要“长”连接[/b]
“聊天”就是一种最直接的需求。WebIM的各种实现方式也见证了“长”连接技术的发展路线。在大型网络架构中,我们需要更多的“直接”通讯,比如web服务器到缓存服务器、多服务器之间的消息同步、连接池到数据库服务器,当规模越来越大,节点越来越多,直到神神秘秘的“云”技术中,越来越多的数据传输依赖于持久稳定的网络连接,而不是全部依靠HTTP。

[b]@Echo Server[/b]
接触过一些简单的socket编程的人,会比每天asp、php的程序员对socket有更深入的理解。echoserver几乎是所有socket入门例程中必不可少的东西,它实现了一个服务器和一个客户端,客户端连接到服务器上,向服务器上发送一些数据,服务器原样地返回,这是最简单的一种C/S应用。其实各种应用也都是简单模型的升级和堆叠。返回原数据是没有什么意义的,但是经过服务器的计算,不就可以返回有用的数据了吗。比如DNS服务,客户端连接到DNS服务器上,发送数据(域名)到服务器,服务器在自身的存储介质中寻找这个域名的信息,如果找到了就对客户端返回,如果没找到就返回错误信息。

但是这些例程千篇一律地是最初级的TCP连接:
[list][*]服务器:socket->bind->listen->accept[*]客户端:socket->connect[/list]然后就是read / write了。虽然简单,我们也可以从中得出一些信息:套接字的表现就是文件描述符,这与*NIX操作系统的理念吻合。当然,socket就是BSD提出的。

我们所看到的大约就是这样的程序:
[code]
/*

ECHOSERV.C
==========
(c) Paul Griffiths, 1999
Email: mail@paulgriffiths.net

Simple TCP/IP echo server.

*/

#include <sys/socket.h>       /*  socket definitions        */
#include <sys/types.h>        /*  socket types              */
#include <arpa/inet.h>        /*  inet (3) funtions         */
#include <unistd.h>           /*  misc. UNIX functions      */

#include "helper.h"           /*  our own helper functions  */

#include <stdlib.h>
#include <stdio.h>

/*  Global constants  */

#define ECHO_PORT          (2002)
#define MAX_LINE           (1000)

int main(int argc, char *argv[])
{
    int       list_s;                /*  listening socket          */
    int       conn_s;                /*  connection socket         */
    short int port;                  /*  port number               */
    struct    sockaddr_in servaddr;  /*  socket address structure  */
    char      buffer[MAX_LINE];      /*  character buffer          */
    char     *endptr;                /*  for strtol()              */

    /*  Get port number from the command line, and
    set to default port if no arguments were supplied  */

    if ( argc == 2 ) {
        port = strtol(argv[1], &endptr, 0);
        if ( *endptr ) {
            fprintf(stderr, "ECHOSERV: Invalid port number.\n");
            exit(EXIT_FAILURE);
        }
    }
    else if ( argc < 2 ) {
        port = ECHO_PORT;
    }
    else {
        fprintf(stderr, "ECHOSERV: Invalid arguments.\n");
        exit(EXIT_FAILURE);
    }

    /*  Create the listening socket  */

    if ( (list_s = socket(AF_INET, SOCK_STREAM, 0)) < 0 ) {
        fprintf(stderr, "ECHOSERV: Error creating listening socket.\n");
        exit(EXIT_FAILURE);
    }

    /*  Set all bytes in socket address structure to
    zero, and fill in the relevant data members   */

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family      = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port        = htons(port);

    /*  Bind our socket addresss to the
    listening socket, and call listen()  */

    if ( bind(list_s, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0 ) {
        fprintf(stderr, "ECHOSERV: Error calling bind()\n");
        exit(EXIT_FAILURE);
    }

    if ( listen(list_s, LISTENQ) < 0 ) {
        fprintf(stderr, "ECHOSERV: Error calling listen()\n");
        exit(EXIT_FAILURE);
    }

    /*  Enter an infinite loop to respond
    to client requests and echo input  */

    while ( 1 ) {

        /*  Wait for a connection, then accept() it  */

        if ( (conn_s = accept(list_s, NULL, NULL) ) < 0 ) {
            fprintf(stderr, "ECHOSERV: Error calling accept()\n");
            exit(EXIT_FAILURE);
        }

        /*  Retrieve an input line from the connected socket
        then simply write it back to the same socket.     */

        Readline(conn_s, buffer, MAX_LINE-1);
        Writeline(conn_s, buffer, strlen(buffer));

        /*  Close the connected socket  */

        if ( close(conn_s) < 0 ) {
            fprintf(stderr, "ECHOSERV: Error calling close()\n");
            exit(EXIT_FAILURE);
        }
    }
}
[/code]
客户端代码和一些头文件就不在这里贴了。

注意服务器程序,在socket->bind->listen之后,就进入了一个死循环,在这个循环中去等待客户端的连接,也就是accept,每当有客户端到来的时候,accept就会返回一个代表客户端的fd,这样一个五元组就建立了,然后读一些信息,再把信息原样返回,最后关闭连接。

注意这里的accept和read都是“阻塞”的,当没有客户端连接进来时候,accept函数会一直停在那里等待,当有客户端连接上之后,read会一直等待客户端发送消息。在这个过程中,第二个客户端是无法连接的。必须当第一个客户端完成了所有操作并断开之后,accept才会继续建立第二个客户端的连接。所以说,这个服务器的“并发”是1。

[color=#9999ee]C/S模式的“并发”和B/S的含义不一样(当然B/S也是C/S的一种特殊实现,它的“C”被指定为浏览器),C/S的“并发”是指同一时刻实实在在地和服务器保持着正确连接的客户端数目,而B/S由于依赖HTTP这种瞬时协议,它的并发表示一段时间(如1秒)服务器所完成的请求反馈数目。[/color]

以上,引出了更多的概念:阻塞、非阻塞、同步、异步,他们最终的目的是不同程度地实现“复用”。

奶瓶 发表于 2010-3-8 12:43 AM

[i=s] 本帖最后由 奶瓶 于 2010-3-8 10:47 PM 编辑 [/i]

[b]@阻塞、非阻塞[/b]
什么样的操作是阻塞的,什么样的操作是非阻塞的呢?举一个银行汇款的例子:我们到银行去汇款给别人,是把钱交给银行服务窗口,或者是塞进存款机,银行服务员或存款机系统通知我们成功了,我们就离开了。并不是等待钞票真的到了对方的手上而什么也不干。那么这次操作就是非阻塞的,一直等待对方反馈的傻瓜就是阻塞的。

阻塞、非阻塞通常应用于IO操作上(Socket的操作方式和文件IO是一样的)。那么如果对于一个文件描述符的IO操作,IO函数只是将消息放进去而不必等待对方真的接收成功,程序转而去继续做别的事情,这次操作就是非阻塞的。如果IO函数必须等待对方的成功反馈,这时程序只是在等待什么也不做,这次操作就是阻塞的。

对于TCP的send函数来说,如果这个描述符是非阻塞的,send()只将消息放进它的缓冲区中,就直接返回实际放进缓冲区的字节数,如果是阻塞的,它会一直等待,直到所有数据都安全送达对方,返回实际发送的字节数(这个时候缓冲区可能已经空了)。

那么,TCP协议本身会保证缓冲区内的数据安全有序地送达,如果传送失败,协议会有三次重发,当对方正确收到数据并ACK回来时,协议才会认为真正成功,并清空缓冲区中的相应数据。所以如果消息长度总是小于缓冲区大小,对于TCP协议来说,非阻塞方式有着重要的意义,它会节省很多“等待”的时间来做别的,尤其是面向一些慢速连接(如公共互联网、手机客户端等)时,这对提升程序的处理能力是有好处的。

UDP协议下,就不这么简单了。由于UDP协议本身并不保证连接有效,也不会确认消息正确发送,所以简单地把连接描述符置为非阻塞是一种不负责的做法。因为它只是一次性地把数据“发送”出去就再也不管了,为了保证安全有效,我们还要做很多的工作,自己来编写消息反馈和拼装。更有一点要注意,UDP是不会保证若干数据包的顺序到达的,所以我们可以看见memcached的UDP实现,是开辟了一个很大的SEND_BUFFER,争取一次性把数据放进去。UDP有它灵活高效的特点,但是为了保证安全稳定,需要付出更多的实现代价。

[b]@同步、异步[/b]
好多人会把阻塞 / 非阻塞与同步 / 异步混淆。阻塞和非阻塞是套接字层面上,而同步异步是操作系统本身概念。同步表示当一个操作被调用时,在操作真正结束时,调用不会返回。而异步表示调用返回时操作者并不会立即获取调用结果,而是需要在之后通过消息、回调等方式获取,操作实际在“后台”进行了。同样举银行的例子:

好多人一起去银行,过去的方式是排队,一个挨着一个,在排到你之前,你跑了,再回来只能重新排,一定要等到排到自己了,才可以办业务,这种方式就是同步的。同步与是否阻塞无关,因为好多人都在排队。

现在的方式,是进门在排号机拿一个小纸条,上面会写有顺序号,以及前面有多少人在排队,然后就可以坐下等。当轮到你的时候,银行会主动通知你,你再去办理业务,这就是“消息”和“回调”。这同样和是否阻塞无关,因为如果你觉得前面只有一两个人,就可以等,如果前面有四五十人,按照目前中国的银行营业厅办事效率,大可去附近吃个饭,如果前面有五六千人,直接回家就好了……这就是异步操作。

[b]@IO复用[/b]
著名的c10k,就是为了解决单个服务器的高并发量问题。之前的阻塞 / 非阻塞、同步 / 异步概念都是为了最终解释IO模型。

我们来看各种级别的IO模型:

1、阻塞IO
也就是之前贴出的echo server源代码,它的实现流程如下:(图非原创,是某位大神画的,膜拜……)
[attach]5881[/attach]

应用程序调用recvfrom(),转入内核,内核有两个过程:wait for data和copy data from kernel to user,直到copy结束后,操作才会返回。这就好像100多人都在银行排队,第一个人交了钱,营业员把钱收走,转给另一个人,另一个人反馈了(打电话?)说收到钱了,第一个人才离开,第二个人开始办理。整个过程从营业员收完钱之后包括他自己在内,银行里的所有人都在等,这是最没有效率的方式。

2、非阻塞IO
营业员稍微聪明了一些:
[attach]5882[/attach]

转入内核之后,在第一个过程wait for data成功后,recvfrom返回了。也就是营业员收到钱之后就告诉第一个人可以回去了,接着办理第二个人的业务。这样,至少营业员没有闲着。整个状态就是在轮询是不是有人交了钱。

3、IO multiplexing
这个时候,顾客也聪明了:
[attach]5883[/attach]

这是最常见的IO复用模式了,select和poll都是这种。所谓“复用”,就是可以同时处理多个套接字。银行有了排号机,会主动通知该谁了。select是一种阻塞操作,当没有fd有数据时候(没有空闲窗口),会有个人一直看着,直到有了可用资源。不过至少大家不用排队了,而是可以随便坐在哪里,或者到门口抽烟。

4、SIGIO
[attach]5884[/attach]

基于信号的IO复用模型,只有*NIX系统才支持,它的进步之处在于,select不再阻塞了,而是由系统注册的信号回调函数来通知,有一点像计时器了。这个时候,银行不需要大堂经理去一直看着是否有窗口空闲了,直接装了个提示牌和喇叭。当一个营业员办完了业务,按一下身边的按钮,就触发了一条信号,提示准备好服务下一个用户了,这是主动的。

5、AIO(Asynchronous IO)
[attach]5885[/attach]

以上四种,在内核的copy data from kernel touser都会阻塞,不过到了aio,则是完全非阻塞的了。对应的就是Windows的IOCP(完成端口)和posix_aio*,不过在*NIX系统上,AIO只是一个设想,从来没有变成现实。在这个时代,只要把钱交给银行,并告知要汇给什么人,就可以走了。成功之后银行会主动通知,不需要排队,也不需要等候。网银时代到来了……

可以看出以上5种模型,越往后阻塞越少,可以同时处理的能力越强。从中也可以解释为什么IIS的静态响应效率普遍好于apache,因为IOCP的模型比select要“高级”。

[b]@select & poll[/b]
我们从最初级的“复用”开始,就是select / poll,这是非常经典的复用模式,著名的Apache就是这种模式的。对比上面的echoserver代码,select的不同是在accept之后不再是直接等待数据,而是直接等待下一个accept。另外的select()来对已经连接的fd轮询,直到寻找到有数据可读的fd,并返回。

由此可以看出select()的问题,在于它的效率是线性的,也就是O(n),它的轮询规模会随着同时连接的fd数上涨,而且每个进程的select()不能同时处理1024(FD_SETSIZE)个以上的fd(所以apache使用了多进程)。

为了解决fd上限问题,提出了/dev/poll,它不再有FD_SETSIZE的限制,但是实现方式和select()类似。

[b]@epoll & kqueue[/b]
从select / poll的实现方式中可以看出,如果有一个非常大的fd集合,而同时又只有少量处于活动状态,应用程序会耗费大量的CPU时间用于轮询空闲的fd。linux从2.5.44开始加入了epoll支持,BSD系统也加入了kqueue,其实它们应该属于3、4级之间的一种实现方式。kqueue的实现方式和epoll很类似,所以下面直来说说epoll。epoll实现了对文件描述符的“开关”设定,也就是回调机制,当一个fd准备就绪之后,它会主动去触发回调,通知用户程序自己已经就绪,而不再需要轮询。这样它的代价就是恒定的O(1),也就是与fd总数无关。所以epoll可以同时处理非常大的fd集合。

不过,当几乎所有的fd都是活动状态时(这在高速网络中很少会有),epoll的性能并不会比select / poll高多少。

[b]@一致性接口[/b]
我们做开发的时候,尤其是跨平台的通用系统,经常会需要一个通用的接口,它会自动根据操作系统来选择最适合的IO模型,而对于开发者来说是透明的。我们只需要知道,我有多少多少钱要汇给谁谁,别的事情比如怎么个汇法,银行在哪就不用管了,接口会去完成。这类接口有很多,比如重型的ACE,和轻量级的libevent(这个很著名吧,虽然大部分web程序员是通过安装memcached才知道有这么个东西却不知道它是什么)、libev(与libevent类似,实现效率比libevent要高)等。

使用ACE的典型例子就是Mongos服务器(流行的WOW服务模拟器),而libevent的实例不用说就是著名的memcached了,有兴趣的可以去读它的源代码,我的Comoro第一个版本也是基于libevent的,之后的corelib使用的是原生epoll,因为我并不需要支持linux以外的系统。另:已经有人把libevent扩展成了支持IOCP,所以有了windows版的memcached,不过由于内存分配实现方式的不同,windows版的memcached并不稳定。

========================= 我是华丽的分割线 =========================

IOCP是优秀的,也是成熟的(它比epoll早好多),不过大部分的大型服务器都是基于*NIX系统的,只有一些经典的微软私有服务和一些游戏服务器是基于IOCP的,所以我们下面不去理会孤芳自赏的IOCP,主要讲述一下epoll。下面的代码来自corelib,也就是我的新版本的comoro的网络实现基础,我尽量做足够的文字描述,其中涉及了很多算法和数据结构相关的概念,有问题的可以去翻书。

所有实现基于epoll和pthread,不使用第三方库如libevent。如需测试,请准备2.6+内核的linux系统。

首先来熟悉一下epoll的三个调用,也是仅有的三个基本调用(成功的东西总是很简单):

[color=#33ee33]int epoll_create()[/color],创建一个epoll句柄,其实也是一个fd。

[color=#33ee33]int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)[/color],对一个epoll句柄epfd操作。op是操作方式,fd是操作数(也是一个文件句柄),events是事件属性。操作包括EPOLL_CTL_ADD,添加一个句柄(fd)到epoll中;EPOLL_CTL_MOD,修改句柄fd的监听方式;EPOLL_CTL_DEL,从epoll中移除句柄fd。看看,比数据库操作还简单。

事件属性包括两个部分:event.data.fd就是当前需要监听的句柄,event.events是需要监听的事件类型集合,事件类型包括:EPOLLIN——读事件;EPOLLOUT——写事件;EPOLLRDHUP——客户端半关闭;EPOLLPRI——高优先级读事件;EPOLLERR——连接错误,这个事件是不需要特别指定的,epoll总是会发现连接错误;EPOLLHUP——客户端主动关闭,这个事件也不需要被特别指定;EPOLLONESHOT——一次性使用,当一次事件触发之后,epoll不会再理会这个fd,除非使用EPOLL_CTL_MOD修改了监听事件;EPOLLET——边缘触发,下面会详细描述它。

[color=#33ee33]int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)[/color],等待epoll事件发生。epfd是需要检查的epoll句柄,events是结果暂存数组,maxevents是可以同时返回的事件最大数,timeout是等待事件。与其说是“等待”不如说是“检查”,因为epoll是主动触发的,epoll_wait的实现也与select()的阻塞遍历不同,它只是检查一下当前集合中有哪些fd触发了开关。不过,从形式上我们还是经常让它的timeout为-1,这样如果epoll中没有fd活动,epoll_wait会阻塞,直到有了活跃分子。当然我们也可以把timeout设为0,让它“点到为止”,不过如果是在无限循环中使用timeout为0的epoll_wait,又有大部分的fd处于非活动状态,所需要付出的代价和select类似,甚至更高,因为它是每次“遍历”所有fd,一直这样不停地遍历下去。

epoll有EPOLLLT和EPOLLET两种触发模式,LT是默认的模式,ET是“高速”模式。LT模式下,只要这个fd还有数据可读,每次epoll_wait都会返回它的事件,提醒用户程序去操作,而在ET(边缘触发)模式中,它只会提示一次,直到下次再有数据流入之前都不会再提示了,无论fd中是否还有数据可读。所以在ET模式下,read一个fd的时候一定要把它的buffer读光,也就是说一直读到read的返回值小于请求值,或者遇到EAGAIN错误。

下面了解一下目前大型服务器流行的程序模型:
[list][*]单线程单进程:早期的memcached。不过单线程单进程的高性能服务器必然是事件驱动的(epoll / kqueue / IOCP)[*]单线程多进程:经典的Apache prefork就是这种[*]多线程单进程:分为动态线程和静态线程池,简单地使用动态线程来实现复用的大型服务器由于性能问题已经不多了,静态线程池的是目前的主流方式,如新的memcached。[*]多线程多进程:Nginx就是这种结构,可以启动多个进程,每个进程有固定数目的子线程,并且每个线程都是独立的事件驱动循环。这是目前最高效的方式了。[/list]具体使用何种方式来开发服务器程序,要根据需求来决定。通常来说,对于WebServer这种上下文无关的应用,更适合创建多个工作进程,而像游戏服务器这类,由于用户之间有密切的关系,所以进程分离会带来很大的进程间通讯负担。另外,使用多进程或多线程在现时有一个更重要的意义就是充分利用多CPU和多核资源,而多进程比多线程可以更有效地利用多个处理器核心。

Apache prefork是早期的一种经典结构,它又一个父进程fork出多个子进程来“等待”连接(不是有连接到了才fork,所以叫做prefork),所以系统中总是有几个空闲的进程来等待操作,直到进程数达到上限。父进程只负责接受连接请求,子进程负责处理HTTP请求,通常来说,子进程在处理一定数量的请求之后会“自杀”,再fork出一个新的来。这种模型是很稳定的,不过多进程程序如果处理得不好,在进程数很高且负载很大的情况下容易发生“惊群”(多个进程同时被唤醒)。

后来在Apache2中,添加了新的模式,比如Worker,也就是多进程多线程模式。这样每个进程的处理工作看起来是“并行”的了,但是为了稳定性,很多大网站还是继续在使用prefork。

在Apache2.2中,终于不再完全依赖select(),提供了event方式(事件驱动)。Apache的发展是稳定的,保守的,这也是它所追求的方向。

而Nginx则不同,它所追求的就是性能。所以它以激进的方式使用了多线程多进程的epoll。而memcached由于上下文相关性(数据共享),所以使用了单进程静态线程池。

corelib同样使用的是单进程的静态线程池。所谓静态,就是在应用启动时候就已经创建好了若干数目的工作线程,且数目不会再改变。主线程会以轮询的方式把操作分配给工作线程,以达到最好的系统资源利用状态。一般来说,worker线程数应该和CPU核心数成整倍关系,这也算是最简单的一种“多核”开发吧~~

php5 发表于 2010-3-8 12:43 AM

[i=s] 本帖最后由 php5 于 2010-3-8 12:11 PM 编辑 [/i]

赶上第一时间  荣幸之至

广告位招租,有意者联系本人

奶瓶 发表于 2010-3-8 12:47 AM

[i=s] 本帖最后由 奶瓶 于 2010-3-8 10:53 PM 编辑 [/i]

我用了“伪OOP”的写法,这种c代码也许有点怪异,不过还是比较规整的。首先来看看core的类型定义:[code]
struct core_t
{
    int nfiles;
    int nworkers;
    int epoll_fd;                        // Epoll fd
    int is_running;

    /* Methods */
    void (* loop) ();
    void (* add_server) ();
};
[/code]这是corelib的根定义,每一个应用都有且只有一个core对象(原谅我这么叫)。nfiles是这个core可以处理的最大句柄数,nworkers是worker线程的数目,epoll_fd是主循环的epoll句柄,is_running是一个标志,给它置0可以让主循环停下来。

loop和add_server是两个函数地址,类型中带有函数定义,就是“伪”OOP的关键所在,看起来struct就是class,那些成员变量就是属性,函数指针就是方法~~

下面来看看core_init()的函数实现:[code]
/* Initialization, nworkers for thread */
/* A core means an absolute looper, which handles socket servers, timer events, and thread pool.
Every aplication based on corelib needs one(and only one) core object */
struct core_t * core_init(int nworkers)
{
    int nfiles = set_rlimit();
    struct core_t *ret = (struct core_t *) malloc(sizeof(struct core_t));
    struct memory_pool *p;

    if (!ret)
    {
        return (struct core_t *) NULL;
    }

    ret->nfiles = nfiles;
    ret->nworkers = (nworkers > 0) ? nworkers : 1;
    ret->epoll_fd = epoll_create(EPOLL_WAIT_CONNS);

    if (ret->epoll_fd < 0)
    {
        // Epoll error
        err(ERROR_EPOLL_CREATE, "Epoll create error");
    }

    ret->loop = core_loop;
    ret->add_server = core_add_server;

    c = ret;

    // Init memory pool
    p = pool_init();

    if (!p)
    {
        err(ERROR_MEMORY_POOL, "Memory pool init error");
    }
   
    // Init socket
    socket_init(nfiles, p);
   
    // Init thread pool
    thread_init(nworkers);

    return ret;
}
[/code]core初始化的时候需要传入参数nworkers,也就是告知这个core需要创建多少个worker线程,这个数字可以根据CPU核心数来确定。

nfiles的确定需要调用set_rlimit,这个函数根据当前环境设置一个进程可以接受的最大句柄数。关于它的相关信息,可以参看linux的ulimit定义(man ulimit),set_rlimit函数的实现如下:[code]
/* Set rlimit resources */
int set_rlimit()
{
    struct rlimit rlim, rlim_new;

    if (0 == getrlimit(RLIMIT_CORE, &amp;rlim))
    {
        // Increase RLIMIT_CORE to infinity if possible
        rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY;

        if (0 != setrlimit(RLIMIT_CORE, &amp;rlim_new))
        {
            // Set rlimit error
            rlim_new.rlim_cur = rlim_new.rlim_max = rlim.rlim_max;
            (void) setrlimit(RLIMIT_CORE, &amp;rlim_new);
        }
    }

    if (0 != getrlimit(RLIMIT_CORE, &amp;rlim) || rlim.rlim_cur == 0)
    {
        err(ERROR_RESOURCE_GET, "Set rlimit.CORE error");
    }

    // Read current RLIMIT_NOFILE
    if (0 != getrlimit(RLIMIT_NOFILE, &amp;rlim))
    {
        err(ERROR_RESOURCE_GET, "Get rlimit.NOFILE error");
    }

    else
    {
        // Enlarge RLIMIT_NOFILE to allow as many connections as we need
        int maxfiles = MAX_FILES;

        if (rlim.rlim_cur &lt; maxfiles)
        {
            rlim.rlim_cur = maxfiles;
        }

        if (rlim.rlim_max &lt; rlim.rlim_cur)
        {
            if (0 == getuid() || 0 == geteuid())
            {
                // You are root?
                rlim.rlim_max = rlim.rlim_cur;
            }

            else
            {
                // You are not root?
                rlim.rlim_cur = rlim.rlim_max;
            }
        }

        if (0 != setrlimit(RLIMIT_NOFILE, &amp;rlim))
        {
            // Root needed
            err(ERROR_RESOURCE_SET, "Set rlimit error, check your superuser privilege");
        }
    }

    return rlim.rlim_max;
}
[/code]只有超级用户才能在代码中改变进程的rlimit!所以好多服务程序都需要以root身份启动(另外一个原因是只有超级用户才有权限监听1024以下端口)

p是一个内存池对象,关于内存池,之后详述。

初始化了一个epoll,返回的句柄赋值给了epoll_fd属性,注意epoll_create()的参数,它只是表明epoll_wait一次可以返回的最多活动句柄数,不是表示epoll可以支持的最大fd数,最大fd数其实就是进程的上限,已经通过set_rlimit尽量增大了。

神奇的OOP……只是给两个函数指针赋值,指向两个真正的函数地址。为什么说是“伪”的呢,因为在这两个函数中,是无法识别出“this”来的,所以只是形式上的OOP。C毕竟是C。

初始化了内存池、socket服务、线程池,core的初始化就结束了。然后我们来看看主循环部分,看看epoll都在做什么:[code]
/* Main loop */
void core_loop()
{
    int nfds, i, client_id, fd_type, fd_id, what;
    struct epoll_event *events;
    struct server_t *s;
   
    if (!c)
    {
        // Not initialized
        err(ERROR_CORE, "Core not initialized");
    }
   
    c->is_running = 1;
    events = malloc(sizeof(struct epoll_event) * EPOLL_WAIT_CONNS);

    while (1)
    {
        memset(events, 0, sizeof(struct epoll_event) * EPOLL_WAIT_CONNS);
        nfds = epoll_wait(c->epoll_fd, events, EPOLL_WAIT_CONNS, -1);

        for (i = 0; i < nfds; i ++)
        {
            what = events.events;
            
            if (!what & EPOLLIN || events.data.fd < 0)
            {
                continue;
            }
            
            // Every fd
            fd_id = get_fd_info(events.data.fd, &fd_type);

            if (FD_TYPE_SERVER == fd_type)
            {
                // A valid server, accept here
                s = get_server(fd_id);
                client_id = socket_accept(s);

                // Dispatch into worker
                dispatch_to_worker(client_id);
            }

            else if (FD_TYPE_CONNECTOR == fd_type)
            {
                // A connector
            }

            else
            {
                // All clients will be dispatched into worker threads, but who are you?
                // Just ignore it
            }
        }
        
        if (0 == c->is_running)
        {
            // Stop signal
            break;
        }
    }
}[/code]因为我们在epoll初始化的参数中传入了参数EPOLL_WAIT_CONNS,所以,需要初始化一个可以最大容纳EPOLL_WAIT_CONNS个epoll事件的空间。

接着就是个无限循环,和最普通的echo server看起来是一样的,不过它是epoll,真的是epoll……。循环可以通过把core的is_running置零让它停下来。每次循环都会epoll_wait(),这个会检查当前epoll中是否有活动的连接,如果有的话,就会向events数组里填充数据,记得每次循环时候清空它。

epoll_wait会给每一个有效的fd返回相应的结构体,这个结构体主要包括event.data.fd,就是fd本身,和事件类型。如果fd等于服务器的fd,表示有新连接进入,需要accept,如果是别的,就是已经连接的客户端有了动作。

奶瓶 发表于 2010-3-8 12:49 AM

主循环就是这些东西,不过它现在是空的。下面是如何向core中添加服务器:
[code]
/* Add a server */
void core_add_server(struct server_t *s)
{
    if (!c)
    {
        // Not initialized
        err(ERROR_CORE, "Core not initialized");
    }

    if (!s)
    {
        err(ERROR_CORE, "Server not defined");
    }

    s->start_time = time(NULL);
    epoll_ctl(c->epoll_fd, EPOLL_CTL_ADD, s->fd, &s->ev);

    return;
}
[/code]
只是简单的epoll_ctl,使用EPOLL_CTL_ADD,把server的fd添加到了epoll中。server_t是一个服务器的结构描述:
[code]
struct server_t
{
    int id;                            // Server id
    int fd;                            // File descriptor
    int conn_type;                    // TCP / UDP / UNIX
    int epoll_mode;                    // Sets of ET(Edge Triggered) or LT(Level Triggered)(Which is the default mode of epoll)
    u_int32_t srv_addr;                // Integer format of IPv4 address
    int srv_port;                        // Server port(0 - 65535)
   
    time_t start_time;

    /* Callbacks */
    void (* on_data) (int, void *, size_t);
                                        // Callback on data event
    void (* on_conn) (int);
                                        // Callback on connect
    void (* on_close) (int);
                                        // Callback on client disconnect

    struct sockaddr_in listen_addr;
    struct sockaddr_un local_addr;
    struct epoll_event ev;            // Epoll event object
};
[/code]
每个server有一个id,因为每个core可以添加多个server,所以需要一个id来做区别。conn_type是协议类型,可以是CONN_TYPE_TYPE、CONN_TYPE_UDP和CONN_TYPE_UNIX,其中CONN_TYPE_UNIX是unix本地套接字。epoll_mode可以是EPOLL_MODE_LT或者EPOLL_MODE_ET,分别对应LT和ET模式。这个设置也会影响到所有连接到该服务器上的客户端。

on_data、on_conn和on_close分别是三个函数指针,对应于“当有数据到来”、“当连接”、“当断开”的回调。

我们来看看一个server的产生过程,new_server()函数:
[code]
/* Create a new socket server */
struct server_t * new_server(const char * addr, int port, int access_mask, int conn_type, int epoll_mode)
{
    int srv_id = -1, flag = 1, i;
    int old_mask = 0;
    struct timeval timeout;
    struct sockaddr *addr_bind;
    size_t addr_size;

    // Find an empty slot from server list
    for (i = 0; i < MAX_SERVERS; i ++)
    {
        if (!server_list[i])
        {
            srv_id = i;
            
            break;
        }
    }

    if (-1 == srv_id)
    {
        // No empty slot
        perror("Cannot create a new server now, server list full");
        
        return (struct server_t *) NULL;
    }
   
    struct server_t *ret = (struct server_t *) malloc(sizeof(struct server_t));
    memset(ret, 0, sizeof(struct server_t));

    ret->conn_type = (conn_type == CONN_TYPE_UDP || conn_type == CONN_TYPE_UNIX) ? conn_type : CONN_TYPE_TCP;
    ret->id = srv_id;
    server_list[srv_id] = ret;

    switch (ret->conn_type)
    {
        // TCP
        case CONN_TYPE_TCP :
            ret->fd = socket(AF_INET, SOCK_STREAM, 0);
            ret->listen_addr.sin_family = AF_INET;
            ret->listen_addr.sin_addr.s_addr = inet_addr(addr);
            ret->listen_addr.sin_port = htons(port);
            
            break;

        // UDP
        case CONN_TYPE_UDP :
            ret->fd = socket(AF_INET, SOCK_DGRAM, 0);
            ret->listen_addr.sin_family = AF_INET;
            ret->listen_addr.sin_addr.s_addr = inet_addr(addr);
            ret->listen_addr.sin_port = htons(port);
            
            break;

        // UNIX local sock pipe
        case CONN_TYPE_UNIX :
            ret->fd = socket(AF_UNIX, SOCK_STREAM, 0);
            ret->local_addr.sun_family = AF_UNIX;
            strncpy(ret->local_addr.sun_path, addr, sizeof(ret->local_addr.sun_path) -1);
            old_mask = umask(~(access_mask & 0777));
            
            break;
    }

    if (-1 == ret->fd)
    {
        // Socket error
        perror("Socket error");
        free(ret);

        return (struct server_t *) NULL;
    }

    if (0 != set_nonblock(ret->fd))
    {
        perror("Set_nonblock");
    }

    if (ret->conn_type == CONN_TYPE_UNIX)
    {
        addr_bind = (struct sockaddr *) &ret->local_addr;
        addr_size = sizeof(ret->local_addr);
    }

    else
    {
        addr_bind = (struct sockaddr *) &ret->listen_addr;
        addr_size = sizeof(ret->listen_addr);
    }
   
    if (bind(ret->fd, addr_bind, addr_size) < 0)
    {
        perror("Bind");
        close(ret->fd);

        if (ret->conn_type == CONN_TYPE_UNIX)
        {
            umask(old_mask);
        }

        free(ret);

        return (struct server_t *) NULL;
    }

    umask(old_mask);

    if (listen(ret->fd, SOCKET_LISTEN_QUEUES) < 0)
    {
        perror("Listen");
        close(ret->fd);

        free(ret);

        return (struct server_t *) NULL;
    }

    setsockopt(ret->fd, SOL_SOCKET, SO_KEEPALIVE, (void *) &flag, sizeof(flag));
    setsockopt(ret->fd, SOL_SOCKET, SO_LINGER, (void *) &flag, sizeof(flag));
    setsockopt(ret->fd, SOL_SOCKET, SO_REUSEADDR, (void *) &flag, sizeof(flag));
   
    if (ret->conn_type != CONN_TYPE_UNIX)
    {
        timeout.tv_sec = 3;
        setsockopt(ret->fd, SOL_SOCKET, SO_SNDTIMEO, (void *) &timeout, sizeof(struct timeval));
        setsockopt(ret->fd, IPPROTO_TCP, TCP_NODELAY, (void *) &flag, sizeof(flag));
    }

    if (epoll_mode == EPOLL_MODE_LT)
    {
        ret->ev.events = EPOLLIN;
    }

    else
    {
        ret->ev.events = EPOLLIN | EPOLLET;
    }

    ret->ev.data.fd = ret->fd;

    // Add into fds
    fds[ret->fd] = 0x80000000 + srv_id;

    return ret;
}
[/code]
这个函数是比较重要的,它的功能是创建一个socket服务器用于监听。传入参数有:服务器地址(IPv4点格式,如果是UNIX本地连接,传入文件路径)、端口号、权限掩码(用于UNIX本地连接的管道文件的权限)、连接类型(CONN_TYPE_TCP / CONN_TYPE_UDP / CONN_TYPE_UNIX)、epoll模式(EPOLL_MODE_LT / EPOLL_MODE_ET)。

首先要在服务器列表中寻找一个空的位置,由于不会频繁地添加服务器,每个core也不会有太多服务器(除非是个疯狂的肉鸡爱好者),所以这里只是使用了一个静态数组保持服务器信息,查找也是简单的遍历。接着根据连接类型分别创建套接字,如果成功,则把这个套接字的fd设置成非阻塞模式(这在使用ET的时候至关重要)。然后才是bind()和listen()。服务器的fd只用监听EPOLLIN事件就可以了。注意在函数最后,把fd加入了fd列表,这个列表对应了每个fd的属性(是服务器?连接器?客户端)和id(fd列表只是一个数组,长度就是进程最大句柄数nfiles,为什么可以这样做呢?因为操作系统分配fd总是选择这个进程内还没有被使用的最小整数,所以fd总是密集分配的,不会有超过nfiles的fd号出现)。

看看设置非阻塞模式的函数,很简单地fcntl:
[code]
/* Set fd non-blocking mode */
int set_nonblock(int fd)
{
    int flags;

    flags = fcntl(fd, F_GETFL);
    if (flags < 0)
    {
        return flags;
    }

    flags |= O_NONBLOCK;
    if (fcntl(fd, F_SETFL, flags) < 0)
    {
        return -1;
    }

    return 0;
}
[/code]

奶瓶 发表于 2010-3-8 12:53 AM

[i=s] 本帖最后由 奶瓶 于 2010-3-8 10:59 PM 编辑 [/i]

在new_server成功创建了一个服务器对象后,就可以core->add_server把它添加到core的epoll中了,在core的主循环中就会检测是否有客户端连接到这个server上,如果有了客户端连入,就会调用socket_accept()来生成客户端对象,客户端描述如下:[code]
struct client_t
{
    int id;
    int fd;
    int srv_id;
    int worker_id;
    int conn_type;
    int epoll_mode;
    u_int32_t remote_addr;
    int remote_port;
    int in_use;

    int read_buffer_size;
    int read_buffer_data_size;
    char *read_buffer;
    struct epoll_event ev;
    struct sockaddr_in client_addr;
    struct sockaddr_un local_addr;
};
[/code]与服务器类似,客户端也有id和fd,这个fd就是accept()来的,同时客户端还会记录它所连接的服务器id。worker_id是客户端被分配的worker线程号(所有客户端都会被分配给一个worker线程,主线程不会去处理客户端事务,所以core的epoll不应该收到服务器fd之外的任何fd的事件)。read_buffer是客户端的读缓存,这是应用级的buffer,不同于TCP的buffer:[code]
/* Socket accept */
int socket_accept(struct server_t *s)
{
    socklen_t len;
   
    if (!s)
    {
        return RTN_GENERAL;
    }

    struct client_t *c = new_client(s->conn_type, s->epoll_mode);
    if (!c)
    {
        return RTN_GENERAL;
    }

    if (c->conn_type == CONN_TYPE_UNIX)
    {
        c->fd = accept(s->fd, (struct sockaddr *) &c->local_addr, &len);
    }

    else
    {
        c->fd = accept(s->fd, (struct sockaddr *) &c->client_addr, &len);
        c->remote_addr = inet_netof(c->client_addr.sin_addr);
    }
   
    if (c->fd == -1)
    {
        // Accept failed
        client_free(c);

        return RTN_GENERAL;
    }

    c->srv_id = s->id;
    c->ev.data.fd = c->fd;
    if (c->epoll_mode == EPOLL_MODE_LT)
    {
        c->ev.events = EPOLLIN;
    }

    else
    {
        c->ev.events = EPOLLIN | EPOLLET;
    }
   
    fds[c->fd] = c->id;

    if (s->on_conn)
    {
        s->on_conn(c->id);
    }
   
    return c->id;
}
[/code]整个accept过程很简单,最后也会把成功连接的fd添加到fd列表中。这里使用了一个技巧,只用一个int值同时保存了类型和ID,其实也是位操作的变种。客户端保存的是原始id,服务器段保存的是id加上0x80000000,连接器保存的是id加上0xc0000000。由于我自信,每个进程不可能有0x7fffffff个客户端(冂……)。

客户端保存在client_list中,这个动态数组是会自增长的,以适应越来越多的连接:[code]
/* Get a free client object from list */
struct client_t * client_list_get_free()
{
    struct client_t *ret = (struct client_t *) NULL, **tmp_list;

    if (free_clients > 0)
    {
        free_clients --;
        ret = free_client_list[free_clients];

        return ret;
    }

    else
    {
        // No free objects
        ret = (struct client_t *) malloc(sizeof(struct client_t));
        if (!ret)
        {
            return ret;
        }

        memset(ret, 0, sizeof(struct client_t));

        // Add to list
        ret->id = clients;

        if (clients >= client_list_size)
        {
            // Enlarge list size
            tmp_list = realloc(client_list, 2 * client_list_size * sizeof(struct client_t *));
            if (tmp_list)
            {
                memset(tmp_list + client_list_size * sizeof(struct client_t *), 0, client_list_size * sizeof(struct client_t *));
                client_list = tmp_list;
                client_list_size *= 2;
            }
        }

        client_list[clients] = ret;
        clients ++;
    }

    return ret;
}
[/code]这个函数就是获取一个客户端“对象”空间的。它首先会检查free_client_list中(这也是一个自增长的数组)是否还有空闲的空间,如果有的话,直接弹出一个来。free_client_list中只是一些指向client_list的元素的指针,每次有客户端下线,它的对象空间就会被free_client_list“回收”,而不会被free()掉,这样就可以留给之后的客户端使用。记住我们的宗旨:

[b]×尽量少地malloc出内存碎片[/b]

如果free_client_list已经空了(比如服务器刚刚启动,还没有人离线过,或者你的服务器真的很忙……),就会malloc一个新的,并把它加入client_list。如果client_list已经写满了,这个神奇的数组空间会翻倍(realloc)。

如果有兴趣的,可以对比一下memcached的源代码,可以发现memcached只有free_client_list而没有client_list,因为它的所有客户端之间都是没有关系的,不会有通讯问题,也不需要定位,所以不需要一个大的列表来保存所有的对象指针。

client_list、free_client_list、fd_list都是在socket_init()的时候初始化的一小块空间,除了fd_list在开始就满满地申请了nfiles块,client_list和free_client_list都是这种自增长数组:[code]
/* Init socket */
int socket_init(int nfiles, struct memory_pool *p)
{
    fds = calloc(nfiles, sizeof(int));
    if (!fds)
    {
        return ERROR_MEMORY_ALLOC;
    }

    fds_size = nfiles;
    mpool = p;

    // Init send queue
    send_queue = (struct send_item *) malloc(sizeof(struct send_item) * SEND_QUEUE_BASE_SIZE);
    memset(send_queue, 0, sizeof(struct send_item) * SEND_QUEUE_BASE_SIZE);
    send_queue_head = send_queue_tail = 0;
    send_queue_size = SEND_QUEUE_BASE_SIZE;
   
    return server_list_init() + client_list_init();
}

/* Init server list */
int server_list_init()
{
    memset(server_list, 0, sizeof(struct server_t *) * MAX_SERVERS);

    return RTN_SUCCESS;
}

/* Init client list */
int client_list_init()
{
    client_list = (struct client_t **) malloc(sizeof(struct client_t *) * CLIENT_LIST_BASE_SIZE);

    if (!client_list)
    {
        return ERROR_MEMORY_ALLOC;
    }

    memset(client_list, 0, sizeof(struct client_t *) * CLIENT_LIST_BASE_SIZE);
    clients = 0;
    client_list_size = CLIENT_LIST_BASE_SIZE;

    free_client_list = (struct client_t **) malloc(sizeof(struct client_t *) * CLIENT_LIST_BASE_SIZE);

    if (!free_client_list)
    {
        return ERROR_MEMORY_ALLOC;
    }

    memset(free_client_list, 0, sizeof(struct client_t *) * CLIENT_LIST_BASE_SIZE);
    free_clients = 0;
    free_client_list_size = CLIENT_LIST_BASE_SIZE;

    return RTN_SUCCESS;
}
[/code]

奶瓶 发表于 2010-3-8 12:57 AM

[i=s] 本帖最后由 奶瓶 于 2010-3-8 11:03 PM 编辑 [/i]

当一个客户端连接成功之后,core会把它分配给一个worker,通过dispatch_to_worker():[code]
/* Dispatch a client to a random worker */
int dispatch_to_worker(int id)
{
    static char signal[5];
    struct client_t *c = get_client(id);
    if (!c)
    {
        return RTN_GENERAL;
    }
   
    struct thread_t *t = next_worker();
    if (!t)
    {
        return ERROR_THREAD_DISPATCH;
    }

    // Write data to pipe
    signal[0] = THREAD_TYPE_WORKER;
    put_u_int((u_int32_t) id, signal + 1);

    if (5 != write(t->notify_send_fd, signal, 5))
    {
        return ERROR_IO_PIPE;
    }

    c->worker_id = t->id;

    return RTN_SUCCESS;
}
[/code]这是一个神奇的函数,首先我们来看一下一个线程的描述:[code]
struct thread_t
{
    int id;
    pthread_t tid;
    pthread_attr_t attr;
    int thread_type;
    int epoll_fd;
    int notify_recv_fd;
    int notify_send_fd;
};
[/code]线程也有一个id,还会有一个tid,这是pthread分配的。thread_type是线程的类型(MAIN / WORKER / CONTROLLER),epoll_fd是这个线程私有的epoll句柄(主线程的已经在无限循环了),notify_recv_fd和notify_send_fd是一个管道的两端,就是这个管道,负责了主线程和子线程之间的通讯。

管道是一个很有趣的东西,它很像socket,两端都是fd,向一端的fd写入数据,就可以从另外一端读出,管道分为具名和匿名(PIPE & FIFO),请参考任意一本UNIX编程类书籍。不管怎样,我们只是使用了它的通知特性连接两个线程。

thread_init()负责产生这些worker线程和sender(专门处理发送的线程):[code]
/* Threads init */
int thread_init(int nworkers)
{
    int i, pipe_fds[2], ret;
    struct thread_t *tmp_thread;

    memset(thread_list, 0, sizeof(struct thread_t *) * MAX_THREADS);
    memset(working_workers, 0, sizeof(struct thread_t *) * MAX_THREADS);

    if (nworkers < 1)
    {
        // Are you crazy? one application should has one worker at least!
        nworkers = 1;
    }

    // We already have a main thread, so just create workers here.
    for (i = 0; i < nworkers; i ++)
    {
        tmp_thread = (struct thread_t *) malloc(sizeof(struct thread_t));
        if (!tmp_thread)
        {
            continue;
        }

        memset(tmp_thread, 0, sizeof(struct thread_t));
        
        tmp_thread->id = i;
        tmp_thread->epoll_fd = epoll_create(EPOLL_WAIT_CONNS);

        if (tmp_thread->epoll_fd < 0)
        {
            return ERROR_EPOLL_CREATE;
        }
        
        // Msg pipe
        if (pipe(pipe_fds))
        {
            return ERROR_IO_PIPE;
        }

        tmp_thread->notify_recv_fd = pipe_fds[0];
        tmp_thread->notify_send_fd = pipe_fds[1];

        set_nonblock(pipe_fds[0]);
        set_nonblock(pipe_fds[1]);

        pthread_attr_init(&tmp_thread->attr);
        ret = pthread_create(&tmp_thread->tid, &tmp_thread->attr, worker_init, tmp_thread);
        
        thread_list = tmp_thread;
        working_workers[total_workers ++] = tmp_thread;
    }

    // We need a send thread
    tmp_thread = (struct thread_t *) malloc(sizeof(struct thread_t));
    if (!tmp_thread)
    {
        return RTN_GENERAL;
    }

    memset(tmp_thread, 0, sizeof(struct thread_t));

    tmp_thread->id = nworkers;
    tmp_thread->epoll_fd = epoll_create(4);

    if (tmp_thread->epoll_fd < 0)
    {
        return ERROR_EPOLL_CREATE;
    }

    if (pipe(pipe_fds))
    {
        return ERROR_IO_PIPE;
    }

    tmp_thread->notify_recv_fd = pipe_fds[0];
    tmp_thread->notify_send_fd = pipe_fds[1];

    set_nonblock(pipe_fds[0]);
    set_nonblock(pipe_fds[1]);

    pthread_attr_init(&tmp_thread->attr);
    ret = pthread_create(&tmp_thread->tid, &tmp_thread->attr, sender_init, tmp_thread);

    thread_list = tmp_thread;
    sender = tmp_thread;

    return RTN_SUCCESS;
}
[/code]有两个list,一个是thread_list,一个是worker_list。为什么是两个呢?有点像前面的client_list和free_client_list,我们只要简单地轮询worker_list就可以获取“下一个”可用的worker而不用遍历整个thread_list,worker_list里只是一些指针,指向thread_list中的某些元素。

然后分别初始化每个worker,pipe()就是创建那个神奇的管道的玩意,它接收两个int的指针,并把它们分别赋值为fd,这样一个thread就有了3个fd:一个epoll句柄,一条管道的两个fd,最后在pthread_create处真正地将它从core中“分离”了出去,进入了各自的循环:[code]
/* Worker thread initialize */
void * worker_init(void *arg)
{
    struct thread_t *worker = (struct thread_t *) arg;
    struct epoll_event ev, events[EPOLL_WAIT_CONNS];
    struct client_t *c;
    int nfds, i, pfd, task_id, fd_type, what;
    char task_type, sig_dispatch[5];

    ev.data.fd = worker->notify_recv_fd;
    ev.events = EPOLLIN;
    (void) epoll_ctl(worker->epoll_fd, EPOLL_CTL_ADD, worker->notify_recv_fd, &ev);

    // Loop
    while (1)
    {
        memset(events, 0, sizeof(struct epoll_event) * EPOLL_WAIT_CONNS);
        nfds = epoll_wait(worker->epoll_fd, events, EPOLL_WAIT_CONNS, -1);

        for (i = 0; i < EPOLL_WAIT_CONNS; i ++)
        {
            pfd = events.data.fd;
            what = events.events;
            if (!what & EPOLLIN || pfd < 0)
            {
                continue;
            }
            
            if (pfd != worker->notify_recv_fd)
            {
                // Worker task
                task_id = get_fd_info(pfd, &fd_type);
                if (fd_type != FD_TYPE_CLIENT)
                {
                    continue;
                }

                c = get_client(task_id);
                if (!c)
                {
                    continue;
                }

                if (what & (EPOLLHUP | EPOLLERR))
                {
                    // Something wrong?
                }

                else
                {
                    // Data here
                    socket_read(c);
                }
            }

            else
            {
                // New notify event
                while (5 == read(pfd, sig_dispatch, 5))
                {
                    task_type = sig_dispatch[0];
                    task_id = (int) get_u_int(sig_dispatch + 1);

                    if (task_type == THREAD_TYPE_WORKER)
                    {
                        // Socket data worker
                        //fprintf(stderr, "Client dispatch here %d %d\n", task_type, task_id);
                        c = get_client(task_id);
                        if (!c)
                        {
                            continue;
                        }
                        
                        (void) epoll_ctl(worker->epoll_fd, EPOLL_CTL_ADD, c->fd, &c->ev);
                    }

                    else if (task_type == THREAD_TYPE_REMOVE)
                    {
                        // Remove listen fd
                        //fprintf(stderr, "FD %d removed from epoll\n", task_id);
                        (void) epoll_ctl(worker->epoll_fd, EPOLL_CTL_DEL, task_id, NULL);
                    }
                }
            }
        }
    }

    return NULL;
}
[/code]和core的主循环类似,它也是一个无限循环,等待着自己的epoll检测出事件。不过它的fd集合是怎样产生的呢?回顾一下前面的dispatch_to_worker(),这个函数在accept成功之后触发,它首先找到“下一个”worker(简单的轮询算法,你也可以把它改造成更复杂的,比如检查每个worker的负载,以选择一个最闲的),然后对它的管道一端写入5个字节,第一个字节是操作类型,后面是客户端id,然后主循环就不再理会了。这个worker收到消息后(管道的另一端是被这个worker的epoll监听的),会从client_list中取出这个客户端,并将它的fd添加到自己的epoll中来,从此这个客户端就有了归宿,不再是游离的了,它的所有动作也会被这个worker捕获到。

当客户端有数据可以读取的时候,epoll_wait()就会返回这个客户端的fd,socket_read()函数会去读取所有数据,记住,一定是“所有”可读的数据:[code]
/* Read data from client */
void socket_read(struct client_t *c)
{
    int rlen;
    char *tmp_buf;
    struct server_t *s;
   
    if (!c)
    {
        return;
    }

    if (!c->read_buffer)
    {
        c->read_buffer = (char *) malloc(READ_BUFFER_BASE_SIZE);
        if (!c->read_buffer)
        {
            // Alloc error
            return;
        }

        c->read_buffer_size = READ_BUFFER_BASE_SIZE;
    }

    c->read_buffer_data_size = 0;
    while (1)
    {
        // Read data until EAGAIN
        rlen = recv(c->fd, c->read_buffer + c->read_buffer_data_size, c->read_buffer_size - c->read_buffer_data_size, 0);
        c->read_buffer_data_size += rlen;
        
        if (errno == EAGAIN || rlen < (c->read_buffer_size - c->read_buffer_data_size))
        {
            // All data here.
            break;
        }

        else if (rlen == 0)
        {
            // Client socket closed normaly
            break;
        }

        else
        {
            // Go on, enlarge buffer size
            tmp_buf = realloc(c->read_buffer, c->read_buffer_size * 2);
            if (!tmp_buf)
            {
                // So ... alloc error
                break;
            }

            memset(tmp_buf + c->read_buffer_size, 0, c->read_buffer_size);
            c->read_buffer = tmp_buf;
            c->read_buffer_size *= 2;
        }
    }

    s = get_server(c->srv_id);
   
    if (0 == c->read_buffer_data_size)
    {
        // Normally close
        client_free(c);
    }

    else
    {
        // Hook
        if (s && s->on_data)
        {
            s->on_data(c->id, (void *) c->read_buffer, c->read_buffer_data_size);
        }
    }
   
    return;
}
[/code]这个函数在成功找到了这个客户端对象之后,就会去折腾它的read_buffer,向里面灌数据。这个read_buffer也是一个可以自增长的结构(我们在写这种程序的时候可能经常会用这玩意)。在“死”循环中一直去read(),直到读到的字节数小于read()请求的字节数或者产生了EAGAIN错误,表示真正把fd的buffer读光了。如果read()的返回值是0,表示这个客户端主动关闭了,如果是小于0,则发生了错误。成功之后把这一大堆数据返回给客户端所注册的服务器的on_data回调函数(如果有的话……)。

奶瓶 发表于 2010-3-8 01:02 AM

[i=s] 本帖最后由 奶瓶 于 2010-3-8 11:08 PM 编辑 [/i]

client_free()函数用来处理客户端“下线”的善后工作:[code]
/* Return a client object to free list */
int client_free(struct client_t *c)
{
    struct client_t **tmp_list;
    struct server_t *s;
    int id = c->id;
    char *tmp_buf = c->read_buffer;;

    s = get_server(c->srv_id);
    if (s && s->on_close)
    {
        s->on_close(id);
    }

    remove_from_worker(c->worker_id, c->fd, 1);
   
    memset(c, 0, sizeof(struct client_t));
    c->id = id;
    c->read_buffer = tmp_buf;

    if (free_clients >= free_client_list_size)
    {
        // Enlarge free list
        tmp_list = realloc(free_client_list, 2 * free_client_list_size * sizeof(struct client_t *));
        if (!tmp_list)
        {
            // Enlarge failed
            free(c);
            return ERROR_MEMORY_ALLOC;
        }

        free_client_list = tmp_list;
    }

    free_client_list[free_clients] = c;
    free_clients ++;

    return RTN_SUCCESS;
}
[/code]这里体现了free_client_list的自增长特性。它会调用一个remove_from_worker函数,把已经下线的客户端fd从worker的epoll中拿掉:[code]
/* Remove a client from worker */
int remove_from_worker(int worker_id, int fd, int mode)
{   
    struct thread_t *t = get_worker(worker_id);
    if (!t)
    {
        return RTN_GENERAL;
    }
   
    static char signal[5];

    signal[0] = THREAD_TYPE_REMOVE;
    put_u_int((u_int32_t) fd, signal + 1);

    if (5 != write(t->notify_send_fd, signal, 5))
    {
        return RTN_GENERAL;
    }

    if (mode)
    {
        close(fd);
    }

    return RTN_SUCCESS;
}
[/code]和dispatch_to_worker()类似,这个函数也像worker的管道发送了5个字节,第一个字节是THREAD_TYPE_REMOVE,后面是要拿掉的fd,worker收到这个通知后,就调用epoll_ctl的EPOLL_CTL_DEL方法把这个fd注销了。

至此,服务器就可以正常工作了,但是它不会向客户端发送消息。在普通的服务器中,发送动作通常都是发生在接收数据之后的反馈,直接混合在逻辑中的send(),但是这样有一个问题——它不够“异步”,当服务器面向的是大量的慢速连接,或者是经常要发送大的数据包,这个过程会占用很长时间,所以这里我们使用一个特殊的“队列”来处理发送操作:[code]
struct send_item
{
    char *msg;
    int msglen;
    int *id_list;
    int ids;
};
[/code]这是一个发送命令产生的记录,它包括:消息、消息长度、接收者列表、接收者数目。这个记录会被push到一个队列中。这个队列有两个指针,一个头指针,一个尾指针。尾指针用来向里面添加记录,头指针用来取出记录,由一个单独的线程sender来维护它。core只要把要发送的东西扔进去,就不用再管了,sender会一直不停地去处理“累积”的发送任务。由于在很多时候,发送的数据都是有逻辑顺序的,所以我们使用的是队列,而不是栈。这个队列如果使用常规的内存块方式来实现,每次pop都会导致一次memmove,把后面的数据“移”向前面一格,这会产生巨大的开销,所以我们使用数组来模拟这个队列,它的头指针是可以移动的,在每次pop之后,只是移动了这个指针,而没有搬动数据。当然,当数据累积过多的时候,这个数组也是会自增长的:[code]
/* Insert a send item into queue */
int send_queue_push(char *msg, int msglen, int *id_list, int ids)
{
    struct send_item *tmp_list;
   
    pthread_mutex_lock(&queue_lock);

    if (send_queue[send_queue_tail].msg)
    {
        free(send_queue[send_queue_tail].msg);
    }

    if (send_queue[send_queue_tail].id_list)
    {
        free(send_queue[send_queue_tail].id_list);
    }
   
    send_queue[send_queue_tail].msg = msg;
    send_queue[send_queue_tail].msglen = msglen;
    send_queue[send_queue_tail].id_list = id_list;
    send_queue[send_queue_tail].ids = ids;

    // Move tail to next
    if (send_queue_tail + 1 >= send_queue_size)
    {
        send_queue_tail = 0;
    }

    else
    {
        send_queue_tail ++;
    }
   
    if (send_queue_tail == send_queue_head)
    {
        // Queue full, enlarge it
        tmp_list = realloc(send_queue, 2 * sizeof(struct send_item) * send_queue_size);
        if (!tmp_list)
        {
            return ERROR_MEMORY_ALLOC;
        }

        send_queue = tmp_list;

        // Move last items
        memmove(send_queue + sizeof(struct send_item) * (send_queue_head + send_queue_size), send_queue + sizeof(struct send_item) * send_queue_head, sizeof(struct send_item) * (send_queue_size - send_queue_head));
        send_queue_head += send_queue_size;

        send_queue_size *= 2;
    }
   
    pthread_mutex_unlock(&queue_lock);

    return RTN_SUCCESS;
}
[/code]这个“增长”过程,数学概念比较弱的同学还真得比划一阵了,呵呵~~,主要是因为它是“循环”的,也就是数组两头被“粘”在了一起,当指针移动到数组尾部时候,再移动一格就会跑到头部去。这种东西叫做“环队列”。

弹出的时候,头指针后移,当两个指针重合的时候,表示队列已经空了:[code]
/* Pop an item from send queue */
struct send_item * send_queue_pop()
{
    struct send_item *ret;
   
    pthread_mutex_lock(&queue_lock);
   
    if (send_queue_tail == send_queue_head)
    {
        ret = (struct send_item *) NULL;
    }

    else
    {
        ret = &send_queue[send_queue_head];
    }

    send_queue_head ++;

    if (send_queue_head >= send_queue_size)
    {
        send_queue_head = 0;
    }

    pthread_mutex_unlock(&queue_lock);

    return ret;
}
[/code]于是,发送命令就变成了简单地把消息放到“环”队列中去:[code]
/* Send (Broadcast) data to client(s) */
int socket_write(char *msg, int msglen, int *id_list, int ids)
{
    if (RTN_SUCCESS != send_queue_push(msg, msglen, id_list, ids))
    {
        return RTN_GENERAL;
    }

    // Send nofity signal to sender thread
    dispatch_to_sender();
   
    return RTN_SUCCESS;
}
[/code]为什么使用了一个id_list呢,这是为了方便“群发”。在一些应用中,例如聊天室、网络游戏,经常需要把一条消息发给多个客户端,按照普通的做法,这条消息就要存在多个副本,而这个函数可以一次把消息发给一大堆客户端。当然,这和网络级的“广播”不同。

这个函数调用了dispatch_to_sender,和前面的dispatch_to_worker类似,它也是给sender线程发送了一个信号,表示有新的数据需要发送,不过信号只有一个字节:[code]
/* Send signal here */
int dispatch_to_sender()
{
    static char signal[1] = "c";
    if (!sender)
    {
        return RTN_GENERAL;
    }

    write(sender->notify_send_fd, signal, 1);
   
    return RTN_SUCCESS;
}
[/code]我发送了一个字符“c”,其实发什么无所谓,它并没有什么意义,只是poke一下sender线程。这个sender线程和worker类似,也有自己的epoll,所不同的是它只监听自己的管道一端,当收到通知后就去检查环队列中是不是有东西,如果有东西就去发送,一直把队列清空:[code]
/* A send thread */
void * sender_init(void *arg)
{
    struct thread_t *worker = (struct thread_t *) arg;
    struct epoll_event ev, events[4];
    struct send_item *sd;
    struct client_t *c;
    int nfds, i, j, t, len, pfd, what;
    char sig_dispatch[1];

    ev.data.fd = worker->notify_recv_fd;
    ev.events = EPOLLIN;
    (void) epoll_ctl(worker->epoll_fd, EPOLL_CTL_ADD, worker->notify_recv_fd, &ev);

    while (1)
    {
        memset(events, 0, sizeof(struct epoll_event) * 4);
        nfds = epoll_wait(worker->epoll_fd, events, 4, -1);

        for (i = 0; i < 4; i ++)
        {
            pfd = events.data.fd;
            what = events.events;
            if (!what & EPOLLIN || pfd < 0)
            {
                continue;
            }

            if (pfd == worker->notify_recv_fd)
            {
                // New notify
                while (1 == read(pfd, sig_dispatch, 1))
                {
                    sd = send_queue_pop();

                    // Send data
                    for (j = 0; j < sd->ids; j ++)
                    {
                        c = get_client(sd->id_list[j]);

                        if (c)
                        {
                            switch (c->conn_type)
                            {
                                case CONN_TYPE_TCP :
                                case CONN_TYPE_UDP :
                                case CONN_TYPE_UNIX :
                                    // Maybe unsafe for UDP
                                    len = send(c->fd, sd->msg, sd->msglen, 0);
                                    if (len == -1)
                                    {
                                        break;
                                    }

                                    if (len < sd->msglen)
                                    {
                                        while (len >= sd->msglen)
                                        {
                                            t = send(c->fd, sd->msg + len, sd->msglen - len, 0);

                                            if (t == -1)
                                            {
                                                break;
                                            }

                                            len += t;
                                        }
                                    }

                                    break;

                                default :
                                    // Unknown type
                                    break;
                            }
                        }
                    }
                }
            }
        }
    }
   
    return NULL;
}
[/code]注意里面的发送过程,由于我们使用了非阻塞的fd,send()函数仅仅是将数据复制到fd的buffer中就直接返回了,而有的时候数据长度比buffer的空闲空间大,所以我们不能直接就这样走掉,一定要等协议把buffer里的东西发送结束,再把剩余部分继续往buffer里写,一直等到全部写完。这里有一点要注意,就是在UDP方式下,我们可能需要使用setsockopt来给fd分配一个“足够”大的buffer,来保证可以一次性把数据都放进去,否则在另一端,无法保证已经支离破碎的数据会按照怎样的方式组合,是不是会有零件丢掉,因为UDP根本就不保证这些。

奶瓶 发表于 2010-3-8 01:06 AM

[i=s] 本帖最后由 奶瓶 于 2010-3-8 11:15 PM 编辑 [/i]

整个的服务器模型,我们已经有了,有一个部分还没有提及,就是内存池。内存池是个什么东西呢?它其实就是一些预先分配好的整块内存的集合。内存池是有效降低内存碎片率的措施,它预留了一些整块的内存空间,当应用程序需要的时候,则从池上去申请,内存池返回的是一个“整”块的内存空间地址,应用程序可以像普通alloc来的空间一样使用它。当用完之后,可以把它归还给池,这样就避免了频繁地malloc / free。内存池有很多实现方式,最常见的是多级链表,APR和boost中都有很好的内存池实现,corelib中的是一个简单原始但是有效的实现,可以给大家做个参考。

这里的内存池就是典型的二维链表,首先看看它的数据定义:[code]
struct memory_pool
{
    int pool_link_size;
    struct memory_pool_slot **pool_link;
};

struct memory_pool_slot
{
    int slot_id;
    void *block;
    int slot_size;
    struct memory_pool_item *item;
    struct memory_pool_slot *next;
};

struct memory_pool_item
{
    void *item_addr;
    char in_use;
};
[/code]memory_pool是一个池的“根”,它有一个成员,是第一级链表(其实是数组)的地址,每一个节点是一个memory_pool_slot的链表。每一个memory_pool_slot上有一个指向某块block的指针,block的大小默认是1MB,另一个指针指向memory_pool_item,这是每次分配的最小单位,也就是把block拆成了大小不等的“块”。

看看这个神奇的东西的实现:[code]
/* Memory pool initialization */
struct memory_pool * pool_init()
{
    struct memory_pool *tmp = (struct memory_pool *) malloc(sizeof(struct memory_pool));
    if (!tmp)
    {
        return (struct memory_pool *) NULL;
    }

    tmp->pool_link = (struct memory_pool_slot **) malloc(POOL_LINK_BASE_SIZE * sizeof(struct memory_pool_slot *));
    if (!tmp->pool_link)
    {
        free(tmp);
        return (struct memory_pool *) NULL;
    }

    memset(tmp->pool_link, 0, POOL_LINK_BASE_SIZE * sizeof(struct memory_pool_slot *));
    tmp->pool_link_size = POOL_LINK_BASE_SIZE;
   
    return tmp;
}

/* Clear a pool */
int pool_destroy(struct memory_pool *p)
{
    if (!p)
    {
        return RTN_GENERAL;
    }
   
    // Clean every slot here
    int i;
    struct memory_pool_slot *tmp, *prev;

    for (i = 0; i < p->pool_link_size; i ++)
    {
        tmp = p->pool_link;

        while (tmp)
        {
            if (tmp->item)
            {
                free(tmp->item);
            }

            if (tmp->block)
            {
                free(tmp->block);
            }

            prev = tmp;
            tmp = tmp->next;

            free(prev);
        }
    }

    free(p->pool_link);
    free(p);
   
    return RTN_SUCCESS;
}
[/code]这是内存池的创建和销毁过程,很简单的链表初始化和free(),所要注意的是销毁的时候要把里面的所有东西都free()掉,才不会产生内存泄漏。这段代码里还有一个预设的数组,这是另外一个神奇的东西:[code]
int item_size_freq[20] = {
    16, 16, 16, 16, 16,
    16, 16, 16, 1024, 1024,
    4096, 4096, 4096, 4096, 4096,
    2048, 1024, 256, 128, 64
};
[/code]看过squid源代码的同学可能会有一点印象,这是预分配的设置,分表表示1字节、2字节、4字节、8字节……512KB字节的块,各初始化多少个,一共20个等级,对应2^0到2^19。其中1到128字节的块不会被分配,最小单位就是256字节,所以前8个都是16,其实是没有意义的。这是什么意思呢?这表示每次即使申请1个字节,也会返回一个实际大小是256字节的块,因为把一个1MB的块拆城1048576个部分实在是一件很无聊的事情>_<。这些数值是经验性的,表示不同大小的内存块的使用频率,比如我们认为1KB、2KB、4KB等这些大小比较“常用”,所以我们就多预分配一些。在这个内存不值钱的年代,用这种方式去换取性能是值得的。

看看alloc和free的实现:[code]
/* Alloc space from pool */
void * pool_alloc(struct memory_pool *p, int size)
{
    if (!p)
    {
        return (void *) NULL;
    }
   
    int slot_id = ceil(log2(size + sizeof(int) * 2));
    void *tmp;

    if (slot_id < POOL_LINK_MIN_SIZE)
    {
        // The minimal item size is 256 byte
        slot_id = POOL_LINK_MIN_SIZE;
    }

    if (slot_id > p->pool_link_size)
    {
        // Enlarge link
        tmp = (struct memory_pool_slot **) realloc(p->pool_link, slot_id * sizeof(int));
        if (!tmp)
        {
            return (void *) NULL;
        }

        memset(tmp + p->pool_link_size * sizeof(struct memory_pool_slot *), 0, (slot_id - p->pool_link_size) * sizeof(struct memory_pool_slot *));
        p->pool_link = tmp;
        p->pool_link_size = slot_id;
    }

    if (!p->pool_link[slot_id])
    {
        p->pool_link[slot_id] = new_slot(slot_id, 0);
    }

    tmp = get_item(p->pool_link[slot_id]);

    return tmp;
}

/* Return space to pool */
int pool_free(struct memory_pool *p, void *s)
{
    if (!p)
    {
        return RTN_GENERAL;
    }
   
    struct memory_pool_slot *tmp;
    int slot_id = *(int *) (s - sizeof(int) * 2);
    int item_id = *(int *) (s - sizeof(int));
    int slot_size = slot_id < 20 ? item_size_freq[slot_id] : 32;
    int ct = (item_id / slot_size);
    int item_tid = item_id % slot_size;
    int i;

    if (!p->pool_link[slot_id])
    {
        return RTN_GENERAL;
    }

    tmp = p->pool_link[slot_id];
   
    for (i = 0; i < ct; i ++)
    {
        if (!tmp->next)
        {
            return RTN_GENERAL;
        }

        tmp = tmp->next;
    }

    if (item_tid >= tmp->slot_size)
    {
        return RTN_GENERAL;
    }

    tmp->item[item_tid].in_use = 0;
   
    return RTN_SUCCESS;
}
[/code]这里涉及了一些数学计算,主要是为了获知申请的字节数要怎样去寻找从哪里获取内存块。开始的时候,一级链表上一共有20个节点,每个节点表示的是不同大小的单元集合(前8个是空的)。这样根据一个既定的算法,当申请一定大小的空间时,程序可以立刻知道这块空间应该从哪里找。需要注意的是,每个内存块的前8个字节是保留的,用来保存这个内存块所属的链表位置,用于空间回收使用。所以当申请1000字节空间时,会得到一块1024字节的内存,实际可用的只有1016字节,而当申请1024字节时,会返回一块2048字节的内存,实际可用为2040字节。

当我们申请的空间大于512KB(2^19)的时候,一级链表会增长,直到适应了这个数值,而临时生成一块足够大的block,挂在上面。所以这个内存池和memcached区别就是memcached的每个节点限制为1MB,这个内存池申请500MB也可以正常工作(如果有的话)。

内存池是没有实际大小的,它也是会自增的,不过它不是“翻倍”,而是每次在链表上挂上一截新的。看看它是怎么增长的:[code]
/* Create a new slot */
struct memory_pool_slot *new_slot(int slot_id, int ct)
{
    struct memory_pool_slot *rtn = malloc(sizeof(struct memory_pool_slot));
    int slot_size = slot_id < 20 ? item_size_freq[slot_id] : 32;
    int item_size = pow(2, slot_id);
    int block_size = slot_size * item_size;
    int i, item_id;

    if (rtn)
    {
        rtn->item = malloc(sizeof(struct memory_pool_item) * slot_size);

        if (!rtn->item)
        {
            free(rtn);
            rtn = (struct memory_pool_slot *) NULL;
        }

        else
        {
            memset(rtn->item, 0, sizeof(struct memory_pool_item) * slot_size);

            rtn->block = (void *) malloc(block_size);

            if (rtn->block)
            {
                rtn->slot_id = slot_id;
                rtn->slot_size = slot_size;
                rtn->next = (struct memory_pool_slot *) NULL;

                for (i = 0; i < slot_size; i ++)
                {
                    item_id = i + ct * slot_size;
                    rtn->item.item_addr = rtn->block + i * item_size + sizeof(int) * 2;
                    memcpy(rtn->block + i * item_size, &slot_id, sizeof(int));
                    memcpy(rtn->block + i * item_size + sizeof(int), &item_id, sizeof(int));
                }
            }

            else
            {
                free(rtn->item);
                free(rtn);
                rtn = (struct memory_pool_slot *) NULL;
            }
        }
    }

    return rtn;
}

/* Get an usable item from slot */
void * get_item(struct memory_pool_slot *h)
{
    struct memory_pool_slot *th = h;
    int i, ct = 0;
    int slot_size = th->slot_id < 20 ? item_size_freq[th->slot_id] : 32;

    if (!th)
    {
        return (void *) NULL;
    }
   
    while (1)
    {
        ct ++;
        for (i = 0; i < slot_size; i ++)
        {
            if (!th->item.item_addr)
            {
                // Empty item??? where is my space?
                return (void *) NULL;
            }

            else if (!th->item.in_use)
            {
                // Unused item
                th->item.in_use = 1;
                //fprintf(stderr, "%d\t%d\n", th->slot_id, i);

                return th->item.item_addr;
            }
        }

        if (th->next)
        {
            th = th->next;
        }

        else
        {
            break;
        }
    }

    // No space? a new slot
    th->next = new_slot(th->slot_id, ct);

    if (th->next)
    {
        th = th->next;

        th->item[0].in_use = 1;

        return th->item[0].item_addr;
    }
   
    return (void *) NULL;
}
[/code]这段内存池的实现代码是我很久以前写的,并不是和corelib一起开发的。使用内存池来做什么呢?是为了保存使用频繁的send buffer。在上面的代码中我没有使用,因为它应该被显式地用在逻辑代码中,而不是core中。

一个完整的服务器还应该包括很多部分,比如心跳检查。心跳是保证确知连接是否有效的唯一手段,因为TCP / UDP本身的属性,除非主动断开连接,服务器会收到一个0字节的读请求,否则不会立刻得到这个客户端的“下线”通知的(例如网络突然断了,客户端死机了)。“心跳”顾名思义是一个有固定频率的动作,客户端在连接成功后按照一个固定的时间间隔对服务器发送一条特殊消息,服务器收到之后立刻反馈。如果服务器在一端时间内存没有收到客户端的心跳,就可以认为这个客户端离线了,同样的,客户端在一端时间内没有收到服务器的心跳反馈,也可以认为和服务器的连接断开了。所以说,心跳是双向的。

关于心跳的实现,可以参考之前版本的comoro,我在目前的版本中只会在corelib中添加timer事件回调函数,而不会加入心跳检查功能。因为corelib和逻辑是分离的,它不一定用来建立需要保持稳定的长连接,如果只是用来做webserver,是不需要心跳的。

奶瓶 发表于 2010-3-8 01:08 AM

最后,提供一个使用corelib的完整实例,说来说去还是个echo server,只不过它会把消息“广播”给所有人,可以认为是聊天室吧:
[code]
#include "core/core.h"

#include <stdio.h>
#include <stdlib.h>

void t_conn(int);
void t_close(int);
void t_data(int, void *, size_t);
int nworkers = 4;

int *id_list;
int ids = 0;

int main(int argc, char **argv)
{
    struct core_t *c = core_init(nworkers);
    struct server_t *s = new_server("0.0.0.0", 34567, 0, CONN_TYPE_TCP, EPOLL_MODE_ET);

    if (!s)
    {
        fprintf(stderr, "失败了哟~~\n");
    }

    s->on_conn = t_conn;
    s->on_close = t_close;
    s->on_data = t_data;
   
    c->add_server(s);

    // 1024 Max
    id_list = (int *) malloc(sizeof(int) * 1024);
   
    fprintf(stderr, "老子是服务器,端口34567,工作线程数%d,监听模式TCP...\n", nworkers);
    c->loop();
   
    return 0;
}

void t_conn(int client_id)
{
    fprintf(stderr, "%d号客户端上线了\n", client_id);

    if (ids >= 1024)
    {
        return;
    }
   
    id_list[ids] = client_id;

    ids ++;

    return;
}

void t_close(int client_id)
{
    int i;
   
    fprintf(stderr, "%d号客户端离线了\n", client_id);

    for (i = 0; i < ids; i ++)
    {
        if (id_list[i] == client_id)
        {
            // Remove it
            if (i < ids - 1)
            {
                memmove(id_list + sizeof(int) * i, id_list + sizeof(int) * (i + 1), sizeof(int) * (ids - i -1));
            }

            ids --;
        }
    }

    return;
}

void t_data(int client_id, void *msg, size_t len)
{
    char *o = strndup((const char *) msg, len);
    int *tmp_list = (int *) malloc(sizeof(int) * ids);
    memcpy(tmp_list, id_list, sizeof(int) * ids);

    // Broadcast
    socket_write(o, len, tmp_list, ids);

    return;
}
[/code]
这是一段非常无聊的程序,但是也很完整。它初始化了core,使用了4个worker线程,创建了一个TCP服务器,监听于所有本地IP的34567端口,使用EPOLLET,设置了三个回调函数t_data、t_conn和t_close,并“假装”不会有超过1024个人来一起“聊”(充分表现了我的无聊)。

这个程序运行起来看起来是这个样子:
[attach]5886[/attach]
服务器

[attach]5887[/attach]
客户端1

[attach]5888[/attach]
客户端2

服务器成了一个“长舌妇”,它把一个客户端发来的消息传给了所有人,不过好处是它没有“添油加醋”。

至此,一个完整的服务器就有了,之后我会整理一个有实际用途的程序放出来,也许是一个什么特殊服务,也许是一个webserver,大家可以测试看看它的实际响应性能,是不是apache可比的,嘿嘿~~

马甲 发表于 2010-3-8 09:12 AM

占个好位子~
HelloHello

isno 发表于 2010-3-8 11:02 AM

牛人啊,膜拜一下

奶瓶 发表于 2010-3-8 11:05 AM

原文地址:[url]http://54np.net/archives/56[/url]
欢迎剽窃

雪舞刀凌 发表于 2010-3-8 11:21 AM

这下子归我

isno 发表于 2010-3-8 11:21 AM

太长了,建议划分为几个小章节

无喱头 发表于 2010-3-8 12:07 PM

强力插入,,,顺便广告位招租。

menses 发表于 2010-3-8 12:20 PM

先顶,过会慢慢看

水清无鱼 发表于 2010-3-8 01:17 PM

非黄金地段了,不过怎么着也要跨入5万/月的广告行市了吧。

奶瓶 发表于 2010-3-8 01:25 PM

我发现我写了好多错别字,大家凑合看吧~

zw870307 发表于 2010-3-8 01:34 PM

我插。广告位招租!!!

页: [1] 2 3

Powered by Discuz! Archiver 7.0.0  © 2001-2009 Comsenz Inc.