摘要 @Override。">| magic (2bytes) | version (1byte) | type (1byte) | reserved (7bits) | 。

# 需求與設計

## 需求分析

RPC 全稱 Remote Procedure Call ,簡單地來說,它能讓使用者像調用本地方法一樣,調用遠程的接口,而不需要關注底層的具體細節。

例如車輛違章代辦功能,如果車輛因爲某種原因違章,只需要通過這個違章代辦功能(它也許是個APP),我們就能動動手指,而省去了一些跑腿的工作。

不像微服務背景下大家所說的 RPC 框架,如 Dubbo 之類。 這個 RPC 框架不提供過多的關於服務註冊、服務發現、服務管理等功能。 它針對的是這樣的一些場景: 在內部網絡,或者局域網內,兩個屬於同個業務的系統之間需要通信,而我們又覺得去設計多一種二進制網絡協議過於繁瑣並且沒有必要,這時候如果給客戶端開發者一些明確的接口,讓他知道實現什麼功能該調用什麼接口,那麼省去的工作量以及開發效率上的提升不言而喻。

這個 RPC 系統基於 Java 語言實現,需求如下:

  • RPC 服務端可以通過一條長連接發佈多個接口(Interface),客戶端按需生成對應接口的代理。

  • RPC 客戶端也可以發佈接口,以便在必要的時候,服務端可以主動調用客戶端的接口實現

  • 客戶端與服務端之間保持長連接並且維持心跳

  • 服務端針對不同的接口實現,可以指定不同的線程池去處理

  • 序列化協議支持擴展

  • 通信協議與具體編程語言無關

  • 支持併發調用,一個RPC客戶端實例要求是線程安全的

## 通信協議設計

高效的通信協議一般是二進制格式的,比較常見的還有文本協議比如說HTTP,爲了追求效率,這個 RPC 框架就採用二進制格式。

### 協議的基本要素

#### 魔數

要了解到,報文是在網絡上傳輸的,安全性比較低,因此有必要採取一些措施使得並不是任何人都可以隨隨便便往我們的端口上發東西,因此我們對報文要有一個初步的識別功能,這時候“魔數(magic number)”就派上用場了。 魔數並不受任何規範約束,沒有人可以要求你的魔數應該遵循什麼規範,實際上魔數只是我們通信雙方都約定的一個“暗號”,不知道這個暗號的人就無法參與進通信中。 例如 Java 源文件編譯後的 class 文件開頭就有一個魔數:

0xCAFEBABE,隨隨便便打開一個class文件用十六進制編輯器查看,就能看到。

Java 虛擬機加載 class 的時候會先驗證魔數。如果不是 CAFEBABE 就認爲是不合法的 class 文件,並拒絕加載。

不過魔數起到的安全防範作用是非常有限的,“有心人”可以通過抓取網絡包就識別出魔數了。 因此魔數這個東西其實是“防君子不防小人”。

#### 協議版本

一個協議可能也會有多個版本,例如說 HTTP1.0 和 HTTP1.1,不同版本的協議元素可能發生了改變,解析方式也會發生改變,因此協議設計這一塊,需要預留出地方聲明協議的版本,通信雙方在解析協議或者拼裝協議的時候纔有跡可循。

#### 報文類型

對於RPC框架來說,報文可能有多種類型:心跳類型報文、認證類型報文、請求類型報文、響應類型報文等。

#### 上下文 ID

RPC 調用其實是一個“請求-響應”的過程,並且跨物理機器,因此每次請求和響應,都必須帶上上下文 ID,通信雙方纔能把請求和響應對應起來。

#### 狀態

狀態用來標識一次調用時正常結束還是異常結束,通常由被調用方置狀態。

#### 請求數據

即發送到服務端的調用請求,通常是序列化後的二進制流,長度不定。

#### 長度編碼字段

收報文的一方怎麼知道發報文的那一方發了多少字節呢?因此發送方必須在協議裏告訴接收方需要接受多少字節纔算一個完整的報文。

#### 保留字段

協議一旦被設計,並非一成不變的,日後可能有變動的可能,因此還需要考慮保留一些字節空間作爲保留字段,以備日後協議的擴展。

### 協議設計

結合以上的一些設計原則,具體協議設計如下:

------------------------------------------------------------------------

| magic (2bytes) | version (1byte) | type (1byte) | reserved (7bits) |

------------------------------------------------------------------------

| status (1byte) | id (8bytes) | body length (4bytes) |

------------------------------------------------------------------------

| |

| body ($body_length bytes) |

| |

------------------------------------------------------------------------

