Linux 下 Reactor高并发服务器

"learning……"

Posted by Ryan on March 1, 2025

人是生而自由的,但却无往不在枷锁之中。

Reactor模型

Reactor模型(Reactor Pattern)是一种用于处理高并发事件驱动应用的设计模式,常用于服务器架构(如Web服务器、网络服务器)中。它允许单个或少量的线程处理大量并发请求,提高系统的吞吐量和资源利用率。

最简单的非阻塞epoll程序

命令如下:

1
2
3
./tcpepoll 192.168.157.128 5085
./client 192.168.157.128 5085
netstat -na|grep 5085        //查询TCP链接

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<errno.h>
#include<string.h>
#include<netinet/in.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<time.h>


/*
 * 用于演示socket通信的客户端
 */
int main(int argc,char* argv[])
{
    if(argc!=3)
    {
        printf("Using \"ifconfig\" or \"ip addr\" to find ip");
        printf("example: ./client 192.168.76.128 5005\n\n");
        return -1;
    }

    //step1:create socket for client
    int sockfd;
    struct sockaddr_in servaddr;
    char buf[1024];

    if((sockfd=socket(AF_INET,SOCK_STREAM,0))<0)
    {
        printf("socket() failed.\n");return -1;
    }
    memset(&servaddr,0,sizeof(servaddr));
    servaddr.sin_family=AF_INET;
    servaddr.sin_port=htons(atoi(argv[2]));
    servaddr.sin_addr.s_addr=inet_addr(argv[1]);


    if(connect(sockfd,(struct sockaddr*)&servaddr,sizeof(servaddr))!=0)//向服务端发起请求连接
    {
        printf("connect(%s:%s)failed.\n",argv[1],argv[2]);
        close(sockfd);
        return -1;
    }
    printf("connect ok.\n");

    //step3:与服务端通信,发送一个请求报文后等待回复,然后发送下一个请求报文
    
    for(int i=0;i<100000;++i)//循环n次,与服务端进行n次通信。
    {
        memset(buf,0,sizeof(buf));
        printf("please input:");
        scanf("%s",buf);
        //向服务端发送请求报文
        if((send(sockfd,buf,strlen(buf),0))<=0)
        {
            printf("write()failed.\n");
            close(sockfd);
            return -1;
        }
        memset(buf,0,sizeof(buf));
        //接受服务端的回应报文,如果服务端没有发送回应报文,recv()函数将阻塞等待
        if((recv(sockfd,buf,sizeof(buf),0))<=0)
        {
            printf("read()failed.\n");
            close(sockfd);
            return -1;
        }
        printf("recv:%s\n",buf);
    }
    //step4:关闭socket,释放资源
    close(sockfd);
    return 0;
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
/*
 *此程序用于演示epoll模型实现网络通信服务端
 */
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/socket.h>
#include<sys/types.h>
#include<arpa/inet.h>
#include<sys/fcntl.h>
#include<sys/epoll.h>
#include<netinet/tcp.h>// TCP_NODELAY需要包含这个头文件
//TCP_NODELAY用于禁用Nagle算法


//设置非阻塞的IO
void setnonblocking(int fd)
{
    fcntl(fd,F_SETFL,fcntl(fd,F_GETFL)|O_NONBLOCK);
}

int main(int argc,char* argv[])
{
    
    if(argc!=3)
    {
        printf("usage: ./tcpepoll ip port\n");
        printf("example: ./tcpepoll 192.168.157.128 5005\n");
        return -1;
    }


    //创建服务端用于监听的端口
    int listenfd=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
    if(listenfd<0)
    {
        perror("socket() failed");
        return -1;
    }

    //设置listenfd的属性
    int opt=1;
    setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&opt,static_cast<socklen_t>(sizeof opt));
    setsockopt(listenfd,SOL_SOCKET,TCP_NODELAY,&opt,static_cast<socklen_t>(sizeof opt));
    setsockopt(listenfd,SOL_SOCKET,SO_REUSEPORT,&opt,static_cast<socklen_t>(sizeof opt));
    setsockopt(listenfd,SOL_SOCKET,SO_KEEPALIVE,&opt,static_cast<socklen_t>(sizeof opt));

    setnonblocking(listenfd);//把服务端的listenfd设置为非阻塞的

    struct sockaddr_in servaddr;//服务端地址结构体
    servaddr.sin_family=AF_INET;//ipv4网络套接字
    servaddr.sin_addr.s_addr = inet_addr(argv[1]);//服务端用于监听的ip地址
    servaddr.sin_port=htons(atoi(argv[2]));//服务端用于监听的端口


    if(bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr))<0)
    {
        perror("bind() failed");
        close(listenfd);
        return -1;
    }
    
    if(listen(listenfd,128)!=0)//在高并发服务其中,第二个参数要大一些
    {
        perror("listen() failed");
        close(listenfd);
        return -1;
    }
    int epollfd = epoll_create(1);//创建epoll句柄(红黑树)

    //为服务端的listenfd准备读事件

    //为服务端的listensock准备可读事件。
    struct epoll_event ev;     //申明事件的数据结构
    ev.data.fd=listenfd;//指定事件的自定义数据,会随着epoll_wait()返回的事件一并返回
    ev.events=EPOLLIN;  //打算让epoll监视listensock的读事件,采用水平触发

    epoll_ctl(epollfd,EPOLL_CTL_ADD,listenfd,&ev); //把需要监视的socket加入epollfd中。
    
    struct epoll_event evs[10];//存放epoll返回的事件。
    
    
    while(1)//事件循环
    {
        //等待监视的socket有事件发生
        int infds = epoll_wait(epollfd,evs,10,-1);

        
        //返回失败
        if(infds<0)
        {
            perror("epoll() failed\n");break;
        }
        //超时
        if(infds==0)
        {
            printf("epoll() timeout.\n");
            continue;
        }
        //如果infds>0,表示有事件发生的socket的数量
        for(int i=0;i<infds;i++)
        {
            //如果发生事件的是listensock,表示有新的客户端连上来
            if(evs[i].data.fd==listenfd)
            {
                struct sockaddr_in clientaddr;
                socklen_t len=sizeof(clientaddr);
                int clientfd=accept(listenfd,(struct sockaddr*)&clientaddr,&len);
                setnonblocking(clientfd);//客户端连接的fd必须设置为非阻塞的
                
                printf("accept client(fd=%d,ip=%s,port=%d)ok.\n",clientfd,inet_ntoa(clientaddr.sin_addr),ntohs(clientaddr.sin_port));

                //为新客户准备可读事件,并添加到epoll中
                ev.data.fd=clientfd;
                ev.events=EPOLLIN|EPOLLET;//边缘触发
                epoll_ctl(epollfd,EPOLL_CTL_ADD,clientfd,&ev);
            }
            else
            {
                //如果客户端连接的sock有事件,表示有报文发过来或者链接已经断开
                //////////////////////////////
                if(evs[i].events&EPOLLRDHUP)//对方已经关闭连接,有些系统检测不到,可以使用EPOLLIN,recv()返回
                {
                    //如果客户端已经断开
                    printf("client(eventfd=%d)disconnected.\n",evs[i].data.fd);
                    close(evs[i].data.fd);//关闭客户端的fd
                }
                else if(evs[i].events&(EPOLLIN|EPOLLPRI))//接受区有数据可以读
                {
                    char buffer[1024];//存放从客户端读取的数据
                    while(true)//使用非阻塞IO,一次读取buffer大小数据,直到全部读取完
                    {
                        bzero(&buffer,sizeof(buffer));
                        ssize_t nread = read(evs[i].data.fd,buffer,sizeof(buffer));
                        if(nread>0)//成功读取到了数据
                        {
                            //把接收到的数据原封不动的返回回去
                            printf("recv(eventfd=%d);%s\n",evs[i].data.fd,buffer);
                            send(evs[i].data.fd,buffer,strlen(buffer),0);
                        }
                        else if(nread==-1&&errno==EINTR)//读取数据的时候信号中断,继续读取
                        {
                            continue;
                        }
                        else if(nread==-1&&((errno==EAGAIN)||(errno==EWOULDBLOCK)))//全部的数据已读取完必
                        {
                            break;
                        }
                        else if(nread==0)//客户端连接已经断开
                        {
                            printf("client(eventfd=%d) disconnected.\n",evs[i].data.fd);
                            close(evs[i].data.fd);
                            break;
                        }
                    }
                }
                else if(evs[i].events&EPOLLOUT)//有数据要写,暂时没代码,以后在说
                {

                }
                else //其他事件,都视为错误,或者对方关闭了链接。
                {
                    printf("client(eventfd=%d)error.\n",evs[i].data.fd);
                    close(evs[i].data.fd);
                }
                /////////////////////////////
                
            }
        }
    }


    return 0;
}

最简单的非阻塞epoll程序优化

服务端程序

创建socket时加一个SOCK_NONBLOCK就能非阻塞。删除了非阻塞的函数。稍微调整了一下事件循环逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
/*
 *此程序用于演示epoll模型实现网络通信服务端
 */
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/socket.h>
#include<sys/types.h>
#include<arpa/inet.h>
#include<sys/fcntl.h>
#include<sys/epoll.h>
#include<netinet/tcp.h>// TCP_NODELAY需要包含这个头文件
//TCP_NODELAY用于禁用Nagle算法

