寫在開頭

  • 想學習造輪子技術,可以看我之前的原創文章大集合: https://mp.weixin.qq.com/s/RsvI5AFzbp3rm6sOlTmiYQ
  • 如果你想領取 3700G 免費學習資料、或者加入技術交流羣(禁止發廣告),可以文末+我微信,專注技術不閒聊

什麼是protobuffer?

  • protocol buffer是Google的一種獨立的數據交換格式,可運用於多種領域。

  • protocolbuffer(以下簡稱PB)是google 的一種數據交換的格式,它獨立於語言,獨立於平臺。

  • google 提供了多種語言的實現:java、c#、c++、go 和 python,每一種實現都包含了相應語言的編譯器以及庫文件。由於它是一種二進制的格式,比使用 xml 進行數據交換快許多。

  • 可以把它用於分佈式應用之間的數據通信或者異構環境下的數據交換。作爲一種效率和兼容性都很優秀的二進制數據傳輸格式,可以用於諸如網絡傳輸、配置文件、數據存儲等諸多領域。

總結一下優點

  • 簡單說來 Protobuf 的主要優點就是:簡潔,快。

  • 爲什麼這麼說呢?

  • 因爲Protocol Buffer 信息的表示非常緊湊,這意味着消息的體積減少,自然需要更少的資源。比如網絡上傳輸的字節數更少,需要的 IO 更少等,從而提高性能。

  • 對於一條消息,用 Protobuf 序列化後的字節序列爲:

08 65 12 06 48 65 6C 6C 6F 77
  • 而如果用 XML,則類似這樣:

31 30 31 3C 2F 69 64 3E 3C 6E 61 6D 65 3E 68 65 
6C 6C 6F 3C 2F 6E 61 6D 65 3E 3C 2F 68 65 6C 6C
6F 77 6F 72 6C 64 3E

在Node.js中引入PB

yarn add protobufjs -D
mkdir proto
cd proto
vi message.proto

....
//message.proto文件
package message;
option optimize_for = LITE_RUNTIME;
message Account{
required string accountName = 1;
required string pwd = 2;
}
message AccountList{
required int32 index = 1;
repeated Account list = 2;
}

開始使用PB協議

  • 引入 protobufjs
  • 讀取root對象

const ProtoBufJs = require("protobufjs");
const root = ProtoBufJs.loadSync("./proto/message.proto");
  • 讀取定義好的pb文件, 動態引入讀取

const ProtoBufJs = require("protobufjs");
const root = ProtoBufJs.loadSync("./proto/message.proto");
const AccountList = root.lookupType("message.AccountList");
const Account = root.lookupType("message.Account");
const accountListObj = AccountList.create();
for (let i = 0; i < 5;i++) {
const accountObj = Account.create();
accountObj.accountName = "前端巔峯" + i;
accountObj.pwd = "Peter醬要比技術胖還騷" + i;
accountListObj.list.push(accountObj);
}
const buffer = AccountList.encode(accountListObj).finish();

console.log(buffer)
  • 使用nodemon啓動項目

  • 打印出了Buffer,此時轉化成string

const ProtoBufJs = require("protobufjs");
const root = ProtoBufJs.loadSync("./proto/message.proto");
const AccountList = root.lookupType("message.AccountList");
const Account = root.lookupType("message.Account");
const accountListObj = AccountList.create();
const accountObj = Account.create();
accountObj.accountName = "前端巔峯";
accountObj.pwd = "Peter醬要比技術胖還騷";
accountObj.test = "大保健越做身體越虛是爲什麼";
accountListObj.list.push(accountObj);
const buffer = AccountList.encode(accountListObj).finish();
console.log(buffer.toString());
  • 打印得到

  • 那麼請問,大寶劍越做身體越虛弱,是爲什麼?

引入socket通信,二進制更好的支持

  • socket
    redis
    udp
    TCP
    redis
    
