WebSocket


起源

​ HTTP 协议有一个缺陷:通信只能由客户端发起。 这种单向请求的特点,注定了如果服务器有连续的状态变化,客户端要获知就非常麻烦。我们只能使用“轮询”:每隔一段时候,就发出一个询问,了解服务器有没有新的信息。最典型的场景就是聊天室。

img

​ 轮询的效率低,非常浪费资源(因为必须不停连接,或者 HTTP 连接始终打开)。WebSocket 就是这样发明的。Websocket诞生于2008年,2011年成为国际标准,现在所有的浏览器都已支持。一种全新的应用层协议,专门为web客户端和服务端设计的真正的全双工通信协议,可以类比HTTP协议来了解websocket协议。

其他特点包括:

(1)建立在 TCP 协议之上,服务器端的实现比较容易。

(2)与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。

(3)数据格式比较轻量,性能开销小,通信高效。

(4)可以发送文本,也可以发送二进制数据。

(5)没有同源限制,客户端可以与任意服务器通信。

(6)协议标识符是ws(如果加密,则为wss),服务器网址就是 URL。

ws://example.com:80/some/path

img

不同点:

  • HTTP协议标识符是httpwebsocketws
  • HTTP请求只能由客户端发起,服务器无法主动向客户端推送消息,而websocket可以
  • HTTP请求有同源限制,不同源之间通信需要跨域,而websocket没有同源限制

相同点:

  • 都是应用层的通信协议
  • 默认端口一样,都是80或443
  • 都可以用于浏览器和服务器间的通信
  • 都基于TCP协议

重连机制

重连过程拆解

首先考虑一个问题,何时需要重连?

最容易想到websocket连接断了为了接下来能收发消息,需要再发起一次连接。但在很多场景下,即便websocket连接没有断开,实际上也不可用了。比方设备切换网络、链路中间路由崩溃、服务器负载继续过高无法响应等,这些场景下的websocket都没有断开,但对上层来说,都没办法正常的收发数据了因此在重连前,需要一种机制来感知连接是否可用、服务是否可用,而且要能快速感知,以便能够快速从不可用状态中恢复。

一旦感知到连接不可用,那便可以弃旧图新了弃用并断开旧连接,然后发起一次新连接。这两个方法看似简单,但若想达到快,且不是那么容易的

首先是断开旧连接,对客户端来说,如何快速快速断开?

协议规定客户端必需要和服务器协商后才能断开websocket连接,但是当客户端已经联系不上服务器、无法协商时,如何断开并快速恢复?

其次是快速发起新连接。此快非彼快,这里的快并非是立即发起连接,立即发起连接会对服务器带来不可预估的影响。重连时通常会采用一些退避算法,延迟一段时间后再发起重连。但如何在重连间隔和性能消耗间做出权衡?如何在恰当的时间点”快速发起连接?

带着这些疑问,来细看下这三个过程。

快速感知何时需要重连

需要重连的场景可以细分为三种,一是连接断开了二是连接没断但是不可用,三是连接对端的服务不可用了

第一种场景很简单,连接直接断开了肯定需要重连了

而对于后两者,无论是连接不可用,还是服务不可用,对上层应用的影响都是不能再收发即时消息了所以从这个角度动身,感知何时需要重连的一种简单粗暴的方法就是通过心跳包超时:发送一个心跳包,如果超越特定的时间后还没有收到服务器回包,则认为服务不可用,如下图中左侧的方案;这种方法最直接。那如果想要快速感知呢,就只能多发心跳包,加快心跳频率。但是心跳太快对移动端流量、电量的消耗又会太多,所以使用这种方法没办法做到快速感知,可以作为检测连接和服务可用的兜底机制。

如果要检测连接不可用,除了用心跳检测,还可以通过判断网络状态来实现,因为断网、切换wifi切换网络是导致连接不可用的最直接原因,所以在网络状态由offline变为online时,大多数情况下需要重连下,但也不一定,因为webscoket底层是基于TCPTCP连接不能敏锐的感知到应用层的网络变化,所以有时候即便网络断开了一小会,对websocket连接是不会有影响的网络恢复后,仍然能够正常地进行通信。因此在网络由断开到连接上时,立即判断下连接是否可用,可以通过发一个心跳包判断,如果能够正常收到服务器的心跳回包,则说明连接仍是可用的如果等待超时后仍没有收到心跳回包,则需要重连,如上图中的右侧。这种方法的优点是速度快,网络恢复后能够第一时间感知连接是否可用,不可用的话可以快速执行恢复,但它只能覆盖应用层网络变化导致websocket不可用的情况。

