TCP sticking problem
TCP is connected, stream-oriented reliability transmission. TCP will merge multiple data with small intervals and small data volumes into a large data block and then send packets. In this way, a data packet may contain multiple messages. Flow-oriented communications have no message protection boundaries, that is, TCP stickers. The receiver needs to unpack and group data by itself to solve the problem of sticking packets.
To solve the problem of TCP sticking packets, we must define a public package header for TCP. The package header generally includes message type and message size. Use the package header to divide each data packet and make the boundary of the data packet.
The following uses C++ to implement the TCP client and TCP server respectively, and use qt test.
TCP Client
The TCP client actively connects to the TCP server and receives the data sent by the TCP server. The received data is divided into packets according to the defined common packet header. Whenever a complete data packet is formed, relevant information is printed.
#ifndef TCPCLIENT_H #define TCPCLIENT_H #include <> #include <> #include <> #include <> #include <> #include <> #include <> #include <sys/> #include <netinet/> #include <arpa/> #include <sys/> #include <> #include <new> #define MAX_PKT_SIZE (256<<20) //Maximum network packet length//Business package headerstruct CommMsgHdr { uint16_t uMsgType; uint32_t uTotalLen; }; typedef struct _TcpHandle_{ int32_t fd; uint32_t uRcvLen; //Received data size uint32_t uAllLen; //Total message length struct sockaddr_in local_addr; struct sockaddr_in remote_addr; _TcpHandle_() { uRcvLen = 0; uAllLen = 0; } }TcpHandle; class TcpClient { public: TcpClient(); int32_t create_tcpClient(char *serverIp, int32_t serverPort); int32_t SendData(char *data, int32_t len); bool m_runing; int epoll_fd; TcpHandle* pTcpHandle; private: pthread_t threadId; }; #endif // TCPCLIENT_H
#include "" int32_t TcpRcv(const int32_t& fd, void* buff, const uint32_t& len) { int32_t iCurrRecv = recv(fd, buff, len, MSG_NOSIGNAL); if (0 < iCurrRecv) { return iCurrRecv; } else if (iCurrRecv < 0) { if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) { return 0; } else return -1; } else return -1; } void* DealTcpThread(void* obj) { TcpClient* pTcpClient = (TcpClient*)obj; TcpHandle* pTcpHandle = pTcpClient->pTcpHandle; const int kEpollDefaultWait = 1;//Timeout time, unit ms struct epoll_event alive_events[256]; uint32_t recv_buffer_max = 1024 * 1024; uint8_t *recv_buffer = nullptr; recv_buffer = new uint8_t[recv_buffer_max]; uint32_t head_len = (uint32_t)sizeof(CommMsgHdr); while (pTcpClient->m_runing) { int num = epoll_wait(pTcpClient->epoll_fd, alive_events, 256, kEpollDefaultWait); for (int i = 0; i < num; ++i) { int fd = alive_events[i].; int events = alive_events[i].events; if ( events & EPOLLIN ) { //1. Start receiving the head if(pTcpHandle->uRcvLen < head_len) { int32_t iRecvLen = TcpRcv(fd, recv_buffer + pTcpHandle->uRcvLen, head_len - pTcpHandle->uRcvLen); if (0 == iRecvLen) continue; else if (0 > iRecvLen) { printf("Recv head data, return [%d] and err[%s],fd=[%d].", iRecvLen, strerror(errno),fd); close(fd);//Close socket continue; } pTcpHandle->uRcvLen += iRecvLen; //If the complete header has been received if(pTcpHandle->uRcvLen >= head_len) { CommMsgHdr* pHdr = (CommMsgHdr *)recv_buffer; pTcpHandle->uAllLen = pHdr->uTotalLen; //If the uTotalLen in the message header is too small or too large, exception handling if ( pHdr->uTotalLen < head_len || pHdr->uTotalLen > MAX_PKT_SIZE ) { printf("uTotalLen invalid,uTotalLen=%u,fd=[%d]", pHdr->uTotalLen,fd); close(fd);//Close socket continue; } //If uTotalLen is greater than the allocated cache, reassign if (((CommMsgHdr *)recv_buffer)->uTotalLen > recv_buffer_max) { uint8_t *new_recv_buffer = new uint8_t[((CommMsgHdr *)recv_buffer)->uTotalLen]; memcpy(new_recv_buffer, recv_buffer,head_len); delete [] recv_buffer;// Free up the original space recv_buffer = new_recv_buffer;// Re-point to the newly opened space recv_buffer_max = ((CommMsgHdr *)recv_buffer)->uTotalLen;// Reassign the maximum buffer length } } } //2. Start receiving data body else { int32_t iRecvLen = TcpRcv(fd, recv_buffer + pTcpHandle->uRcvLen, pTcpHandle->uAllLen - pTcpHandle->uRcvLen); if (0 == iRecvLen) continue; else if (0 > iRecvLen) { printf("Recv body data, return [%d] and err[%s],fd=[%d].", iRecvLen, strerror(errno),fd); close(fd);//Close socket continue; } pTcpHandle->uRcvLen += iRecvLen; // Complete reception if(pTcpHandle->uRcvLen == pTcpHandle->uAllLen) { CommMsgHdr* pHdr = (CommMsgHdr*)recv_buffer; printf("Rcv completed,msgType=%d,uTotalLen=%u\n",pHdr->uMsgType,pHdr->uTotalLen); pTcpHandle->uRcvLen = 0; pTcpHandle->uAllLen = 0; } } } } } delete [] recv_buffer; recv_buffer = nullptr; return nullptr; } TcpClient::TcpClient() { pTcpHandle = new TcpHandle; epoll_fd = epoll_create(1); } int32_t TcpClient::create_tcpClient(char *serverIp, int32_t serverPort) { if (pTcpHandle == NULL) return -1; pTcpHandle->fd = -1; if((pTcpHandle->fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { printf("socket err=%s\n",strerror(errno)); return -2; } pTcpHandle->remote_addr.sin_family = AF_INET; pTcpHandle->remote_addr.sin_port = htons(serverPort); pTcpHandle->remote_addr.sin_addr.s_addr = inet_addr(serverIp); if(connect(pTcpHandle->fd, (struct sockaddr *)&pTcpHandle->remote_addr, sizeof(pTcpHandle->remote_addr)) < 0) { printf("connect err=%s\n",strerror(errno)); return -3; } struct epoll_event evt; = EPOLLIN; fcntl(pTcpHandle->fd, F_SETFL, O_NONBLOCK);//Set non-blocking = pTcpHandle->fd; epoll_ctl(epoll_fd,EPOLL_CTL_ADD,pTcpHandle->fd,&evt); m_runing = true; pthread_create(&threadId,NULL,DealTcpThread,this); return 0; } int32_t TcpClient::SendData(char *data, int32_t len) { int32_t ret = send(pTcpHandle->fd, data, len, MSG_NOSIGNAL); return ret; }
TCP server
The server starts listening, and when there is a client access, it sends data packets of unequal sizes to the client cycle.
#ifndef TCPSERVER_H #define TCPSERVER_H #include <> #include <> #include <> #include <> #include <> #include <> #include <> #include <sys/> #include <netinet/> #include <arpa/> #include <sys/> #include <> #include <new> #define MAX_PKT_SIZE (256<<20) //Maximum network packet length//Business package headerstruct CommMsgHdr { uint16_t uMsgType; uint32_t uTotalLen; }; typedef struct _TcpHandle_{ int32_t fd; uint32_t uRcvLen; //Received data size uint32_t uAllLen; //Total message length struct sockaddr_in local_addr; struct sockaddr_in remote_addr; _TcpHandle_() { uRcvLen = 0; uAllLen = 0; } }TcpHandle; class TcpServer { public: TcpServer(); int32_t create_tcpServer(int32_t listenPort); bool m_runing; int epoll_fd; TcpHandle* pTcpSerHandle; private: pthread_t threadId; }; #endif // TCPSERVER_H
#include "" int SendLoop(int32_t fd, uint8_t * buff, uint32_t len) { uint64_t total_send_bytes = 0; int64_t curr_send_len = 0; uint64_t left_bytes = len; while(total_send_bytes < len) { curr_send_len = send(fd, buff + total_send_bytes, left_bytes, MSG_NOSIGNAL); if(curr_send_len < 0) { if( errno == EINTR || errno == EAGAIN) continue; return -1; } else { total_send_bytes += curr_send_len; left_bytes -= curr_send_len; } } return 0; } void* DealTcpThread(void* obj) { TcpServer* pTcpServer = (TcpServer*)obj; TcpHandle* pTcpSerHandle = (TcpHandle*)pTcpServer->pTcpSerHandle; socklen_t src_len = sizeof(struct sockaddr_in); while (pTcpServer->m_runing) { struct sockaddr_in src; memset(&src, 0, src_len); int connfd = accept(pTcpSerHandle->fd, (struct sockaddr*) &src, &src_len); if(connfd > -1) { //Start send for(int index=0;index<100;index++) { uint32_t dataLength = 1024*1024*16 + index*10; void *sendbuff = new char[dataLength]; CommMsgHdr* pHead = (CommMsgHdr*)sendbuff; pHead->uMsgType = 1001; pHead->uTotalLen = dataLength; SendLoop(connfd,(uint8_t * )sendbuff,dataLength); } } } return nullptr; } TcpServer::TcpServer() { pTcpSerHandle = new TcpHandle; } int32_t TcpServer::create_tcpServer(int32_t listenPort) { pTcpSerHandle->fd = -1; pTcpSerHandle->local_addr.sin_family = AF_INET; pTcpSerHandle->local_addr.sin_port = htons(listenPort); pTcpSerHandle->local_addr.sin_addr.s_addr = INADDR_ANY; pTcpSerHandle->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); int opt = 1; setsockopt(pTcpSerHandle->fd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));//Multiplex port if (bind(pTcpSerHandle->fd, (struct sockaddr*) &pTcpSerHandle->local_addr,sizeof(struct sockaddr_in)) < 0) { printf("http server bind error(%s)",strerror(errno)); return -1; } listen(pTcpSerHandle->fd, 32); m_runing = true; pthread_create(&threadId,NULL,DealTcpThread,this); return 0; }
Source code testing
Start the server first
TcpServer *pTcpServer; pTcpServer = new TcpServer; pTcpServer->create_tcpServer(9090);
Start the client again
TcpClient* pTcpClient; pTcpClient = new TcpClient; pTcpClient->create_tcpClient("127.0.0.1",9090);
Client printing
Rcv completed,msgType=1001,uTotalLen=16777216 Rcv completed,msgType=1001,uTotalLen=16777226 Rcv completed,msgType=1001,uTotalLen=16777236 Rcv completed,msgType=1001,uTotalLen=16777246 Rcv completed,msgType=1001,uTotalLen=16777256 Rcv completed,msgType=1001,uTotalLen=16777266 Rcv completed,msgType=1001,uTotalLen=16777276 Rcv completed,msgType=1001,uTotalLen=16777286 ... ... ...
This is the end of this article about C++ solving the problem of TCP sticking packets. For more related contents of C++ TCP sticking packets, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!