const net = require("net");
const listenPort = 6380; //監聽端口
const server = net
.createServer(function (socket) {
// 創建socket服務端
console.log("connect: " + socket.remoteAddress + ":" + socket.remotePort);
socket.setKeepAlive(true);
socket.setEncoding("utf-8");
//接收到數據
socket.on("data", function (data) {
console.log("client send:" + data);
});
socket.write("Hello client!\r\n");
//數據錯誤事件
socket.on("error", function (exception) {
socket.end();
});
//客戶端關閉事件
socket.on("close", function (data) {
});
})
.listen(listenPort);
//服務器監聽事件
server.on("listening", function () {
console.log("server listening:" + server.address().port);
});
//服務器錯誤事件
server.on("error", function (exception) {
console.log("server error:" + exception);
});

  • redis
    6379
    6380
    keep-alive
    socket.setKeepAlive(true)
    

編寫redis客戶端

  • 引入 Socket 通信
const { Socket } = require("net");
//其他引入pb文件的代碼不變
  • 引入 pb 文件的代碼不變,客戶端一份,服務端一份,雙工通訊兩邊 pb 文件都要各自有一份
const port = 6380;
const host = "127.0.0.1";
const client = new Socket();
client.setKeepAlive(true);
client.setEncoding("utf-8");
//連接到服務端
client.connect(port, host, function () {
client.write("hello server");
//向端口寫入數據到達服務端
});
client.on("data", function (data) {
console.log("from server:" + data);
//得到服務端返回來的數據
});
client.on("error", function (error) {
//錯誤出現之後關閉連接
console.log("error:" + error);
client.destory();
});
client.on("close", function () {
//正常關閉連接
console.log("Connection closed");
});
  • 通過 socket 鏈接 6380 端口服務器,建立長鏈接

應用層心跳保活& 重連

  • 重新定義pb文件,PingPong

package message;
syntax = "proto3";

message PingPong {
string message_type = 1; // 會變成 messageType
string ping = 2;
string pong = 3;
}
  • 客戶端編譯pb文件,使用pb協議進行通訊與服務端

const root = ProtoBufJs.loadSync('./proto/message.proto');
const PingPong = root.lookupType('message.PingPong');
setInterval(() => {
const payload = { message_type: '1', ping: '1', pong: '2' };
const errMsg = PingPong.verify(payload);
if (errMsg) throw Error(errMsg);
const message = PingPong.create(payload);
const buffer = PingPong.encode(message).finish();
client.write(buffer);
}, 3000);
  • 每隔3秒發送心跳包一次

服務端接受心跳

  • 引入pb

const root = ProtoBufJs.loadSync('./proto/message.proto');
const PingPong = root.lookupType('message.PingPong');
  • 接受心跳包

const server = createServer(function (socket) {
socket.setKeepAlive(true);
// 創建socket服務端
//接收到數據
socket.on('data', function (data) {
const decodedMessage = PingPong.decode(data);
console.log(decodedMessage, 'decodedMessage');
});
socket.write('Hello client!\r\n');
//數據錯誤事件
socket.on('error', function (exception) {
console.log('socket error:' + exception);
socket.end();
});
//客戶端關閉事件
socket.on('close', function (data) {
console.log('client closed!');
});
}).listen(listenPort);
  • 此時已經能接收到PB協議傳輸的Buffer,並且解析了

  • 心跳保持,客戶端發送心跳

  timer = setInterval(() => {
count++;
const payload = { messageType: '1', ping: '1' };
const errMsg = PingPong.verify(payload);
if (errMsg) throw Error(errMsg);
const message = PingPong.create(payload);
const buffer = PingPong.encode(message).finish();
client.write(buffer);
}, 3000);
  • 服務端返回心跳

 socket.on('data', function (data) {
const decodedMessage = PingPong.decode(data);
console.log(decodedMessage,'decodedMessage')
if(decodedMessage.messageType ==='1'){
console.log('進入判斷')
const payload = { messageType: '1', pong: '1'};
const errMsg = PingPong.verify(payload);
if (errMsg) throw Error(errMsg);
const message = PingPong.create(payload);
const buffer = PingPong.encode(message).finish();
socket.write(buffer);
}
});
  • 客戶端記錄心跳,做超時、斷了的處理