综上,定时发送心跳包检测的方案贵在稳定,能够覆盖所有场景,但速度不太可;而判断网络状态的方案速度快,无需等待心跳间隔,较为灵敏,但覆盖场景较为局限。因此,可以结合两种方案:定时以不太快的频率发送心跳包,比方40s/次、60s/次等,具体可以根据应用场景来定,然后在网络状态由offline变为online时立即发送一次心跳,检测当前连接是否可用,不可用的话立即进行恢复处置。这样在大多数情况下,上层的应用通信都能较快从不可用状态中恢复,对于少部分场景,有定时心跳作为兜底,一个心跳周期内也能够恢复。

快速断开旧连接

通常情况下,发起下一次连接前,如果旧连接还存在话,应该先把旧连接断开,这样一来可以释放客户端和服务器的资源,二来可以防止之后误从旧连接收发数据。

知道websocket底层是基于TCP协议传输数据的连接两端分别是服务器和客户端,而TCPTIME_WA IT状态是由服务器端维持的因此在大多数正常情况下,应该由服务器发起断开底层TCP连接,而不是客户端。也就是说,要断开websocket连接时,如果是服务器收到指示要断开websocket那它应该立即发起断开TCP连接;如果是客户端收到指示要断开websocket那它应该发信号给服务器,然后等待底层TCP连接被服务器断开或直至超时。

那如果客户端想要断开旧的websocket可以分websocket连接可用和不可用两种情况来讨论。当旧连接可用时,客户端可以直接给服务器发送断开信号,然后服务器发起断开连接即可;当旧连接不可用时,比方客户端切换了wifi客户端发送了断开信号,但是服务器收不到客户端只能迟迟等待,直至超时才干被允许断开。超时断开的过程相对来说是比拟久的那有没有方法可以快点断开?

上层应用无法改变只能由服务器发起断开连接这种协议层面的规则,所以只能从应用逻辑入手,比方在上层通过业务逻辑保证旧连接完全失效,模拟连接断开,然后在发起新连接,恢复通讯。这种方法相当于尝试断开旧连接不行时,直接弃之,然后就能快速进入下一流程,所以在使用时一定要确保在业务逻辑上旧连接已完全失效,比方:保证丢掉从旧连接收到所有数据、旧连接不能阻碍新连接的建立,旧连接超时断开后不能影响新连接和上层业务逻辑等等。

快速发起新连接

有IM开发经验的同学应该有所了解,遇到因网络原因导致的重连时,万万不能立即发起一次新连接的否则当呈现网络抖动时,所有的设备都会立即同时向服务器发起连接,这无异于黑客通过发起大量请求消耗网络带宽引起的拒绝服务攻击,这对服务器来说简直是灾难。所以在重连时通常采用一些退避算法,延迟一段时间再发起重连,如下图中左侧的流程。

如果要快速连上呢?最直接的做法就是缩短重试间隔,重试间隔越短,网络恢复后就能越快的恢复通讯。但是太频繁的重试对性能、带宽、电量的消耗就比较严重。如何在这之间做一个较好的权衡呢?

一种比较合理的方式是随着重试次数增多,逐渐增大重试间隔;另一方面监听网络变化,网络状态由offline变为online这种比拟可能重连上的时刻,可以适当地减小重连间隔,如上图中的右侧(随重试次数的增多,重连间隔也会变大)两种方式配合使用。

除此之外,还可以结合业务逻辑,根据胜利重连上的可能性适当的调整间隔,如网络未连接时或应用在后台时重连间隔可以调大一些,网络正常的状态下可以适当调小一些等等,加快重连上的速度。

结尾

