RPC

远程过程调用 Remote Procedure Call,客户程序通过接口调用另外一台服务器内部的标准或自定义函数,获得函数返回的数据进行处理

传输协议

OSI 通信模型中,RPC 跨越 传输层应用层

传输层常见协议:

  • TCP, 连接是基于字节流,一种可以保证可靠数据传输的传输层协议,如网页请求资源。
  • UDP, 基于报文流,一种无连接的传输层协议,它无法保证数据传输可靠性,但传输效率更高,开销更小,如视频、语言电话。

Node.js net 模块

  • socket

单工通信

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
const net = require('net');
const {getRandomBookId} = require('../code');

const socket = new net.Socket();

socket.connect({
host: '127.0.0.1',
port: 3000,
});

/**
* 数据打包
* @param {string} bookId
* @returns {buffer}
*/
const encode = (bookId) => {
const buffer = Buffer.alloc(2);
buffer.write(bookId);
console.log(buffer);
return buffer;
}

const bookId = getRandomBookId();
socket.write(encode(bookId));

socket.on('data', (buffer) => {
console.log(`书籍 ${bookId}: ${buffer.toString()}`);
});

服务端:

1
2
3
4
5
6
7
8
9
10
const net = require('net');
const {searchBookName} = require('../code');

const server = net.createServer((socket) => {
socket.on('data', (buffer) => {
socket.write(searchBookName(buffer.toString()));
});
});

server.listen(3000);

半双工通信

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
const net = require('net');
const {getRandomBookId} = require('../code');

const socket = new net.Socket();

let currentBookId = null;

socket.connect({
host: '127.0.0.1',
port: 3000,
});

/**
* 数据打包
* @param {string} bookId
* @returns {buffer}
*/
const encode = (bookId) => {
const buffer = Buffer.alloc(2);
buffer.write(bookId);
return buffer;
}

/**
* 发送请求
*/
const sendRequest = () => {
currentBookId = getRandomBookId();
socket.write(encode(currentBookId));
}

sendRequest();

socket.on('data', (buffer) => {
console.log(`书籍 ${currentBookId}: ${buffer.toString()}`);
sendRequest();
});

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
const net = require('net');
const {searchBookName} = require('../code');

const server = net.createServer((socket) => {
socket.on('data', (buffer) => {
setTimeout(() => {
socket.write(searchBookName(buffer.toString()));
}, 1000);
});
});

server.listen(3000);

全双工通信

TCP传输协议面向流的,没有消息保护边界,所有会出现 粘包拆包

解决方案:指定协议规则(包序号1-2字节 + 包长度3-6字节 + 包主体)

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
const net = require('net');
const {getRandomBookId} = require('../code');

const socket = new net.Socket();

socket.connect({
host: '127.0.0.1',
port: 3000,
});

/**
* 计数器
* @returns {object}
*/
const count = () => {
let num = 0;
return {
get() {
return num;
},
add() {
num += 1;
}
}
}

// 数据包序号
const currentIndex = count();

/**
* 组装一个二进制包,默认协议内容为:1-2字节为包序号,3-6字节为包长度,后面是包内容
* @returns {buffer}
*/
const encode = () => {
// 内容
const body = Buffer.alloc(2);
const bookId = getRandomBookId();
body.write(bookId);

// 头内容
const header = Buffer.alloc(6);
header.writeInt16BE(currentIndex.get());
header.writeInt32BE(body.length, 2);

// 拼接数据包
const buffer = Buffer.concat([header, body]);

console.log(`包${currentIndex.get()}传输的课程 id 为${bookId}`);
currentIndex.add();
return buffer;
}

/**
* 拆解一个二进制包
* @param {buffer} 二进制数据包
* @returns {object}
*/
const decode = (buffer) => {
if (buffer instanceof Buffer) {
const header = buffer.slice(0, 6);
const body = buffer.slice(6);
const index = header.readUInt16BE();

return {
success: true,
data: {
index,
text: body.toString(),
}
}
}
return {
success: false,
message: '数据包格式不正确',
}
}

/**
* 检查获取一个包的长度,默认公示为:头部长度(6)+ 存储在头部的包长度
* 如果长度小于6,则说明被拆包,需要与下个包数据拼接使用,故返回 0 。
* @param {buffer} buffer
* @returns {number}
*/
const checkBufferLen = (buffer) => {
if (buffer instanceof Buffer) {
// 头部信息不全时,可能是因为内容超长被拆包了
if (buffer.length < 6) {
return 0;
}
const length = 6 + buffer.readInt32BE(2);
return buffer.length >= length ? length : 0;
}
return 0;
}

// 残留的数据
let residualBuffer = null;