int main(int argc,char* argv[])
{
    
    if(argc!=3)
    {
        printf("usage: ./tcpepoll ip port\n");
        printf("example: ./tcpepoll 192.168.157.128 5005\n");
        return -1;
    }


    //创建服务端用于监听的端口
    int listenfd=socket(AF_INET,SOCK_STREAM|SOCK_NONBLOCK,IPPROTO_TCP);//加一个SOCK_NONBLOCK就是非阻塞IO
    if(listenfd<0)
    {
        perror("socket() failed");
        return -1;
    }

    //设置listenfd的属性
    int opt=1;
    setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&opt,static_cast<socklen_t>(sizeof opt));
    setsockopt(listenfd,SOL_SOCKET,TCP_NODELAY,&opt,static_cast<socklen_t>(sizeof opt));
    setsockopt(listenfd,SOL_SOCKET,SO_REUSEPORT,&opt,static_cast<socklen_t>(sizeof opt));
    setsockopt(listenfd,SOL_SOCKET,SO_KEEPALIVE,&opt,static_cast<socklen_t>(sizeof opt));

    //setnonblocking(listenfd);//把服务端的listenfd设置为非阻塞的

    struct sockaddr_in servaddr;//服务端地址结构体
    servaddr.sin_family=AF_INET;//ipv4网络套接字
    servaddr.sin_addr.s_addr = inet_addr(argv[1]);//服务端用于监听的ip地址
    servaddr.sin_port=htons(atoi(argv[2]));//服务端用于监听的端口


    if(bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr))<0)
    {
        perror("bind() failed");
        close(listenfd);
        return -1;
    }
    
    if(listen(listenfd,128)!=0)//在高并发服务其中,第二个参数要大一些
    {
        perror("listen() failed");
        close(listenfd);
        return -1;
    }
    int epollfd = epoll_create(1);//创建epoll句柄(红黑树)

    //为服务端的listenfd准备读事件

    //为服务端的listensock准备可读事件。
    struct epoll_event ev;     //申明事件的数据结构
    ev.data.fd=listenfd;//指定事件的自定义数据,会随着epoll_wait()返回的事件一并返回
    ev.events=EPOLLIN;  //打算让epoll监视listensock的读事件,采用水平触发

    epoll_ctl(epollfd,EPOLL_CTL_ADD,listenfd,&ev); //把需要监视的socket加入epollfd中。
    
    struct epoll_event evs[10];//存放epoll返回的事件。
    
    
    while(1)//事件循环
    {
        //等待监视的socket有事件发生
        int infds = epoll_wait(epollfd,evs,10,-1);

        
        //返回失败
        if(infds<0)
        {
            perror("epoll() failed\n");break;
        }
        //超时
        if(infds==0)
        {
            printf("epoll() timeout.\n");
            continue;
        }
        //如果infds>0,表示有事件发生的socket的数量
        for(int i=0;i<infds;i++)
        {
            //如果客户端连接的sock有事件,表示有报文发过来或者链接已经断开
            //////////////////////////////
            if(evs[i].events&EPOLLRDHUP)//对方已经关闭连接,有些系统检测不到,可以使用EPOLLIN,recv()返回
            {
                //如果客户端已经断开
                printf("client(eventfd=%d)disconnected.\n",evs[i].data.fd);
                close(evs[i].data.fd);//关闭客户端的fd
            }
            else if(evs[i].events&(EPOLLIN|EPOLLPRI))//接受区有数据可以读
            {
                //如果发生事件的是listensock,表示有新的客户端连上来
                if(evs[i].data.fd==listenfd)
                {
                    struct sockaddr_in clientaddr;
                    socklen_t len=sizeof(clientaddr);
                    int clientfd=accept4(listenfd,(struct sockaddr*)&clientaddr,&len,SOCK_NONBLOCK);
                    //setnonblocking(clientfd);//客户端连接的fd必须设置为非阻塞的
                    
                    printf("accept client(fd=%d,ip=%s,port=%d)ok.\n",clientfd,inet_ntoa(clientaddr.sin_addr),ntohs(clientaddr.sin_port));

                    //为新客户准备可读事件,并添加到epoll中
                    ev.data.fd=clientfd;
                    ev.events=EPOLLIN|EPOLLET;//边缘触发
                    epoll_ctl(epollfd,EPOLL_CTL_ADD,clientfd,&ev);
                }
                else
                {
                    char buffer[1024];//存放从客户端读取的数据
                    while(true)//使用非阻塞IO,一次读取buffer大小数据,直到全部读取完
                    {
                        bzero(&buffer,sizeof(buffer));
                        ssize_t nread = read(evs[i].data.fd,buffer,sizeof(buffer));
                        if(nread>0)//成功读取到了数据
                        {
                            //把接收到的数据原封不动的返回回去
                            printf("recv(eventfd=%d);%s\n",evs[i].data.fd,buffer);
                            send(evs[i].data.fd,buffer,strlen(buffer),0);
                        }
                        else if(nread==-1&&errno==EINTR)//读取数据的时候信号中断,继续读取
                        {
                            continue;
                        }
                        else if(nread==-1&&((errno==EAGAIN)||(errno==EWOULDBLOCK)))//全部的数据已读取完必
                        {
                            break;
                        }
                        else if(nread==0)//客户端连接已经断开
                        {
                            printf("client(eventfd=%d) disconnected.\n",evs[i].data.fd);
                            close(evs[i].data.fd);
                            break;
                        }
                    }
                }
                
            }
            else if(evs[i].events&EPOLLOUT)//有数据要写,暂时没代码,以后在说
            {

            }
            else //其他事件,都视为错误,或者对方关闭了链接。
            {
                printf("client(eventfd=%d)error.\n",evs[i].data.fd);
                close(evs[i].data.fd);
            }
            /////////////////////////////
                
        }
    }
    return 0;
}

封装InetAddress类

头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#pragma once

#include<arpa/inet.h>
#include<netinet/in.h>
#include<string>

//socket的地址协议簇
class InetAddress
{
private:
    sockaddr_in addr_;      //表示地址协议的结构体
public:
    InetAddress();
    InetAddress(const std::string &ip,uint16_t port);//如果是监听的fd,用这个构造函数
    InetAddress(const sockaddr_in addr);//如果是客户端连上来的fd,用这个构造函数
    
    ~InetAddress();

    const char*ip() const;  //返回字符串表示的IP地址,例如192.168.216.128
    uint16_t port() const;  //返回整数表示的端口,例如80、8080
    const sockaddr*addr() const;//返回addr_成员的地址,转换成了sockaddr类型
    void setaddr(sockaddr_in clientaddr); //设置addr_成员的值
};

源文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include "InetAddress.h"

/*
class InetAddress
{
private:
    sockaddr_in addr_;      //表示地址协议的结构体
public:
    InetAddress();
    InetAddress(const std::string &ip,uint16_t port);//如果是监听的fd,用这个构造函数
    InetAddress(const sockaddr_in addr):addr_(addr){}//如果是客户端连上来的fd,用这个构造函数
    ~InetAddress();

    const char*ip() const;  //返回字符串表示的IP地址,例如192.168.216.128
    uint16_t port() const;  //返回整数表示的端口,例如80、8080
    const sockaddr*addr() const;//返回addr_成员的地址,转换成了sockaddr类型
    void setaddr(sockaddr_in clientaddr); //设置addr_成员的值
};
*/
InetAddress::InetAddress()
{
    
}
InetAddress::InetAddress(const std::string &ip,uint16_t port)//如果是监听的fd,用这个构造函数
{
    addr_.sin_family=AF_INET;//ipv4网络套接字
    addr_.sin_addr.s_addr = inet_addr(ip.c_str());//服务端用于监听的ip地址
    addr_.sin_port=htons(port);//服务端用于监听的端口
}
InetAddress::InetAddress(const sockaddr_in addr):addr_(addr)//如果是客户端连上来的fd,用这个构造函数
{

}
InetAddress::~InetAddress()
{

}

const char* InetAddress::ip() const  //返回字符串表示的IP地址,例如192.168.216.128
{
    return inet_ntoa(addr_.sin_addr);
    
}
uint16_t InetAddress::port() const  //返回整数表示的端口,例如80、8080
{
    return ntohs(addr_.sin_port);
}
const sockaddr* InetAddress::addr() const//返回addr_成员的地址,转换成了sockaddr类型
{
    return (sockaddr*)&addr_;
}
void InetAddress::setaddr(sockaddr_in clientaddr) //设置addr_成员的值
{
    addr_=clientaddr;
}


Makefile

1
2
3
4
5
6
7
8
9
10
all:client tcpepoll

client:client.cpp
	g++ -g client.cpp -o client

tcpepoll:tcpepoll.cpp InetAddress.cpp
	g++ -g tcpepoll.cpp InetAddress.cpp -o tcpepoll

clean:
	rm -f client tcpepoll

修改服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
/*
 *此程序用于演示epoll模型实现网络通信服务端
 */
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/socket.h>
#include<sys/types.h>
#include<arpa/inet.h>
#include<sys/fcntl.h>
#include<sys/epoll.h>
#include<netinet/tcp.h>// TCP_NODELAY需要包含这个头文件
//TCP_NODELAY用于禁用Nagle算法
#include"InetAddress.h"