最后总结一下,本文在开头将websocket断网重连细分为三个步骤:确定何时需要重连、断开旧连接和发起新连接。然后分别分析了websocket不同状态下、不同的网络状态下,如何快速完成这个三个步骤:首先通过定时发送心跳包的方式检测当前连接是否可用,同时监测网络恢复事件,恢复后立即发送一次心跳,快速感知当前状态,判断是否需要重连;其次正常情况下由服务器断开旧连接,与服务器失去联系时直接弃用旧连接,上层模拟断开,来实现快速断开;最后发起新连接时使用退避算法延迟一段时间再发起连接,同时考虑到资源浪费和重连速度,可以在网络离线时调大重连间隔,网络正常或网络由offline变为online时缩小重连间隔,使之尽可能快地重连上。

QWebsocket实现断线重连

class WebSocketClient : public QThread
{
Q_OBJECT
public:
WebSocketClient();
~WebSocketClient();


protected:
    virtual void run();

public:
    // 启动连接 - 必须把url地址 协议头 设置完成后再启动
    bool startConnect();
    
    // 断开连接
    void disconnect();
    
    //设置链接的URL
    void setConnectUrl(QString val);
    
    // 向协议队列中添加要发送的内容
    bool addSendText(QString val);
    
    // 设置websocket 的 Sec—Websocket-Protocol 头
    void setSecWebSocketProtocolHeader(QString val);
    
    // 设置心跳数据发送时间间隔 单位秒
    void setHeartbeatTimer(int val);
    
    // 设置判断需要重连的心跳发送失败次数
    void setReconnetHeartbeatTimerCount(int val);
    
    // 设置重连的时间间隔 单位秒
    void setReconnectTimer(int val);
    
    // 设置判断断开的重连次数
    void setDisconnectReconnectCount(int val);
    
    // 获取当前状态
    webSocketState getCurrentState();

private slots :
    // 连接成功
    void onConnected();
    
    // 连接断开
    void onDisconnected();
    
    // 接收到 协议内容
    void onTextMessageReceived(const QString &val);
    
    // 定时发送心跳
    void onSendHeartbeatText();
    
    // 定时重连
    void onReconnect();

private:
    webSocketState            m_webSocketState;                    // websocket 状态
    bool                    m_bCloseByHand;                        // 手动关闭
    QMutex                    m_mutex;                            // 数据同步
    QLinkedList<QString>    m_sendTextLinkedList;                // 协议链表
    QString                    m_strURL;                            // URL地址
    QString                    m_strSecWebsocketProtocol;            // 协议头 目前使用此协议头 与网页端保持一致
    QString                    m_strHeartbeatText;                    // 心跳内容
    QWebSocket                m_websocket;                        // websocket 客户端
    int                        m_nHeartbeatTimer;                    // 心跳数据发送时间间隔 单位ms
    int                        m_nReconnectHeartbatTimerCount;        // 需要重连的心跳发送失败次数
    int                        m_nReconnectTimer;                    // 重连的时间间隔 单位ms
    int                        m_nDisconnectReconnectCount;        // 重连失败次数
    int                        m_nHeartbeatFailedCount;            // 心跳发送失败次数
    int                        m_nReconnectFailedCount;            // 重连失败次数
    QTimer                    m_timerHeartbeat;                    // 心跳发送定时器
    QTimer                    m_timerReconnect;                    // 断线重连定时器

};
#include "WebSocketClient.h"

WebSocketClient::WebSocketClient()
    : m_bCloseByHand(false)
    , m_webSocketState(stateNone)
    , m_nHeartbeatTimer(5000)
    , m_nReconnectHeartbatTimerCount(3)
    , m_nReconnectTimer(5000)
    , m_nDisconnectReconnectCount(3)
    , m_nHeartbeatFailedCount(0)
    , m_nReconnectFailedCount(0)
    , m_strURL(tr(""))
    , m_strSecWebsocketProtocol(tr(""))
    , m_strHeartbeatText(tr(""))
{
    connect(&m_websocket, SIGNAL(connected()), this, SLOT(onConnected()));
    connect(&m_websocket, SIGNAL(disconnected()), this, SLOT(onDisconnected()));
    connect(&m_websocket, SIGNAL(textMessageReceived(QString)), this, SLOT(onTextMessageReceived(QString)));
    connect(&m_timerHeartbeat, SIGNAL(timeout()), this, SLOT(onSendHeartbeatText()));
    connect(&m_timerReconnect, SIGNAL(timeout()), this, SLOT(onReconnect()));
}