client.on('data', function (data) {
const decodedMessage = PingPong.decode(data);
if (decodedMessage.messageType === '1') {
console.log('收到心跳回包');
count = 0;
}
console.log('from server:' + decodedMessage.messageType);
//得到服務端返回來的數據
});
  • 發送心跳時候判斷,三次後收不到心跳,拋出錯誤,不再發送心跳

  timer = setInterval(() => {
if (count > 3) {
clearInterval(timer);
client.end();
throw Error('timeout')
}
count++;
const payload = { messageType: '1', ping: '1' };
const errMsg = PingPong.verify(payload);
if (errMsg) throw Error(errMsg);
const message = PingPong.create(payload);
const buffer = PingPong.encode(message).finish();
client.write(buffer);
}, 3000);
  • 服務端故意不回覆心跳

  socket.write(buffer);
  • 客戶端拋出錯誤,取消心跳發送,斷開 socket 鏈接
  • 此時應該還有重連機制,這裏就不做處理了,還有發送隊列這些

實現redis的get,set方法

  • 數據存儲,服務端用 Map 類型存儲
  • 傳輸使用 PB 協議
  • 接受到消息回覆 ACK

定義數據傳輸的 Payload pb字段

  • 定義字段

message Data {
string message_type = 1; // 會變成 messageType
Payload data = 2;
}


message Payload {
required string key = 1;
required string value =2;
}
  • 此時定義RedisSet函數:

const Data = root.lookupType('message.Data');
function RedisSet() {
const msg = { messageType: '2', data: { key: '1', value: '2' } };
const errMsg = Data.verify(msg);
if (errMsg) throw Error(errMsg);
const message = Data.create(msg);
const buffer = Data.encode(message).finish();
client.write(buffer);
}
  • 服務端decode解析反序列化

  socket.on('data', function (data) {
const decodedMessage = PingPong.decode(data);
console.log(decodedMessage,'decodedMessage');
if(decodedMessage.messageType ==='1'){
const payload = { messageType: '1', pong: '1'};
const errMsg = PingPong.verify(payload);
if (errMsg) throw Error(errMsg);
const message = PingPong.create(payload);
const buffer = PingPong.encode(message).finish();
socket.write(buffer);
}
});
  • 反序列化成功

  • 此時已經拿到了數據,但是細心觀察的會發現,我們拿錯了反序列的對象去處理,導致數據有問題,那麼我們需要告知收包方應該用什麼對象去反序列化

  • 此時最佳方案應該定義common字段去先反序列化一次

message Common {
string message_type = 1;
}
  • 在服務端先反序列化一次,用 common ,得到messageType後再進行處理,再反序列化一次
  socket.on('data', function (data) {
const res = Common.decode(data);
if (res.messageType=== '1') {
const payload = { messageType: '1', pong: '1' };
const errMsg = PingPong.verify(payload);
if (errMsg) throw Error(errMsg);
const message = PingPong.create(payload);
const buffer = PingPong.encode(message).finish();
socket.write(buffer);
} else if(res.messageType=== '2'){
const message = Data.decode(data)
const payload = message.data;
console.log(payload.key,'payload');
M.set(payload.key,payload.value);
console.log(M,'m')
}
});
  • 完成簡單的set方法

  • 定義 RedisGet 方法:
const M = new Map();
M.set('1','Peter醬牛逼')

function RedisGet() {
const msg = { messageType: '3', data: { key: '1' } };
const errMsg = Data.verify(msg);
if (errMsg) throw Error(errMsg);
const message = Data.create(msg);
const buffer = Data.encode(message).finish();
client.write(buffer);
}
  • 服務端對類型messageType爲'3'的進行處理

else if (res.messageType === '3') {
const message = Query.decode(data);
const res = M.get(message.key);
console.log(res, 'res');
}
  • 此時get方法完成,得到數據,再定義一個GetData傳輸下,先序列化再反序列化就完成了? 肯定不會這麼簡單

  • redis的set、get的非常高頻的操作,即便是緩存,不是存入數據庫,但還是有失敗風險,因爲我們是通過socket通訊,如果網絡抖動或者其他原因導致通訊失敗,這個數據沒有進入cache,那麼就有問題