int main(int argc,char* argv[])
{
    
    if(argc!=3)
    {
        printf("usage: ./tcpepoll ip port\n");
        printf("example: ./tcpepoll 192.168.157.128 5005\n");
        return -1;
    }


    //创建服务端用于监听的端口
    int listenfd=socket(AF_INET,SOCK_STREAM|SOCK_NONBLOCK,IPPROTO_TCP);//加一个SOCK_NONBLOCK就是非阻塞IO
    if(listenfd<0)
    {
        perror("socket() failed");
        return -1;
    }

    //设置listenfd的属性
    int opt=1;
    setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&opt,static_cast<socklen_t>(sizeof opt));
    setsockopt(listenfd,SOL_SOCKET,TCP_NODELAY,&opt,static_cast<socklen_t>(sizeof opt));
    setsockopt(listenfd,SOL_SOCKET,SO_REUSEPORT,&opt,static_cast<socklen_t>(sizeof opt));
    setsockopt(listenfd,SOL_SOCKET,SO_KEEPALIVE,&opt,static_cast<socklen_t>(sizeof opt));

    //setnonblocking(listenfd);//把服务端的listenfd设置为非阻塞的

    //sockaddr_in servaddr;//服务端地址结构体
    //servaddr.sin_family=AF_INET;//ipv4网络套接字
    //servaddr.sin_addr.s_addr = inet_addr(argv[1]);//服务端用于监听的ip地址
    //servaddr.sin_port=htons(atoi(argv[2]));//服务端用于监听的端口
    InetAddress servaddr(argv[1],atoi(argv[2]));

    if(bind(listenfd,servaddr.addr(),sizeof(sockaddr))<0)
    {
        perror("bind() failed");
        close(listenfd);
        return -1;
    }
    
    if(listen(listenfd,128)!=0)//在高并发服务其中,第二个参数要大一些
    {
        perror("listen() failed");
        close(listenfd);
        return -1;
    }
    int epollfd = epoll_create(1);//创建epoll句柄(红黑树)

    //为服务端的listenfd准备读事件

    //为服务端的listensock准备可读事件。
    epoll_event ev;     //申明事件的数据结构
    ev.data.fd=listenfd;//指定事件的自定义数据,会随着epoll_wait()返回的事件一并返回
    ev.events=EPOLLIN;  //打算让epoll监视listensock的读事件,采用水平触发

    epoll_ctl(epollfd,EPOLL_CTL_ADD,listenfd,&ev); //把需要监视的socket加入epollfd中。
    
    epoll_event evs[10];//存放epoll返回的事件。
    
    
    while(1)//事件循环
    {
        //等待监视的socket有事件发生
        int infds = epoll_wait(epollfd,evs,10,-1);

        
        //返回失败
        if(infds<0)
        {
            perror("epoll() failed\n");break;
        }
        //超时
        if(infds==0)
        {
            printf("epoll() timeout.\n");
            continue;
        }
        //如果infds>0,表示有事件发生的socket的数量
        for(int i=0;i<infds;i++)
        {
            //如果客户端连接的sock有事件,表示有报文发过来或者链接已经断开
            //////////////////////////////
            if(evs[i].events&EPOLLRDHUP)//对方已经关闭连接,有些系统检测不到,可以使用EPOLLIN,recv()返回
            {
                //如果客户端已经断开
                printf("client(eventfd=%d)disconnected.\n",evs[i].data.fd);
                close(evs[i].data.fd);//关闭客户端的fd
            }
            else if(evs[i].events&(EPOLLIN|EPOLLPRI))//接受区有数据可以读
            {
                //如果发生事件的是listensock,表示有新的客户端连上来
                if(evs[i].data.fd==listenfd)
                {
                    sockaddr_in peeraddr;
                    socklen_t len=sizeof(peeraddr);
                    int clientfd=accept4(listenfd,(sockaddr*)&peeraddr,&len,SOCK_NONBLOCK);
                    
                    InetAddress clientaddr(peeraddr);

                    printf("accept client(fd=%d,ip=%s,port=%d)ok.\n",clientfd,clientaddr.ip(),clientaddr.port());

                    //为新客户准备可读事件,并添加到epoll中
                    ev.data.fd=clientfd;
                    ev.events=EPOLLIN|EPOLLET;//边缘触发
                    epoll_ctl(epollfd,EPOLL_CTL_ADD,clientfd,&ev);
                }
                else
                {
                    char buffer[1024];//存放从客户端读取的数据
                    while(true)//使用非阻塞IO,一次读取buffer大小数据,直到全部读取完
                    {
                        bzero(&buffer,sizeof(buffer));
                        ssize_t nread = read(evs[i].data.fd,buffer,sizeof(buffer));
                        if(nread>0)//成功读取到了数据
                        {
                            //把接收到的数据原封不动的返回回去
                            printf("recv(eventfd=%d);%s\n",evs[i].data.fd,buffer);
                            send(evs[i].data.fd,buffer,strlen(buffer),0);
                        }
                        else if(nread==-1&&errno==EINTR)//读取数据的时候信号中断,继续读取
                        {
                            continue;
                        }
                        else if(nread==-1&&((errno==EAGAIN)||(errno==EWOULDBLOCK)))//全部的数据已读取完必
                        {
                            break;
                        }
                        else if(nread==0)//客户端连接已经断开
                        {
                            printf("client(eventfd=%d) disconnected.\n",evs[i].data.fd);
                            close(evs[i].data.fd);
                            break;
                        }
                    }
                }
                
            }
            else if(evs[i].events&EPOLLOUT)//有数据要写,暂时没代码,以后在说
            {

            }
            else //其他事件,都视为错误,或者对方关闭了链接。
            {
                printf("client(eventfd=%d)error.\n",evs[i].data.fd);
                close(evs[i].data.fd);
            }
            /////////////////////////////
                
        }
    }


    return 0;
}

封装socket类

头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#pragma once
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include "InetAddress.h"

//创建一个非阻塞的socket
int createnonblocking();

//socekt 类
class Socket
{
private:
    const int fd_;  //Socket持有的fd,在构造函数中传进来
public:
    Socket(int fd); //构造函数,传入一个已经准备好的fd
    ~Socket();      //在析构函数中,关闭fd_

    int fd() const; //返回fd成员
    void setreuseaddr(bool on);  //设置SO_REUSEADDR选项,true打开,false关闭
    void setreuseport(bool on);  //设置SO_REUSEPORT选项
    void settcpnodelay(bool on); //设置TCP_NODELAY选项
    void setkeepalive(bool on);  //设置SO_KEEPALIVE选项
    void bind(const InetAddress& servaddr); //服务端的socket将调用此函数
    void listen(int nn=128);                //服务端的socket将调用此函数
    int accept(InetAddress& clientaddr);   //服务端的socket将调用此函数
};

源文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#include "Socket.h"
//创建一个非阻塞的socket
int createnonblocking()
{
    //创建服务端用于监听的端口
    int listenfd=socket(AF_INET,SOCK_STREAM|SOCK_NONBLOCK,IPPROTO_TCP);//加一个SOCK_NONBLOCK就是非阻塞IO
    if(listenfd<0)
    {
        printf("%s:%s:%d listen socket create error:%d\n",__FILE__,__FUNCTION__,__LINE__,errno);
        exit(-1);
    }
    return listenfd;
}

//构造函数,传入一个已经准备好的fd
Socket::Socket(int fd):fd_(fd)
{

}

//在析构函数中,关闭fd_
Socket::~Socket()      
{
    ::close(fd_);
}


//返回fd成员
int Socket::fd() const
{
    return fd_;
} 


//设置SO_REUSEADDR选项,true打开,false关闭
void Socket::setreuseaddr(bool on)
{
    int optval = on?1:0;
    ::setsockopt(fd_,SOL_SOCKET,SO_REUSEADDR,&optval,sizeof(optval));
}

void Socket::setreuseport(bool on)  //设置SO_REUSEPORT选项
{
    int optval = on?1:0;
    ::setsockopt(fd_,SOL_SOCKET,SO_REUSEPORT,&optval,sizeof(optval));
}

void Socket::settcpnodelay(bool on) //设置TCP_NODELAY选项
{
    int optval = on?1:0;
    ::setsockopt(fd_,IPPROTO_TCP,TCP_NODELAY,&optval,sizeof(optval));
}



void Socket::setkeepalive(bool on)  //设置SO_KEEPALIVE选项
{
    int optval = on?1:0;
    ::setsockopt(fd_,SOL_SOCKET,SO_KEEPALIVE,&optval,sizeof(optval));    
}

void Socket::bind(const InetAddress& servaddr) //服务端的socket将调用此函数
{
    if(::bind(fd_,servaddr.addr(),sizeof(sockaddr))<0)
    {
        perror("bind() failed");
        close(fd_);
        exit(-1);
    }
}


void Socket::listen(int nn)                //服务端的socket将调用此函数
{
    if(::listen(fd_,nn)!=0)//在高并发服务其中,第二个参数要大一些
    {
        perror("listen() failed");
        close(fd_);
        exit(-1);
    }
}

int Socket::accept(InetAddress& clientaddr)   //服务端的socket将调用此函数
{
    sockaddr_in peeraddr;
    socklen_t len=sizeof(peeraddr);
    int clientfd=accept4(fd_,(sockaddr*)&peeraddr,&len,SOCK_NONBLOCK);
    
    clientaddr.setaddr(peeraddr);

    return clientfd;

}

Makefile

1
2
3
4
5
6
7
8
9
10
all:client tcpepoll

client:client.cpp
	g++ -g client.cpp -o client

tcpepoll:tcpepoll.cpp InetAddress.cpp Socket.cpp
	g++ -g tcpepoll.cpp InetAddress.cpp Socket.cpp -o tcpepoll

clean:
	rm -f client tcpepoll

修改服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
/*
 *此程序用于演示epoll模型实现网络通信服务端
 */
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/socket.h>
#include<sys/types.h>
#include<arpa/inet.h>
#include<sys/fcntl.h>
#include<sys/epoll.h>
#include<netinet/tcp.h>// TCP_NODELAY需要包含这个头文件
//TCP_NODELAY用于禁用Nagle算法
#include"InetAddress.h"
#include"Socket.h"