WebSocketClient::~WebSocketClient()
{
    if (disconnected  != m_webSocketState)
    {
        disconnect();
    }
}


void WebSocketClient::run()
{
    while (true)
        {
            if (m_webSocketState == connected)
            {
                m_mutex.lock();
                if (m_sendTextLinkedList.size())
                {
                    QString strVal = m_sendTextLinkedList.first();
                    if (m_websocket.sendTextMessage(strVal) == strVal.toLocal8Bit().length())
                    {
                        m_sendTextLinkedList.erase(m_sendTextLinkedList.begin());
                    }
                    m_mutex.unlock();
                }
                else
                {
                    //当没有数据时,睡眠,否则cpu会过高
                    QThread::msleep(20);
                    m_mutex.unlock();
                }

            }
            else if(m_bCloseByHand)
            {
                break;
            }
        }
}


bool WebSocketClient::startConnect()
{
    bool bRet = false;

    // 如果URL地址 或者 特定的协议头为空 返回false
    if (!m_strURL.isEmpty() && !m_strSecWebsocketProtocol.isEmpty())
    {
        if (!isRunning())
        {
            start();
        }
        QNetworkRequest request;
        request.setUrl(QUrl(m_strURL));
        QByteArray byteHeader = "Protocol";
        request.setRawHeader(byteHeader, m_strSecWebsocketProtocol.toLocal8Bit());
        m_websocket.open(request);
        bRet = true;
    }

    return bRet;

}

void WebSocketClient::disconnect()
{
    m_bCloseByHand = true;
    m_websocket.close();
    m_timerHeartbeat.stop();
    m_timerReconnect.stop();
}


void WebSocketClient::setConnectUrl(QString val)
{
    m_strURL = val;
}

bool WebSocketClient::addSendText(QString val)
{
    bool bRet = false;
    if (m_webSocketState == disconnected || m_webSocketState == stateNone)
    {
        bRet = false;
    }
    else
    {
        m_mutex.lock();
        m_sendTextLinkedList.append(val);
        m_mutex.unlock();
        bRet = true;
    }
    return bRet;
}

void WebSocketClient::setSecWebSocketProtocolHeader(QString val)
{
    m_strSecWebsocketProtocol = val;
}

void WebSocketClient::setHeartbeatTimer(int val)
{
    if (val > 5000)
    {
        m_nHeartbeatTimer = val;
        }
        }

        void WebSocketClient::setReconnetHeartbeatTimerCount(int val)
    {
        if (val > 3)
    {
        m_nReconnectHeartbatTimerCount = val;
        }
        }


        void WebSocketClient::setReconnectTimer(int val)
    {
        if (val > 5000)
    {
        m_nReconnectTimer = val;
        }
        }

        void WebSocketClient::setDisconnectReconnectCount(int val)
    {
        if (val > 3)
    {
        m_nDisconnectReconnectCount = val;
        }
        }


        WebSocketClient::webSocketState WebSocketClient::getCurrentState()
    {
        return m_webSocketState;
        }

        void WebSocketClient::onConnected()
    {
        // 如果是重连成功 停止重连的定时Timer
        if (m_webSocketState == reconnecting)
    {
        m_timerReconnect.stop();
        }
        m_webSocketState = connected;
        m_timerHeartbeat.start(m_nHeartbeatTimer);
        m_nReconnectFailedCount = 0;
        m_nHeartbeatFailedCount = 0;
        }


        void WebSocketClient::onDisconnected()
    {
        // 如果不是手动关闭 则需要重连
        if (!m_bCloseByHand)
    {
        m_timerHeartbeat.stop();
        m_timerReconnect.start(m_nReconnectTimer);
        m_webSocketState = reconnecting;
        }
        }


        void WebSocketClient::onTextMessageReceived(const QString &val)
    {
        //收到网页后端发来的内容,处理内容
        }


        void WebSocketClient::onSendHeartbeatText()
    {
        // 加锁 保证websocket 只在同一时间发送一条数据
        m_mutex.lock();
        int nSendByte =    m_websocket.sendTextMessage(m_strHeartbeatText);
        m_mutex.unlock();
        // 发送心跳失败
        if (nSendByte != m_strHeartbeatText.toLocal8Bit().length())
    {
        m_nHeartbeatFailedCount++;
        // 失败次数等于启动重连的次数 停止发送心跳 开启重连 更新状态
        if (m_nHeartbeatFailedCount == m_nReconnectHeartbatTimerCount)
    {
        m_timerHeartbeat.stop();
        m_timerReconnect.start(m_nReconnectTimer);
        m_webSocketState = reconnecting;
        }
        }
        }


        void WebSocketClient::onReconnect()
    {
        // close websocket
        m_websocket.close();
        //如果重连次数已到 不再重连 处于断开状态
        if (m_nReconnectFailedCount == m_nReconnectHeartbatTimerCount)
    {
        m_webSocketState = disconnected;
        m_timerReconnect.stop();
        }
    else // 开始连接并计数
    {
        startConnect();
        m_nReconnectFailedCount++;
        }
        }