set方法應該有cb(回調),get方法應該有返回值

  • 基於以上兩種需求,需要設計新的模式去完成這個set、get功能

  • 無論成功、失敗都能知道結果

真正的開始實現Redis

  • 首先確定通訊依然使用socket,長連接

  • 心跳保活需要

  • 需要引入發送隊列

  • set能觸發cb,get能返回數據(基於 promise | generator|async )
  • 基於pb協議傳輸

  • 有ACK回覆機制,這樣能確保cb調用

處理隊列

  • set和set的回調隊列

  • 我之前想set成功後,應該把數據在客戶端保護一份,這樣redis.get就可以直接拿到數據了,不需要通過socket,後面考慮到多個機器連接redis,應該保持數據一致性,此處應該有多種方法保證數據一致性..

const cbQueue = []; //回調隊列
const setQueue = []; //set的隊列
const getQueue = []; //get的隊列
  • 實現set隊列,觸發 cb ,改造redisSet
function RedisSet(data, cb) {
cbQueue.push(cb);
setQueue.push(data);
console.log(cbQueue, setQueue, "queue");
const errMsg = Data.verify(data);
if (errMsg) throw Error(errMsg);
const message = Data.create(data);
const buffer = Data.encode(message).finish();
client.write(buffer);
}

  • 服務端收到set後,在Map中追加數據,用socket寫入通入客戶端

else if (res.messageType === '2') {
const message = Data.decode(data);
const payload = message.data;
M.set(payload.key, payload.value);
}

M.set後,使用socket通知客戶端緩存寫入成功

  • 首先定義pb字段,我們使用message_type = "5"來通知

message setSucceed {
string message_type = 1;
}
const msg = { messageType: "5" };
const errMsg = setSucceed.verify(msg);
if (errMsg) throw Error(errMsg);
const m = setSucceed.create(msg);
const buffer = setSucceed.encode(m).finish();
socket.write(buffer);
  • 前端觸發set隊列的cb,並且消費這個隊列

  RedisSet(data, () => {
console.log("set成功,觸發cb");
});

else if (decodedMessage.messageType === "5") {
const cb = cbQueue.shift();
cb && cb();
}
  • 結果,符合預期

但是這個操作,是有BUG的

  • 因爲socket寫入都是異步,等返回的時候,那麼就有可能亂序,這裏需要加入ACK回覆機制

  • 在客戶端set的時候,生成一個UUID,將這個UUID帶着給服務端,服務端的Map數據存儲完成後,就可以將這個UUID帶着回來給客戶端(相當於ACK機制)

  • 客戶端接受到 ACK ,觸發cbQueue中的cb(此時將cbQueue數組類型改成Map,方便處理),觸發完成後remove掉 cb 即可
  • 加入UUID

yarn add node-uuid
const uuid = require('node-uuid');

// v1 根據時間戳和隨機數生成的uuid
const creatuuid= uuid.v1()
  • 修改Data的pb文件,增加uuid字段

message Data {
string message_type = 1; // 會變成 messageType
string uuid = 2;
Payload data = 3;
}
  • 修改set方法,每次set用UUID生成key,value爲cb,存儲在Map中

function RedisSet(data, cb) {
// v1 根據時間戳和隨機數生成的uuid
const creatuuid = uuid.v1();
data.uuid = creatuuid;
cbQueue.set(creatuuid, cb);
const errMsg = Data.verify(data);
if (errMsg) throw Error(errMsg);
const message = Data.create(data);
const buffer = Data.encode(message).finish();
client.write(buffer);
}
  • 服務端修改,返回ACK字段,通知客戶端消費cb

else if (res.messageType === '2') {
const message = Data.decode(data);
const payload = message.data;
M.set(payload.key, payload.value);
const msg = { messageType: '5', uuid: message.uuid };
const errMsg = setSucceed.verify(msg);
if (errMsg) throw Error(errMsg);
const m = setSucceed.create(msg);
const buffer = setSucceed.encode(m).finish();
socket.write(buffer);
}
  • 客戶端收到set成功的ACK,根據UUId,消費 cb
 else if (decodedMessage.messageType === '5') {
const res = setSucceed.decode(data);
const cb = cbQueue.get(res.uuid);
cb && cb() && cbQueue.remove(res.uuid);
}