socket.on('data', (buffer) => {
// 拼接残留的 buffer
if(residualBuffer) {
buffer = Buffer.concat([residualBuffer, buffer]);
}

let completeLen = 0;

// 分包解析
while(completeLen = checkBufferLen(buffer)) {
const package = buffer.slice(0, completeLen);
buffer = buffer.slice(completeLen);

const res = decode(package);
if (res.success) {
console.log(`包${res.data.index}, 返回值是${res.data.text}`);
}
}

residualBuffer = buffer;
});

// 是否模拟网络波动
const isRandom = true;

// 连续发出 10 个请求
for(let i = 0; i < 10; i++) {
if (isRandom) {
setTimeout(() => {
socket.write(encode());
}, Math.round(Math.random() * 1000));
} else {
socket.write(encode());
}
}

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
const net = require('net');
const {searchBookName} = require('../code');

/**
* 组装一个二进制包
* @param {string} index 包序号
* @param {string} data 包内容
* @returns {buffer}
*/
const encode = (index, data) => {
const body = Buffer.from(data);
const header = Buffer.alloc(6);
header.writeInt16BE(index);
header.writeInt32BE(body.length, 2);

return Buffer.concat([header, body]);
}

/**
* 拆解一个二进制包
* @param {buffer} 二进制数据包
* @returns {object}
*/
const decode = (buffer) => {
if (buffer instanceof Buffer) {
const header = buffer.slice(0, 6);
const index = header.readUInt16BE();
const body = buffer.slice(6).toString();

return {
success: true,
data: {
index,
text: body,
}
}
}
return {
success: false,
message: '数据包格式不正确',
}
}

/**
* 检查获取一个包的长度,默认公示为:头部长度(6)+ 存储在头部的包长度
* 如果长度小于6,则说明被拆包,需要与下个包数据拼接使用,故返回 0 。
* @param {buffer} buffer
* @returns {number}
*/
const checkBufferLen = (buffer) => {
if (buffer instanceof Buffer) {
// 头部信息不全时,可能是因为内容超长被拆包了
if (buffer.length < 6) {
return 0;
}
const length = 6 + buffer.readInt32BE(2);
return buffer.length >= length ? length : 0;
}
return 0;
}

// 是否模拟网络波动
const isRandom = true;

const server = net.createServer((socket) => {
let residualBuffer = null;
socket.on('data', (buffer) => {

// 拼接残留的 buffer
if (residualBuffer) {
buffer = Buffer.concat([residualBuffer, buffer]);
}

let completeLen = 0;

// 分包解析
while(completeLen = checkBufferLen(buffer)) {
const package = buffer.slice(0, completeLen);
buffer = buffer.slice(completeLen);

const res = decode(package);
if (res.success) {
if(isRandom) {
setTimeout(() => {
socket.write(
encode(res.data.index, searchBookName(res.data.text))
);
}, Math.round(Math.random() * 1000));
} else {
socket.write(
encode(res.data.index, searchBookName(res.data.text))
)
}
}
}

residualBuffer = buffer;
});
});

server.listen(3000);

RPC框架内存使用

RPC 主要工作

网络收发 – 压缩解压 – 序列化反序列化

本质上一条 RPC 消息内存是离散的,都需要有自己的 buffer 类型

buffer 设计

链表每块大小怎么定义?

malloc/free

作为系统调度,比较耗费性能
一般来说内存分配交给 ptmalloc、jemalloc、tcmalloc

碎片管理

碎片管理会分很多层,大小、使用时间等都会影响这块内存回收到哪层。那么我们组织这些内存碎片的数据结构,必须满足能够互斥、快速找到适合的碎片、碎片及时回收等需求

RPC 核心

目的:调用远程方法像调用本地一样无差别
本质:屏蔽远程调用网络相关细节

调用基本流程通过动态代理实现

RPC 会给接口生成一个代理类,所以我们调用这个接口实际调用的是动态生成的代理类,由代理类来触发远程调用

动态代理: Spring(AOP)、JDK 动态代理、cglib、Dubbo(Javassist)

调用细节

调用细节

  1. 序列化处理参数,需要综合考虑通用性、性能、可读性和兼容性

  2. RPC协议:协议头 + 协议体

    协议头放一些元数据,包括:魔法位、协议的版本、消息的类型、序列化方式、整体长度、头长度、扩展位等。
    协议体就是放请求的数据

  3. 网络传输

    IO 模型:

    • IO 多路复用 (推荐大多数RPC调用场景都是高并发)
    • 同步阻塞 BIO
    • 同步非阻塞 NIO
    • 异步非阻塞 AIO

    JAVA 使用的轮子 Netty

  4. 工厂级别 RPC,集群部署,需要注册中心

  5. 路由分组策略:实现分组调用、灰度发布、流量隔离等

  6. 负载均衡

  7. 异常重试

  8. 限流熔断

泛化调用

常见 RPC 框架

RMI、Dubbo、gRPC、Hessian(二进制协议)、Thrift