完善资料让更多小伙伴认识你,还能领取20积分哦, 立即完善>
开发环境: 主机:Ubuntu 18.04 开发板:OK3568-C开发板 并发服务器支持多个客户端的同时连接,最大可接入的客户端数取决于内核控制块的个数。当使用Socket API时,要使服务器能够同时支持多个客户端的连接,必须引入多任务机制,为每个连接创建一个单独的任务来处理连接上的数据,多任务可以是多线程或者多进程,这是最常用的并发服务器设计。但是多线程/多进程消耗资源多,处理起来也比较复杂,本文将基于Select/Poll机制实现并发服务器。 在具体讲解基于Select/Poll机制实现并发服务器之前,我们需要了解IO的相关概念,所谓IO就是,就是数据的读写,一般分为网络IO(本质就是socket读写)和磁盘IO。 IO模型大致可以分为:同步阻塞、同步非阻塞、异步、信号驱动。 可细分为5种I/O模型: 1)阻塞I/O,进程处于阻塞模式时,让出CPU,进入休眠状态; 2)非阻塞I/O,非阻塞模式的使用并不普遍,因为非阻塞模式会浪费大量的CPU资源; 3)I/O复用(select和poll),针对批量IP操作时,使用I/O多路复用,非常有好; 4)异步I/O(POSIX的aio_系列函数) 5)信号驱动I/O(SIGIO) 一个输入操作通常包括两个不同的阶段: 1)等待数据准备好; 2)从内核向进程复制数据; 对于一个套接字的输入操作,第一步通常涉及等待数据从网络中到达。当所等待分组到达时,它被复制到内核中某个缓冲区。第二步就是把数据从内核缓冲区复制到应用进程缓冲区。 4.1.1阻塞I/O阻塞 I/O 模式是最普遍使用的 I/O 模式。一个套接字建立后所处于的模式就是阻塞 I/O 模式。(因为Linux系统默认的IO模式是阻塞模式)。对于一个 UDP 套接字来说,数据就绪的标志比较简单: (1)已经收到了一整个数据报 (2)没有收到。 而 TCP 这个概念就比较复杂,需要附加一些其他的变量。 最流行的I/O模型是阻塞式I/O(blocking I/O)模型,默认情况下,所有的套接字都是阻塞的。阻塞调用是指调用结果返回之前,当前线程会被挂起(线程进入非可执行状态,在这个状态下,CPU不会给线程分配时间片,即线程暂停运行)。函数只有在得到结果之后才会返回。以数据包套接字为例,如图。 进程调用recvfrom,其系统调用直到数据报到达且被拷贝到应用进程的缓冲区或者发生错误才返回。最常见的错误是系统调用被信号中断。我们说进程从调用recvfrom开始到它返回的整段时间内是被阻塞的,recvfrom成功返回后,进程开始处理数据报。 4.1.2非阻塞I/O进程把一个套接口设置成非阻塞是在通知内核:当所请求的I/O操作非得把本进程投入睡眠才能完成时,不要把本进程投入睡眠,而是返回一个错误。 前三次调用recvfrom 时没有数据可返回,因此内核转而立即返回一个EWOULDBLOCK 错误。第四次调用 recvfrom 时已有一个数据报准备好,它被复制到应用程序缓冲区,于是recvfrom 成功返回。我们接着处理数据。 当一个应用程序使用了非阻塞模式的套接字,它需要使用一个循环来不听的测试是否一个文件描述符有数据可读(称做 polling(轮询))。应用程序不停的 polling 内核来检查是否 I/O操作已经就绪。这将是一个极浪费 CPU资源的操作。这种模式使用中不是很普遍。 非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回。 在使用 I/O 多路威廉希尔官方网站
的时候,我们调用 select()函数和 poll()函数,在调用它们的时候阻塞,而不是我们来调用 recvfrom(或recv)的时候阻塞。主要可以调用select和poll;对一个IO端口,两次调用,两次返回,比阻塞IO并没有什么优越性;关键是能实现同时对多个IO端口进行监听,可以等待多个描述符就绪。 I/O复用模型会用到select、poll,这几个函数也会使进程阻塞,但是和阻塞I/O所不同的的,这两个函数可以同时阻塞多个I/O操作。而且可以同时对多个读操作,多个写操作的I/O函数进行检测,直到有数据可读或可写时,才真正调用I/O操作函数。 当我们调用 select函数阻塞的时候,select 函数等待数据报套接字进入读就绪状态。当select函数返回的时候, 也就是套接字可以读取数据的时候。 这时候我们就可以调用 recvfrom函数来将数据拷贝到我们的程序缓冲区中。 对于单个I/O操作,和阻塞模式相比较,select()和poll()并没有什么高级的地方。而且,在阻塞模式下只需要调用一个函数:读取或发送函数。在使用了多路复用威廉希尔官方网站
后,我们需要调用两个函数了:先调用 select()函数或poll()函数,然后才能进行真正的读写。 多路复用的高级之处在于:它能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回。 IO 多路威廉希尔官方网站
一般在下面这些情况中被使用: l 当一个客户端需要同时处理多个文件描述符的输入输出操作的时候(一般来说是标准的输入输出和网络套接字),I/O 多路复用威廉希尔官方网站
将会有机会得到使用。 l 当程序需要同时进行多个套接字的操作的时候。 l 如果一个 TCP 服务器程序同时处理正在侦听网络连接的套接字和已经连接好的套接字。 l 如果一个服务器程序同时使用 TCP 和 UDP 协议。 l 如果一个服务器同时使用多种服务并且每种服务可能使用不同的协议(比如 inetd就是这样的)。 异步I/O(asynchronous I/O)有POSIX规范定义。后来演变成当前POSIX规范的各种早期标准定义的实时函数中存在的差异已经取得一致。一般地说,这些函数的工作机制是:告知内核启动某个操作,并让内核在整个操作(包括将数据从内核拷贝到我们自己的缓冲区)完成后通知我们。这种模型与前与前面介绍的信号驱动模型的主要区别在于:信号驱动I/O是由内核通知我们何时可以启动一个I/O操作,而异步I/O模型是由内核通知我们I/O操作何时完成。 我们也可以用信号,让内核在描述字就绪时发送SIGIO信号通知我们。我们称这种模型为信号驱动I/O(signal-driven I/O)。 我们首先开启套接口的信号驱动I/O功能,并通过sigaction系统调用安装一个信号处理函数。该系统调用立即发回,我们的进程继续工作,也就是说它没有被阻塞。当数据报准备好时,内核就为该进程产生一个SIGIO信号。我们随后既可以在信号处理函数中调用recvfrom读取数据报,并通知主循环数据已经准备好待处理,也可以立即通知主循环,让它读取数据报。 无论如何处理SIGIO信号,这种模型的优势在于等待数据报到达期间,进程不被阻塞。主循环可以继续执行,只要不时等待来自信号处理函数的通知:既可以是数据已经准备好被处理,也可以是数据报已准备好被读取。 各种模型的比较如下图所示,可以看出,前4种模型的主要区别在于第一阶段,因为它们的第二阶段是一样的:在数据从内核复制到调用者的缓冲区起见,进程阻塞与recvfrom 调用,相反。异步I/O模型在这两个阶段都需要处理,从而不同于其他四种模型。 l 同步I/O与异步I/O对比 POSIX把这两个术语定义如下: ·同步I/O操作(synchronous I/Ooperation)导致请求进程阻塞,直到I/O操作完成。 ·异步I/O(asynchronous I/Ooperation)不导致请求进程阻塞。 根据上述定义,我们前4种模型----阻塞I/O模型、非阻塞I/O模型、I/O复用模型和信号去驱动I/O模型都是同步I/O模型,因为其中真正的I/O操作(recvfrom)将阻塞进程。只有异步I/O模型与POSIX定义的异步I/O相匹配。 本文的要将的I/O复用,本质就是select/poll机制。因此,其他IO有兴趣可以去了解。 Select/Poll则是POSIX所规定,一般操作系统或协议栈均有实现。 值得注意的是,poll和select都是基于内核函数sys_poll实现的,不同在于在Linux系统中select是从BSDUnix系统继承而来,poll则是从SystemV Unix系统继承而来,因此两种方式相差不大。poll函数没有最大文件描述符数量的限制。poll和 select与一样,大量文件描述符的数组被整体复制于用户和内核的地址空间之间,开销随着文件描述符数量的增加而线性增大。 在BSD Socket 中,select函数原型如下: int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds,struct timeval *timeout); 【参数说明】 l nfds:select监视的文件句柄数,一般设为要监视各文件中的最大文件描述符值加1。 l readfds:文件描述符集合监视文件集中的任何文件是否有数据可读,当select函数返回的时候,readfds将清除其中不可读的文件描述符,只留下可读的文件描述符。 l writefds:文件描述符集合监视文件集中的任何文件是否有数据可写,当select函数返回的时候,writefds将清除其中不可写的文件描述符,只留下可写的文件描述符。 l exceptfds:文件集将监视文件集中的任何文件是否发生错误,可用于其他的用途,例如,监视带外数据OOB,带外数据使用MSG_OOB标志发送到套接字上。当select函数返回的时候,exceptfds将清除其中的其他文件描述符,只留下标记有OOB数据的文件描述符。 l timeout 参数是一个指向 struct timeval 类型的指针,它可以使 select()在等待 timeout 时间后若没有文件描述符准备好则返回。其timeval结构用于指定这段时间的秒数和微秒数。它可以使select处于三种状态: (1) 若将NULL以形参传入,即不传入时间结构,就是将select置于阻塞状态,一定等到监视文件描述符集合中某个文件描述符发生变化为止; (2) 若将时间值设为0秒0毫秒,就变成一个纯粹的非阻塞函数,不管文件描述符是否有变化,都立刻返回继续执行,文件无变化返回0,有变化返回一个正值; (3) timeout的值大于0,这就是等待的超时时间,即select在timeout时间内阻塞,超时时间之内有事件到来就返回了,否则在超时后不管怎样一定返回,返回值同上述。 timeval 结构体定义 struct timeval { int tv_sec;/* 秒 */ int tv_usec;/* 微妙 */ }; 【返回值】 l int:若有就绪描述符返回其数目,若超时则为0,若出错则为-1 下列操作用来设置、清除、判断文件描述符集合。 FD_ZERO(fd_set*set);//清除一个文件描述符集。 FD_SET(intfd,fd_set *set);//将一个文件描述符加入文件描述符集中。 FD_CLR(intfd,fd_set *set);//将一个文件描述符从文件描述符集中清除。 FD_ISSET(intfd,fd_set *set);//判断文件描述符是否被置位 fd_set可以理解为一个集合,这个集合中存放的是文件描述符(file descriptor),即文件句柄。中间的三个参数指定我们要让内核测试读、写和异常条件的文件描述符集合。如果对某一个的条件不感兴趣,就可以把它设为空指针。 select()的机制中提供一种fd_set的数据结构,实际上是一个long类型的数组,每一个数组元素都能与打开的文件句柄(不管是Socket句柄,还是其他文件或命名管道或设备句柄)建立联系,建立联系的工作由程序员完成,当调用select()时,由内核根据IO状态修改fd_set的内容,由此来通知执行了select()的进程哪一Socket或文件可读。 poll的函数原型: int poll(struct pollfd *fds, nfds_t nfds, int timeout); 【参数说明】 l fds:fds是一个struct pollfd类型的数组,用于存放需要检测其状态的socket描述符,并且调用poll函数之后fds数组不会被清空;一个pollfd结构体表示一个被监视的文件描述符,通过传递fds指示 poll() 监视多个文件描述符。 struct pollfd原型如下: typedef struct pollfd { int fd; // 需要被检测或选择的文件描述符 short events; // 对文件描述符fd上感兴趣的事件 short revents; // 文件描述符fd上当前实际发生的事件 } pollfd_t; 其中,结构体的events域是监视该文件描述符的事件掩码,由用户来设置这个域,结构体的revents域是文件描述符的操作结果事件掩码,内核在调用返回时设置这个域。 l nfds:记录数组fds中描述符的总数量。 l timeout:指定等待的毫秒数,无论 I/O 是否准备好,poll() 都会返回,和select函数是类似的。 【返回值】 l int:函数返回fds集合中就绪的读、写,或出错的描述符数量,返回0表示超时,返回-1表示出错; poll改变了文件描述符集合的描述方式,使用了pollfd结构而不是select的fd_set结构,使得poll支持的文件描述符集合限制远大于select的1024。这也是和select不同的地方。 接下来将使用select/poll来实现并发服务器。这里以select为例。 select并发服务器模型: socket(...); // 创建套接字 bind(...); // 绑定 listen(...); // 监听 while(1) { if(select(...) > 0) // 检测监听套接字是否可读 { if(FD_ISSET(...)>0) // 套接字可读,证明有新客户端连接服务器 { accpet(...);// 取出已经完成的连接 process(...);// 处理请求,反馈结果 } } close(...); // 关闭连接套接字:accept()返回的套接字 } 因此,基于select实现的并发服务器模型如下: 从流程上来看,使用select函数进行IO请求和同步阻塞模型没有太大的区别,甚至还多了添加监视socket,以及调用select函数的额外操作,效率更差。但是,使用select以后最大的优势是用户可以在一个线程内同时处理多个socket的IO请求。用户可以注册多个socket,然后不断地调用select读取被激活的socket,即可达到在同一个线程内同时处理多个IO请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。 Server: /** ****************************************************************************** * @file server.c * @author BruceOu * @version V1.0 * @date 2021-06-04 * @blog https://blog.bruceou.cn/ * @Official Accounts 嵌入式实验楼 * @Brief 基于select的服务器 ****************************************************************************** */ #include #include #include #include #include #include #include #include #include #include #define SERVER_PORT 8888 #define BUFF_SIZE 1024 static char recvbuff[BUFF_SIZE]; /** * @brief mian * @param None * @retval int */ int main(int argc,char *argv[]) { int sfd, cfd, maxfd, i, nready, n; char str[INET_ADDRSTRLEN]; struct sockaddr_in server_addr,client_addr; char sendbuff[ ] = "Hello client!"; socklen_t client_addr_len; fd_set all_set, read_set; //FD_SETSIZE里面包含了服务器的fd int clientfds[FD_SETSIZE - 1]; //创建socket if ((sfd = socket(AF_INET, SOCK_STREAM, 0))< 0) { printf("Socket createfailed.n"); } server_addr.sin_family = AF_INET; server_addr.sin_port = htons(SERVER_PORT); server_addr.sin_addr.s_addr =htonl(INADDR_ANY); //绑定socket if (bind(sfd, (struct sockaddr*)&server_addr, sizeof(struct sockaddr)) < 0) { printf("socket bindfailed.n"); } printf("socket bind network interfacesuccess!n"); //监听socket if(listen(sfd, 5) == -1) { printf("listen error"); } else { printf("listening...n"); } client_addr_len = sizeof(client_addr); //初始化 maxfd 等于 sfd maxfd = sfd; //清空fdset FD_ZERO(&all_set); //把sfd文件描述符添加到集合中 FD_SET(sfd, &all_set); //初始化客户端fd的集合 for(i = 0; i < FD_SETSIZE -1 ; i++) { //初始化为-1 clientfds = -1; } while(1) { //每次select返回之后,fd_set集合就会变化,再select时,就不能使用, //所以我们要保存设置fd_set 和 读取的fd_set read_set = all_set; nready = select(maxfd + 1,&read_set, NULL, NULL, NULL); //没有超时机制,不会返回0 if(nready < 0) { printf("select errorrn"); } //判断监听的套接字是否有数据 if(FD_ISSET(sfd, &read_set)) { //有客户端进行连接了 cfd = accept(sfd, (struct sockaddr*)&client_addr, &client_addr_len); if(cfd < 0) { printf("accept socketerrorrn"); //继续select continue; } printf("new client connect fd= %drn", cfd); //把新的cfd 添加到fd_set集合中 FD_SET(cfd, &all_set); //更新要select的maxfd maxfd = (cfd > maxfd)?cfd:maxfd; //把新的cfd 保存到cfds集合中 for(i = 0; i < FD_SETSIZE -1 ; i++) { if(clientfds == -1) { clientfds = cfd; //退出,不需要添加 break; } } //没有其他套接字需要处理:这里防止重复工作,就不去执行其他任务 if(--nready == 0) { //继续select continue; } } //遍历所有的客户端文件描述符 for(i = 0; i < FD_SETSIZE -1 ; i++) { if(clientfds == -1) { //继续遍历 continue; } //判断是否在fd_set集合里面 if(FD_ISSET(clientfds,&read_set)) { n = recv(clientfds,recvbuff, sizeof(recvbuff), 0); printf("Client from %s atPort %d, ", inet_ntop(AF_INET,&client_addr.sin_addr, str, sizeof(str)), ntohs(client_addr.sin_port)); printf("Clientfd %d: %s rn",clientfds, recvbuff); if(n <= 0) { //从集合里面清除 FD_CLR(clientfds,&all_set); //当前的客户端fd 赋值为-1 clientfds = -1; } else { //写回客户端 n = send(clientfds,sendbuff, strlen(sendbuff), 0); if(n < 0) { //从集合里面清除 FD_CLR(clientfds,&all_set); //当前的客户端fd 赋值为-1 clientfds = -1; } } } } } } Client: /** ****************************************************************************** * @file client.c * @author BruceOu * @version V1.0 * @date 2021-06-04 * @blog https://blog.bruceou.cn/ * @Official Accounts 嵌入式实验楼 * @brief client ****************************************************************************** */ #include #include #include #include #include #include #include #include #include #include #define SERVPORT 8888 /** * @brief mian * @param None * @retval int */ int main(int argc,char *argv[]) { char sendbuf[ ] = {"Hello server!"}; char recvbuf[1024]; int sockfd,sendbytes; struct sockaddr_in serv_addr;//需要连接的服务器地址信息 if (argc != 2) { perror("init error"); } //1.创建socket //AF_INET 表示IPV4 //SOCK_STREAM 表示TCP if((sockfd = socket(AF_INET,SOCK_STREAM,0))< 0) { perror("socket"); exit(1); } //填充服务器地址信息 serv_addr.sin_family = AF_INET; //网络层的IP协议: IPV4 serv_addr.sin_port = htons(SERVPORT); //传输层的端口号 serv_addr.sin_addr.s_addr = inet_addr(argv[1]); //网络层的IP地址: 实际的服务器IP地址 bzero(&(serv_addr.sin_zero),8); //保留的8字节置零 //2.发起对服务器的连接信息 //三次握手,需要将sockaddr_in类型的数据结构强制转换为sockaddr if((connect(sockfd,(struct sockaddr*)&serv_addr,sizeof(struct sockaddr))) < 0) { perror("connect failed!"); exit(1); } printf("connect successful! n"); //3.发送消息给服务器端 while (1) { send(sockfd, sendbuf, strlen(sendbuf),0); recv(sockfd, recvbuf, sizeof(recvbuf),0); printf("Server : %s n",recvbuf); sleep(2); } //4.关闭 close(sockfd); } 接下来就是验证了,现在OK3568上开启服务器: Server: 然后开启客户端,笔者的客户端在Ubuntu上运行的: Client: 笔者这里使用的客户端只有四个,有兴趣的也可以使用多个客户端。 当然啦,如果懒得写客户端,也可使用网络调试助手测试。 l select: select本质上是通过设置或者检查存放fd标志位的数据结构来进行下一步处理。这样所带来的缺点如下: 1、单个进程可监视的fd数量被限制,即能监听端口的大小有限。它由FD_SETSIZE设置,默认值是1024。 一般来说这个数目和系统内存关系很大。32位机默认是1024个。64位机默认是2048. 2、对socket进行扫描时是线性扫描,即采用轮询的方法,效率较低。 当套接字比较多的时候,每次select()都要通过遍历FD_SETSIZE个Socket来完成调度,不管哪个Socket是活跃的,都遍历一遍。这会浪费很多CPU时间。如果能给套接字注册某个回调函数,当他们活跃时,自动完成相关操作,那就避免了轮询。 3、需要维护一个用来存放大量fd的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大。 l poll: poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态,如果设备就绪则在设备等待队列中加入一项并继续遍历,如果遍历完所有fd后没有发现就绪设备,则挂起当前进程,直到设备就绪或者主动超时,被唤醒后它又要再次遍历fd。这个过程经历了多次无谓的遍历。 它没有最大连接数的限制,原因是它是基于链表来存储的,但是同样有一个缺点: 1、大量的fd的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义。 2、poll还有一个特点是“水平触发”,如果报告了fd后,没有被处理,那么下次poll时会再次报告该fd。 使用select监听socket时是线性扫描的方式,即采用轮询的方式,当套接字比较多的时候,每次select都要遍历每个socket来完成调度,不管哪个socket是活跃的,都会遍历一遍,这很浪费CPU的时间,如果每个套接字都能注册一个回调函数,当套接字活跃时直接调用回调也是很方便的,这样就避免了轮训,这正是epoll和equeue做的事情。 select/poll能监听多个设备的文件描述符,只要有任何一个设备满足条件,select/poll就会返回,否则将进行睡眠等待。 select与poll需要轮询文件描述符集合,在文件描述符很多的情况下开销会比较大,select默认支持的文件描述符数量是1024。 Linux提供了epoll机制,改进了select与poll在效率与资源上的缺点。 |
|
相关推荐
|
|
只有小组成员才能发言,加入小组>>
【飞凌嵌入式OK3588J-C开发板体验】OK3588J-C开发板开箱评测
3473 浏览 1 评论
[威廉希尔官方网站 ] 【飞凌嵌入式OK3576-C开发板体验】llama2.c部署
7384 浏览 0 评论
7848 浏览 0 评论
【飞凌嵌入式OK3576-C开发板体验】ssh连接与文件传输
7778 浏览 0 评论
9228 浏览 0 评论
6828浏览 2评论
11242浏览 1评论
5320浏览 1评论
85697浏览 1评论
686浏览 1评论
小黑屋| 手机版| Archiver| 电子发烧友 ( 湘ICP备2023018690号 )
GMT+8, 2024-12-26 01:31 , Processed in 0.623777 second(s), Total 65, Slave 47 queries .
Powered by 电子发烧友网
© 2015 bbs.elecfans.com
关注我们的微信
下载发烧友APP
电子发烧友观察
版权所有 © 湖南华秋数字科技有限公司
电子发烧友 (电路图) 湘公网安备 43011202000918 号 电信与信息服务业务经营许可证:合字B2-20210191 工商网监 湘ICP备2023018690号