int main(int argc,char* argv[])
{
    
    if(argc!=3)
    {
        printf("usage: ./tcpepoll ip port\n");
        printf("example: ./tcpepoll 192.168.157.128 5005\n");
        return -1;
    }

    /*
    //创建服务端用于监听的端口
    int listenfd=socket(AF_INET,SOCK_STREAM|SOCK_NONBLOCK,IPPROTO_TCP);//加一个SOCK_NONBLOCK就是非阻塞IO
    if(listenfd<0)
    {
        perror("socket() failed");
        return -1;
    }

    //设置listenfd的属性
    int opt=1;
    setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&opt,static_cast<socklen_t>(sizeof opt));
    setsockopt(listenfd,SOL_SOCKET,TCP_NODELAY,&opt,static_cast<socklen_t>(sizeof opt));
    setsockopt(listenfd,SOL_SOCKET,SO_REUSEPORT,&opt,static_cast<socklen_t>(sizeof opt));
    setsockopt(listenfd,SOL_SOCKET,SO_KEEPALIVE,&opt,static_cast<socklen_t>(sizeof opt));

    InetAddress servaddr(argv[1],atoi(argv[2]));

    if(bind(listenfd,servaddr.addr(),sizeof(sockaddr))<0)
    {
        perror("bind() failed");
        close(listenfd);
        return -1;
    }
    
    if(listen(listenfd,128)!=0)//在高并发服务其中,第二个参数要大一些
    {
        perror("listen() failed");
        close(listenfd);
        return -1;
    }
    */
    Socket servsock(createnonblocking());
    InetAddress servaddr(argv[1],atoi(argv[2]));
    servsock.setreuseaddr(true);
    servsock.settcpnodelay(true);
    servsock.setreuseport(true);
    servsock.setkeepalive(true);
    servsock.bind(servaddr);
    servsock.listen();


    int epollfd = epoll_create(1);//创建epoll句柄(红黑树)

    //为服务端的listenfd准备读事件

    //为服务端的listensock准备可读事件。
    epoll_event ev;     //申明事件的数据结构
    ev.data.fd=servsock.fd();//指定事件的自定义数据,会随着epoll_wait()返回的事件一并返回
    ev.events=EPOLLIN;  //打算让epoll监视listensock的读事件,采用水平触发

    epoll_ctl(epollfd,EPOLL_CTL_ADD,servsock.fd(),&ev); //把需要监视的socket加入epollfd中。
    
    epoll_event evs[10];//存放epoll返回的事件。
    
    
    while(1)//事件循环
    {
        //等待监视的socket有事件发生
        int infds = epoll_wait(epollfd,evs,10,-1);

        
        //返回失败
        if(infds<0)
        {
            perror("epoll() failed\n");break;
        }
        //超时
        if(infds==0)
        {
            printf("epoll() timeout.\n");
            continue;
        }
        //如果infds>0,表示有事件发生的socket的数量
        for(int i=0;i<infds;i++)
        {
            //如果客户端连接的sock有事件,表示有报文发过来或者链接已经断开
            //////////////////////////////
            if(evs[i].events&EPOLLRDHUP)//对方已经关闭连接,有些系统检测不到,可以使用EPOLLIN,recv()返回
            {
                //如果客户端已经断开
                printf("client(eventfd=%d)disconnected.\n",evs[i].data.fd);
                close(evs[i].data.fd);//关闭客户端的fd
            }
            else if(evs[i].events&(EPOLLIN|EPOLLPRI))//接受区有数据可以读
            {
                //如果发生事件的是listensock,表示有新的客户端连上来
                if(evs[i].data.fd==servsock.fd())
                {
                    //sockaddr_in peeraddr;
                    //socklen_t len=sizeof(peeraddr);
                    //int clientfd=accept4(listenfd,(sockaddr*)&peeraddr,&len,SOCK_NONBLOCK);
                    
                    InetAddress clientaddr;
                    Socket* clientsock=new Socket(servsock.accept(clientaddr));

                    printf("accept client(fd=%d,ip=%s,port=%d)ok.\n",clientsock->fd(),clientaddr.ip(),clientaddr.port());

                    //为新客户准备可读事件,并添加到epoll中
                    ev.data.fd=clientsock->fd();
                    ev.events=EPOLLIN|EPOLLET;//边缘触发
                    epoll_ctl(epollfd,EPOLL_CTL_ADD,clientsock->fd(),&ev);
                }
                else
                {
                    char buffer[1024];//存放从客户端读取的数据
                    while(true)//使用非阻塞IO,一次读取buffer大小数据,直到全部读取完
                    {
                        bzero(&buffer,sizeof(buffer));
                        ssize_t nread = read(evs[i].data.fd,buffer,sizeof(buffer));
                        if(nread>0)//成功读取到了数据
                        {
                            //把接收到的数据原封不动的返回回去
                            printf("recv(eventfd=%d);%s\n",evs[i].data.fd,buffer);
                            send(evs[i].data.fd,buffer,strlen(buffer),0);
                        }
                        else if(nread==-1&&errno==EINTR)//读取数据的时候信号中断,继续读取
                        {
                            continue;
                        }
                        else if(nread==-1&&((errno==EAGAIN)||(errno==EWOULDBLOCK)))//全部的数据已读取完必
                        {
                            break;
                        }
                        else if(nread==0)//客户端连接已经断开
                        {
                            printf("client(eventfd=%d) disconnected.\n",evs[i].data.fd);
                            close(evs[i].data.fd);
                            break;
                        }
                    }
                }
                
            }
            else if(evs[i].events&EPOLLOUT)//有数据要写,暂时没代码,以后在说
            {

            }
            else //其他事件,都视为错误,或者对方关闭了链接。
            {
                printf("client(eventfd=%d)error.\n",evs[i].data.fd);
                close(evs[i].data.fd);
            }
            /////////////////////////////
                
        }
    }
    return 0;
}

封装epoll类

头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#pragma once
#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<strings.h>
#include<string.h>
#include<sys/epoll.h>
#include<vector>
#include<unistd.h>

//Epoll类
class Epoll
{
private:
    static const int MaxEvents=100;//epoll_wait()返回事件数组的大小
    int epollfd_ = -1;//epoll句柄,在构造函数中创建
    epoll_event events_[MaxEvents];//存放epoll返回的事件。
public:
    Epoll();        //构造函数创建epollfd_
    ~Epoll();       //析构函数关闭epollfd_
    void addfd(int fd,uint32_t op); //把fd和他需要监视的事件添加到红黑树上
    std::vector<epoll_event> loop(int timeout=-1);//运行epoll_wait(),等待事件的发生,已发生的事件用vector容器返回
};

源文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include"Epoll.h"

Epoll::Epoll()        //构造函数创建epollfd_
{
    if((epollfd_ = epoll_create(1))==-1) //创建epoll句柄(红黑树)
    {
        printf("epoll_create() failed(%d).\n",errno);
        exit(-1);
    }

}
Epoll::~Epoll()      //析构函数关闭epollfd_
{
    close(epollfd_);
}
void Epoll::addfd(int fd,uint32_t op) //把fd和他需要监视的事件添加到红黑树上
{
    epoll_event ev;     //申明事件的数据结构
    ev.data.fd=fd;//指定事件的自定义数据,会随着epoll_wait()返回的事件一并返回
    ev.events=op;  //
    if(epoll_ctl(epollfd_,EPOLL_CTL_ADD,fd,&ev)==-1) //把需要监视的socket加入epollfd中。
    {
        printf("epoll_ctl() failed(%d).\n",errno);
        exit(-1);
    }
}
std::vector<epoll_event> Epoll::loop(int timeout)//运行epoll_wait(),等待事件的发生,已发生的事件用vector容器返回
{
    std::vector<epoll_event> evs;//存放返回的epoll_wait()事件
    bzero(events_,sizeof(events_));
    
    //等待监视的socket有事件发生
    int infds = epoll_wait(epollfd_,events_,MaxEvents,timeout);

    
    //返回失败
    if(infds<0)
    {
        perror("epoll() failed\n");exit(-1);
    }
    //超时
    if(infds==0)
    {
        printf("epoll() timeout.\n");
        return evs;
    }
    //如果infds>0,表示有事件发生的socket的数量
    for(int i=0;i<infds;i++)
    {
        evs.push_back(events_[i]);
    }
    return evs;
}

Makefile

1
2
3
4
5
6
7
8
9
10
all:client tcpepoll

client:client.cpp
	g++ -g client.cpp -o client

tcpepoll:tcpepoll.cpp InetAddress.cpp Socket.cpp Epoll.cpp
	g++ -g tcpepoll.cpp InetAddress.cpp Socket.cpp Epoll.cpp -o tcpepoll

clean:
	rm -f client tcpepoll

修改服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/*
 *此程序用于演示epoll模型实现网络通信服务端
 */
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/socket.h>
#include<sys/types.h>
#include<arpa/inet.h>
#include<sys/fcntl.h>
#include<sys/epoll.h>
#include<netinet/tcp.h>// TCP_NODELAY需要包含这个头文件
#include"InetAddress.h"//TCP_NODELAY用于禁用Nagle算法
#include"Socket.h"
#include"Epoll.h"