## 鏈路可靠性

客戶端與服務端之間的連接採用 TCP 長連接,一個客戶端與服務端之間保持至少一條長連接。 接口調用請求的發送,在多條連接之間進行負載均衡。

每條連接在空閒的時候,由客戶端主動向服務端發送心跳報文,並且客戶端在發現連接失效或斷開的時候,自動進行重連。

每個客戶端向服務端建立連接後,在正式發起接口調用請求之前,都需要進行check in 操作, check in 操作主要是將客戶端的身份標識(identifier)和客戶端的心跳間隔告訴服務端。 利用 netty 的 handler 責任鏈機制和自帶的 IdleStateHandler,自動檢測出連接是否空閒,並在空閒時觸發心跳報文的發送。 而服務端在客戶端 checkin 後,根據客戶端的心跳頻率,在自己的 handler pipeline 上動態加入一個 IdleStateHandler,來檢測出客戶端是否已經失聯,如果是,則主動關閉連接。

同時,客戶端本地將會起一個定時執行任務的線程,定期檢查連接是否失效,如果失效,則關閉舊連接,並進行連接的重建。

# 基於 netty 的協議編解碼

## 概述

在上文 需求與設計 中已經給出了協議的具體細節,協議類型爲二進制協議,如下:

<span style="font-size: 17px;font-family: Optima-Regular, PingFangTC-light;">------------------------------------------------------------------------</span>

<span style="font-size: 17px;font-family: Optima-Regular, PingFangTC-light;">| magic (2bytes) | version (1byte) | type (1byte) | reserved (7bits) | </span>

<span style="font-size: 17px;font-family: Optima-Regular, PingFangTC-light;"> ------------------------------------------------------------------------</span>

<span style="font-size: 17px;font-family: Optima-Regular, PingFangTC-light;"> | status (1byte) | id (8bytes) | body length (4bytes) |</span>

<span style="font-size: 17px;font-family: Optima-Regular, PingFangTC-light;"> ------------------------------------------------------------------------</span>

<span style="font-size: 17px;font-family: Optima-Regular, PingFangTC-light;"> | |</span>

<span style="font-size: 17px;font-family: Optima-Regular, PingFangTC-light;"> | body ($body_length bytes) |</span>

<span style="font-size: 17px;font-family: Optima-Regular, PingFangTC-light;"> | |</span>

<span style="font-size: 17px;font-family: Optima-Regular, PingFangTC-light;"> ------------------------------------------------------------------------</span>

協議的解碼我們稱爲 decode,編碼我們成爲 encode,下文我們將直接使用 decode 和 encode 術語。

decode 的本質就是講接收到的一串二進制報文,轉化爲具體的消息對象,在 Java 中,就是將這串二進制報文所包含的信息,用某種類型的對象存儲起來。

encode 則是將存儲了信息的對象,轉化爲具有相同含義的一串二進制報文,然後網絡收發模塊再將報文發出去。

無論是 rpc 客戶端還是服務端,都需要有一個 decode 和 encode 的邏輯。

## 消息類型

rpc 客戶端與服務端之間的通信,需要通過發送不同類型的消息來實現,例如: client 向 server 端發送的消息,可能是請求消息,可能是心跳消息,可能是認證消息,而 server 向 client 發送的消息,一般就是響應消息。

利用 Java 中的枚舉類型,可以將消息類型進行如下定義:

/**

* 消息類型

*

* @author beanlam

* @version 1.0

*/


public enum MessageType {


REQUEST((byte) 0x01), HEARTBEAT((byte) 0x02), CHECKIN((byte) 0x03), RESPONSE(

(byte) 0x04), UNKNOWN((byte) 0xFF);


private byte code;


MessageType(byte code) {

this.code = code;

}


public static MessageType valueOf(byte code) {

for (MessageType instance : values()) {

if (instance.code == code) {

return instance;

}

}

return UNKNOWN;

}


public byte getCode() {

return code;

}

}

在這個類中設計了 valueOf 方法,方便進行具體的 byte 字節與具體的消息枚舉類型之間的映射和轉換。

## 調用狀態設計

client 主動發起的一次 rpc 調用,要麼成功,要麼失敗,server 端有責任告知 client 此次調用的結果,client 也有責任去感知調用失敗的原因,因爲不一定是 server 端造成的失敗,可能是因爲 client 端在對消息進行預處理的時候,例如序列化,就已經出錯了,這種錯誤也應該作爲一次調用的調用結果返回給 client 調用者。 因此引入一個調用狀態,與消息類型一樣,它也藉助了 Java 語言裏的枚舉類型來實現,並實現了方便的 valueOf 方法:

/**


* 調用狀態


*


* @author beanlam


* @version 1.0


*/


public enum InvocationStatus {


OK((byte) 0x01), CLIENT_TIMEOUT((byte) 0x02), SERVER_TIMEOUT(


(byte) 0x03), BAD_REQUEST((byte) 0x04), BAD_RESPONSE(


(byte) 0x05), SERVICE_NOT_FOUND((byte) 0x06), SERVER_SERIALIZATION_ERROR(


(byte) 0x07), CLIENT_SERIALIZATION_ERROR((byte) 0x08), CLIENT_CANCELED(


(byte) 0x09), SERVER_BUSY((byte) 0x0A), CLIENT_BUSY(


(byte) 0x0B), SERIALIZATION_ERROR((byte) 0x0C), INTERNAL_ERROR(


(byte) 0x0D), SERVER_METHOD_INVOKE_ERROR((byte) 0x0E), UNKNOWN((byte) 0xFF);


private byte code;


InvocationStatus(byte code) {


this.code = code;


}


public static InvocationStatus valueOf(byte code) {


for (InvocationStatus instance : values()) {


if (code == instance.code) {


return instance;


}


}


return UNKNOWN;


}


public byte getCode() {


return code;


}


}

## 消息實體設計

我們將 client 往 server 端發送的統一稱爲 rpc 請求消息,一個請求對應着一個響應,因此在 client 和 server 端間流動的信息大體上其實就只有兩種,即要麼是請求,要麼是響應。 我們將會定義兩個類,分別是 RpcRequest 和 RpcResponse 來代表請求消息和響應消息。

另外由於無論是請求消息還是響應消息,它們都有一些共同的屬性,例如說“調用上下文ID”,或者消息類型。 因此會再定義一個 RpcMessage 類,作爲父類。

### RpcMessage

/**


* rpc消息


*


* @author beanlam


* @version 1.0


*/


public class RpcMessage {


private MessageType type;


private long contextId;


private Object data;


public long getContextId() {


return this.contextId;


}


public void setContextId(long id) {


this.contextId = id;


}


public Object getData() {


return this.data;


}


public void setData(Object data) {


this.data = data;


}


public void setType(byte code) {


this.type = MessageType.valueOf(code);


}


public MessageType getType() {


return this.type;


}


public void setType(MessageType type) {


this.type = type;


}


@Override


public String toString() {


return "[messageType=" + type.name() + ", contextId=" + contextId + ", data="


+ data + "]";


}


}

### RpcRequest

import java.util.concurrent.atomic.AtomicLong;


/**


* rpc請求消息


*


* @author beanlam


* @version 1.0


*/


public class RpcRequest extends RpcMessage {


private static final AtomicLong ID_GENERATOR = new AtomicLong(0);


public RpcRequest() {


this(ID_GENERATOR.incrementAndGet());


}


public RpcRequest(long contextId) {


setContextId(contextId);


setType(MessageType.REQUEST);


}


}

### RpcResponse

<span>/**</span>

<span><br /></span>

<span> *</span>

<span><br /></span>

<span> * rpc響應消息</span>

<span><br /></span>

<span> *</span>

<span><br /></span>

<span> * @author beanlam</span>

<span><br /></span>

<span> * @version 1.0</span>

<span><br /></span>

<span> */</span>

<span><br /></span>

<span>public class RpcResponse extends RpcMessage {</span>

<span><br /></span>

<span> private InvocationStatus status = InvocationStatus.OK;</span>

<span><br /></span>

<span> public RpcResponse(long contextId) {</span>

<span><br /></span>

<span> setContextId(contextId);</span>

<span><br /></span>

<span> setType(MessageType.RESPONSE);</span>

<span><br /></span>

<span> }</span>

<span><br /></span>

<span> public InvocationStatus getStatus() {</span>

<span><br /></span>

<span> return this.status;</span>

<span><br /></span>

<span> }</span>

<span><br /></span>

<span> public void setStatus(InvocationStatus status) {</span>

<span><br /></span>

<span> this.status = status;</span>

<span><br /></span>

<span> }</span>

<span><br /></span>

<span> @Override</span>

<span><br /></span>

<span> public String toString() {</span>

<span><br /></span>

<span> return &quot;RpcResponse[contextId=&quot; + getContextId() + &quot;, status=&quot; + status.name() + &quot;]&quot;;</span>

<span><br /></span>

<span> }</span>

<span><br /></span>

<span>}</span>

## netty 編解碼介紹

