之前我们将 CocoaAsyncSocket 作为底层实现,在其上面封装了一套 Socket 通信机制以及业务接口,最近我们开始研究 WebSocket ,并用来替换掉原先的 CocoaAsyncSocket ,简单来说一下两者的关系,WebSocket 和 Socket 虽然名称上很像,但两者是完全不同的东西, WebSocket 是建立在 TCP/IP 协议之上,属于应用层的协议,而 Socket 是在应用层和传输层中的一个抽象层,它是将 TCP/IP 层的复杂操作抽象成几个简单的接口来提供给应用层调用。为什么要做这次替换呢?原因是我们服务端在做改造,同时网页版 IM 已经使用了 WebSocket ,客户端也采用的话对于服务端来说维护一套代码会更好更方便,而且 WebSocket 在体积、实时性和扩展上都具有一定的优势。
WebSocket 最新的协议是 13 RFC 6455 ,要理解 WebSocket 的实现,一定要去理解它的协议!~
WebSocket 的实现分为握手,数据发送/读取
握手要从请求头去理解。
WebSocket 首先发起一个 HTTP 请求,在请求头加上 Upgrade 字段,该字段用于改变 HTTP 协议版本或者是换用其他协议,这里我们把 Upgrade 的值设为 websocket ,将它升级为 WebSocket 协议。
同时要注意 Sec-WebSocket-Key 字段,它由客户端生成并发给服务端,用于证明服务端接收到的是一个可受信的连接握手,可以帮助服务端排除自身接收到的由非 WebSocket 客户端发起的连接,该值是一串随机经过 base64 编码的字符串。
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Origin: http://example.com
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13
我们可以简化请求头,将请求以字符串方式发送出去,当然别忘了最后的两个空行作为包结束:
const char * fmt = "GET %s HTTP/1.1\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Host: %s\r\n"
"Sec-WebSocket-Key: %s\r\n"
"Sec-WebSocket-Version: 13\r\n"
"\r\n";
size = strlen(fmt) + strlen(path) + strlen(host) + strlen(ws->key);
buf = (char *)malloc(size);
sprintf(buf, fmt, path, host, ws->key);
size = strlen(buf);
nbytes = ws->io_send(ws, ws->context, buf, size);
收到请求后,服务端也会做一次响应:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
里面重要的是 Sec-WebSocket-Accept ,服务端通过从客户端请求头中读取 Sec-WebSocket-Key 与一串全局唯一的标识字符串(俗称魔串)“258EAFA5-E914-47DA- 95CA-C5AB0DC85B11”做拼接,生成长度为160字节的 SHA-1 字符串,然后进行 base64 编码,作为 Sec-WebSocket-Accept 的值回传给客户端。
处理握手 HTTP 响应解析的时候,可以用 nodejs 的 http-paser ,解析方式也比较简单,就是对头信息的逐字读取再处理,具体处理你可以看一下它的状态机实现。解析完成后你需要对其内容进行解析,看返回是否正确,同时去管理你的握手状态。
数据的处理就要拿这个帧协议图来说明了:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
首先我们来看看数字的含义,数字表示位,0-7表示有8位,等于1个字节。
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
所以如果要组装一个帧数据可以这样子:
char *rev = (rev *)malloc(4);
rev[0] = (char)(0x81 & 0xff);
rev[1] = 126 & 0x7f;
rev[2] = 1;
rev[3] = 0;
ok,了解了帧数据的样子,我们反过来去理解值对应的帧字段。
首先0x81是什么,这个是十六进制数据,转换成二进制就是1000 0001, 是一个字节的长度,也就是这一段里面每一位的值:
0 1 2 3 4 5 6 7 8
+-+-+-+-+-------+
|F|R|R|R| opcode|
|I|S|S|S| (4) |
|N|V|V|V| |
| |1|2|3| |
+-+-+-+-+-------+
0xff 作用就是取出需要的二进制值。
下面再来看126,126则表示的是 Payload len ,也就是 Payload 的长度:
8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-------------+-------------------------------+
|M| Payload len | Extended payload length |
|A| (7) | (16/64) |
|S| | (if payload len==126/127) |
|K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
而数据的发送和读取就是对帧的封装和解析。
数据发送:
int ws_recv(websocket_t *ws) {
if (ws->state != WS_STATE_HANDSHAKE_COMPLETED) {
return ws_do_handshake(ws);
}
int ret;
while(TRUE) {
ret = ws__recv(ws);
if (ret != OK) {
break;
}
}
return ret;
}
int ws__recv(websocket_t *ws) {
int nbytes;
int ret = OK, i;
int state = ws->rd_state;
char *rd_buf;
uint64_t rd_buf_len = 0;
switch(state) {
case WS_READ_IDLE: {
if (ws->buf_pos < 2) {
rd_buf_len = 2 - ws->buf_pos;
rd_buf = malloc(rd_buf_len);
nbytes = ws->io_recv(ws, ws->context, rd_buf, (size_t) (rd_buf_len));
if (nbytes < 0) {
free(rd_buf);
//TODO errono fix
ret = nbytes;
break;
}
ws__enqueue_buf(ws, rd_buf, (size_t)nbytes) ;
free(rd_buf);
}
if (ws->buf_pos < 2) {
ret = WS_WANT_READ;
break;
}
ws_frame_t * frame;
if (ws->frame == NULL) {
frame__alloc(&ws->frame);
frame = ws->frame;
} else {
frame = ws->frame;
}
rd_buf = ws->buf;
frame->fin = (*(rd_buf) & 0x80) == 0x80 ? 1 : 0;
frame->op_code = *(rd_buf) & 0x0f;
frame->payload_len = *(rd_buf + 1) & 0x7f;
if (frame->payload_len < 126) {
frame->payload_bit_offset = 2;
ws->rd_state = WS_READ_PAYLOAD;
} else if (frame -> payload_len == 126) {
frame->payload_bit_offset = 4;
ws->rd_state = WS_READ_EXTEND_PAYLOAD_2_WORDS;
} else {
frame->payload_bit_offset = 8;
ws->rd_state = WS_READ_EXTEND_PAYLOAD_8_WORDS;
}
ws__reset_buf(ws, 2);
break;
}
case WS_READ_EXTEND_PAYLOAD_2_WORDS: {
#define PAYLOAD_LEN_BITS 2
if (ws->buf_pos < PAYLOAD_LEN_BITS) {
rd_buf_len = PAYLOAD_LEN_BITS - ws->buf_pos;
rd_buf = malloc(rd_buf_len);
nbytes = ws->io_recv(ws, ws->context, rd_buf, (size_t) (rd_buf_len));
if (nbytes < 0) {
free(rd_buf);
ret = nbytes;
break;
}
ws__enqueue_buf(ws, rd_buf, (size_t)nbytes) ;
free(rd_buf);
}
if (ws->buf_pos < PAYLOAD_LEN_BITS) {
ret = WS_WANT_READ;
break;
}
rd_buf = ws->buf;
ws_frame_t * frame = ws->frame;
//rd_buf[0] = 0; rd_buf[1] = 255
for (i = 0; i < PAYLOAD_LEN_BITS; i++) {
*(((char *)&frame->payload_len) + i) = rd_buf[PAYLOAD_LEN_BITS - 1 - i];
}
ws__reset_buf(ws, PAYLOAD_LEN_BITS);
ws->rd_state = WS_READ_PAYLOAD;
#undef PAYLOAD_LEN_BITS
break;
}
case WS_READ_EXTEND_PAYLOAD_8_WORDS: {
#define PAYLOAD_LEN_BITS 8
if (ws->buf_pos < PAYLOAD_LEN_BITS) {
rd_buf_len = PAYLOAD_LEN_BITS - ws->buf_pos;
rd_buf = malloc(rd_buf_len);
nbytes = ws->io_recv(ws, ws->context, rd_buf, (size_t) (rd_buf_len));
if (nbytes < 0) {
free(rd_buf);
ret = nbytes;
break;
}
ws__enqueue_buf(ws, rd_buf, (size_t)nbytes) ;
free(rd_buf);
}
if (ws->buf_pos < PAYLOAD_LEN_BITS) {
ret = WS_WANT_READ;
break;
}
rd_buf = ws->buf;
ws_frame_t * frame = ws->frame;
for (i = 0; i < PAYLOAD_LEN_BITS; i++) {
*(((char *)&frame->payload_len) + i) = rd_buf[PAYLOAD_LEN_BITS - 1 - i];
}
ws__reset_buf(ws, PAYLOAD_LEN_BITS);
ws->rd_state = WS_READ_PAYLOAD;
#undef PAYLOAD_LEN_BITS
break;
}
case WS_READ_PAYLOAD: {
ws_frame_t * frame = ws->frame;
uint64_t payload_len = frame->payload_len;
if (ws->buf_pos < payload_len) {
rd_buf_len = payload_len - ws->buf_pos;
rd_buf = malloc(rd_buf_len);
nbytes = ws->io_recv(ws, ws->context, rd_buf, (size_t) (rd_buf_len));
if (nbytes < 0) {
free(rd_buf);
ret = nbytes;
break;
}
ws__enqueue_buf(ws, rd_buf, (size_t)nbytes) ;
free(rd_buf);
}
if (ws->buf_pos < payload_len) {
ret = WS_WANT_READ;
break;
}
rd_buf = ws->buf;
frame->payload = malloc(payload_len);
memcpy(frame->payload, rd_buf, payload_len);
ws__reset_buf(ws, payload_len);
if (frame->fin == 1) {
// is control frame
if (frame->op_code == OP_CLOSE) {
// TODO if should response a close frame
// close connection
if (ws->close_cb) {
ws->close_cb(ws);
}
} else {
ws__dispatch_msg(ws, frame);
ws->frame = NULL;
}
} else {
ws_frame_t *new_frame;
frame__alloc(&new_frame);
frame->next = new_frame;
new_frame->prev = frame;
ws->frame = new_frame;
}
ws->rd_state = WS_READ_IDLE;
break;
}
}
return ret;
}
数据解析:
void ws__wrap_packet(_WS_IN websocket_t *ws,
_WS_IN const char *payload,
_WS_IN unsigned long long payload_size,
_WS_IN int flags,
_WS_OUT char** out,
_WS_OUT uint64_t *out_size) {
struct timeval tv;
char mask[4];
unsigned int mask_int;
unsigned int payload_len_bits;
unsigned int payload_bit_offset = 6;
unsigned int extend_payload_len_bits, i;
unsigned long long frame_size;
const int MASK_BIT_LEN = 4;
gettimeofday(&tv, NULL);
srand(tv.tv_usec * tv.tv_sec);
mask_int = rand();
memcpy(mask, &mask_int, 4);
/**
* payload_len bits
* ref to https://tools.ietf.org/html/rfc6455#section-5.2
* If 0-125, that is the payload length
*
* If payload length is equals 126, the following 2 bytes interpreted as a
* 16-bit unsigned integer are the payload length
*
* If 127, the following 8 bytes interpreted as a 64-bit unsigned integer (the
* most significant bit MUST be 0) are the payload length.
*/
if (payload_size <= 125) {
// consts of ((fin + rsv1/2/3 + opcode) + payload-len bits + mask bit len + payload len)
extend_payload_len_bits = 0;
frame_size = 1 + 1 + MASK_BIT_LEN + payload_size;
payload_len_bits = payload_size;
} else if (payload_size > 125 && payload_size <= 0xffff) {
extend_payload_len_bits = 2;
// consts of ((fin + rsv1/2/3 + opcode) + payload-len bits + extend-payload-len bites + mask bit len + payload len)
frame_size = 1 + 1 + extend_payload_len_bits + MASK_BIT_LEN + payload_size;
payload_len_bits = 126;
payload_bit_offset += extend_payload_len_bits;
} else if (payload_size > 0xffff && payload_size <= 0xffffffffffffffffLL) {
extend_payload_len_bits = 8;
// consts of ((fin + rsv1/2/3 + opcode) + payload-len bits + extend-payload-len bites + mask bit len + payload len)
frame_size = 1 + 1 + extend_payload_len_bits + MASK_BIT_LEN + payload_size;
payload_len_bits = 127;
payload_bit_offset += extend_payload_len_bits;
} else {
if (ws->error_cb) {
ws_error_t *err = ws_new_error(WS_SEND_DATA_TOO_LARGE_ERR);
ws->error_cb(ws, err);
free(err);
}
return ;
}
*out_size = frame_size;
char *data = (*out) = (char *)malloc(frame_size);
char *buf_offset = data;
bzero(data, frame_size);
*data = flags & 0xff;
buf_offset = data + 1;
// set mask bit = 1
*(buf_offset) = payload_len_bits | 0x80; //payload length with mask bit on
buf_offset = data + 2;
if (payload_len_bits == 126) {
payload_size &= 0xffff;
} else if (payload_len_bits == 127) {
payload_size &= 0xffffffffffffffffLL;
}
for (i = 0; i < extend_payload_len_bits; i++) {
*(buf_offset + i) = *((char *)&payload_size + (extend_payload_len_bits - i - 1));
}
/**
* according to https://tools.ietf.org/html/rfc6455#section-5.3
*
* buf_offset is set to mask bit
*/
buf_offset = data + payload_bit_offset - 4;
for (i = 0; i < 4; i++) {
*(buf_offset + i) = mask[i] & 0xff;
}
/**
* mask the payload data
*/
buf_offset = data + payload_bit_offset;
memcpy(buf_offset, payload, payload_size);
mask_payload(mask, buf_offset, payload_size);
}
对WebSocket的学习主要是对协议的理解,理解了协议,上面复杂的代码自然而然就会明白~
websocket是实现浏览器和web服务器沟通的一个好方法。一个比较好的使用方法就是在web网页上使用websocket,然后再起一个webdocketd服务器。
HTML5定义了WebSocket协议,能更好的节省服务器资源和带宽并达到实时通讯。在 WebSocket API,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道。两者之间就直接可以数据互相传送。
WebSocket是基于TCP的一种新的网络协议,并在2011年被IETF定为标准的全双工通信协议,它实现了客户端与服务器全双工通信。 WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。
Web Sockets定义了一种在通过一个单一的 socket 在网络上进行全双工通讯的通道。它不仅仅是传统的 HTTP 通讯的一个增量的提高,尤其对于实时、事件驱动的应用来说是一个飞跃。
WebSocket是HTML5出的东西(协议),也就是说HTTP协议没有变化,或者说没关系,但HTTP是不支持持久连接的(长连接,循环连接的不算)
在使用websocket的过程中,有时候会遇到网络断开的情况,但是在网络断开的时候服务器端并没有触发onclose的事件。这样会有:服务器会继续向客户端发送多余的链接,并且这些数据还会丢失
在建立web socket通信后,发送数据时,出现下图所示现象:要明白这个问题产生的原因,就需要了解websocket的几个状态。通常在实例化一个websocket对象之后,客户端就会与服务器进行连接
在 websocket 出现之前,为了实现 web 端的实时通信,通常采用的是 Ajax 轮询技术,(轮询是在特定的时间间隔内,由浏览器向服务器发送 HTTP 请求,再由服务器返回最新的数据)
websocket服务端往往需要和服务层打交道,因此需要将服务层的一些bean注入到websocket实现类中使用,但是呢,websocket实现类虽然顶部加上了@Component注解,依然无法通过@Resource和@Autowire注入spring容器管理下的bean
以下 API 用于创建 WebSocket 对象。以上代码中的第一个参数 url, 指定连接的 URL。第二个参数 protocol 是可选的,指定了可接受的子协议。以下是 WebSocket 对象的属性。
内容以共享、参考、研究为目的,不存在任何商业目的。其版权属原作者所有,如有侵权或违规,请与小编联系!情况属实本人将予以删除!