int main(int argc,char* argv[])
{
    
    if(argc!=3)
    {
        printf("usage: ./tcpepoll ip port\n");
        printf("example: ./tcpepoll 192.168.157.128 5005\n");
        return -1;
    }

    Socket servsock(createnonblocking());
    InetAddress servaddr(argv[1],atoi(argv[2]));
    servsock.setreuseaddr(true);
    servsock.settcpnodelay(true);
    servsock.setreuseport(true);
    servsock.setkeepalive(true);
    servsock.bind(servaddr);
    servsock.listen();

    Epoll ep;
    ep.addfd(servsock.fd(),EPOLLIN);//水平触发,监听listenfd的读事件
    std::vector<epoll_event>evs;//存放epoll_wait()返回事件。

    while(1)//事件循环
    {
        evs=ep.loop();  //等待监视的fd有事件发生

        //如果infds>0,表示有事件发生的socket的数量
        for(auto& ev:evs)
        {
            //如果客户端连接的sock有事件,表示有报文发过来或者链接已经断开
            //////////////////////////////
            if(ev.events&EPOLLRDHUP)//对方已经关闭连接,有些系统检测不到,可以使用EPOLLIN,recv()返回
            {
                //如果客户端已经断开
                printf("client(eventfd=%d)disconnected.\n",ev.data.fd);
                close(ev.data.fd);//关闭客户端的fd
            }
            else if(ev.events&(EPOLLIN|EPOLLPRI))//接受区有数据可以读
            {
                //如果发生事件的是listensock,表示有新的客户端连上来
                if(ev.data.fd==servsock.fd())
                {
                           
                    InetAddress clientaddr;
                    Socket* clientsock=new Socket(servsock.accept(clientaddr));

                    printf("accept client(fd=%d,ip=%s,port=%d)ok.\n",clientsock->fd(),clientaddr.ip(),clientaddr.port());

                    //为新客户准备可读事件,并添加到epoll中
                    ep.addfd(clientsock->fd(),EPOLLIN|EPOLLET);   //客户端采用边缘触发
                    
                }
                else
                {
                    char buffer[1024];//存放从客户端读取的数据
                    while(true)//使用非阻塞IO,一次读取buffer大小数据,直到全部读取完
                    {
                        bzero(&buffer,sizeof(buffer));
                        ssize_t nread = read(ev.data.fd,buffer,sizeof(buffer));
                        if(nread>0)//成功读取到了数据
                        {
                            //把接收到的数据原封不动的返回回去
                            printf("recv(eventfd=%d);%s\n",ev.data.fd,buffer);
                            send(ev.data.fd,buffer,strlen(buffer),0);
                        }
                        else if(nread==-1&&errno==EINTR)//读取数据的时候信号中断,继续读取
                        {
                            continue;
                        }
                        else if(nread==-1&&((errno==EAGAIN)||(errno==EWOULDBLOCK)))//全部的数据已读取完必
                        {
                            break;
                        }
                        else if(nread==0)//客户端连接已经断开
                        {
                            printf("client(eventfd=%d) disconnected.\n",ev.data.fd);
                            close(ev.data.fd);
                            break;
                        }
                    }
                }
                
            }
            else if(ev.events&EPOLLOUT)//有数据要写,暂时没代码,以后在说
            {

            }
            else //其他事件,都视为错误,或者对方关闭了链接。
            {
                printf("client(eventfd=%d)error.\n",ev.data.fd);
                close(ev.data.fd);
            }
            /////////////////////////////
                
        }
    }
    return 0;
}

封装Channel类

epoll结构体

epoll 是 Linux 中用于高效 I/O 事件通知的机制,特别适合处理大量文件描述符(如网络套接字)。它的核心是三个系统调用:epoll_create、epoll_ctl 和 epoll_wait,以及一个关键的数据结构 struct epoll_event。

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <sys/epoll.h>

struct epoll_event {
    uint32_t     events;  // 需要监听的事件类型
    epoll_data_t data;    // 用户数据,通常用于存储文件描述符或指针
};

typedef union epoll_data {
    void* ptr;  // 可以存储任意指针
    int fd;     // 通常用于存储文件描述符
    uint32_t u32;
    uint64_t u64;
} epoll_data_t;

之前用的都是fd,在reactor模型中要用void* ptr。

用ptr的服务端demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
/*
 *此程序用于演示epoll模型实现网络通信服务端
 */
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/socket.h>
#include<sys/types.h>
#include<arpa/inet.h>
#include<sys/fcntl.h>
#include<sys/epoll.h>
#include<netinet/tcp.h>// TCP_NODELAY需要包含这个头文件
//TCP_NODELAY用于禁用Nagle算法

class Channel
{
private:
    int fd_;
    bool islisten_=false;//true表示监听的fd,false表示客户端连上来的fd
    //.....
public:
    Channel(int fd,bool islisten=false):fd_(fd),islisten_(islisten)
    {

    }
    int fd()
    {
        return fd_;
    }
    bool islisten()
    {
        return islisten_;
    }
    //.....
};

//设置非阻塞的IO
void setnonblocking(int fd)
{
    fcntl(fd,F_SETFL,fcntl(fd,F_GETFL)|O_NONBLOCK);
}

int main(int argc,char* argv[])
{
    
    if(argc!=3)
    {
        printf("usage: ./tcpepoll ip port\n");
        printf("example: ./tcpepoll 192.168.157.128 5005\n");
        return -1;
    }


    //创建服务端用于监听的端口
    int listenfd=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
    if(listenfd<0)
    {
        perror("socket() failed");
        return -1;
    }

    //设置listenfd的属性
    int opt=1;
    setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&opt,static_cast<socklen_t>(sizeof opt));
    setsockopt(listenfd,SOL_SOCKET,TCP_NODELAY,&opt,static_cast<socklen_t>(sizeof opt));
    setsockopt(listenfd,SOL_SOCKET,SO_REUSEPORT,&opt,static_cast<socklen_t>(sizeof opt));
    setsockopt(listenfd,SOL_SOCKET,SO_KEEPALIVE,&opt,static_cast<socklen_t>(sizeof opt));

    setnonblocking(listenfd);//把服务端的listenfd设置为非阻塞的

    struct sockaddr_in servaddr;//服务端地址结构体
    servaddr.sin_family=AF_INET;//ipv4网络套接字
    servaddr.sin_addr.s_addr = inet_addr(argv[1]);//服务端用于监听的ip地址
    servaddr.sin_port=htons(atoi(argv[2]));//服务端用于监听的端口


    if(bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr))<0)
    {
        perror("bind() failed");
        close(listenfd);
        return -1;
    }
    
    if(listen(listenfd,128)!=0)//在高并发服务其中,第二个参数要大一些
    {
        perror("listen() failed");
        close(listenfd);
        return -1;
    }
    int epollfd = epoll_create(1);//创建epoll句柄(红黑树)

    Channel* servchannel=new Channel(listenfd,true);
    //为服务端的listenfd准备读事件

    //为服务端的listensock准备可读事件。
    struct epoll_event ev;     //申明事件的数据结构
    ev.data.ptr=servchannel;//指定事件的自定义数据,会随着epoll_wait()返回的事件一并返回
    ev.events=EPOLLIN;  //打算让epoll监视listensock的读事件,采用水平触发

    epoll_ctl(epollfd,EPOLL_CTL_ADD,listenfd,&ev); //把需要监视的socket加入epollfd中。
    
    struct epoll_event evs[10];//存放epoll返回的事件。
    
    
    while(1)//事件循环
    {
        //等待监视的socket有事件发生
        int infds = epoll_wait(epollfd,evs,10,-1);

        
        //返回失败
        if(infds<0)
        {
            perror("epoll() failed\n");break;
        }
        //超时
        if(infds==0)
        {
            printf("epoll() timeout.\n");
            continue;
        }
        //如果infds>0,表示有事件发生的socket的数量
        for(int i=0;i<infds;i++)
        {
            Channel* ch=(Channel*)evs[i].data.ptr;
            //如果发生事件的是listensock,表示有新的客户端连上来
            if(ch->islisten()==true)
            {
                struct sockaddr_in clientaddr;
                socklen_t len=sizeof(clientaddr);
                int clientfd=accept(listenfd,(struct sockaddr*)&clientaddr,&len);
                setnonblocking(clientfd);//客户端连接的fd必须设置为非阻塞的
                
                printf("accept client(fd=%d,ip=%s,port=%d)ok.\n",clientfd,inet_ntoa(clientaddr.sin_addr),ntohs(clientaddr.sin_port));

                //为新客户准备可读事件,并添加到epoll中
                Channel*clientchannel=new Channel(clientfd);
                ev.data.ptr=clientchannel;
                ev.events=EPOLLIN|EPOLLET;//边缘触发
                epoll_ctl(epollfd,EPOLL_CTL_ADD,clientfd,&ev);
            }
            else
            {
                //如果客户端连接的sock有事件,表示有报文发过来或者链接已经断开
                //////////////////////////////
                if(evs[i].events&EPOLLRDHUP)//对方已经关闭连接,有些系统检测不到,可以使用EPOLLIN,recv()返回
                {
                    //如果客户端已经断开
                    printf("client(eventfd=%d)disconnected.\n",ch->fd());
                    close(ch->fd());//关闭客户端的fd
                }
                else if(evs[i].events&(EPOLLIN|EPOLLPRI))//接受区有数据可以读
                {
                    char buffer[1024];//存放从客户端读取的数据
                    while(true)//使用非阻塞IO,一次读取buffer大小数据,直到全部读取完
                    {
                        bzero(&buffer,sizeof(buffer));
                        ssize_t nread = read(ch->fd(),buffer,sizeof(buffer));
                        if(nread>0)//成功读取到了数据
                        {
                            //把接收到的数据原封不动的返回回去
                            printf("recv(eventfd=%d);%s\n",ch->fd(),buffer);
                            send(ch->fd(),buffer,strlen(buffer),0);
                        }
                        else if(nread==-1&&errno==EINTR)//读取数据的时候信号中断,继续读取
                        {
                            continue;
                        }
                        else if(nread==-1&&((errno==EAGAIN)||(errno==EWOULDBLOCK)))//全部的数据已读取完必
                        {
                            break;
                        }
                        else if(nread==0)//客户端连接已经断开
                        {
                            printf("client(eventfd=%d) disconnected.\n",ch->fd());
                            close(ch->fd());
                            break;
                        }
                    }
                }
                else if(evs[i].events&EPOLLOUT)//有数据要写,暂时没代码,以后在说
                {

                }
                else //其他事件,都视为错误,或者对方关闭了链接。
                {
                    printf("client(eventfd=%d)error.\n",ch->fd());
                    close(ch->fd());
                }
                /////////////////////////////
                
            }
        }
    }

    return 0;
}