netty 是一個 NIO 框架,應該這麼說,netty 是一個有良好設計思想的 NIO 框架。 一個 NIO 框架必備的要素就是 reactor 線程模型,目前有一些比較優秀而且開源的小型 NIO 框架,例如分庫分表中間件 mycat 實現的一個簡易 NIO 框架。

netty 的主要特點有: 微內核設計、責任鏈模式的業務邏輯處理、內存和資源泄露的檢測等。 其中編解碼在 netty 中,都被設計成責任鏈上的一個一個 Handler。

decode 對於 netty 來說,它提供了 ByteToMessageDecoder,它也提供了 MessageToByteEncoder。

藉助 netty 來實現協議編解碼,實際上就是去在這兩個handler裏面實現編解碼的邏輯。

## decode

在實現 decode 邏輯時需要注意的一個問題是,由於二進制報文是在網絡上發送的,因此一個完整的報文可能經過多個分組來發送的,什麼意思呢,就是當有報文進來後,要確認報文是否完整,decode邏輯代碼不能假設收到的報文就是一個完整報文,一般稱這爲“TCP半包問題”。 同樣,報文是連着報文發送的,意味着decode代碼邏輯還要負責在一長串二進制序列中,分割出一個一個獨立的報文,這稱之爲“TCP粘包問題”。

netty 本身有提供一些方便的 decoder handler 來處理 TCP 半包和粘包的問題。 不過一般情況下我們不會直接去用它,因爲我們的協議比較簡單,自己在代碼裏處理一下就可以了。

完整的 decode 代碼邏輯如下所示:

import cn.com.agree.ats.rpc.message.*;


import cn.com.agree.ats.util.logfacade.AbstractPuppetLoggerFactory;


import cn.com.agree.ats.util.logfacade.IPuppetLogger;


import io.netty.buffer.ByteBuf;


import io.netty.channel.ChannelHandlerContext;


import io.netty.handler.codec.ByteToMessageDecoder;


import java.util.List;


/**


* 協議解碼器


*


* @author beanlam


* @version 1.0


*/


public class ProtocolDecoder extends ByteToMessageDecoder {


private static final IPuppetLogger logger = AbstractPuppetLoggerFactory


.getInstance(ProtocolDecoder.class);


private boolean magicChecked = false;


@Override


protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> list)


throws Exception {


if (!magicChecked) {


if (in.readableBytes() < ProtocolMetaData.MAGIC_LENGTH_IN_BYTES) {


return;


}


magicChecked = true;


if (!(in.getShort(in.readerIndex()) == ProtocolMetaData.MAGIC)) {


logger.warn(


"illegal data received without correct magic number, channel will be close");


ctx.close();


magicChecked = false; //this line of code makes no any sense, but it's good for a warning


return;


}


}


if (in.readableBytes() < ProtocolMetaData.HEADER_LENGTH_IN_BYTES) {


return;


}


int bodyLength = in


.getInt(in.readerIndex() + ProtocolMetaData.BODY_LENGTH_OFFSET);


if (in.readableBytes() < bodyLength + ProtocolMetaData.HEADER_LENGTH_IN_BYTES) {


return;


}


magicChecked = false;// so far the whole packet was received


in.readShort(); // skip the magic


in.readByte(); // dont care about the protocol version so far


byte type = in.readByte();


byte status = in.readByte();


long contextId = in.readLong();


byte[] body = new byte[in.readInt()];


in.readBytes(body);


RpcMessage message = null;


MessageType messageType = MessageType.valueOf(type);


if (messageType == MessageType.RESPONSE) {


message = new RpcResponse(contextId);


((RpcResponse) message).setStatus(InvocationStatus.valueOf(status));


} else {


message = new RpcRequest(contextId);


}


message.setType(messageType);


message.setData(body);


list.add(message);


}


}

可以看到,我們解決半包問題的時候,是判斷有沒有收到我們期望收到的報文,如果沒有,直接在 decode 方法裏面 return,等有更多的報文被收到的時候,netty 會自動幫我們調起 decode 方法。 而我們解決粘包問題的思路也很清晰,那就是一次只處理一個報文,不去動後面的報文內容。

還需要注意的是,在 netty 中,對於 ByteBuf 的 get 是不會消費掉報文的,而 read 是會消費掉報文的。 當不確定報文是否收完整的時候,我們都是用 get開頭的方法去試探性地驗證報文是否接收完全,當確定報文接收完全後,我們才用 read 開頭的方法去消費這段報文。

## encode

直接貼代碼,參考前文提到的協議格式閱讀以下代碼:

/**


*


* 協議編碼器


*


* @author beanlam


* @version 1.0


*/


public class ProtocolEncoder extends MessageToByteEncoder<RpcMessage> {


@Override


protected void encode(ChannelHandlerContext ctx, RpcMessage rpcMessage, ByteBuf out)


throws Exception {


byte status;


byte[] data = (byte[]) rpcMessage.getData();


if (rpcMessage instanceof RpcRequest) {


RpcRequest request = (RpcRequest) rpcMessage;


status = InvocationStatus.OK.getCode();


} else {


RpcResponse response = (RpcResponse) rpcMessage;


status = response.getStatus().getCode();


}


out.writeShort(ProtocolMetaData.MAGIC);


out.writeByte(ProtocolMetaData.VERSION);


out.writeByte(rpcMessage.getType().getCode());


out.writeByte(status);


out.writeLong(rpcMessage.getContextId());


out.writeInt(data.length);


out.writeBytes(data);


}


}

# 序列化機制

## 概述

在上文 基於 netty 的協議編解碼 中談到對於協議的 decode 和 encode,在談 decode 之前,必須先要知道 encode 的過程是什麼,它把什麼東西轉化成了二進制協議。

由於我們還未談到具體的 RPC 調用機制,因此暫且認爲 encode 就是把一個包含了調用信息的 Java 對象,從 client 經過序列化,變成一串二進制流,發送到了 server 端。

這裏需要明確的是,encode 的職責是拼協議,它不負責序列化,同樣,decode 只是把整個二進制報文分割,哪部分是報文頭,哪部分是報文體,誠然,報文體就是被序列化成二進制流的一個 Java 對象。

對於調用方來說,先將調用信息封裝成一個 Java 對象,經過序列化後形成二進制流,再經過 encode 階段,拼接成一個完整的遵守我們定好的協議的報文。

對於被調用方來說,則是收取完整的報文,在 decode 階段將報文中的報文頭,報文體分割出來,在序列化階段將報文體反序列化爲一個 Java 對象,從而獲得調用信息。

本文探討序列化機制。

## 基於 netty handler

由於這個 RPC 框架基於 netty 實現,因此序列化機制其實體現在了 netty 的 pipeline 上的 handler 上。

例如對於調用方,它需要在 pipeline 上加上一個 序列化 encode handler,用來序列化發出去的請求,同時需要加上一個反序列化的 decode handler, 以便反序列化調用結果。 如下所示:

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new ProtocolEncoder())

.addLast(new ProtocolDecoder())

.addLast(new SerializationHandler(serialization))

.addLast(new DeserializationHandler(serialization));


}

其中的 SerializationHandler 和 DeserializationHandler 就是上文提到的序列化 encode handler 和反序列化 decode handler。

同樣,對於被調用方來說,它也需要這兩個handler,與調用方的 handler 編排順序一致。

其中,serialization 這個參數的對象代表具體的序列化機制策略。

## 序列化機制

上文中,SerializationHandler 和 DeserializationHandler 這兩個對象都需要一個 serialization 對象作爲參數,它是這麼定義的:

private ISerialization serialization = SerializationFactory.getSerialization(ServerDefaults.DEFAULT_SERIALIZATION_TYPE);

採用工廠模式來創建具體的序列化機制:

/**

* 序列化工廠

*

* @author beanlam

* @version 1.0

*/

public class SerializationFactory {


private SerializationFactory() {

}


public static ISerialization getSerialization(SerializationType type) {

if (type == SerializationType.JDK) {

return new JdkSerialization();

}

return new HessianSerialization();

}

}

這裏暫時只支持 JDK 原生序列化 和 基於 Hessian 的序列化機制,日後若有其他效率更高更適合的序列化機制,則可以在工廠類中進行添加。

這裏的 hessian 序列化是從 dubbo 中剝離出來的一塊代碼,感興趣可以從 dubbo 的源碼中的 com.caucho.hessian 包中獲得。

以 HessianSerialization 爲例:

/**

* @author beanlam

* @version 1.0

*/

public class HessianSerialization implements ISerialization {


private ISerializer serializer = new HessianSerializer();

private IDeserializer deserializer = new HessianDeserializer();


@Override

public ISerializer getSerializer() {

return serializer;

}


@Override

public IDeserializer getDeserializer() {

return deserializer;

}


@Override

public boolean accept(Class<?> clazz) {

return Serializable.class.isAssignableFrom(clazz);

}

}

根據 Hessian 的 API, 分別返回一個 hessian 的序列化器和反序列化器即可。

相關文章