起源
HTTP 协议有一个缺陷:通信只能由客户端发起。 这种单向请求的特点,注定了如果服务器有连续的状态变化,客户端要获知就非常麻烦。我们只能使用“轮询”:每隔一段时候,就发出一个询问,了解服务器有没有新的信息。最典型的场景就是聊天室。
轮询的效率低,非常浪费资源(因为必须不停连接,或者 HTTP 连接始终打开)。WebSocket 就是这样发明的。Websocket诞生于2008年,2011年成为国际标准,现在所有的浏览器都已支持。一种全新的应用层协议,专门为web客户端和服务端设计的真正的全双工通信协议,可以类比HTTP协议来了解websocket协议。
其他特点包括:
(1)建立在 TCP 协议之上,服务器端的实现比较容易。
(2)与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。
(3)数据格式比较轻量,性能开销小,通信高效。
(4)可以发送文本,也可以发送二进制数据。
(5)没有同源限制,客户端可以与任意服务器通信。
(6)协议标识符是ws(如果加密,则为wss),服务器网址就是 URL。
ws://example.com:80/some/path
不同点:
- HTTP协议标识符是httpwebsocketws
- HTTP请求只能由客户端发起,服务器无法主动向客户端推送消息,而websocket可以
- HTTP请求有同源限制,不同源之间通信需要跨域,而websocket没有同源限制
相同点:
- 都是应用层的通信协议
- 默认端口一样,都是80或443
- 都可以用于浏览器和服务器间的通信
- 都基于TCP协议
重连机制
重连过程拆解
首先考虑一个问题,何时需要重连?
最容易想到websocket连接断了为了接下来能收发消息,需要再发起一次连接。但在很多场景下,即便websocket连接没有断开,实际上也不可用了。比方设备切换网络、链路中间路由崩溃、服务器负载继续过高无法响应等,这些场景下的websocket都没有断开,但对上层来说,都没办法正常的收发数据了因此在重连前,需要一种机制来感知连接是否可用、服务是否可用,而且要能快速感知,以便能够快速从不可用状态中恢复。
一旦感知到连接不可用,那便可以弃旧图新了弃用并断开旧连接,然后发起一次新连接。这两个方法看似简单,但若想达到快,且不是那么容易的
首先是断开旧连接,对客户端来说,如何快速快速断开?
协议规定客户端必需要和服务器协商后才能断开websocket连接,但是当客户端已经联系不上服务器、无法协商时,如何断开并快速恢复?
其次是快速发起新连接。此快非彼快,这里的快并非是立即发起连接,立即发起连接会对服务器带来不可预估的影响。重连时通常会采用一些退避算法,延迟一段时间后再发起重连。但如何在重连间隔和性能消耗间做出权衡?如何在恰当的时间点”快速发起连接?
带着这些疑问,来细看下这三个过程。
快速感知何时需要重连
需要重连的场景可以细分为三种,一是连接断开了二是连接没断但是不可用,三是连接对端的服务不可用了
第一种场景很简单,连接直接断开了肯定需要重连了
而对于后两者,无论是连接不可用,还是服务不可用,对上层应用的影响都是不能再收发即时消息了所以从这个角度动身,感知何时需要重连的一种简单粗暴的方法就是通过心跳包超时:发送一个心跳包,如果超越特定的时间后还没有收到服务器回包,则认为服务不可用,如下图中左侧的方案;这种方法最直接。那如果想要快速感知呢,就只能多发心跳包,加快心跳频率。但是心跳太快对移动端流量、电量的消耗又会太多,所以使用这种方法没办法做到快速感知,可以作为检测连接和服务可用的兜底机制。
如果要检测连接不可用,除了用心跳检测,还可以通过判断网络状态来实现,因为断网、切换wifi切换网络是导致连接不可用的最直接原因,所以在网络状态由offline变为online时,大多数情况下需要重连下,但也不一定,因为webscoket底层是基于TCPTCP连接不能敏锐的感知到应用层的网络变化,所以有时候即便网络断开了一小会,对websocket连接是不会有影响的网络恢复后,仍然能够正常地进行通信。因此在网络由断开到连接上时,立即判断下连接是否可用,可以通过发一个心跳包判断,如果能够正常收到服务器的心跳回包,则说明连接仍是可用的如果等待超时后仍没有收到心跳回包,则需要重连,如上图中的右侧。这种方法的优点是速度快,网络恢复后能够第一时间感知连接是否可用,不可用的话可以快速执行恢复,但它只能覆盖应用层网络变化导致websocket不可用的情况。
综上,定时发送心跳包检测的方案贵在稳定,能够覆盖所有场景,但速度不太可;而判断网络状态的方案速度快,无需等待心跳间隔,较为灵敏,但覆盖场景较为局限。因此,可以结合两种方案:定时以不太快的频率发送心跳包,比方40s/次、60s/次等,具体可以根据应用场景来定,然后在网络状态由offline变为online时立即发送一次心跳,检测当前连接是否可用,不可用的话立即进行恢复处置。这样在大多数情况下,上层的应用通信都能较快从不可用状态中恢复,对于少部分场景,有定时心跳作为兜底,一个心跳周期内也能够恢复。
快速断开旧连接
通常情况下,发起下一次连接前,如果旧连接还存在话,应该先把旧连接断开,这样一来可以释放客户端和服务器的资源,二来可以防止之后误从旧连接收发数据。
知道websocket底层是基于TCP协议传输数据的连接两端分别是服务器和客户端,而TCPTIME_WA IT状态是由服务器端维持的因此在大多数正常情况下,应该由服务器发起断开底层TCP连接,而不是客户端。也就是说,要断开websocket连接时,如果是服务器收到指示要断开websocket那它应该立即发起断开TCP连接;如果是客户端收到指示要断开websocket那它应该发信号给服务器,然后等待底层TCP连接被服务器断开或直至超时。
那如果客户端想要断开旧的websocket可以分websocket连接可用和不可用两种情况来讨论。当旧连接可用时,客户端可以直接给服务器发送断开信号,然后服务器发起断开连接即可;当旧连接不可用时,比方客户端切换了wifi客户端发送了断开信号,但是服务器收不到客户端只能迟迟等待,直至超时才干被允许断开。超时断开的过程相对来说是比拟久的那有没有方法可以快点断开?
上层应用无法改变只能由服务器发起断开连接这种协议层面的规则,所以只能从应用逻辑入手,比方在上层通过业务逻辑保证旧连接完全失效,模拟连接断开,然后在发起新连接,恢复通讯。这种方法相当于尝试断开旧连接不行时,直接弃之,然后就能快速进入下一流程,所以在使用时一定要确保在业务逻辑上旧连接已完全失效,比方:保证丢掉从旧连接收到所有数据、旧连接不能阻碍新连接的建立,旧连接超时断开后不能影响新连接和上层业务逻辑等等。
快速发起新连接
有IM开发经验的同学应该有所了解,遇到因网络原因导致的重连时,万万不能立即发起一次新连接的否则当呈现网络抖动时,所有的设备都会立即同时向服务器发起连接,这无异于黑客通过发起大量请求消耗网络带宽引起的拒绝服务攻击,这对服务器来说简直是灾难。所以在重连时通常采用一些退避算法,延迟一段时间再发起重连,如下图中左侧的流程。
如果要快速连上呢?最直接的做法就是缩短重试间隔,重试间隔越短,网络恢复后就能越快的恢复通讯。但是太频繁的重试对性能、带宽、电量的消耗就比较严重。如何在这之间做一个较好的权衡呢?
一种比较合理的方式是随着重试次数增多,逐渐增大重试间隔;另一方面监听网络变化,网络状态由offline变为online这种比拟可能重连上的时刻,可以适当地减小重连间隔,如上图中的右侧(随重试次数的增多,重连间隔也会变大)两种方式配合使用。
除此之外,还可以结合业务逻辑,根据胜利重连上的可能性适当的调整间隔,如网络未连接时或应用在后台时重连间隔可以调大一些,网络正常的状态下可以适当调小一些等等,加快重连上的速度。
结尾
最后总结一下,本文在开头将websocket断网重连细分为三个步骤:确定何时需要重连、断开旧连接和发起新连接。然后分别分析了websocket不同状态下、不同的网络状态下,如何快速完成这个三个步骤:首先通过定时发送心跳包的方式检测当前连接是否可用,同时监测网络恢复事件,恢复后立即发送一次心跳,快速感知当前状态,判断是否需要重连;其次正常情况下由服务器断开旧连接,与服务器失去联系时直接弃用旧连接,上层模拟断开,来实现快速断开;最后发起新连接时使用退避算法延迟一段时间再发起连接,同时考虑到资源浪费和重连速度,可以在网络离线时调大重连间隔,网络正常或网络由offline变为online时缩小重连间隔,使之尽可能快地重连上。
QWebsocket实现断线重连
class WebSocketClient : public QThread
{
Q_OBJECT
public:
WebSocketClient();
~WebSocketClient();
protected:
virtual void run();
public:
// 启动连接 - 必须把url地址 协议头 设置完成后再启动
bool startConnect();
// 断开连接
void disconnect();
//设置链接的URL
void setConnectUrl(QString val);
// 向协议队列中添加要发送的内容
bool addSendText(QString val);
// 设置websocket 的 Sec—Websocket-Protocol 头
void setSecWebSocketProtocolHeader(QString val);
// 设置心跳数据发送时间间隔 单位秒
void setHeartbeatTimer(int val);
// 设置判断需要重连的心跳发送失败次数
void setReconnetHeartbeatTimerCount(int val);
// 设置重连的时间间隔 单位秒
void setReconnectTimer(int val);
// 设置判断断开的重连次数
void setDisconnectReconnectCount(int val);
// 获取当前状态
webSocketState getCurrentState();
private slots :
// 连接成功
void onConnected();
// 连接断开
void onDisconnected();
// 接收到 协议内容
void onTextMessageReceived(const QString &val);
// 定时发送心跳
void onSendHeartbeatText();
// 定时重连
void onReconnect();
private:
webSocketState m_webSocketState; // websocket 状态
bool m_bCloseByHand; // 手动关闭
QMutex m_mutex; // 数据同步
QLinkedList<QString> m_sendTextLinkedList; // 协议链表
QString m_strURL; // URL地址
QString m_strSecWebsocketProtocol; // 协议头 目前使用此协议头 与网页端保持一致
QString m_strHeartbeatText; // 心跳内容
QWebSocket m_websocket; // websocket 客户端
int m_nHeartbeatTimer; // 心跳数据发送时间间隔 单位ms
int m_nReconnectHeartbatTimerCount; // 需要重连的心跳发送失败次数
int m_nReconnectTimer; // 重连的时间间隔 单位ms
int m_nDisconnectReconnectCount; // 重连失败次数
int m_nHeartbeatFailedCount; // 心跳发送失败次数
int m_nReconnectFailedCount; // 重连失败次数
QTimer m_timerHeartbeat; // 心跳发送定时器
QTimer m_timerReconnect; // 断线重连定时器
};
#include "WebSocketClient.h"
WebSocketClient::WebSocketClient()
: m_bCloseByHand(false)
, m_webSocketState(stateNone)
, m_nHeartbeatTimer(5000)
, m_nReconnectHeartbatTimerCount(3)
, m_nReconnectTimer(5000)
, m_nDisconnectReconnectCount(3)
, m_nHeartbeatFailedCount(0)
, m_nReconnectFailedCount(0)
, m_strURL(tr(""))
, m_strSecWebsocketProtocol(tr(""))
, m_strHeartbeatText(tr(""))
{
connect(&m_websocket, SIGNAL(connected()), this, SLOT(onConnected()));
connect(&m_websocket, SIGNAL(disconnected()), this, SLOT(onDisconnected()));
connect(&m_websocket, SIGNAL(textMessageReceived(QString)), this, SLOT(onTextMessageReceived(QString)));
connect(&m_timerHeartbeat, SIGNAL(timeout()), this, SLOT(onSendHeartbeatText()));
connect(&m_timerReconnect, SIGNAL(timeout()), this, SLOT(onReconnect()));
}
WebSocketClient::~WebSocketClient()
{
if (disconnected != m_webSocketState)
{
disconnect();
}
}
void WebSocketClient::run()
{
while (true)
{
if (m_webSocketState == connected)
{
m_mutex.lock();
if (m_sendTextLinkedList.size())
{
QString strVal = m_sendTextLinkedList.first();
if (m_websocket.sendTextMessage(strVal) == strVal.toLocal8Bit().length())
{
m_sendTextLinkedList.erase(m_sendTextLinkedList.begin());
}
m_mutex.unlock();
}
else
{
//当没有数据时,睡眠,否则cpu会过高
QThread::msleep(20);
m_mutex.unlock();
}
}
else if(m_bCloseByHand)
{
break;
}
}
}
bool WebSocketClient::startConnect()
{
bool bRet = false;
// 如果URL地址 或者 特定的协议头为空 返回false
if (!m_strURL.isEmpty() && !m_strSecWebsocketProtocol.isEmpty())
{
if (!isRunning())
{
start();
}
QNetworkRequest request;
request.setUrl(QUrl(m_strURL));
QByteArray byteHeader = "Protocol";
request.setRawHeader(byteHeader, m_strSecWebsocketProtocol.toLocal8Bit());
m_websocket.open(request);
bRet = true;
}
return bRet;
}
void WebSocketClient::disconnect()
{
m_bCloseByHand = true;
m_websocket.close();
m_timerHeartbeat.stop();
m_timerReconnect.stop();
}
void WebSocketClient::setConnectUrl(QString val)
{
m_strURL = val;
}
bool WebSocketClient::addSendText(QString val)
{
bool bRet = false;
if (m_webSocketState == disconnected || m_webSocketState == stateNone)
{
bRet = false;
}
else
{
m_mutex.lock();
m_sendTextLinkedList.append(val);
m_mutex.unlock();
bRet = true;
}
return bRet;
}
void WebSocketClient::setSecWebSocketProtocolHeader(QString val)
{
m_strSecWebsocketProtocol = val;
}
void WebSocketClient::setHeartbeatTimer(int val)
{
if (val > 5000)
{
m_nHeartbeatTimer = val;
}
}
void WebSocketClient::setReconnetHeartbeatTimerCount(int val)
{
if (val > 3)
{
m_nReconnectHeartbatTimerCount = val;
}
}
void WebSocketClient::setReconnectTimer(int val)
{
if (val > 5000)
{
m_nReconnectTimer = val;
}
}
void WebSocketClient::setDisconnectReconnectCount(int val)
{
if (val > 3)
{
m_nDisconnectReconnectCount = val;
}
}
WebSocketClient::webSocketState WebSocketClient::getCurrentState()
{
return m_webSocketState;
}
void WebSocketClient::onConnected()
{
// 如果是重连成功 停止重连的定时Timer
if (m_webSocketState == reconnecting)
{
m_timerReconnect.stop();
}
m_webSocketState = connected;
m_timerHeartbeat.start(m_nHeartbeatTimer);
m_nReconnectFailedCount = 0;
m_nHeartbeatFailedCount = 0;
}
void WebSocketClient::onDisconnected()
{
// 如果不是手动关闭 则需要重连
if (!m_bCloseByHand)
{
m_timerHeartbeat.stop();
m_timerReconnect.start(m_nReconnectTimer);
m_webSocketState = reconnecting;
}
}
void WebSocketClient::onTextMessageReceived(const QString &val)
{
//收到网页后端发来的内容,处理内容
}
void WebSocketClient::onSendHeartbeatText()
{
// 加锁 保证websocket 只在同一时间发送一条数据
m_mutex.lock();
int nSendByte = m_websocket.sendTextMessage(m_strHeartbeatText);
m_mutex.unlock();
// 发送心跳失败
if (nSendByte != m_strHeartbeatText.toLocal8Bit().length())
{
m_nHeartbeatFailedCount++;
// 失败次数等于启动重连的次数 停止发送心跳 开启重连 更新状态
if (m_nHeartbeatFailedCount == m_nReconnectHeartbatTimerCount)
{
m_timerHeartbeat.stop();
m_timerReconnect.start(m_nReconnectTimer);
m_webSocketState = reconnecting;
}
}
}
void WebSocketClient::onReconnect()
{
// close websocket
m_websocket.close();
//如果重连次数已到 不再重连 处于断开状态
if (m_nReconnectFailedCount == m_nReconnectHeartbatTimerCount)
{
m_webSocketState = disconnected;
m_timerReconnect.stop();
}
else // 开始连接并计数
{
startConnect();
m_nReconnectFailedCount++;
}
}
python实现websocket
#/usr/bin/env python
# -*- coding:utf-8 -*-
import websocket
from websocket import WebSocketApp
try:
import thread
except ImportError:
import _thread as thread
import time
class Test(object):
def __init__(self):
super(Test, self).__init__()
self.url = "ws://echo.websocket.org/"
self.ws = None
def on_message(self, message):
print("####### on_message #######")
print("message:%s" % message)
def on_error(self, error):
print("####### on_error #######")
print("error:%s" % error)
def on_close(self):
print("####### on_close #######")
def on_ping(self, message):
print("####### on_ping #######")
print("ping message:%s" % message)
def on_pong(self, message):
print("####### on_pong #######")
print("pong message:%s" % message)
def on_open(self):
print("####### on_open #######")
thread.start_new_thread(self.run, ())
def run(self, *args):
while True:
time.sleep(1)
input_msg = input("输入要发送的消息(ps:输入关键词 close 结束程序):\n")
if input_msg == "close":
self.ws.close() # 关闭
print("thread terminating...")
break
else:
self.ws.send(input_msg)
def start(self):
websocket.enableTrace(True) # 开启运行状态追踪。debug 的时候最好打开他,便于追踪定位问题。
self.ws = WebSocketApp(self.url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
# self.ws.on_open = self.on_open # 也可以先创建对象再这样指定回调函数。run_forever 之前指定回调函数即可。
self.ws.run_forever()
if __name__ == '__main__':
Test().start()
"""
--- request header ---
GET / HTTP/1.1
Upgrade: websocket
Host: echo.websocket.org
Origin: http://echo.websocket.org
Sec-WebSocket-Key: AXR9yvs3Ucn9LE35KkhXfw==
Sec-WebSocket-Version: 13
Connection: upgrade
-----------------------
--- response header ---
HTTP/1.1 101 Web Socket Protocol Handshake
Connection: Upgrade
Date: Wed, 04 Aug 2021 06:29:05 GMT
Sec-WebSocket-Accept: WoOPLeAQpWaV2Bqd4sDOFkSpUuw=
Server: Kaazing Gateway
Upgrade: websocket
-----------------------
####### on_open #######
输入要发送的消息(ps:输入关键词 close 结束程序):
aaadbbbbb
send: b'\x81\x89\x82-\xdfj\xe3L\xbe\x0e\xe0O\xbd\x08\xe0'
####### on_message #######
message:aaadbbbbb
输入要发送的消息(ps:输入关键词 close 结束程序):
sakdnakjf
send: b'\x81\x89\xa8\xe0g\x8b\xdb\x81\x0c\xef\xc6\x81\x0c\xe1\xce'
####### on_message #######
message:sakdnakjf
输入要发送的消息(ps:输入关键词 close 结束程序):
123456
send: b'\x81\x86(\x84>\xb7\x19\xb6\r\x83\x1d\xb2'
####### on_message #######
message:123456
输入要发送的消息(ps:输入关键词 close 结束程序):
send: b'\x8a\x80.\xf3`+'
send: b'\x8a\x80P\x0c\xc6W'
send: b'\x8a\x807j\x03l'
send: b'\x8a\x80\xd0\xac%v'
send: b'\x8a\x80\xb9\x9do\x08'
send: b'\x8a\x80s\xbb\xad\x8f'
send: b'\x8a\x80\xf4-\xd9\x8b'
close
send: b'\x88\x82\xf5L>\xc4\xf6\xa4'
####### on_close #######
Process finished with exit code 0
"""
QT实现TcpSocket
//初始化服务端
void TaskCreate::initServer()
{
tcpserver = new QTcpServer(this);
//监听7789端口
if (!tcpserver->listen(QHostAddress::Any, 7789)){
qWarning("TcpServer failed to start. Error: %s", qPrintable(tcpserver->errorString()));
return;
}
//监听到新的客户端连接请求
connect(tcpserver,&QTcpServer::newConnection,this,[this]{
//如果有新的连接就取出
while(tcpserver->hasPendingConnections())
{
QTcpSocket *socket = tcpserver->nextPendingConnection();
//(结构体)记录连接的客户端
clientList.append(socket);
qDebug() << QString("[%1:%2] TcpSocket Connected")
.arg(socket->peerAddress().toString())
.arg(socket->peerPort());
//收到数据readyRead
connect(socket,&QTcpSocket::readyRead,[this,socket]{
//没有可读的数据就返回
if(socket->bytesAvailable()<=0)
return;
QByteArray recv_text=socket->readAll();
qDebug() << QString("[%1:%2]")
.arg(socket->peerAddress().toString())
.arg(socket->peerPort());
qDebug() << "接收到数据:" << recv_text;
// 处理坐标点
handle_point(recv_text);
});
//连接断开,销毁socket对象,这是为了开关server时socket正确释放
connect(socket,&QTcpSocket::disconnected,[this,socket]{
socket->deleteLater();
clientList.removeOne(socket);
qDebug() << QString("[%1:%2] TcpSocket Disonnected")
.arg(socket->peerAddress().toString())
.arg(socket->peerPort());
updateState();
});
}
updateState();
});
connect(tcpserver,&QTcpServer::acceptError,[this](QAbstractSocket::SocketError){
qDebug() << "Server Error:"+tcpserver->errorString();
});
}
void TaskCreate::closeServer()
{
//停止服务
tcpserver->close();
qDebug() << "断开TcpSocket连接";
for(QTcpSocket * socket:clientList)
{
//断开与客户端的连接
socket->disconnectFromHost();
if(socket->state()!=QAbstractSocket::UnconnectedState){
socket->abort();
}
}
}
void TaskCreate::updateState()
{
//将当前server地址和端口、客户端连接数写在标题栏
if(tcpserver->isListening()){
qDebug() << "已经连接上例程qt";
}else{
qDebug() << "例程qt连接断开";
}
}
void MainWindow::initClient()
{
if(client->state()==QTcpSocket::ConnectedState){
qDebug() << "连接中";
// client->abort();
}else if(client->state()==QTcpSocket::UnconnectedState){
client->connectToHost("127.0.0.1",7789);
}else{
qDebug() << "It is not ConnectedState or UnconnectedState";
}
connect(client,&QTcpSocket::connected,[this]{updateState();});
connect(client,&QTcpSocket::disconnected,[this]{updateState();});
}
//发送坐标
void MainWindow::send_point(QByteArray point)
{
const QByteArray send_data = point;
client->write(send_data);
}
//更新socket状态
void MainWindow::updateState()
{
if(client->state()==QTcpSocket::ConnectedState){
qDebug() << "连接上任务调度系统";
}else{
qDebug() << "任务调度系统断开连接";
}
}