這樣我們set觸發cb已經完成,剩下get得到返回值

  • 其實這個get,也要推敲一下,我當初想粗糙點,直接把所有數據同步到客戶端,然後客戶端根據setQueue & cbQueue去追加數據,後面覺得很不優雅,因爲redis還有集羣,數據同步,預熱,兩種不同數據持久化等等

  • 此處可以通過curl、http請求等形式拿到,因爲我沒看過redis源碼,不清楚怎麼實現的

  • 但是基於Node.js的redis使用,是直接通過redis.get(),傳入回調函數後得到一個數據,沒有使用promise和await(我記得是這樣)

定義get的pb字段

  • 定義Query

message Query {
string message_type = 1;
string key = 2;
string uuid =3;
}
  • 定義get方法

get = function (key, cb) {
// v1 根據時間戳和隨機數生成的uuid
const creatuuid = uuid.v1();
getCbQueue.set(creatuuid, cb);
const msg = { messageType: '6', key, uuid: creatuuid };
const errMsg = Query.verify(msg);
if (errMsg) throw Error(errMsg);
const message = Query.create(msg);
const buffer = Query.encode(message).finish();
TCPClient.write(buffer);
};
  • 首先發送messageType爲6的包給服務端,服務端對6的type做處理

else if (res.messageType === "6") {
const message = Query.decode(data);
const res = M.get(message.key);
const msg = { messageType: "6", uuid: message.uuid, data: res };
const errMsg = getSucceed.verify(msg);
if (errMsg) throw Error(errMsg);
const m = getSucceed.create(msg);
const buffer = getSucceed.encode(m).finish();
socket.write(buffer);
}

  • 如果是6,代表是客戶端的get操作,我們先去Map中查詢,然後返回通知給客戶端,type還是6

  • 客戶端接受到6的msgtype後,通過拿到的data和uuid,調用getCbQueue中的對應回掉,並且delete掉

else if (decodedMessage.messageType === '6') {
const res = getSucceed.decode(data);
const cb = getCbQueue.get(res.uuid);
cb && cb(res.data);
getCbQueue.delete(res.uuid);
}

很多人想看我真實的代碼,我貼出來我優化後的代碼吧,我覺得真的很整潔.

  • 通過類實現redis,靜態方法定義

  • 如何使用我的Redis?

const Redis = require('./redis');
const port = 6380;
const host = '127.0.0.1';
const RedisStore = Redis.connect(port, host);

const data = { messageType: '2', data: { key: '1', value: '2' } };

RedisStore.set(data, () => {
console.log('set成功,觸發cb');
});

RedisStore.get('1', (data) => {
console.log('get成功data:', data);
});


  • 達到預期

還缺守護進程、數據持久化

  • 守護進程,我之前寫過 cluster 源碼解析,用pm2 docker誰都會,但是真的要自己實現,還是要思考一下
  • 有興趣學習的,可以看我之前的解析Cluster源碼、PM2原理文章 https://segmentfault.com/a/1190000021230376
  • PM2這個輪子造起來,可能比redis不相上下,以後有機會可以寫一個,我們今天直接用PM2啓動即可達到守護進程效果

pm2 start server.js

實現redis數據持久化

  • redis數據持久化兩種方式

    • RDB:指定的時間間隔內保存數據快照

    • AOF:先把命令追加到操作日誌的尾部,保存所有的歷史操作

  • 這裏持久化,其實有點麻煩,redis的key數據類型非常豐富

  • redis數據持久化用來做什麼?

    • redis數據存儲在內存中,如果服務器重啓或者redis掛了/重啓,如果不做數據持久化,那麼數據就丟了

先是實現AOF,追加到日誌尾部

  • 在服務端接受到redis.set的時候進行日誌追加

 M.set(payload.key, payload.value);