python实现websocket

#/usr/bin/env python
# -*- coding:utf-8 -*-

import websocket
from websocket import WebSocketApp

try:
    import thread
except ImportError:
    import _thread as thread
import time


class Test(object):
    def __init__(self):
        super(Test, self).__init__()
        self.url = "ws://echo.websocket.org/"
        self.ws = None

    def on_message(self, message):
        print("####### on_message #######")
        print("message:%s" % message)

    def on_error(self, error):
        print("####### on_error #######")
        print("error:%s" % error)

    def on_close(self):
        print("####### on_close #######")

    def on_ping(self, message):
        print("####### on_ping #######")
        print("ping message:%s" % message)

    def on_pong(self, message):
        print("####### on_pong #######")
        print("pong message:%s" % message)

    def on_open(self):
        print("####### on_open #######")

        thread.start_new_thread(self.run, ())

    def run(self, *args):
        while True:
            time.sleep(1)
            input_msg = input("输入要发送的消息(ps:输入关键词 close 结束程序):\n")
            if input_msg == "close":
                self.ws.close()  # 关闭
                print("thread terminating...")
                break
            else:
                self.ws.send(input_msg)

    def start(self):
        websocket.enableTrace(True)  # 开启运行状态追踪。debug 的时候最好打开他,便于追踪定位问题。

        self.ws = WebSocketApp(self.url,
                               on_open=self.on_open,
                               on_message=self.on_message,
                               on_error=self.on_error,
                               on_close=self.on_close)
        # self.ws.on_open = self.on_open  # 也可以先创建对象再这样指定回调函数。run_forever 之前指定回调函数即可。

        self.ws.run_forever()


if __name__ == '__main__':
    Test().start()

"""
--- request header ---
GET / HTTP/1.1
Upgrade: websocket
Host: echo.websocket.org
Origin: http://echo.websocket.org
Sec-WebSocket-Key: AXR9yvs3Ucn9LE35KkhXfw==
Sec-WebSocket-Version: 13
Connection: upgrade


-----------------------
--- response header ---
HTTP/1.1 101 Web Socket Protocol Handshake
Connection: Upgrade
Date: Wed, 04 Aug 2021 06:29:05 GMT
Sec-WebSocket-Accept: WoOPLeAQpWaV2Bqd4sDOFkSpUuw=
Server: Kaazing Gateway
Upgrade: websocket
-----------------------
####### on_open #######
输入要发送的消息(ps:输入关键词 close 结束程序):
aaadbbbbb
send: b'\x81\x89\x82-\xdfj\xe3L\xbe\x0e\xe0O\xbd\x08\xe0'
####### on_message #######
message:aaadbbbbb
输入要发送的消息(ps:输入关键词 close 结束程序):
sakdnakjf
send: b'\x81\x89\xa8\xe0g\x8b\xdb\x81\x0c\xef\xc6\x81\x0c\xe1\xce'
####### on_message #######
message:sakdnakjf
输入要发送的消息(ps:输入关键词 close 结束程序):
123456
send: b'\x81\x86(\x84>\xb7\x19\xb6\r\x83\x1d\xb2'
####### on_message #######
message:123456
输入要发送的消息(ps:输入关键词 close 结束程序):
send: b'\x8a\x80.\xf3`+'
send: b'\x8a\x80P\x0c\xc6W'
send: b'\x8a\x807j\x03l'
send: b'\x8a\x80\xd0\xac%v'
send: b'\x8a\x80\xb9\x9do\x08'
send: b'\x8a\x80s\xbb\xad\x8f'
send: b'\x8a\x80\xf4-\xd9\x8b'
close
send: b'\x88\x82\xf5L>\xc4\xf6\xa4'
####### on_close #######

Process finished with exit code 0

"""

