asio之socket的创建和连接
终端节点的创建
所谓终端节点就是用来通信的端对端的节点,可以通过ip地址和端口构造,其的节点可以连接这个终端节点做通信。
如果我们是客户端,我们可以通过对端的ip和端口构造一个endpoint,用这个endpoint和其通信。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| int client_end_point() { std::string raw_ip_address = "127.0.0.1"; unsigned short port_num = 3333; boost::system::error_code ec; asio::ip::address ip_address = asio::ip::address::from_string(raw_ip_address, ec); if (ec.value() != 0) { std::cout << "Failed to parse the IP address. Error code = " << ec.value() << ". Message: " << ec.message(); return ec.value(); } asio::ip::tcp::endpoint ep(ip_address, port_num); return 0; }
|
如果是服务端,则只需根据本地地址绑定就可以生成endpoint
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| int server_end_point(){ unsigned short port_num = 3333; asio::ip::address ip_address = asio::ip::address_v6::any(); asio::ip::tcp::endpoint ep(ip_address, port_num); return 0; }
|
创建socket
- socket进行通信必须要一个参数,这个参数就是上下文
- 上下文是boost的asio的一个核心服务,它的所有的服务都是通过上下文服务来通信的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| int create_tcp_socket() { asio::io_context ios;
asio::ip::tcp protocol = asio::ip::tcp::v4();
asio::ip::tcp::socket sock(ios);
boost::system::error_code ec;
sock.open(protocol, ec); if (ec.value() != 0) { std::cout << "Failed to open the socket! Error code = " << ec.value() << ". Message: " << ec.message(); return ec.value(); } return 0; }
|
上述socket只是通信的socket,如果是服务端,我们还需要生成一个acceptor的socket,用来接收新的连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| int create_acceptor_socket() {
asio::ip::tcp::acceptor a(ios,asio::ip::tcp::endpoint(asio::ip::tcp::v4(),3333));
return 0; }
|
绑定acceptor
对于acceptor类型的socket,服务器要将其绑定到指定的断点,所有连接这个端点的连接都可以被接收到。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| int bind_acceptor_socket() { unsigned short port_num = 3333; asio::ip::tcp::endpoint ep(asio::ip::address_v4::any(), port_num); asio::io_context ios; asio::ip::tcp::acceptor acceptor(ios, ep.protocol()); boost::system::error_code ec; acceptor.bind(ep, ec); if (ec.value() != 0) { std::cout << "无法绑定acceptor socket。" << "错误代码 = " << ec.value() << "。消息:" << ec.message(); return ec.value(); } return 0; }
|
连接指定的端点
作为客户端可以连接服务器指定的端点进行连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| int connect_to_end() { std::string raw_ip_address = "127.0.0.1"; unsigned short port_num = 3333; try { asio::ip::tcp::endpoint ep(asio::ip::address::from_string(raw_ip_address), port_num); asio::io_context ios; asio::ip::tcp::socket sock(ios, ep.protocol()); sock.connect(ep); } catch (system::system_error &e) { std::cout << "发生错误!错误代码 = " << e.code() << "。消息:" << e.what(); return e.code().value(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| #include <asio.hpp> #include <iostream> #include <string> int dns_connect_to_end(){ std::string host = "llfc.club"; std::string port_num = "3333";
asio::io_context ios;
asio::ip::tcp::resolver::query resolver_query(host, port_num, asio::ip::tcp::resolver::query::numeric_service);
asio::ip::tcp::resolver resolver(ios);
try { asio::ip::tcp::resolver::iterator it = resolver.resolve(resolver_query);
asio::ip::tcp::socket sock(ios);
asio::connect(sock, it); } catch (asio::system_error &e) { std::cout << "Error occurred! Error code = " << e.code() << ". Message: " << e.what(); return e.code().value(); } return 0; }
|
服务器接收连接
- 当有客户端连接时,服务器需要接收连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| int accept_new_connection(){ const int BACKLOG_SIZE = 30; unsigned short port_num = 3333; asio::ip::tcp::endpoint ep(asio::ip::address_v4::any(), port_num); asio::io_context ios; try { asio::ip::tcp::acceptor acceptor(ios, ep.protocol()); acceptor.bind(ep); acceptor.listen(BACKLOG_SIZE); asio::ip::tcp::socket sock(ios); acceptor.accept(sock); } catch (system::system_error& e) { std::cout << "发生错误!错误代码 = " << e.code() << "。消息:" << e.what(); return e.code().value(); } }
|
asio socket同步读写
关于buffer
- 任何网络库都有提供buffer的数据结构,所谓buffer就是接收和发送数据时缓存数据的结构。
boost::asio
提供了asio::mutable_buffer
和asio::const_buffer
这两个结构,他们是一段连续的空间,首字节存储了后续数据的长度。
asio::mutable_buffer
用于写服务,asio::const_buffer
用于读服务。但是这两个结构都没有被asio的api直接使用。
- 对于api的buffer参数,asio提出了
MutableBufferSequence
和ConstBufferSequence
概念,他们是由多个asio::mutable_buffer
和asio::const_buffer
组成的。也就是说boost::asio
为了节省空间,将一部分连续的空间组合起来,作为参数交给api使用。
- 我们可以理解为
MutableBufferSequence
的数据结构为std::vector<asio::mutable_buffer>
结构如下
- 每隔
vector
存储的都是mutable_buffer
的地址,每个mutable_buffer
的第一个字节表示数据的长度,后面跟着数据内容。
- 这么复杂的结构交给用户使用并不合适,所以asio提出了
buffer()
函数,该函数接收多种形式的字节流,该函数返回asio::mutable_buffers_1 o
或者asio::const_buffers_1
结构的对象。
- 如果传递给
buffer()
的参数是一个只读类型,则函数返回asio::const_buffers_1
类型对象。
- 如果传递给
buffer()
的参数是一个可写类型,则返回asio::mutable_buffers_1
类型对象。
asio::const_buffers_1
和asio::mutable_buffers_1
是asio::mutable_buffer
和asio::const_buffer
的适配器,提供了符合MutableBufferSequence
和ConstBufferSequence
概念的接口,所以他们可以作为boost::asio
的api函数的参数使用。
- 简单概括一下,我们可以用
buffer()
函数生成我们要用的缓存存储数据。
- 比如boost的发送接口send要求的参数为
ConstBufferSequence
类型
1 2
| template<typename ConstBufferSequence> std::size_t send(const ConstBufferSequence & buffers);
|
我们需要将”Hello Word转化为该类型”
1 2 3 4 5 6
| void use_const_buffer() { std::string buf = "hello world!"; asio::const_buffer asio_buf(buf.c_str(), buf.length()); std::vector<asio::const_buffer> buffers_sequence; buffers_sequence.push_back(asio_buf); }
|
最终buffers_sequence就是可以传递给发送接口send的类型。但是这太复杂了,可以直接用buffer函数转化为send需要的参数类型
1 2 3
| void use_buffer_str() { asio::const_buffers_1 output_buf = asio::buffer("hello world"); }
|
output_buf可以直接传递给该send接口。我们也可以将数组转化为send接受的类型
1 2 3 4 5
| void use_buffer_array(){ const size_t BUF_SIZE_BYTES = 20; std::unique_ptr<char[] > buf(new char[BUF_SIZE_BYTES]); auto input_buf = asio::buffer(static_cast<void*>(buf.get()), BUF_SIZE_BYTES); }
|
对于流式操作,我们可以用streambuf,将输入输出流和streambuf绑定,可以实现流式输入和输出。
1 2 3 4 5 6 7 8 9 10 11 12 13
| void use_stream_buffer() { asio::streambuf buf; std::ostream output(&buf); output << "Message1\nMessage2"; std::istream input(&buf); std::string message1; std::getline(input, message1); }
|
同步写write_some
boost::asio
提供了几种同步写的api,write_som
e可以每次向指定的空间写入固定的字节数,如果写缓冲区满了,就只写一部分,返回写入的字节数。
1 2 3 4 5 6 7 8 9 10 11
| void write_to_socket(asio::ip::tcp::socket& sock) { std::string buf = "Hello World!"; std::size_t total_bytes_written = 0; while (total_bytes_written != buf.length()) { total_bytes_written += sock.write_some(asio::buffer(buf.c_str() + total_bytes_written, buf.length() - total_bytes_written)); } }
|
同步写send
write_some
使用起来比较麻烦,需要多次调用,asio提供了send函数。send函数会一次性将buffer中的内容发送给对端,如果有部分字节因为发送缓冲区满无法发送,则阻塞等待,直到发送缓冲区可用,则继续发送完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| int send_data_by_send() { std::string raw_ip_address = "127.0.0.1"; unsigned short port_num = 3333; try { asio::ip::tcp::endpoint ep(asio::ip::address::from_string(raw_ip_address), port_num); asio::io_context ios; asio::ip::tcp::socket sock(ios, ep.protocol()); sock.connect(ep); std::string buf = "Hello World!"; int send_length = sock.send(asio::buffer(buf.c_str(), buf.length())); if (send_length <= 0) { cout << "发送失败" << endl; return 0; } } catch (system::system_error& e) { std::cout << "发生错误!错误代码 = " << e.code() << "。消息:" << e.what(); return e.code().value(); } return 0; }
|
同步写write
- 类似send方法,asio还提供了一个write函数,可以一次性将所有数据发送给对端,如果发送缓冲区满了则阻塞,直到发送缓冲区可用,将数据发送完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| int send_data_by_wirte() { std::string raw_ip_address = "127.0.0.1"; unsigned short port_num = 3333; try { asio::ip::tcp::endpoint ep(asio::ip::address::from_string(raw_ip_address), port_num); asio::io_service ios; asio::ip::tcp::socket sock(ios, ep.protocol()); sock.connect(ep); std::string buf = "Hello World!"; int send_length = asio::write(sock, asio::buffer(buf.c_str(), buf.length())); if (send_length <= 0) { cout << "send failed" << endl; return 0; } } catch (system::system_error& e) { std::cout << "Error occured! Error code = " << e.code() << ". Message: " << e.what(); return e.code().value(); } return 0; }
|
同步读read_some
- 同步读和同步写类似,提供了读取指定字节数的接口read_some
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| std::string read_from_socket(asio::ip::tcp::socket& sock) { const unsigned char MESSAGE_SIZE = 7; char buf[MESSAGE_SIZE]; std::size_t total_bytes_read = 0; while (total_bytes_read != MESSAGE_SIZE) { total_bytes_read += sock.read_some( asio::buffer(buf + total_bytes_read, MESSAGE_SIZE - total_bytes_read)); } return std::string(buf, total_bytes_read); } int read_data_by_read_some() { std::string raw_ip_address = "127.0.0.1"; unsigned short port_num = 3333; try { asio::ip::tcp::endpoint ep(asio::ip::address::from_string(raw_ip_address), port_num); asio::io_service ios; asio::ip::tcp::socket sock(ios, ep.protocol()); sock.connect(ep); read_from_socket(sock); } catch (system::system_error& e) { std::cout << "Error occured! Error code = " << e.code() << ". Message: " << e.what(); return e.code().value(); } return 0; }
|
同步读receive
- 可以一次性同步接收对方发送的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| int read_data_by_receive() { std::string raw_ip_address = "127.0.0.1"; unsigned short port_num = 3333; try { asio::ip::tcp::endpoint ep(asio::ip::address::from_string(raw_ip_address), port_num); asio::io_service ios; asio::ip::tcp::socket sock(ios, ep.protocol()); sock.connect(ep); const unsigned char BUFF_SIZE = 7; char buffer_receive[BUFF_SIZE]; int receive_length = sock.receive(asio::buffer(buffer_receive, BUFF_SIZE)); if (receive_length <= 0) { cout << "receive failed" << endl; } } catch (system::system_error& e) { std::cout << "Error occured! Error code = " << e.code() << ". Message: " << e.what(); return e.code().value(); } return 0; }
|
同步读read
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| int read_data_by_read() { std::string raw_ip_address = "127.0.0.1"; unsigned short port_num = 3333; try { asio::ip::tcp::endpoint ep(asio::ip::address::from_string(raw_ip_address), port_num); asio::io_service ios; asio::ip::tcp::socket sock(ios, ep.protocol()); sock.connect(ep); const unsigned char BUFF_SIZE = 7; char buffer_receive[BUFF_SIZE]; int receive_length = asio::read(sock, asio::buffer(buffer_receive, BUFF_SIZE)); if (receive_length <= 0) { cout << "receive failed" << endl; } } catch (system::system_error& e) { std::cout << "Error occured! Error code = " << e.code() << ". Message: " << e.what(); return e.code().value(); } return 0; }
|
读取直到指定字符
1 2 3 4 5 6 7 8 9 10
| std::string read_data_by_until(asio::ip::tcp::socket& sock) { asio::streambuf buf; asio::read_until(sock, buf, '\n'); std::string message; std::istream input_stream(&buf); std::getline(input_stream, message); return message; }
|
asio异步读写操作
异步写操作
在写操作前,我们先封装一个Node结构,用来管理要发送和接收的数据,该结构包含数据域首地址,数据的总长度,以及已经处理的长度(已读的长度或者已写的长度)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| const int RECVSIZE = 1024; class MsgNode { public : MsgNode(const char* msg, int total_len): _total_len(total_len), _cur_len(0){ _msg = new char[total_len]; memcpy(_msg, msg, total_len); } MsgNode(int total_len) :_total_len(total_len), _cur_len(0) { _msg = new char[total_len]; } ~MsgNode(){ delete[]_msg; } char* _msg; int _total_len; int _cur_len; };
|
写了两个构造函数,两个参数的负责构造写节点,一个参数的负责构造读节点。
接下来为Session添加异步写操作和负责发送写数据的节点
- 需要一个回调函数
WriteCallBackErr
,其中bytes_transferred
代表我们写了多少字符
WriteToSocketErr
是封装的异步写函数
1 2 3 4 5 6 7
| class Session{ public: void WriteCallBackErr(const boost::system::error_code & ec, std::size_t bytes_transferred,std::shared_ptr<MsgNode>); void WriteToSocketErr(const std::string& buf); private: std::shared_ptr<MsgNode> _send_node; };
|
WriteToSocketErr函数为我们封装的写操作,WriteCallBackErr为异步写操作回调的函数,为什么会有三个参数呢,我们可以看一下asio源码
1 2 3 4 5 6 7 8
| BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code, std::size_t)) WriteToken BOOST_ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)> BOOST_ASIO_INITFN_AUTO_RESULT_TYPE_PREFIX(WriteToken, void (boost::system::error_code, std::size_t)) async_write_some(const ConstBufferSequence& buffers, BOOST_ASIO_MOVE_ARG(WriteToken)token BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(executor_type))
|
async_write_some
是异步写的函数,这个异步写函数有两个参数,第一个参数为ConstBufferSequence
常引用类型的buffers,第二个参数为WriteToken
类型,而WriteToken
在上面定义了,是一个函数对象类型,返回值为void,参数为error_code
和size_t,所以我们为了调用async_write_some
函数也要传入一个符合WriteToken定义的函数,就是我们声明的WriteCallBackErr
函数,前两个参数为WriteToken
规定的参数,第三个参数为MsgNode的智能指针,这样通过智能指针保证我们发送的Node生命周期延长。我们看一下WriteToSocketErr
函数的具体实现
1 2 3 4 5 6 7 8
| void Session::WriteToSocketErr(const std::string& buf) { _send_node = make_shared<MsgNode>(buf.c_str(), buf.length()); this->_socket->async_write_some(asio::buffer(_send_node->_msg, _send_node->_total_len), std::bind(&Session::WriteCallBackErr, this, std::placeholders::_1, std::placeholders::_2, _send_node)); }
|
因为WriteCallBackErr
函数为三个参数且为成员函数,而async_write_some
需要的回调函数为两个参数,所以我们通过bind将三个参数转换为两个参数的普通函数。
我们看看回调函数的实现
1 2 3 4 5 6 7 8 9 10 11
| void Session::WriteCallBackErr(const boost::system::error_code& ec, std::size_t bytes_transferred, std::shared_ptr<MsgNode> msg_node) { if (bytes_transferred + msg_node->_cur_len < msg_node->_total_len) { _send_node->_cur_len += bytes_transferred; this->_socket->async_write_some(asio::buffer(_send_node->_msg+_send_node->_cur_len, _send_node->_total_len-_send_node->_cur_len), std::bind(&Session::WriteCallBackErr, this, std::placeholders::_1, std::placeholders::_2, _send_node)); } }
|
在WriteCallBackErr
函数里判断如果已经发送的字节数没有达到要发送的总字节数,那么久更新节点已经发送的长度,然后计算剩余要发送的长度,如果有数据未发送完,再次调用async_write_some函数异步发送。
但是这个函数并不能投入实际应用,因为async_write_some
回调函数返回已发送的字节数可能并不是全部长度。比如TCP发送缓存区总大小为8字节,但是有3字节未发送(上一次未发送完),这样剩余空间为5字节
此时我们调用`async_write_some`发送hello world!实际发送的长度就是为5,也就是只发送了hello,剩余world!通过我们的回调继续发送。
而实际开发的场景用户是不清楚底层tcp的多路复用调用情况的,用户想发送数据的时候就调用WriteToSocketErr,或者循环调用WriteToSocketErr,很可能在一次没发送完数据还未调用回调函数时再次调用`WriteToSocketErr`,因为boost::asio封装的时epoll和iocp等多路复用模型,当写事件就绪后就发数据,发送的数据按照async_write_some调用的顺序发送,所以回调函数内调用的async_write_some可能并没有被及时调用。
比如我们如下代码
1 2 3 4
| WriteToSocketErr("Hello World!");
WriteToSocketErr("Hello World!");
|
那么很可能第一次只发送了Hello,后面的数据没发完,第二次发送了Hello World!之后又发送了World!
所以对端收到的数据很可能是”HelloHello World! World!”
那怎么解决这个问题呢,我们可以通过队列保证应用层的发送顺序。我们在Session中定义一个发送队列,然后重新定义正确的异步发送函数和回调处理
1 2 3 4 5 6 7 8 9
| class Session{ public: void WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred); void WriteToSocket(const std::string &buf); private: std::queue<std::shared_ptr<MsgNode>> _send_queue; std::shared_ptr<asio::ip::tcp::socket> _socket; bool _send_pending; };
|
定义了bool变量_send_pending
,该变量为true表示一个节点还未发送完。
_send_queue
用来缓存要发送的消息节点,是一个队列。
我们实现异步发送功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| void Session::WriteToSocket(const std::string& buf){ _send_queue.emplace(new MsgNode(buf.c_str(), buf.length())); if (_send_pending) { return; } this->_socket->async_write_some(asio::buffer(buf), std::bind(&Session::WriteCallBack, this, std::placeholders::_1, std::placeholders::_2)); _send_pending = true; } void Session::WriteCallBack(const boost::system::error_code & ec, std::size_t bytes_transferred){ if (ec.value() != 0) { std::cout << "Error , code is " << ec.value() << " . Message is " << ec.message(); return; } auto & send_data = _send_queue.front(); send_data->_cur_len += bytes_transferred; if (send_data->_cur_len < send_data->_total_len) { this->_socket->async_write_some(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len-send_data->_cur_len), std::bind(&Session::WriteCallBack, this, std::placeholders::_1, std::placeholders::_2)); return; } _send_queue.pop(); if (_send_queue.empty()) { _send_pending = false; } if (!_send_queue.empty()) { auto& send_data = _send_queue.front(); this->_socket->async_write_some(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len - send_data->_cur_len), std::bind(&Session::WriteCallBack, this, std::placeholders::_1, std::placeholders::_2)); } }
|
async_write_some
函数不能保证每次回调函数触发时发送的长度为要总长度,这样我们每次都要在回调函数判断发送数据是否完成,asio提供了一个更简单的发送函数async_send
,这个函数在发送的长度未达到我们要求的长度时就不会触发回调,所以触发回调函数时要么时发送出错了要么是发送完成了,其内部的实现原理就是帮我们不断的调用async_write_some
直到完成发送,所以async_send
不能和async_write_some
混合使用,我们基于async_send
封装另外一个发送函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| void Session::WriteAllToSocket(const std::string& buf) { _send_queue.emplace(new MsgNode(buf.c_str(), buf.length())); if (_send_pending) { return; } this->_socket->async_send(asio::buffer(buf), std::bind(&Session::WriteAllCallBack, this, std::placeholders::_1, std::placeholders::_2)); _send_pending = true; } void Session::WriteAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred){ if (ec.value() != 0) { std::cout << "Error occured! Error code = " << ec.value() << ". Message: " << ec.message(); return; } _send_queue.pop(); if (_send_queue.empty()) { _send_pending = false; } if (!_send_queue.empty()) { auto& send_data = _send_queue.front(); this->_socket->async_send(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len - send_data->_cur_len), std::bind(&Session::WriteAllCallBack, this, std::placeholders::_1, std::placeholders::_2)); } }
|
异步读操作
接下来介绍异步读操作,异步读操作和异步的写操作类似同样又async_read_some和async_receive函数,前者触发的回调函数获取的读数据的长度可能会小于要求读取的总长度,后者触发的回调函数读取的数据长度等于读取的总长度。
先基于async_read_some
封装一个读取的函数ReadFromSocket
,同样在Session类的声明中添加一些变量
1 2 3 4 5 6 7 8 9
| class Session { public: void ReadFromSocket(); void ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred); private: std::shared_ptr<asio::ip::tcp::socket> _socket; std::shared_ptr<MsgNode> _recv_node; bool _recv_pending; };
|
_recv_node
用来缓存接收的数据,_recv_pending
为true表示节点正在接收数据,还未接受完。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| void Session::ReadFromSocket() { if (_recv_pending) { return; }
_recv_node = std::make_shared<MsgNode>(RECVSIZE); _socket->async_read_some(asio::buffer(_recv_node->_msg, _recv_node->_total_len), std::bind(&Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2)); _recv_pending = true; } void Session::ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred){ _recv_node->_cur_len += bytes_transferred; if (_recv_node->_cur_len < _recv_node->_total_len) { _socket->async_read_some(asio::buffer(_recv_node->_msg+_recv_node->_cur_len, _recv_node->_total_len - _recv_node->_cur_len), std::bind(&Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2)); return; } _recv_pending = false; _recv_node = nullptr; }
|
我们基于async_receive
再封装一个接收数据的函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| void Session::ReadAllFromSocket(const std::string& buf) { if (_recv_pending) { return; }
_recv_node = std::make_shared<MsgNode>(RECVSIZE); _socket->async_receive(asio::buffer(_recv_node->_msg, _recv_node->_total_len), std::bind(&Session::ReadAllCallBack, this, std::placeholders::_1, std::placeholders::_2)); _recv_pending = true; } void Session::ReadAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred) { _recv_node->_cur_len += bytes_transferred; _recv_pending = false; _recv_node = nullptr; }
|
同样async_read_some
和async_receive
不能混合使用,否则会出现逻辑问题。
异步读写的服务器示例
Session类
Session类主要是处理客户端消息收发的会话类,为了简单起见,我们不考虑粘包问题,也不考虑支持手动调用发送的接口,只以应答的方式发送和接收固定长度(1024字节长度)的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| class Session { public: Session(boost::asio::io_context& ioc):_socket(ioc){ } tcp::socket& Socket() { return _socket; } void Start(); private: void handle_read(const boost::system::error_code & error, size_t bytes_transfered); void handle_write(const boost::system::error_code& error); tcp::socket _socket; enum {max_length = 1024}; char _data[max_length]; };
|
_data
用来接收客户端传递的数据
_socket
为单独处理客户端读写的socket。
handle_read
和handle_write
分别为读回调函数和写回调函数。
接下来我们实现Session类1 2 3 4 5 6 7
| void Session::Start(){ memset(_data, 0, max_length); _socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&Session::handle_read, this, placeholders::_1, placeholders::_2) ); }
|
在Start方法中我们调用异步读操作,监听对端发送的消息。当对端发送数据后,触发handle_read
函数
1 2 3 4 5 6 7 8 9 10
| void Session::handle_read(const boost::system::error_code& error, size_t bytes_transfered) { if (!error) { cout << "server receive data is " << _data << endl; boost::asio::async_write(_socket, boost::asio::buffer(_data, bytes_transfered), std::bind(&Session::handle_write, this, placeholders::_1)); } else { delete this; } }
|
handle_read
函数内将收到的数据发送给对端,当发送完成后触发handle_write
回调函数。
1 2 3 4 5 6 7 8 9 10
| void Session::handle_write(const boost::system::error_code& error) { if (!error) { memset(_data, 0, max_length); _socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&Session::handle_read, this, placeholders::_1, placeholders::_2)); } else { delete this; } }
|
handle_write
函数内又一次监听了读事件,如果对端有数据发送过来则触发handle_read,我们再将收到的数据发回去。从而达到应答式服务的效果。
Server类
Server类为服务器接收连接的管理类
1 2 3 4 5 6 7 8 9
| class Server { public: Server(boost::asio::io_context& ioc, short port); private: void start_accept(); void handle_accept(Session* new_session, const boost::system::error_code& error); boost::asio::io_context& _ioc; tcp::acceptor _acceptor; };
|
start_accept
将要接收连接的acceptor
绑定到服务上,其内部就是将accpeptor
对应的socket描述符绑定到epoll
或iocp
模型上,实现事件驱动。
handle_accept
为新连接到来后触发的回调函数。
下面是具体实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| Server::Server(boost::asio::io_context& ioc, short port) :_ioc(ioc), _acceptor(ioc, tcp::endpoint(tcp::v4(), port)) { start_accept(); } void Server::start_accept() { Session* new_session = new Session(_ioc); _acceptor.async_accept(new_session->Socket(), std::bind(&Server::handle_accept, this, new_session, placeholders::_1)); } void Server::handle_accept(Session* new_session, const boost::system::error_code& error) { if (!error) { new_session->Start(); } else { delete new_session; } start_accept(); }
|