修改epoll头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#pragma once
#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<strings.h>
#include<string.h>
#include<sys/epoll.h>
#include<vector>
#include<unistd.h>
#include"Channel.h"

class Channel;

//Epoll类
class Epoll
{
private:
    static const int MaxEvents=100;//epoll_wait()返回事件数组的大小
    int epollfd_ = -1;//epoll句柄,在构造函数中创建
    epoll_event events_[MaxEvents];//存放epoll返回的事件。
public:
    Epoll();        //构造函数创建epollfd_
    ~Epoll();       //析构函数关闭epollfd_
    //void addfd(int fd,uint32_t op); //把fd和他需要监视的事件添加到红黑树上
    void updatechannel(Channel *ch);//把channel添加/更新到红黑树上,channel中有fd,也有要监视的事件
    //std::vector<epoll_event> loop(int timeout=-1);//运行epoll_wait(),等待事件的发生,已发生的事件用vector容器返回
    std::vector<Channel*> loop(int timeout=-1);//运行epoll_wait(),等待事件的发生,已发生的事件用vector容器返回
};

修改epoll源文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#include"Epoll.h"

Epoll::Epoll()        //构造函数创建epollfd_
{
    if((epollfd_ = epoll_create(1))==-1) //创建epoll句柄(红黑树)
    {
        printf("epoll_create() failed(%d).\n",errno);
        exit(-1);
    }

}
Epoll::~Epoll()      //析构函数关闭epollfd_
{
    close(epollfd_);
}
/*
//把fd和他需要监视的事件添加到红黑树上
void Epoll::addfd(int fd,uint32_t op) 
{
    epoll_event ev;     //申明事件的数据结构
    ev.data.fd=fd;//指定事件的自定义数据,会随着epoll_wait()返回的事件一并返回
    ev.events=op;  //
    if(epoll_ctl(epollfd_,EPOLL_CTL_ADD,fd,&ev)==-1) //把需要监视的socket加入epollfd中。
    {
        printf("epoll_ctl() failed(%d).\n",errno);
        exit(-1);
    }
}
*/
//把channel添加/更新到红黑树上,channel中有fd,也有要监视的事件
void Epoll::updatechannel(Channel *ch)
{
    epoll_event ev;//申明事件的数据结构
    ev.data.ptr=ch;//指定channel
    ev.events=ch->events();//指定事件
    if(ch->inpoll())
    {
        if(epoll_ctl(epollfd_,EPOLL_CTL_MOD,ch->fd(),&ev)==-1)
        {
            perror("epoll_ctl() failed .\n");
            exit(-1);
        }
    }
    else
    {
        if(epoll_ctl(epollfd_,EPOLL_CTL_ADD,ch->fd(),&ev)==-1)
        {
            perror("epoll_ctl() failed .\n");
            exit(-1);
        }
        ch->setinepoll();
    }
}


std::vector<Channel*> Epoll::loop(int timeout)//运行epoll_wait(),等待事件的发生,已发生的事件用vector容器返回
{
    std::vector<Channel*> channels;//存放返回的epoll_wait()事件
    bzero(events_,sizeof(events_));
    
    //等待监视的socket有事件发生
    int infds = epoll_wait(epollfd_,events_,MaxEvents,timeout);

    
    //返回失败
    if(infds<0)
    {
        perror("epoll() failed\n");exit(-1);
    }
    //超时
    if(infds==0)
    {
        printf("epoll() timeout.\n");
        return channels;
    }
    //如果infds>0,表示有事件发生的socket的数量
    for(int i=0;i<infds;i++)
    {
        //evs.push_back(events_[i]);
        Channel *ch=(Channel*)events_[i].data.ptr;
        ch->setrevents(events_[i].events);
        channels.push_back(ch);
    }
    return channels;
}

channel头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#pragma once
#include<sys/epoll.h>
#include"Epoll.h"

class Epoll;

class Channel
{
private:
    int fd_=-1;        //channel和fd是一对一的关系
    Epoll * ep_=nullptr;//channel和epoll是一对一的关系,一个channel只在一个红黑树上
    bool inepoll_=false;//标记channel是否已经添加到epoll树上,如果未添加,调用epoll_ctl()的时候用EPOLL_CTL_ADD,否则用EPOLL_CTL_MOD
    uint32_t events_=0;//fd需要监视的事件,listenfd和clientfd需要监视EPOLLIN,clientfd还要监视EPOLLOUT
    uint32_t revents_=0;//存放fd已发生的事件
public:
    Channel(Epoll* ep,int fd);
    ~Channel();

    int fd();           //返回fd_成员
    void useet();        //采用边缘触发
    void enablereading();//让epoll_wait()监视fd_的读事件
    void setinepoll();  //把inepoll_成员的值设置为true
    void setrevents(uint32_t ev);//设置revents_成员的值为参数ev
    bool inpoll();      //返回inepoll_成员的值
    uint32_t events();  //返回events_成员的值
    uint32_t revents();  //返回revents_成员的值
};

channel源文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include"Channel.h"

Channel::Channel(Epoll* ep,int fd):ep_(ep),fd_(fd)
{

}

Channel::~Channel()
{
    //在析构函数中不要销毁ep_,也不能关闭fd_,因为这两个不属于Channel类,channel类只是需要他们,使用他们而已
}

int Channel::fd()           //返回fd_成员
{
    return fd_;
}
void Channel::useet()        //采用边缘触发
{
    events_=events_|EPOLLET;
}
void Channel::enablereading()//让epoll_wait()监视fd_的读事件
{
    events_=events_|EPOLLIN;
    ep_->updatechannel(this);
}
void Channel::setinepoll()  //把inepoll_成员的值设置为true
{
    inepoll_=true;
}
void Channel::setrevents(uint32_t ev)//设置revents_成员的值为参数ev
{
    revents_=ev;
}
bool Channel::inpoll()      //返回inepoll_成员的值
{
    return inepoll_;
}
uint32_t Channel::events()  //返回events_成员的值
{
    return events_;
}
uint32_t Channel::revents()  //返回revents_成员的值
{
    return revents_;
}

Makefile

1
2
3
4
5
6
7
8
9
10
all:client tcpepoll

client:client.cpp
	g++ -g client.cpp -o client

tcpepoll:tcpepoll.cpp InetAddress.cpp Socket.cpp Epoll.cpp Channel.cpp
	g++ -g tcpepoll.cpp InetAddress.cpp Socket.cpp Epoll.cpp Channel.cpp -o tcpepoll

clean:
	rm -f client tcpepoll

修改服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
/*
 *此程序用于演示epoll模型实现网络通信服务端
 */
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/socket.h>
#include<sys/types.h>
#include<arpa/inet.h>
#include<sys/fcntl.h>
#include<sys/epoll.h>
#include<netinet/tcp.h>// TCP_NODELAY需要包含这个头文件
#include"InetAddress.h"//TCP_NODELAY用于禁用Nagle算法
#include"Socket.h"
#include"Epoll.h"


int main(int argc,char* argv[])
{
    
    if(argc!=3)
    {
        printf("usage: ./tcpepoll ip port\n");
        printf("example: ./tcpepoll 192.168.157.128 5005\n");
        return -1;
    }

    Socket servsock(createnonblocking());
    InetAddress servaddr(argv[1],atoi(argv[2]));
    servsock.setreuseaddr(true);
    servsock.settcpnodelay(true);
    servsock.setreuseport(true);
    servsock.setkeepalive(true);
    servsock.bind(servaddr);
    servsock.listen();

    Epoll ep;
    //ep.addfd(servsock.fd(),EPOLLIN);//水平触发,监听listenfd的读事件
    Channel * servchannel=new Channel(&ep,servsock.fd());
    servchannel->enablereading();
    

    while(1)//事件循环
    {
        std::vector<Channel*>channels=ep.loop();  //等待监视的fd有事件发生

        //如果infds>0,表示有事件发生的socket的数量
        for(auto& ch:channels)
        {
            //如果客户端连接的sock有事件,表示有报文发过来或者链接已经断开
            //////////////////////////////
            if(ch->revents()&EPOLLRDHUP)//对方已经关闭连接,有些系统检测不到,可以使用EPOLLIN,recv()返回
            {
                //如果客户端已经断开
                printf("client(eventfd=%d)disconnected.\n",ch->fd());
                close(ch->fd());//关闭客户端的fd
            }
            else if(ch->revents()&(EPOLLIN|EPOLLPRI))//接受区有数据可以读
            {
                //如果发生事件的是listensock,表示有新的客户端连上来
                if(ch==servchannel)
                {
                    InetAddress clientaddr;
                    Socket* clientsock=new Socket(servsock.accept(clientaddr));

                    printf("accept client(fd=%d,ip=%s,port=%d)ok.\n",clientsock->fd(),clientaddr.ip(),clientaddr.port());

                    //为新客户准备可读事件,并添加到epoll中
                    //ep.addfd(clientsock->fd(),EPOLLIN|EPOLLET);   //客户端采用边缘触发
                    Channel * clientchannel=new Channel(&ep,clientsock->fd());
                    clientchannel->useet();
                    clientchannel->enablereading();
                }
                else
                {
                    char buffer[1024];//存放从客户端读取的数据
                    while(true)//使用非阻塞IO,一次读取buffer大小数据,直到全部读取完
                    {
                        bzero(&buffer,sizeof(buffer));
                        ssize_t nread = read(ch->fd(),buffer,sizeof(buffer));
                        if(nread>0)//成功读取到了数据
                        {
                            //把接收到的数据原封不动的返回回去
                            printf("recv(eventfd=%d);%s\n",ch->fd(),buffer);
                            send(ch->fd(),buffer,strlen(buffer),0);
                        }
                        else if(nread==-1&&errno==EINTR)//读取数据的时候信号中断,继续读取
                        {
                            continue;
                        }
                        else if(nread==-1&&((errno==EAGAIN)||(errno==EWOULDBLOCK)))//全部的数据已读取完必
                        {
                            break;
                        }
                        else if(nread==0)//客户端连接已经断开
                        {
                            printf("client(eventfd=%d) disconnected.\n",ch->fd());
                            close(ch->fd());
                            break;
                        }
                    }
                }
                
            }
            else if(ch->revents()&EPOLLOUT)//有数据要写,暂时没代码,以后在说
            {

            }
            else //其他事件,都视为错误,或者对方关闭了链接。
            {
                printf("client(eventfd=%d)error.\n",ch->fd());
                close(ch->fd());
            }
            /////////////////////////////
                
        }
    }


    return 0;
}

