如何開發一個自己的 RPC 框架 (一)#
2023 年 12 月 11 日 12:00 AM
寫在前面:本文參考了開源項目 https://github.com/MIracleCczs/simple-rpc,其中大部分代碼參考了該項目,本章主要從客戶端的調用出發,講講一個 RPC 框架的實現在客戶端測需要實現那些功能
RPC 的定義#
可以參考 wiki: https://zh.wikipedia.org/wiki/ 遠程過程調用
一個簡單的 RPC 框架是如何組成的?#
一個基礎的 RPC 框架,需要包含三大部分:1. 註冊中心 2. 服務提供方 3. 服務消費方
從上圖可以看出,服務提供方和消費方都需要和註冊中心通信
一個遠程方法的調用是如何實現的?#
下面,我們將根據上面流程圖,一步步進行講解。為了方便更加清楚的講清整個邏輯,我們從實際的業務需求出發。
需求:
存在服務提供方 Producer (後面統稱服務端),提供方法 get
存在服務消費方 Consumer (後面統稱客戶端), 需要調用 Producer 中的 get
方法
基礎接口定義#
定義 UserService
接口,接口內包含 get
方法
public interface UserService {
String get(String username);
}
客戶端發起服務調用#
客戶端註解定義#
客戶端如何才能夠像調用本地方法一樣調用遠程服務呢?RPC 框架就是用來解決這個問題。我們一般本地方法的調用都是採用
@Autowired
private UserService userService;
通過 Spring
依賴注入的方式,將需要用到的方法注入到調用對象中,那麼我們 RPC 調用能不能也採用這種形式呢?答案當然是可以的。那么為了實現上面的需求,我們最簡單的辦法就是自定義一個註解 RpcClient
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface RpcClient {
}
註解定義完成後,我們就應該考慮註解中需要設置那些屬性呢?
那麼冒出來的第一個問題就是:客戶端如何知道是調用的那個遠程服務呢?這時我們就需要設置第一個屬性 remoteAppKey
服務的唯一標識,通過 remoteAppKey
客戶端可以輕鬆地在註冊中心找到目標服務。
這個時候又會有第二個疑問,如果一個服務多個版本如何處理呢?比如進行灰度升級等操作的時,那麼這個時候就需要第二個參數 groupName
找到具體服務下的具體分組
剩餘的參數就比較簡單了,完成的參數配置如下:
/**
* 服務接口:匹配從註冊中心獲取到本地的服務提供者,得到服務提供者列表,再根據負載均衡策略選取一個發起服務調用
*/
Class<?> targetItf() default Object.class;
/**
* 超時時間:服務調用超時時間
*/
long timeout() default 3000 * 10L;
/**
* 調用者線程數
*/
int consumeThreads() default 10;
/**
* 服務提供者唯一標識
*/
String remoteAppKey() default "";
/**
* 服務分組組名
*/
String groupName() default "default";
客戶端初始化#
為了實現類似 @Autowired
的功能,框架需要在 Bean
初始化之時,將所有被 RpcClient
註解的對象進行依賴注入,那麼如何實現這個功能呢? Spring
的 InstantiationAwareBeanPostProcessor
接口,可以在 Bean 的實例化的各個階段執行自定義邏輯。定義一個 ConsumerAnnotaionBean
方法,實現 InstantiationAwareBeanPostProcessor
接口。
public class ConsumerAnnotaionBean implements InstantiationAwareBeanPostProcessor {
...其他方法省略
@Override
public PropertyValues postProcessProperties(PropertyValues pvs, Object bean, String beanName) throws BeansException {
}
}
主要實現接口的 postProcessProperties
方法,設置 Bean 對象的屬性值
進行具體代碼編寫之前,我們需要先理清楚這裡需要實現那些目的:
- 客戶端服務註冊 (監控目的)
- 對象依賴的注入
- 查詢服務節點,預創建 Netty 連接
好了,理清楚完需求之後,我們便開始對應的邏輯編寫
@Override
public PropertyValues postProcessProperties(PropertyValues pvs, Object bean, String beanName) throws BeansException {
Class<?> beanClass = bean.getClass();
// 依賴注入的服務
Field[] fields = beanClass.getDeclaredFields();
for (Field field : fields) {
RpcClient rpcClient = field.getAnnotation(RpcClient.class);
if (rpcClient == null) {
continue;
}
}
}
根據初始化的 bean
獲取對象中的所有參數,然後使用 getAnnotation(RpcClient.class);
判斷參數是否被 RpcClient
所註解
客戶端服務註冊#
這一步的目的是為了在註冊中心中記錄消費者信息,方便後續監控,所以這一步相對來說非常簡單,只需要構造客戶端信息然後提交到註冊中心即可。
// 構造消費者對象
Consumer consumer = Consumer
.builder()
.groupName(rpcClient.groupName())
.remoteAppKey(rpcClient.remoteAppKey())
.targetItf(targetItf)
.build();
// 註冊中心註冊消費者
registerCenter.registerConsumer(consumer);
對象依賴的注入#
同樣的,我們先梳理一下這裡的需求。我們需要實現一個動態代理,在方法調用時,根據方法調用名 + 類名獲取遠程服務提供方的節點信息,然後構造一個 NettyRequest
信息,發送到服務方,最後需要接收服務放返回的 NettyResponse
解析成方法的返回值進行返回
好了,我們已經理清楚了上面整體流程,那麼就開始具體的代碼編寫吧
首先,定義一個對象 ClientProxyBeanFactory
實現 InvocationHandler
接口
主要是實現接口的 invoke
方法
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {}
invoke
方法中,需要根據類名 + 方法名從註冊中心中獲取可用的節點,那麼具體代理的類這個時候就需要從對象實例化中傳入,所以我們在定義 ClientProxyBeanFactory
時,需要定義幾個成員變量
@Slf4j
public class ClientProxyBeanFactory implements InvocationHandler {
// 調用連接池(Netty 請求)
private ExecutorService executorService;
// 目標代理類
private Class<?> targetItf;
// 超時
private long timeout;
// 調用線程數
private int consumeThreads;
}
對象初始化的時候,需要設置成員變量的值
private static volatile ClientProxyBeanFactory instance;
public ClientProxyBeanFactory(Class<?> targetItf, long timeout, int consumeThreads) {
this.executorService = new ThreadPoolExecutor(consumeThreads, consumeThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new ThreadFactoryBuilder()
.setNameFormat("simple-rpc-%d").build(), new ThreadPoolExecutor.AbortPolicy());
this.targetItf = targetItf;
this.timeout = timeout;
this.consumeThreads = consumeThreads;
}
/**
* 獲取代理對象
*
* @return
*/
public Object getProxy() {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{targetItf}, this);
}
public static ClientProxyBeanFactory getInstance(Class<?> targetItf, long timeout, int consumeThreads) {
if (null == instance) {
synchronized (ClientProxyBeanFactory.class) {
if (null == instance) {
instance = new ClientProxyBeanFactory(targetItf, timeout, consumeThreads);
}
}
}
return instance;
}
完成上述成員變量賦值後,便可以開始從註冊中心中獲取服務節點了
// ConsumerAnnotaionBean.class
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 代理 className
String serviceName = targetItf.getName();
// 註冊中心服務
IRegisterCenter registerCenter = IRegisterCenterZkImpl.getInstance();
// 根據 serviceName + methodName 獲取可以使用的節點
List<Producer> producerList = registerCenter.getServiceProducer(serviceName, method.getName());
...
}
獲取到服務節點後,這裡可以根據設置的負載均衡策略獲取本次使用的節點信息,假設這裡採用隨機獲取的方法獲取得到節點 Producer
開啟 Netty 連接,進行消息收發
拿到了 Producer
就意味著我們可以獲取到遠程服務 Netty 的 ip + port 信息了,這個時候就可以建立遠程服務連接了。但是這裡存在一個優化邏輯,就是如果我們每次都是方法調用時再去建立鏈接,那麼建立連接將會是一個非常耗時的操作,但是如果我們提前根據 ip + port 建立一個 Channel
池,方法調用時只需要從連接池中獲取 Channel
,那麼服務的效率是不是會大大提高了?
基於上面的邏輯,我們需要實現一個 NettyChannelPoolFactory
用來緩存客戶端的 Netty
的請求緩存,同時對外提供兩個方法: acquire
獲取 Channel 信息 release
釋放 Channel 信息
具體實現代碼如下:
@Slf4j
public class NettyChannelPoolFactory {
private static final NettyChannelPoolFactory CHANNEL_POOL_FACTORY = new NettyChannelPoolFactory();
// 連接池緩存 key 為服務提供者地址,value為Netty Channel阻塞隊列
public static final Map<InetSocketAddress, ArrayBlockingQueue<Channel>> CHANNEL_POOL_MAP = Maps.newConcurrentMap();
/**
* 初始化Netty Channel阻塞隊列的長度,該值為可配置信息
*/
private static final Integer CHANNEL_CONNECT_SIZE = 3;
public static NettyChannelPoolFactory getInstance() {
return CHANNEL_POOL_FACTORY;
}
/**
* 初始化 netty 連接池
*/
public void initChannelFactory(List<Producer> producerNodeList) {
for (Producer producer : producerNodeList) {
InetSocketAddress address = new InetSocketAddress(producer.getIp(), producer.getPort());
while (CHANNEL_POOL_MAP.get(address) == null || CHANNEL_POOL_MAP.get(address).size() < CHANNEL_CONNECT_SIZE) {
ArrayBlockingQueue<Channel> channels = CHANNEL_POOL_MAP.get(address);
if (channels == null || channels.size() < CHANNEL_CONNECT_SIZE) {
// 初始化 Netty Channel 阻塞隊列
Channel channel = null;
while (channel == null) {
channel = registerChannel(address);
}
if (channels == null) {
channels = new ArrayBlockingQueue<>(CHANNEL_CONNECT_SIZE);
}
boolean offer = channels.offer(channel);
if (!offer) {
log.debug("channelArrayBlockingQueue fail");
} else {
CHANNEL_POOL_MAP.put(address, channels);
}
}
}
}
}
/**
* 根據 address 獲取客戶端隊列
*/
public ArrayBlockingQueue<Channel> acquire(InetSocketAddress address) {
return CHANNEL_POOL_MAP.get(address);
}
/**
* 使用完成之後,將 channel 放回到 阻塞隊列
*/
public void release(ArrayBlockingQueue<Channel> queue, Channel channel, InetSocketAddress address) {
if (queue == null) {
return;
}
// 回收之前判斷channel 是否可用
if (channel == null || !channel.isOpen() || !channel.isActive() || !channel.isWritable()) {
if (channel != null) {
channel.deregister().syncUninterruptibly().awaitUninterruptibly();
channel.closeFuture().syncUninterruptibly().awaitUninterruptibly();
} else {
while (channel == null) {
channel = registerChannel(address);
}
}
}
queue.offer(channel);
}
/**
* 註冊 netty 客戶端
*/
public Channel registerChannel(InetSocketAddress address) {
try {
EventLoopGroup group = new NioEventLoopGroup(10);
Bootstrap bootstrap = new Bootstrap();
bootstrap.remoteAddress(address);
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 註冊Netty編碼器
ch.pipeline().addLast(new NettyEncoderHandler());
// 註冊Netty解碼器
ch.pipeline().addLast(new NettyDecoderHandler(NettyResponse.class));
// 註冊客戶端業務處理邏輯Handler
ch.pipeline().addLast(new NettyHandlerClient());
}
});
ChannelFuture channelFuture = bootstrap.connect().sync();
final Channel channel = channelFuture.channel();
final CountDownLatch countDownLatch = new CountDownLatch(1);
final List<Boolean> isSuccessHolder = Lists.newArrayListWithCapacity(1);
// 監聽channel是否建立成功
channelFuture.addListener(future -> {
if (future.isSuccess()) {
isSuccessHolder.add(Boolean.TRUE);
} else {
// 如果建立失敗,保存建立失敗標記
log.error("registerChannel fail , {}", future.cause().getMessage());
isSuccessHolder.add(Boolean.FALSE);
}
countDownLatch.countDown();
});
countDownLatch.await();
// 如果Channel建立成功,返回新建的Channel
if (Boolean.TRUE.equals(isSuccessHolder.get(0))) {
return channel;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("registerChannel fail", e);
}
return null;
}
}
NettyChannelPoolFactory
對象中還定義了一個方法 registerChannel
接收 InetSocketAddress
的入參,返回值為 Channel
。方法中主要根據傳入的 address
信息,創建了 Netty
連接,設置了序列化和反序列化的編解碼器,然後增加了一個 NettyHandlerClient
的客戶端消息處理器。最後將初始化好的 Channel
連接進行返回
有了上面的 NettyChannelPoolFactory
,便可以將從註冊中心獲得到的 Producer
信息,根據 ip + port 獲取 Channel
,從而進行 NettyRequest
消息的發送
**NettyRequest
消息的構造 **
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 根據 serviceName + methodName 獲取可以使用的節點
List<Producer> producerList = registerCenter.getServiceProducer(serviceName, method.getName());
// 直接取第0 個,這裡可以採用負載均衡策略進行獲取
Producer providerCopy =producerList.get(0) ;
// NettyRequest 構造
NettyRequest request = NettyRequest.builder()
// 服務節點信息
.producer(providerCopy)
// 本次請求的唯一編號
.uniqueKey(UUID.randomUUID() + "-" + Thread.currentThread().getId())
// 請求超時時間
.invokeTimeout(timeout)
// 請求方法名稱
.invokeMethodName(method.getName())
// 請求參數
.args(args)
.build();
}
好了,現在 NettyRequest
和發送消息的 Channel
都已經有了,只需要將消息發送出去,然後接收消息然後序列成方法的出參即可。
這裡可以採用線程池的方式,進行 Netty
消息的發送和返回值的解碼
定義一個 ClientServiceCallable
集成自 Callable<NettyResponse>
帶返回值的任務的接口
Callable
只有一個需要實現的方法 call()
, 在該方法中,需要完成 1. 獲取 Channel
對象 2. 發送請求 3. 結果值返回
@Slf4j
public class ClientServiceCallable implements Callable<NettyResponse> {
/**
* Netty 通信管道
*/
private Channel channel;
/**
* 請求參數
*/
private final NettyRequest request;
public static ClientServiceCallable of(NettyRequest request) {
return new ClientServiceCallable(request);
}
public ClientServiceCallable(NettyRequest request) {
this.request = request;
}
@Override
public NettyResponse call() throws Exception {
InetSocketAddress inetSocketAddress = new InetSocketAddress(request.getProducer().getIp(), request.getProducer().getPort());
// 獲取本地緩存 Channel 隊列
ArrayBlockingQueue<Channel> blockingQueue = NettyChannelPoolFactory.getInstance().acquire(inetSocketAddress);
try {
if (channel == null) {
// 從隊列中獲取 Channel
channel = blockingQueue.take();
}
if (channel == null) {
throw new RuntimeException("can't find channel to resolve this request");
}
} catch (Exception e) {
log.error("client send request error", e);
} finally {
// 請求結束,隊列歸還 Channel
NettyChannelPoolFactory.getInstance().release(blockingQueue, channel, inetSocketAddress);
}
}
}
上述代碼的 call
方法中,首先從本地緩存中獲取到了 Channel
隊列,然後在 finally
中將 Channel
歸還到隊列中。那么方法中剩下的邏輯就是發送 NettyRequest
請求,然後返回結果了
try {
if (channel == null) {
channel = blockingQueue.take();
}
if (channel == null) {
throw new RuntimeException("can't find channel to resolve this request");
} else {
1️⃣ ClientResponseHolder.initResponseData(request.getUniqueKey());
2️⃣while (!channel.isOpen() || !channel.isActive() || !channel.isWritable()) {
log.warn("retry get new channel");
channel = blockingQueue.poll(request.getInvokeTimeout(), TimeUnit.MILLISECONDS);
if (channel == null) {
// 若隊列中沒有可用的Channel,則重新註冊一個Channel
channel = NettyChannelPoolFactory.getInstance().registerChannel(inetSocketAddress);
}
}
// 將本次調用的信息寫入Netty通道,發起異步調用
3️⃣ ChannelFuture channelFuture = channel.writeAndFlush(request);
channelFuture.syncUninterruptibly();
// 從返回結果容器中獲取返回結果,同時設置等待超時時間為invokeTimeout
long invokeTimeout = request.getInvokeTimeout();
4️⃣ return ClientResponseHolder.getValue(request.getUniqueKey(), invokeTimeout);
}
} catch (Exception e) {
log.error("client send request error", e);
} finally {
NettyChannelPoolFactory.getInstance().release(blockingQueue, channel, inetSocketAddress);
}
1️⃣ ClientResponseHolder 類
ClientResponseHolder.initResponseData(request.getUniqueKey());
這裡又增加了一個新的類ClientResponseHolder
, 那麼這個類是幹嘛的呢?
由於消息的發送都是異步的形式,这里使用了 Map<String,NettyResponseWrapper>
進行本地數據緩存,Map
的 KEY 是 NeettyRequest
的 uniqueKey
,而 Value
就是 Netty
的返回結果,即是服務端執行之後的返回值
ClientResponseHolder
的具體實現如下:
@Slf4j
public class ClientResponseHolder {
private static final Map<String, NettyResponseWrapper> RESPONSE_WRAPPER_MAP = Maps.newConcurrentMap();
private static final ScheduledExecutorService executorService;
static {
executorService = new ScheduledThreadPoolExecutor(1, new RemoveExpireThreadFactory("simple-rpc", false));
// 刪除過期的數據
executorService.scheduleWithFixedDelay(() -> {
for (Map.Entry<String, NettyResponseWrapper> entry : RESPONSE_WRAPPER_MAP.entrySet()) {
boolean expire = entry.getValue().isExpire();
if (expire) {
RESPONSE_WRAPPER_MAP.remove(entry.getKey());
}
}
}, 1, 20, TimeUnit.MILLISECONDS);
}
/**
* 初始化返回結果容器,requestUniqueKey唯一標識本次調用
*/
public static void initResponseData(String requestUniqueKey) {
RESPONSE_WRAPPER_MAP.put(requestUniqueKey, NettyResponseWrapper.of());
}
/**
* 將Netty調用異步返回結果放入阻塞隊列
*/
public static void putResultValue(NettyResponse response) {
long currentTimeMillis = System.currentTimeMillis();
NettyResponseWrapper responseWrapper = RESPONSE_WRAPPER_MAP.get(response.getUniqueKey());
responseWrapper.setResponseTime(currentTimeMillis);
responseWrapper.getResponseBlockingQueue().add(response);
RESPONSE_WRAPPER_MAP.put(response.getUniqueKey(), responseWrapper);
}
/**
* 從阻塞隊列中獲取異步返回結果
*/
public static NettyResponse getValue(String requestUniqueKey, long timeout) {
NettyResponseWrapper responseWrapper = RESPONSE_WRAPPER_MAP.get(requestUniqueKey);
try {
return responseWrapper.getResponseBlockingQueue().poll(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("get value error", e);
} finally {
RESPONSE_WRAPPER_MAP.remove(requestUniqueKey);
}
return null;
}
}
- initResponseData: 根據
uniqueKey
初始化Map
- putResultValue: 插入
NettyResponse
返回結果 - getValue: 根據
uniqueKey
獲取結果
同時定義了一個定時執行的隊列,隊列中根據 responseTime
判斷消息是否過期進行內存數據清理
2️⃣ Channel 狀態判斷
判斷當前 Netty
通道狀態,如果當前 Channel
不可用,則需要重新申請通道
3️⃣ Netty 消息發送
4️⃣ 從本地緩存中獲取 Netty 返回結果
異步調用 Netty 服務,使用 Future
獲取返回結果
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 根據 serviceName + methodName 獲取可以使用的節點
List<Producer> producerList = registerCenter.getServiceProducer(serviceName, method.getName());
// 直接取第0 個,這裡可以採用負載均衡策略進行獲取
Producer providerCopy =producerList.get(0) ;
// NettyRequest 構造
NettyRequest request = NettyRequest.builder()
// 服務節點信息
.producer(providerCopy)
// 本次請求的唯一編號
.uniqueKey(UUID.randomUUID() + "-" + Thread.currentThread().getId())
// 請求超時時間
.invokeTimeout(timeout)
// 請求方法名稱
.invokeMethodName(method.getName())
// 請求參數
.args(args)
.build();
// 發起異步調用,通過 NettyClient 發送請求
try {
Future<NettyResponse> responseFuture = executorService.submit(ClientServiceCallable.of(request));
NettyResponse response = responseFuture.get(timeout, TimeUnit.MILLISECONDS);
if (response != null) {
return response.getResult();
}
} catch (Exception e) {
log.error("send request error", e);
}
}
到這裡,我們就完成了 ClientProxyBeanFactory
代理對象的完整編寫,現在就需要將初始化好的代理對象進行依賴注入
// ConsumerAnnotaionBean.class
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 代理 className
String serviceName = targetItf.getName();
// 註冊中心服務
IRegisterCenter registerCenter = IRegisterCenterZkImpl.getInstance();
// 根據 serviceName + methodName 獲取可以使用的節點
List<Producer> producerList = registerCenter.getServiceProducer(serviceName, method.getName());
// 3.獲取服務代理對象
Class<?> targetItf = rpcClient.targetItf();
if (targetItf == Object.class) {
targetItf = field.getType();
}
// 初始化代理對象
ClientProxyBeanFactory factory = ClientProxyBeanFactory.getInstance(targetItf, rpcClient.timeout(), rpcClient.consumeThreads());
ReflectionUtils.makeAccessible(field);
try {
// 設置代理對象
field.set(bean, factory.getProxy());
} catch (IllegalAccessException e) {
log.error("ReferenceBeanPostProcessor post process properties error, beanName={}", beanName, e);
throw new RuntimeException("ReferenceBeanPostProcessor post process properties error, beanName=" + beanName, e);
}
}
通過 ClientProxyBeanFactory.getInstance
獲取到代理對象後,使用 field.set
方法進行執行賦值
完成上述操作之後,當客戶端執行 get
方法時,便會 invoke
到 ClientProxyBeanFactory
的 invoke
方法上,隨後執行 開啟 Netty 連接,進行消息收發 內容,隨後將服務方結果進行返回
查詢服務節點,預創建 Netty 連接#
這部分內容和第二步有所重疊,其核心邏輯如下:
到這裡,客戶端的所有流程就都編寫完成了。但是為了理清楚主要思路,文章中對負載均衡策略、序列化和反序列化等都只是一筆帶過。這些也是一個 RPC 框架非常重要的一部分。