QT实现TcpSocket

//初始化服务端
void TaskCreate::initServer()
{
    tcpserver = new QTcpServer(this);

    //监听7789端口
    if (!tcpserver->listen(QHostAddress::Any, 7789)){
        qWarning("TcpServer failed to start. Error: %s", qPrintable(tcpserver->errorString()));
        return;
    }

    //监听到新的客户端连接请求
    connect(tcpserver,&QTcpServer::newConnection,this,[this]{
        //如果有新的连接就取出
        while(tcpserver->hasPendingConnections())
            {
                QTcpSocket *socket = tcpserver->nextPendingConnection();
                //(结构体)记录连接的客户端
                clientList.append(socket);
                qDebug() << QString("[%1:%2] TcpSocket Connected")
                    .arg(socket->peerAddress().toString())
                    .arg(socket->peerPort());
                //收到数据readyRead
                connect(socket,&QTcpSocket::readyRead,[this,socket]{
                    //没有可读的数据就返回
                    if(socket->bytesAvailable()<=0)
                        return;

                    QByteArray recv_text=socket->readAll();
                    qDebug() << QString("[%1:%2]")
                        .arg(socket->peerAddress().toString())
                        .arg(socket->peerPort());
                    qDebug() << "接收到数据:"  << recv_text;

                    // 处理坐标点
                    handle_point(recv_text);
                });


                //连接断开,销毁socket对象,这是为了开关server时socket正确释放
                connect(socket,&QTcpSocket::disconnected,[this,socket]{
                    socket->deleteLater();
                    clientList.removeOne(socket);
                    qDebug() << QString("[%1:%2] TcpSocket Disonnected")
                        .arg(socket->peerAddress().toString())
                        .arg(socket->peerPort());
                    updateState();
                });
            }
        updateState();
    });


    connect(tcpserver,&QTcpServer::acceptError,[this](QAbstractSocket::SocketError){
        qDebug() << "Server Error:"+tcpserver->errorString();
    });
}

void TaskCreate::closeServer()
{
    //停止服务
    tcpserver->close();
    qDebug() << "断开TcpSocket连接";
    for(QTcpSocket * socket:clientList)
        {
            //断开与客户端的连接
            socket->disconnectFromHost();
            if(socket->state()!=QAbstractSocket::UnconnectedState){
                socket->abort();
            }
        }
}

void TaskCreate::updateState()
{
    //将当前server地址和端口、客户端连接数写在标题栏
    if(tcpserver->isListening()){
        qDebug() << "已经连接上例程qt";
    }else{
        qDebug() << "例程qt连接断开";
    }
}
void MainWindow::initClient()
{
    if(client->state()==QTcpSocket::ConnectedState){
        qDebug() << "连接中";
        // client->abort();
    }else if(client->state()==QTcpSocket::UnconnectedState){
        client->connectToHost("127.0.0.1",7789);
    }else{
        qDebug() << "It is not ConnectedState or UnconnectedState";
    }

    connect(client,&QTcpSocket::connected,[this]{updateState();});
    connect(client,&QTcpSocket::disconnected,[this]{updateState();});
}

//发送坐标
void MainWindow::send_point(QByteArray point)
{
    const QByteArray send_data = point;
    client->write(send_data);
}

//更新socket状态
void MainWindow::updateState()
{
    if(client->state()==QTcpSocket::ConnectedState){
        qDebug() << "连接上任务调度系统";
    }else{
        qDebug() << "任务调度系统断开连接";
    }
}

文章作者: Nico
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Nico !
  目录