优化Channel类

优化Channel头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#pragma once
#include<sys/epoll.h>
#include"Epoll.h"
#include"InetAddress.h"
#include"Socket.h"

class Epoll;

class Channel
{
private:
    int fd_=-1;        //channel和fd是一对一的关系
    Epoll * ep_=nullptr;//channel和epoll是一对一的关系,一个channel只在一个红黑树上
    bool inepoll_=false;//标记channel是否已经添加到epoll树上,如果未添加,调用epoll_ctl()的时候用EPOLL_CTL_ADD,否则用EPOLL_CTL_MOD
    uint32_t events_=0;//fd需要监视的事件,listenfd和clientfd需要监视EPOLLIN,clientfd还要监视EPOLLOUT
    uint32_t revents_=0;//存放fd已发生的事件
    bool islisten_=false;//如果为listenfd,取值为true,客户端连上来的fd为false
public:
    Channel(Epoll* ep,int fd,bool islisten);
    ~Channel();

    int fd();           //返回fd_成员
    void useet();        //采用边缘触发
    void enablereading();//让epoll_wait()监视fd_的读事件
    void setinepoll();  //把inepoll_成员的值设置为true
    void setrevents(uint32_t ev);//设置revents_成员的值为参数ev
    bool inpoll();      //返回inepoll_成员的值
    uint32_t events();  //返回events_成员的值
    uint32_t revents();  //返回revents_成员的值

    void handleevent(Socket *servsock); //事件处理函数,epoll_wait()返回的时候,执行它
};

优化Channel源文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#include"Channel.h"

Channel::Channel(Epoll* ep,int fd,bool islisten):ep_(ep),fd_(fd),islisten_(islisten)
{

}

Channel::~Channel()
{
    //在析构函数中不要销毁ep_,也不能关闭fd_,因为这两个不属于Channel类,channel类只是需要他们,使用他们而已
}

int Channel::fd()           //返回fd_成员
{
    return fd_;
}
void Channel::useet()        //采用边缘触发
{
    events_=events_|EPOLLET;
}
void Channel::enablereading()//让epoll_wait()监视fd_的读事件
{
    events_=events_|EPOLLIN;
    ep_->updatechannel(this);
}
void Channel::setinepoll()  //把inepoll_成员的值设置为true
{
    inepoll_=true;
}
void Channel::setrevents(uint32_t ev)//设置revents_成员的值为参数ev
{
    revents_=ev;
}
bool Channel::inpoll()      //返回inepoll_成员的值
{
    return inepoll_;
}
uint32_t Channel::events()  //返回events_成员的值
{
    return events_;
}
uint32_t Channel::revents()  //返回revents_成员的值
{
    return revents_;
}

void Channel::handleevent(Socket *servsock)
{
    //如果客户端连接的sock有事件,表示有报文发过来或者链接已经断开
    //////////////////////////////
    if(revents_&EPOLLRDHUP)//对方已经关闭连接,有些系统检测不到,可以使用EPOLLIN,recv()返回
    {
        //如果客户端已经断开
        printf("client(eventfd=%d)disconnected.\n",fd_);
        close(fd_);//关闭客户端的fd
    }
    else if(revents_&(EPOLLIN|EPOLLPRI))//接受区有数据可以读
    {
        //如果发生事件的是listensock,表示有新的客户端连上来
        if(islisten_)
        {
            InetAddress clientaddr;
            Socket* clientsock=new Socket(servsock->accept(clientaddr));

            printf("accept client(fd=%d,ip=%s,port=%d)ok.\n",clientsock->fd(),clientaddr.ip(),clientaddr.port());

            //为新客户准备可读事件,并添加到epoll中
            Channel * clientchannel=new Channel(ep_,clientsock->fd(),false);
            clientchannel->useet();
            clientchannel->enablereading();
        }
        else
        {
            char buffer[1024];//存放从客户端读取的数据
            while(true)//使用非阻塞IO,一次读取buffer大小数据,直到全部读取完
            {
                bzero(&buffer,sizeof(buffer));
                ssize_t nread = read(fd_,buffer,sizeof(buffer));
                if(nread>0)//成功读取到了数据
                {
                    //把接收到的数据原封不动的返回回去
                    printf("recv(eventfd=%d);%s\n",fd_,buffer);
                    send(fd_,buffer,strlen(buffer),0);
                }
                else if(nread==-1&&errno==EINTR)//读取数据的时候信号中断,继续读取
                {
                    continue;
                }
                else if(nread==-1&&((errno==EAGAIN)||(errno==EWOULDBLOCK)))//全部的数据已读取完必
                {
                    break;
                }
                else if(nread==0)//客户端连接已经断开
                {
                    printf("client(eventfd=%d) disconnected.\n",fd_);
                    close(fd_);
                    break;
                }
            }
        }
        
    }
    else if(revents_&EPOLLOUT)//有数据要写,暂时没代码,以后在说
    {

    }
    else //其他事件,都视为错误,或者对方关闭了链接。
    {
        printf("client(eventfd=%d)error.\n",fd_);
        close(fd_);
    }
    /////////////////////////////
}

修改服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/*
 *此程序用于演示epoll模型实现网络通信服务端
 */
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/socket.h>
#include<sys/types.h>
#include<arpa/inet.h>
#include<sys/fcntl.h>
#include<sys/epoll.h>
#include<netinet/tcp.h>// TCP_NODELAY需要包含这个头文件
#include"InetAddress.h"//TCP_NODELAY用于禁用Nagle算法
#include"Socket.h"
#include"Epoll.h"


int main(int argc,char* argv[])
{
    
    if(argc!=3)
    {
        printf("usage: ./tcpepoll ip port\n");
        printf("example: ./tcpepoll 192.168.157.128 5005\n");
        return -1;
    }

    Socket servsock(createnonblocking());
    InetAddress servaddr(argv[1],atoi(argv[2]));
    servsock.setreuseaddr(true);
    servsock.settcpnodelay(true);
    servsock.setreuseport(true);
    servsock.setkeepalive(true);
    servsock.bind(servaddr);
    servsock.listen();

    Epoll ep;
    //ep.addfd(servsock.fd(),EPOLLIN);//水平触发,监听listenfd的读事件
    Channel * servchannel=new Channel(&ep,servsock.fd(),true);
    servchannel->enablereading();
    
    while(1)//事件循环
    {
        std::vector<Channel*>channels=ep.loop();  //等待监视的fd有事件发生

        //如果infds>0,表示有事件发生的socket的数量
        for(auto& ch:channels)
        {
            ch->handleevent(&servsock);
                
        }
    }
    return 0;
}

回调优化

优化channel类头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#pragma once
#include<sys/epoll.h>
#include<functional>
#include"Epoll.h"
#include"InetAddress.h"
#include"Socket.h"

class Epoll;

class Channel
{
private:
    int fd_=-1;        //channel和fd是一对一的关系
    Epoll * ep_=nullptr;//channel和epoll是一对一的关系,一个channel只在一个红黑树上
    bool inepoll_=false;//标记channel是否已经添加到epoll树上,如果未添加,调用epoll_ctl()的时候用EPOLL_CTL_ADD,否则用EPOLL_CTL_MOD
    uint32_t events_=0;//fd需要监视的事件,listenfd和clientfd需要监视EPOLLIN,clientfd还要监视EPOLLOUT
    uint32_t revents_=0;//存放fd已发生的事件
    
    std::function<void()> readcallback_;//fd_读事件的回调函数
public:
    Channel(Epoll* ep,int fd);
    ~Channel();

    int fd();           //返回fd_成员
    void useet();        //采用边缘触发
    void enablereading();//让epoll_wait()监视fd_的读事件
    void setinepoll();  //把inepoll_成员的值设置为true
    void setrevents(uint32_t ev);//设置revents_成员的值为参数ev
    bool inpoll();      //返回inepoll_成员的值
    uint32_t events();  //返回events_成员的值
    uint32_t revents();  //返回revents_成员的值

    void handleevent(); //事件处理函数,epoll_wait()返回的时候,执行它

    void newconnection(Socket *servsock); //处理客户端的新链接请求
    void onmessage();//处理对端发过来的报文
    void setreadcallback(std::function<void()> fn);//设置fd_读事件的回调函数
};

优化channel类源文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#include"Channel.h"

Channel::Channel(Epoll* ep,int fd):ep_(ep),fd_(fd)
{

}

Channel::~Channel()
{
    //在析构函数中不要销毁ep_,也不能关闭fd_,因为这两个不属于Channel类,channel类只是需要他们,使用他们而已
}