fs.appendFile(
'./redis.db',
`${payload.key},${payload.value}\n`,
(error) => {
if (error) return console.log('追加文件失敗' + error.message);
console.log('追加成功');
}
);
  • 結果

  • 這樣寫是有問題的,到時候取值的時候不好取,這裏可以用到我之前手寫富文本編輯器的原理,用零寬字符佔位,然後讀取數據時候再替換分割~

什麼是零寬度字符

  • 一種不可打印的Unicode字符, 在瀏覽器等環境不可見, 但是真是存在, 獲取字符串長度時也會佔位置, 表示某一種控制功能的字符.

  • 常見的零寬字符有哪些

  • 零寬空格(zero-width space, ZWSP)用於可能需要換行處。

    Unicode: U+200B  HTML: &#8203;
  • 零寬不連字 (zero-width non-joiner,ZWNJ)放在電子文本的兩個字符之間,抑制本來會發生的連字,而是以這兩個字符原本的字形來繪製。

    Unicode: U+200C  HTML: &#8204;
  • 零寬連字(zero-width joiner,ZWJ)是一個控制字符,放在某些需要複雜排版語言(如阿拉伯語、印地語)的兩個字符之間,使得這兩個本不會發生連字的字符產生了連字效果。

    Unicode: U+200D  HTML: &#8205;
  • 左至右符號(Left-to-right mark,LRM)是一種控制字符,用於計算機的雙向文稿排版中。

    Unicode: U+200E  HTML: ‎ &#x200E; 或 
  • 右至左符號(Right-to-left mark,RLM)是一種控制字符,用於計算機的雙向文稿排版中。

    Unicode: U+200F  HTML: ‏ &#x200F; 或 
  • 字節順序標記(byte-order mark,BOM)常被用來當做標示文件是以UTF-8、UTF-16或UTF-32編碼的標記。

    Unicode: U+FEFF
  • 零寬度字符在JavaScript的應用

  • 數據防爬

  • 將零寬度字符插入文本中,干擾關鍵字匹配。爬蟲得到的帶有零寬度字符的數據會影響他們的分析,但不會影響用戶的閱讀數據。

  • 信息傳遞

  • 將自定義組合的零寬度字符插入文本中,用戶複製後會攜帶不可見信息,達到傳遞作用。

使用零寬字符

  • 我喜歡用 \u200b\ ,因爲它夠2b
 `${payload.key},${payload.value}\u200b\n`,
  • 插入持久化效果

數據預熱

  • 在服務器監聽端口事件中進行數據預熱,讀取磁盤數據到內存中

//服務器監聽事件
server.on('listening', function () {
fs.readFile('./redis.db', (err, data) => {
console.log(data.toString(), 'xxx');
});
console.log('server listening:' + server.address().port);
});

  • 結果 符合預期

  • 上面這樣寫,其實有問題,爲了更好的分割提取磁盤冷數據,我換了下分割的零寬字符

 `${payload.key}-${payload.value}\u200b`,
  • 插入後的數據變成了這樣

  • 讀取數據算法,也是要思考下

//服務器監聽事件
server.on('listening', function () {
fs.readFile('./redis.db', (err, data) => {
const string = data.toString();
if (string.length > 0) {
const result = string.split('\u200b');
for (let i = 0; i < result.length; i++) {
const res = result[i];
for (let j = 0; j < res.length; j++) {
if (res[j] === '-') {
continue;
}
j === 0 ? M.set(res[j], null) : M.set(res[j - 2], res[j]);
}
}
}
});
console.log('server listening:' + server.address().port);
});
  • 最終效果,符合預期

  • 在redis出錯的時候,將數據刷入磁盤中以及定期持久化數據,如果要實現,也可以類似的思路,當然這並不是redis的真正實現,只是一個模擬.

往期精彩原創推薦閱讀

如果感覺寫得不錯,關注下微信公衆號 [ 前端巔峯 ]

  • 我是Peter,架構設計過20萬人端到端加密超級羣功能的桌面IM軟件,我的微信: CALASFxiaotan
  • 另外歡迎收藏我的資料網站:前端生活社區: https://qianduan.life ,感覺對你有幫助,可以右下角點個在看,關注一波公衆號:[ 前端巔峯 ]
相關文章