SoFunction
Updated on 2025-03-10

C++ solves the problem of TCP sticky packet implementation

TCP sticking problem

TCP is connected, stream-oriented reliability transmission. TCP will merge multiple data with small intervals and small data volumes into a large data block and then send packets. In this way, a data packet may contain multiple messages. Flow-oriented communications have no message protection boundaries, that is, TCP stickers. The receiver needs to unpack and group data by itself to solve the problem of sticking packets.

To solve the problem of TCP sticking packets, we must define a public package header for TCP. The package header generally includes message type and message size. Use the package header to divide each data packet and make the boundary of the data packet.

The following uses C++ to implement the TCP client and TCP server respectively, and use qt test.

TCP Client

The TCP client actively connects to the TCP server and receives the data sent by the TCP server. The received data is divided into packets according to the defined common packet header. Whenever a complete data packet is formed, relevant information is printed.

#ifndef TCPCLIENT_H
#define TCPCLIENT_H
#include <>
#include <>
#include <>
#include <>
#include <>
#include <>
#include <>
#include <sys/>
#include <netinet/>
#include <arpa/>
#include <sys/>
#include <>
#include <new>
#define MAX_PKT_SIZE          (256<<20) //Maximum network packet length//Business package headerstruct CommMsgHdr
{
    uint16_t uMsgType;
    uint32_t uTotalLen;
};
typedef struct _TcpHandle_{
    int32_t fd;
    uint32_t     uRcvLen;        //Received data size    uint32_t     uAllLen;        //Total message length    struct sockaddr_in local_addr;
    struct sockaddr_in remote_addr;
    _TcpHandle_()
    {
        uRcvLen = 0;
        uAllLen = 0;
    }
}TcpHandle;
class TcpClient
{
public:
    TcpClient();
    int32_t create_tcpClient(char *serverIp, int32_t serverPort);
    int32_t SendData(char *data, int32_t len);
    bool m_runing;
    int epoll_fd;
    TcpHandle* pTcpHandle;
private:
    pthread_t threadId;
};
#endif // TCPCLIENT_H