int Channel::fd()           //返回fd_成员
{
    return fd_;
}
void Channel::useet()        //采用边缘触发
{
    events_=events_|EPOLLET;
}
void Channel::enablereading()//让epoll_wait()监视fd_的读事件
{
    events_=events_|EPOLLIN;
    ep_->updatechannel(this);
}
void Channel::setinepoll()  //把inepoll_成员的值设置为true
{
    inepoll_=true;
}
void Channel::setrevents(uint32_t ev)//设置revents_成员的值为参数ev
{
    revents_=ev;
}
bool Channel::inpoll()      //返回inepoll_成员的值
{
    return inepoll_;
}
uint32_t Channel::events()  //返回events_成员的值
{
    return events_;
}
uint32_t Channel::revents()  //返回revents_成员的值
{
    return revents_;
}

void Channel::handleevent()
{
    //如果客户端连接的sock有事件,表示有报文发过来或者链接已经断开
    //////////////////////////////
    if(revents_&EPOLLRDHUP)//对方已经关闭连接,有些系统检测不到,可以使用EPOLLIN,recv()返回
    {
        //如果客户端已经断开
        printf("client(eventfd=%d)disconnected.\n",fd_);
        close(fd_);//关闭客户端的fd
    }
    else if(revents_&(EPOLLIN|EPOLLPRI))//接受区有数据可以读
    {
        //如果发生事件的是listensock,表示有新的客户端连上来
        readcallback_();
        
    }
    else if(revents_&EPOLLOUT)//有数据要写,暂时没代码,以后在说
    {

    }
    else //其他事件,都视为错误,或者对方关闭了链接。
    {
        printf("client(eventfd=%d)error.\n",fd_);
        close(fd_);
    }
    /////////////////////////////
}



void Channel::newconnection(Socket *servsock) //处理客户端的新链接请求
{
    InetAddress clientaddr;
    Socket* clientsock=new Socket(servsock->accept(clientaddr));

    printf("accept client(fd=%d,ip=%s,port=%d)ok.\n",clientsock->fd(),clientaddr.ip(),clientaddr.port());

    //为新客户准备可读事件,并添加到epoll中
    Channel * clientchannel=new Channel(ep_,clientsock->fd());
    clientchannel->setreadcallback(std::bind(&Channel::onmessage,clientchannel));
    clientchannel->useet();
    clientchannel->enablereading();
}
void Channel::onmessage()//处理对端发过来的报文
{
    char buffer[1024];//存放从客户端读取的数据
    while(true)//使用非阻塞IO,一次读取buffer大小数据,直到全部读取完
    {
        bzero(&buffer,sizeof(buffer));
        ssize_t nread = read(fd_,buffer,sizeof(buffer));
        if(nread>0)//成功读取到了数据
        {
            //把接收到的数据原封不动的返回回去
            printf("recv(eventfd=%d);%s\n",fd_,buffer);
            send(fd_,buffer,strlen(buffer),0);
        }
        else if(nread==-1&&errno==EINTR)//读取数据的时候信号中断,继续读取
        {
            continue;
        }
        else if(nread==-1&&((errno==EAGAIN)||(errno==EWOULDBLOCK)))//全部的数据已读取完必
        {
            break;
        }
        else if(nread==0)//客户端连接已经断开
        {
            printf("client(eventfd=%d) disconnected.\n",fd_);
            close(fd_);
            break;
        }
    }
}


void Channel::setreadcallback(std::function<void()> fn)//设置fd_读事件的回调函数
{
    readcallback_=fn;
}

修改服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/*
 *此程序用于演示epoll模型实现网络通信服务端
 */
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/socket.h>
#include<sys/types.h>
#include<arpa/inet.h>
#include<sys/fcntl.h>
#include<sys/epoll.h>
#include<netinet/tcp.h>// TCP_NODELAY需要包含这个头文件
#include"InetAddress.h"//TCP_NODELAY用于禁用Nagle算法
#include"Socket.h"
#include"Epoll.h"


int main(int argc,char* argv[])
{
    
    if(argc!=3)
    {
        printf("usage: ./tcpepoll ip port\n");
        printf("example: ./tcpepoll 192.168.157.128 5005\n");
        return -1;
    }

    Socket servsock(createnonblocking());
    InetAddress servaddr(argv[1],atoi(argv[2]));
    servsock.setreuseaddr(true);
    servsock.settcpnodelay(true);
    servsock.setreuseport(true);
    servsock.setkeepalive(true);
    servsock.bind(servaddr);
    servsock.listen();

    Epoll ep;
    //ep.addfd(servsock.fd(),EPOLLIN);//水平触发,监听listenfd的读事件
    Channel * servchannel=new Channel(&ep,servsock.fd());
    servchannel->setreadcallback(std::bind(&Channel::newconnection,servchannel,&servsock));
    servchannel->enablereading();
    

    while(1)//事件循环
    {
        std::vector<Channel*>channels=ep.loop();  //等待监视的fd有事件发生

        //如果infds>0,表示有事件发生的socket的数量
        for(auto& ch:channels)
        {
            ch->handleevent();
                
        }
    }


    return 0;
}

增加EventLoop事件循环类

EventLoop头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#pragma once
#include "Epoll.h"

//事件循环类
class EventLoop
{
private:
    Epoll *ep_;     //每个事件循环只有一个epoll

public:
    EventLoop();    //在构造函数中创建epoll对象ep_
    ~EventLoop();   //在析构函数中销毁ep_
    void run();     //运行事件循环
    Epoll* ep();    //返回ep_地址
};

EventLoop源文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include"EventLoop.h"

EventLoop::EventLoop():ep_(new Epoll)    //在构造函数中创建epoll对象ep_
{

}
EventLoop::~EventLoop()   //在析构函数中销毁ep_
{
    delete ep_;
}
void EventLoop::run()     //运行事件循环
{
    while(1)//事件循环
    {
        std::vector<Channel*>channels=ep_->loop();  //等待监视的fd有事件发生

        //如果infds>0,表示有事件发生的socket的数量
        for(auto& ch:channels)
        {
            ch->handleevent();
                
        }
    }
}

Epoll* EventLoop::ep()    //返回ep_地址
{
    return ep_;
}

Makefile

1
2
3
4
5
6
7
8
9
10
all:client tcpepoll

client:client.cpp
	g++ -g client.cpp -o client

tcpepoll:tcpepoll.cpp InetAddress.cpp Socket.cpp Epoll.cpp Channel.cpp EventLoop.cpp
	g++ -g tcpepoll.cpp InetAddress.cpp Socket.cpp Epoll.cpp Channel.cpp EventLoop.cpp -o tcpepoll

clean:
	rm -f client tcpepoll

修改服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/*
 *此程序用于演示epoll模型实现网络通信服务端
 */
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/socket.h>
#include<sys/types.h>
#include<arpa/inet.h>
#include<sys/fcntl.h>
#include<sys/epoll.h>
#include<netinet/tcp.h>// TCP_NODELAY需要包含这个头文件
#include"InetAddress.h"//TCP_NODELAY用于禁用Nagle算法
#include"Socket.h"
#include"Epoll.h"
#include"EventLoop.h"


int main(int argc,char* argv[])
{
    
    if(argc!=3)
    {
        printf("usage: ./tcpepoll ip port\n");
        printf("example: ./tcpepoll 192.168.157.128 5005\n");
        return -1;
    }

    Socket servsock(createnonblocking());
    InetAddress servaddr(argv[1],atoi(argv[2]));
    servsock.setreuseaddr(true);
    servsock.settcpnodelay(true);
    servsock.setreuseport(true);
    servsock.setkeepalive(true);
    servsock.bind(servaddr);
    servsock.listen();


    EventLoop loop;
    
    Channel * servchannel=new Channel(loop.ep(),servsock.fd());
    servchannel->setreadcallback(std::bind(&Channel::newconnection,servchannel,&servsock));
    servchannel->enablereading();

    loop.run();
    
    return 0;
}

少了EventLoop和Channel的连接处理没做

封装TcpServer类

TcpServer头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#pragma once 
#include"EventLoop.h"
#include"Socket.h"
#include"Channel.h"

class TcpServer
{
private:
    EventLoop loop_;    //一个TcpServer可以有多个事件循环,现在是单线程,暂时只用一个事件循环
public:
    TcpServer(const std::string &ip,const uint16_t port);
    ~TcpServer();
    void start();
};

TcpServer源文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include"TcpServer.h"

TcpServer::TcpServer(const std::string &ip,const uint16_t port)
{
    Socket * servsock=new Socket(createnonblocking());
    InetAddress servaddr(ip,port);
    servsock->setreuseaddr(true);
    servsock->settcpnodelay(true);
    servsock->setreuseport(true);
    servsock->setkeepalive(true);
    servsock->bind(servaddr);
    servsock->listen();


    EventLoop loop;
    
    Channel * servchannel=new Channel(loop.ep(),servsock->fd());
    servchannel->setreadcallback(std::bind(&Channel::newconnection,servchannel,servsock));
    servchannel->enablereading();
}
TcpServer::~TcpServer()
{

}

void TcpServer::start()
{
    loop_.run();
}

Makefile

1
2
3
4
5
6
7
8
9
10
all:client tcpepoll

client:client.cpp
	g++ -g client.cpp -o client

tcpepoll:tcpepoll.cpp InetAddress.cpp Socket.cpp Epoll.cpp Channel.cpp EventLoop.cpp TcpServer.cpp
	g++ -g tcpepoll.cpp InetAddress.cpp Socket.cpp Epoll.cpp Channel.cpp EventLoop.cpp TcpServer.cpp -o tcpepoll

clean:
	rm -f client tcpepoll

修改服务程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/*
 *此程序用于演示epoll模型实现网络通信服务端
 */

#include"TcpServer.h"


int main(int argc,char* argv[])
{
    
    if(argc!=3)
    {
        printf("usage: ./tcpepoll ip port\n");
        printf("example: ./tcpepoll 192.168.157.128 5005\n");
        return -1;
    }

    TcpServer tcpserver(argv[1],atoi(argv[2]));

    tcpserver.start();
    

    


    return 0;
}