#include ""
int32_t TcpRcv(const int32_t&amp; fd, void* buff, const uint32_t&amp; len)
{
    int32_t iCurrRecv = recv(fd, buff, len, MSG_NOSIGNAL);
    if (0 &lt; iCurrRecv) {
        return iCurrRecv;
    } else if (iCurrRecv &lt; 0) {
        if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
            return 0;
        } else return -1;
    } else return -1;
}
void* DealTcpThread(void* obj)
{
    TcpClient* pTcpClient = (TcpClient*)obj;
    TcpHandle* pTcpHandle = pTcpClient-&gt;pTcpHandle;
    const int kEpollDefaultWait = 1;//Timeout time, unit ms    struct epoll_event alive_events[256];
    uint32_t recv_buffer_max = 1024 * 1024;
    uint8_t *recv_buffer = nullptr;
    recv_buffer = new uint8_t[recv_buffer_max];
    uint32_t head_len = (uint32_t)sizeof(CommMsgHdr);
    while (pTcpClient-&gt;m_runing)
    {
        int num = epoll_wait(pTcpClient-&gt;epoll_fd, alive_events, 256, kEpollDefaultWait);
        for (int i = 0; i &lt; num; ++i)
        {
            int fd = alive_events[i].;
            int events = alive_events[i].events;
            if ( events &amp; EPOLLIN )
            {
                //1. Start receiving the head                if(pTcpHandle-&gt;uRcvLen &lt; head_len)
                {
                    int32_t iRecvLen = TcpRcv(fd, recv_buffer + pTcpHandle-&gt;uRcvLen, head_len - pTcpHandle-&gt;uRcvLen);
                    if (0 == iRecvLen) continue;
                    else if (0 &gt; iRecvLen) {
                        printf("Recv head data, return [%d] and err[%s],fd=[%d].", iRecvLen, strerror(errno),fd);
                        close(fd);//Close socket                        continue;
                    }
                    pTcpHandle-&gt;uRcvLen += iRecvLen;
                    //If the complete header has been received                    if(pTcpHandle-&gt;uRcvLen &gt;= head_len)
                    {
                        CommMsgHdr* pHdr = (CommMsgHdr *)recv_buffer;
                        pTcpHandle-&gt;uAllLen = pHdr-&gt;uTotalLen;
                        //If the uTotalLen in the message header is too small or too large, exception handling                        if ( pHdr-&gt;uTotalLen &lt; head_len || pHdr-&gt;uTotalLen &gt; MAX_PKT_SIZE )
                        {
                            printf("uTotalLen invalid,uTotalLen=%u,fd=[%d]",
                                   pHdr-&gt;uTotalLen,fd);
                            close(fd);//Close socket                            continue;
                        }
                        //If uTotalLen is greater than the allocated cache, reassign                        if (((CommMsgHdr *)recv_buffer)-&gt;uTotalLen &gt; recv_buffer_max)
                        {
                            uint8_t *new_recv_buffer = new uint8_t[((CommMsgHdr *)recv_buffer)-&gt;uTotalLen];
                            memcpy(new_recv_buffer, recv_buffer,head_len);
                            delete [] recv_buffer;// Free up the original space                            recv_buffer = new_recv_buffer;// Re-point to the newly opened space                            recv_buffer_max = ((CommMsgHdr *)recv_buffer)-&gt;uTotalLen;// Reassign the maximum buffer length                        }
                    }
                }
                //2. Start receiving data body                else
                {
                    int32_t iRecvLen = TcpRcv(fd, recv_buffer + pTcpHandle-&gt;uRcvLen, pTcpHandle-&gt;uAllLen - pTcpHandle-&gt;uRcvLen);
                    if (0 == iRecvLen) continue;
                    else if (0 &gt; iRecvLen) {
                        printf("Recv body data, return [%d] and err[%s],fd=[%d].", iRecvLen, strerror(errno),fd);
                        close(fd);//Close socket                        continue;
                    }
                    pTcpHandle-&gt;uRcvLen += iRecvLen;
                    // Complete reception                    if(pTcpHandle-&gt;uRcvLen == pTcpHandle-&gt;uAllLen)
                    {
                        CommMsgHdr* pHdr = (CommMsgHdr*)recv_buffer;
                        printf("Rcv completed,msgType=%d,uTotalLen=%u\n",pHdr-&gt;uMsgType,pHdr-&gt;uTotalLen);
                        pTcpHandle-&gt;uRcvLen = 0;
                        pTcpHandle-&gt;uAllLen = 0;
                    }
                }
            }
        }
    }
    delete [] recv_buffer;
    recv_buffer = nullptr;
    return nullptr;
}
TcpClient::TcpClient()
{
    pTcpHandle = new TcpHandle;
    epoll_fd = epoll_create(1);
}
int32_t TcpClient::create_tcpClient(char *serverIp, int32_t serverPort)
{
    if (pTcpHandle == NULL)        return -1;
    pTcpHandle-&gt;fd = -1;
    if((pTcpHandle-&gt;fd = socket(AF_INET, SOCK_STREAM, 0)) &lt; 0)
    {
        printf("socket err=%s\n",strerror(errno));
        return -2;
    }
    pTcpHandle-&gt;remote_addr.sin_family = AF_INET;
    pTcpHandle-&gt;remote_addr.sin_port = htons(serverPort);
    pTcpHandle-&gt;remote_addr.sin_addr.s_addr = inet_addr(serverIp);
    if(connect(pTcpHandle-&gt;fd, (struct sockaddr *)&amp;pTcpHandle-&gt;remote_addr, sizeof(pTcpHandle-&gt;remote_addr)) &lt; 0)
    {
        printf("connect err=%s\n",strerror(errno));
        return -3;
    }
    struct epoll_event evt;
     = EPOLLIN;
    fcntl(pTcpHandle-&gt;fd, F_SETFL, O_NONBLOCK);//Set non-blocking     = pTcpHandle-&gt;fd;
    epoll_ctl(epoll_fd,EPOLL_CTL_ADD,pTcpHandle-&gt;fd,&amp;evt);
    m_runing = true;
    pthread_create(&amp;threadId,NULL,DealTcpThread,this);
    return 0;
}
int32_t TcpClient::SendData(char *data, int32_t len)
{
    int32_t ret = send(pTcpHandle-&gt;fd, data, len, MSG_NOSIGNAL);
    return ret;
}

TCP server

The server starts listening, and when there is a client access, it sends data packets of unequal sizes to the client cycle.

#ifndef TCPSERVER_H
#define TCPSERVER_H
#include &lt;&gt;
#include &lt;&gt;
#include &lt;&gt;
#include &lt;&gt;
#include &lt;&gt;
#include &lt;&gt;
#include &lt;&gt;
#include &lt;sys/&gt;
#include &lt;netinet/&gt;
#include &lt;arpa/&gt;
#include &lt;sys/&gt;
#include &lt;&gt;
#include &lt;new&gt;
#define MAX_PKT_SIZE          (256<<20) //Maximum network packet length//Business package headerstruct CommMsgHdr
{
    uint16_t uMsgType;
    uint32_t uTotalLen;
};
typedef struct _TcpHandle_{
    int32_t fd;
    uint32_t     uRcvLen;        //Received data size    uint32_t     uAllLen;        //Total message length    struct sockaddr_in local_addr;
    struct sockaddr_in remote_addr;
    _TcpHandle_()
    {
        uRcvLen = 0;
        uAllLen = 0;
    }
}TcpHandle;
class TcpServer
{
public:
    TcpServer();
    int32_t create_tcpServer(int32_t listenPort);
    bool m_runing;
    int epoll_fd;
    TcpHandle* pTcpSerHandle;
private:
    pthread_t threadId;
};
#endif // TCPSERVER_H

#include ""
int SendLoop(int32_t fd, uint8_t * buff, uint32_t len) {
    uint64_t total_send_bytes = 0;
    int64_t curr_send_len = 0;
    uint64_t left_bytes = len;
    while(total_send_bytes &lt; len) {
        curr_send_len = send(fd, buff + total_send_bytes, left_bytes, MSG_NOSIGNAL);
        if(curr_send_len &lt; 0) {
            if( errno == EINTR || errno == EAGAIN)
                continue;
            return -1;
        } else {
            total_send_bytes += curr_send_len;
            left_bytes -= curr_send_len;
        }
    }
     return 0;
}
void* DealTcpThread(void* obj)
{
    TcpServer* pTcpServer = (TcpServer*)obj;
    TcpHandle* pTcpSerHandle = (TcpHandle*)pTcpServer-&gt;pTcpSerHandle;
    socklen_t src_len = sizeof(struct sockaddr_in);
    while (pTcpServer-&gt;m_runing)
    {
        struct sockaddr_in src;
        memset(&amp;src, 0, src_len);
        int connfd = accept(pTcpSerHandle-&gt;fd, (struct sockaddr*) &amp;src, &amp;src_len);
        if(connfd &gt; -1)
        {
            //Start send            for(int index=0;index&lt;100;index++)
            {
                uint32_t dataLength = 1024*1024*16 + index*10;
                void *sendbuff = new char[dataLength];
                CommMsgHdr* pHead = (CommMsgHdr*)sendbuff;
                pHead-&gt;uMsgType = 1001;
                pHead-&gt;uTotalLen = dataLength;
                SendLoop(connfd,(uint8_t * )sendbuff,dataLength);
            }
        }
    }
    return nullptr;
}
TcpServer::TcpServer()
{
    pTcpSerHandle = new TcpHandle;
}
int32_t TcpServer::create_tcpServer(int32_t listenPort)
{
    pTcpSerHandle-&gt;fd = -1;
    pTcpSerHandle-&gt;local_addr.sin_family = AF_INET;
    pTcpSerHandle-&gt;local_addr.sin_port = htons(listenPort);
    pTcpSerHandle-&gt;local_addr.sin_addr.s_addr = INADDR_ANY;
    pTcpSerHandle-&gt;fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    int opt = 1;
    setsockopt(pTcpSerHandle-&gt;fd,SOL_SOCKET,SO_REUSEADDR,&amp;opt,sizeof(opt));//Multiplex port    if (bind(pTcpSerHandle-&gt;fd, (struct sockaddr*) &amp;pTcpSerHandle-&gt;local_addr,sizeof(struct sockaddr_in)) &lt; 0)
    {
        printf("http server bind error(%s)",strerror(errno));
        return -1;
    }
    listen(pTcpSerHandle-&gt;fd, 32);
    m_runing = true;
    pthread_create(&amp;threadId,NULL,DealTcpThread,this);
    return 0;
}

Source code testing

Start the server first

    TcpServer *pTcpServer;
    pTcpServer = new TcpServer;
    pTcpServer->create_tcpServer(9090);

Start the client again

    TcpClient* pTcpClient;
    pTcpClient = new TcpClient;
    pTcpClient->create_tcpClient("127.0.0.1",9090);

Client printing

Rcv completed,msgType=1001,uTotalLen=16777216
Rcv completed,msgType=1001,uTotalLen=16777226
Rcv completed,msgType=1001,uTotalLen=16777236
Rcv completed,msgType=1001,uTotalLen=16777246
Rcv completed,msgType=1001,uTotalLen=16777256
Rcv completed,msgType=1001,uTotalLen=16777266
Rcv completed,msgType=1001,uTotalLen=16777276
Rcv completed,msgType=1001,uTotalLen=16777286
...
...
...

This is the end of this article about C++ solving the problem of TCP sticking packets. For more related contents of C++ TCP sticking packets, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!