simon

simon

如何開發一個自己的 RPC 框架 (一)

如何開發一個自己的 RPC 框架 (一)#

2023 年 12 月 11 日 12:00 AM

寫在前面:本文參考了開源項目 https://github.com/MIracleCczs/simple-rpc,其中大部分代碼參考了該項目,本章主要從客戶端的調用出發,講講一個 RPC 框架的實現在客戶端測需要實現那些功能

RPC 的定義#

可以參考 wiki: https://zh.wikipedia.org/wiki/ 遠程過程調用

一個簡單的 RPC 框架是如何組成的?#

一個基礎的 RPC 框架,需要包含三大部分:1. 註冊中心 2. 服務提供方 3. 服務消費方

Mermaid Loading...

從上圖可以看出,服務提供方和消費方都需要和註冊中心通信

一個遠程方法的調用是如何實現的?#

Mermaid Loading...

下面,我們將根據上面流程圖,一步步進行講解。為了方便更加清楚的講清整個邏輯,我們從實際的業務需求出發。

需求:

存在服務提供方 Producer (後面統稱服務端),提供方法 get

存在服務消費方 Consumer (後面統稱客戶端), 需要調用 Producer 中的 get 方法

Mermaid Loading...

基礎接口定義#

定義 UserService 接口,接口內包含 get 方法

public interface UserService {
    String get(String username);
}

客戶端發起服務調用#

客戶端註解定義#

客戶端如何才能夠像調用本地方法一樣調用遠程服務呢?RPC 框架就是用來解決這個問題。我們一般本地方法的調用都是採用

@Autowired
private UserService userService;

通過 Spring 依賴注入的方式,將需要用到的方法注入到調用對象中,那麼我們 RPC 調用能不能也採用這種形式呢?答案當然是可以的。那么為了實現上面的需求,我們最簡單的辦法就是自定義一個註解 RpcClient

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface RpcClient {
}

註解定義完成後,我們就應該考慮註解中需要設置那些屬性呢?

那麼冒出來的第一個問題就是:客戶端如何知道是調用的那個遠程服務呢?這時我們就需要設置第一個屬性 remoteAppKey 服務的唯一標識,通過 remoteAppKey 客戶端可以輕鬆地在註冊中心找到目標服務。

這個時候又會有第二個疑問,如果一個服務多個版本如何處理呢?比如進行灰度升級等操作的時,那麼這個時候就需要第二個參數 groupName 找到具體服務下的具體分組

剩餘的參數就比較簡單了,完成的參數配置如下:

 		/**
     * 服務接口:匹配從註冊中心獲取到本地的服務提供者,得到服務提供者列表,再根據負載均衡策略選取一個發起服務調用
     */
    Class<?> targetItf() default Object.class;

    /**
     * 超時時間:服務調用超時時間
     */
    long timeout() default 3000 * 10L;

    /**
     * 調用者線程數
     */
    int consumeThreads() default 10;

    /**
     * 服務提供者唯一標識
     */
    String remoteAppKey() default "";

    /**
     * 服務分組組名
     */
    String groupName() default "default";

客戶端初始化#

為了實現類似 @Autowired 的功能,框架需要在 Bean 初始化之時,將所有被 RpcClient 註解的對象進行依賴注入,那麼如何實現這個功能呢? SpringInstantiationAwareBeanPostProcessor 接口,可以在 Bean 的實例化的各個階段執行自定義邏輯。定義一個 ConsumerAnnotaionBean 方法,實現 InstantiationAwareBeanPostProcessor 接口。

public class ConsumerAnnotaionBean implements InstantiationAwareBeanPostProcessor {
		...其他方法省略
		@Override
    public PropertyValues postProcessProperties(PropertyValues pvs, Object bean, String beanName) throws BeansException {
	}
}

主要實現接口的 postProcessProperties 方法,設置 Bean 對象的屬性值

進行具體代碼編寫之前,我們需要先理清楚這裡需要實現那些目的:

  1. 客戶端服務註冊 (監控目的)
  2. 對象依賴的注入
  3. 查詢服務節點,預創建 Netty 連接

好了,理清楚完需求之後,我們便開始對應的邏輯編寫

@Override
    public PropertyValues postProcessProperties(PropertyValues pvs, Object bean, String beanName) throws BeansException {
        Class<?> beanClass = bean.getClass();
        // 依賴注入的服務
        Field[] fields = beanClass.getDeclaredFields();
        for (Field field : fields) {
            RpcClient rpcClient = field.getAnnotation(RpcClient.class);
            if (rpcClient == null) {
                continue;
            }
        }
    }

根據初始化的 bean 獲取對象中的所有參數,然後使用 getAnnotation(RpcClient.class); 判斷參數是否被 RpcClient 所註解

客戶端服務註冊#

這一步的目的是為了在註冊中心中記錄消費者信息,方便後續監控,所以這一步相對來說非常簡單,只需要構造客戶端信息然後提交到註冊中心即可。

// 構造消費者對象
Consumer consumer = Consumer
                    .builder()
                    .groupName(rpcClient.groupName())
                    .remoteAppKey(rpcClient.remoteAppKey())
                    .targetItf(targetItf)
                    .build();
// 註冊中心註冊消費者
registerCenter.registerConsumer(consumer);

對象依賴的注入#

同樣的,我們先梳理一下這裡的需求。我們需要實現一個動態代理,在方法調用時,根據方法調用名 + 類名獲取遠程服務提供方的節點信息,然後構造一個 NettyRequest 信息,發送到服務方,最後需要接收服務放返回的 NettyResponse 解析成方法的返回值進行返回

Mermaid Loading...

好了,我們已經理清楚了上面整體流程,那麼就開始具體的代碼編寫吧

首先,定義一個對象 ClientProxyBeanFactory 實現 InvocationHandler 接口

主要是實現接口的 invoke 方法

@Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {}

invoke 方法中,需要根據類名 + 方法名從註冊中心中獲取可用的節點,那麼具體代理的類這個時候就需要從對象實例化中傳入,所以我們在定義 ClientProxyBeanFactory 時,需要定義幾個成員變量

@Slf4j
public class ClientProxyBeanFactory implements InvocationHandler {

    // 調用連接池(Netty 請求)
    private ExecutorService executorService;

    // 目標代理類
    private Class<?> targetItf;

    // 超時
    private long timeout;

    // 調用線程數
    private int consumeThreads;
}

對象初始化的時候,需要設置成員變量的值

private static volatile ClientProxyBeanFactory instance;

    public ClientProxyBeanFactory(Class<?> targetItf, long timeout, int consumeThreads) {
        this.executorService = new ThreadPoolExecutor(consumeThreads, consumeThreads,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(), new ThreadFactoryBuilder()
                .setNameFormat("simple-rpc-%d").build(), new ThreadPoolExecutor.AbortPolicy());

        this.targetItf = targetItf;
        this.timeout = timeout;
        this.consumeThreads = consumeThreads;
    }

    /**
     * 獲取代理對象
     *
     * @return
     */
    public Object getProxy() {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{targetItf}, this);
    }

    public static ClientProxyBeanFactory getInstance(Class<?> targetItf, long timeout, int consumeThreads) {
        if (null == instance) {
            synchronized (ClientProxyBeanFactory.class) {
                if (null == instance) {
                    instance = new ClientProxyBeanFactory(targetItf, timeout, consumeThreads);
                }
            }
        }

        return instance;
    }

完成上述成員變量賦值後,便可以開始從註冊中心中獲取服務節點了

// ConsumerAnnotaionBean.class
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
				// 代理 className
        String serviceName = targetItf.getName();
				// 註冊中心服務
        IRegisterCenter registerCenter = IRegisterCenterZkImpl.getInstance();
				// 根據 serviceName + methodName 獲取可以使用的節點
        List<Producer> producerList = registerCenter.getServiceProducer(serviceName, method.getName());
				...
}

獲取到服務節點後,這裡可以根據設置的負載均衡策略獲取本次使用的節點信息,假設這裡採用隨機獲取的方法獲取得到節點 Producer

開啟 Netty 連接,進行消息收發

拿到了 Producer 就意味著我們可以獲取到遠程服務 Netty 的 ip + port 信息了,這個時候就可以建立遠程服務連接了。但是這裡存在一個優化邏輯,就是如果我們每次都是方法調用時再去建立鏈接,那麼建立連接將會是一個非常耗時的操作,但是如果我們提前根據 ip + port 建立一個 Channel 池,方法調用時只需要從連接池中獲取 Channel ,那麼服務的效率是不是會大大提高了?

Mermaid Loading...

基於上面的邏輯,我們需要實現一個 NettyChannelPoolFactory 用來緩存客戶端的 Netty 的請求緩存,同時對外提供兩個方法: acquire 獲取 Channel 信息 release 釋放 Channel 信息

具體實現代碼如下:

@Slf4j
public class NettyChannelPoolFactory {

    private static final NettyChannelPoolFactory CHANNEL_POOL_FACTORY = new NettyChannelPoolFactory();

    // 連接池緩存 key 為服務提供者地址,value為Netty Channel阻塞隊列
    public static final Map<InetSocketAddress, ArrayBlockingQueue<Channel>> CHANNEL_POOL_MAP = Maps.newConcurrentMap();

    /**
     * 初始化Netty Channel阻塞隊列的長度,該值為可配置信息
     */
    private static final Integer CHANNEL_CONNECT_SIZE = 3;

    public static NettyChannelPoolFactory getInstance() {
        return CHANNEL_POOL_FACTORY;
    }

    /**
     * 初始化 netty 連接池
     */
    public void initChannelFactory(List<Producer> producerNodeList) {
        for (Producer producer : producerNodeList) {
            InetSocketAddress address = new InetSocketAddress(producer.getIp(), producer.getPort());
            while (CHANNEL_POOL_MAP.get(address) == null || CHANNEL_POOL_MAP.get(address).size() < CHANNEL_CONNECT_SIZE) {
                ArrayBlockingQueue<Channel> channels = CHANNEL_POOL_MAP.get(address);
                if (channels == null || channels.size() < CHANNEL_CONNECT_SIZE) {
                    // 初始化 Netty Channel 阻塞隊列
                    Channel channel = null;
                    while (channel == null) {
                        channel = registerChannel(address);
                    }

                    if (channels == null) {
                        channels = new ArrayBlockingQueue<>(CHANNEL_CONNECT_SIZE);
                    }

                    boolean offer = channels.offer(channel);
                    if (!offer) {
                        log.debug("channelArrayBlockingQueue fail");

                    } else {
                        CHANNEL_POOL_MAP.put(address, channels);
                    }
                }
            }
        }
    }

    /**
     * 根據 address 獲取客戶端隊列
     */
    public ArrayBlockingQueue<Channel> acquire(InetSocketAddress address) {
        return CHANNEL_POOL_MAP.get(address);
    }

    /**
     * 使用完成之後,將 channel 放回到 阻塞隊列
     */
    public void release(ArrayBlockingQueue<Channel> queue, Channel channel, InetSocketAddress address) {
        if (queue == null) {
            return;
        }

        // 回收之前判斷channel 是否可用
        if (channel == null || !channel.isOpen() || !channel.isActive() || !channel.isWritable()) {
            if (channel != null) {
                channel.deregister().syncUninterruptibly().awaitUninterruptibly();
                channel.closeFuture().syncUninterruptibly().awaitUninterruptibly();

            } else {
                while (channel == null) {
                    channel = registerChannel(address);
                }
            }
        }

        queue.offer(channel);
    }

    /**
     * 註冊 netty 客戶端
     */
    public Channel registerChannel(InetSocketAddress address) {
        try {
            EventLoopGroup group = new NioEventLoopGroup(10);
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.remoteAddress(address);
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 註冊Netty編碼器
                            ch.pipeline().addLast(new NettyEncoderHandler());
                            // 註冊Netty解碼器
                            ch.pipeline().addLast(new NettyDecoderHandler(NettyResponse.class));
                            // 註冊客戶端業務處理邏輯Handler
                            ch.pipeline().addLast(new NettyHandlerClient());
                        }
                    });

            ChannelFuture channelFuture = bootstrap.connect().sync();
            final Channel channel = channelFuture.channel();
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            final List<Boolean> isSuccessHolder = Lists.newArrayListWithCapacity(1);
            // 監聽channel是否建立成功
            channelFuture.addListener(future -> {
                if (future.isSuccess()) {
                    isSuccessHolder.add(Boolean.TRUE);

                } else {
                    // 如果建立失敗,保存建立失敗標記
                    log.error("registerChannel fail , {}", future.cause().getMessage());
                    isSuccessHolder.add(Boolean.FALSE);
                }

                countDownLatch.countDown();
            });

            countDownLatch.await();
            // 如果Channel建立成功,返回新建的Channel
            if (Boolean.TRUE.equals(isSuccessHolder.get(0))) {
                return channel;
            }

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("registerChannel fail", e);
        }

        return null;
    }
}

NettyChannelPoolFactory 對象中還定義了一個方法 registerChannel 接收 InetSocketAddress 的入參,返回值為 Channel 。方法中主要根據傳入的 address 信息,創建了 Netty 連接,設置了序列化和反序列化的編解碼器,然後增加了一個 NettyHandlerClient 的客戶端消息處理器。最後將初始化好的 Channel 連接進行返回

有了上面的 NettyChannelPoolFactory ,便可以將從註冊中心獲得到的 Producer 信息,根據 ip + port 獲取 Channel ,從而進行 NettyRequest 消息的發送

**NettyRequest 消息的構造 **

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    // 根據 serviceName + methodName 獲取可以使用的節點
List<Producer> producerList = registerCenter.getServiceProducer(serviceName, method.getName());
    // 直接取第0 個,這裡可以採用負載均衡策略進行獲取
    Producer providerCopy =producerList.get(0) ;
		// NettyRequest 構造
NettyRequest request = NettyRequest.builder()
								// 服務節點信息
                .producer(providerCopy)
							// 本次請求的唯一編號
                .uniqueKey(UUID.randomUUID() + "-" + Thread.currentThread().getId())
							// 請求超時時間
                .invokeTimeout(timeout)
							// 請求方法名稱
                .invokeMethodName(method.getName())
							// 請求參數
                .args(args)
                .build();

}

好了,現在 NettyRequest 和發送消息的 Channel 都已經有了,只需要將消息發送出去,然後接收消息然後序列成方法的出參即可。

這裡可以採用線程池的方式,進行 Netty 消息的發送和返回值的解碼

定義一個 ClientServiceCallable 集成自 Callable<NettyResponse> 帶返回值的任務的接口

Callable 只有一個需要實現的方法 call() , 在該方法中,需要完成 1. 獲取 Channel 對象 2. 發送請求 3. 結果值返回

@Slf4j
public class ClientServiceCallable implements Callable<NettyResponse> {

    /**
     * Netty 通信管道
     */
    private Channel channel;

    /**
     * 請求參數
     */
    private final NettyRequest request;

    public static ClientServiceCallable of(NettyRequest request) {
        return new ClientServiceCallable(request);
    }

    public ClientServiceCallable(NettyRequest request) {
        this.request = request;
    }

@Override
public NettyResponse call() throws Exception {
    InetSocketAddress inetSocketAddress = new InetSocketAddress(request.getProducer().getIp(), request.getProducer().getPort());
    // 獲取本地緩存 Channel 隊列
    ArrayBlockingQueue<Channel> blockingQueue = NettyChannelPoolFactory.getInstance().acquire(inetSocketAddress);
    try {
        if (channel == null) {
            // 從隊列中獲取 Channel
            channel = blockingQueue.take();
        }

        if (channel == null) {
            throw new RuntimeException("can't find channel to resolve this request");
        }
    } catch (Exception e) {
        log.error("client send request error", e);

    } finally {
        // 請求結束,隊列歸還 Channel
        NettyChannelPoolFactory.getInstance().release(blockingQueue, channel, inetSocketAddress);
    }
}
}

上述代碼的 call 方法中,首先從本地緩存中獲取到了 Channel 隊列,然後在 finally 中將 Channel 歸還到隊列中。那么方法中剩下的邏輯就是發送 NettyRequest 請求,然後返回結果了

try {
            if (channel == null) {
                channel = blockingQueue.take();
            }

            if (channel == null) {
                throw new RuntimeException("can't find channel to resolve this request");

            } else {
                1️⃣ ClientResponseHolder.initResponseData(request.getUniqueKey());

                2️⃣while (!channel.isOpen() || !channel.isActive() || !channel.isWritable()) {
                    log.warn("retry get new channel");
                    channel = blockingQueue.poll(request.getInvokeTimeout(), TimeUnit.MILLISECONDS);
                    if (channel == null) {
                        // 若隊列中沒有可用的Channel,則重新註冊一個Channel
                        channel = NettyChannelPoolFactory.getInstance().registerChannel(inetSocketAddress);
                    }
                }

                // 將本次調用的信息寫入Netty通道,發起異步調用
                3️⃣ ChannelFuture channelFuture = channel.writeAndFlush(request);
                channelFuture.syncUninterruptibly();
                // 從返回結果容器中獲取返回結果,同時設置等待超時時間為invokeTimeout
                long invokeTimeout = request.getInvokeTimeout();
                4️⃣ return ClientResponseHolder.getValue(request.getUniqueKey(), invokeTimeout);
            }

        } catch (Exception e) {
            log.error("client send request error", e);

        } finally {
            NettyChannelPoolFactory.getInstance().release(blockingQueue, channel, inetSocketAddress);
        }

1️⃣ ClientResponseHolder 類

ClientResponseHolder.initResponseData(request.getUniqueKey()); 這裡又增加了一個新的類ClientResponseHolder, 那麼這個類是幹嘛的呢?

由於消息的發送都是異步的形式,这里使用了 Map<String,NettyResponseWrapper> 進行本地數據緩存,Map 的 KEY 是 NeettyRequestuniqueKey ,而 Value 就是 Netty 的返回結果,即是服務端執行之後的返回值

ClientResponseHolder 的具體實現如下:

@Slf4j
public class ClientResponseHolder {

    private static final Map<String, NettyResponseWrapper> RESPONSE_WRAPPER_MAP = Maps.newConcurrentMap();

    private static final ScheduledExecutorService executorService;

    static {
        executorService = new ScheduledThreadPoolExecutor(1, new RemoveExpireThreadFactory("simple-rpc", false));
        // 刪除過期的數據
        executorService.scheduleWithFixedDelay(() -> {
            for (Map.Entry<String, NettyResponseWrapper> entry : RESPONSE_WRAPPER_MAP.entrySet()) {
                boolean expire = entry.getValue().isExpire();
                if (expire) {
                    RESPONSE_WRAPPER_MAP.remove(entry.getKey());
                }
            }
        }, 1, 20, TimeUnit.MILLISECONDS);
    }

    /**
     * 初始化返回結果容器,requestUniqueKey唯一標識本次調用
     */
    public static void initResponseData(String requestUniqueKey) {
        RESPONSE_WRAPPER_MAP.put(requestUniqueKey, NettyResponseWrapper.of());
    }

    /**
     * 將Netty調用異步返回結果放入阻塞隊列
     */
    public static void putResultValue(NettyResponse response) {
        long currentTimeMillis = System.currentTimeMillis();
        NettyResponseWrapper responseWrapper = RESPONSE_WRAPPER_MAP.get(response.getUniqueKey());
        responseWrapper.setResponseTime(currentTimeMillis);
        responseWrapper.getResponseBlockingQueue().add(response);
        RESPONSE_WRAPPER_MAP.put(response.getUniqueKey(), responseWrapper);
    }

    /**
     * 從阻塞隊列中獲取異步返回結果
     */
    public static NettyResponse getValue(String requestUniqueKey, long timeout) {
        NettyResponseWrapper responseWrapper = RESPONSE_WRAPPER_MAP.get(requestUniqueKey);
        try {
            return responseWrapper.getResponseBlockingQueue().poll(timeout, TimeUnit.MILLISECONDS);
            
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("get value error", e);

        } finally {
            RESPONSE_WRAPPER_MAP.remove(requestUniqueKey);
        }
        return null;
    }

}
  • initResponseData: 根據 uniqueKey 初始化 Map
  • putResultValue: 插入 NettyResponse 返回結果
  • getValue: 根據 uniqueKey 獲取結果

同時定義了一個定時執行的隊列,隊列中根據 responseTime 判斷消息是否過期進行內存數據清理

2️⃣ Channel 狀態判斷

判斷當前 Netty 通道狀態,如果當前 Channel 不可用,則需要重新申請通道

3️⃣ Netty 消息發送

4️⃣ 從本地緩存中獲取 Netty 返回結果

異步調用 Netty 服務,使用 Future 獲取返回結果

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    // 根據 serviceName + methodName 獲取可以使用的節點
List<Producer> producerList = registerCenter.getServiceProducer(serviceName, method.getName());
    // 直接取第0 個,這裡可以採用負載均衡策略進行獲取
    Producer providerCopy =producerList.get(0) ;
		// NettyRequest 構造
NettyRequest request = NettyRequest.builder()
								// 服務節點信息
                .producer(providerCopy)
							// 本次請求的唯一編號
                .uniqueKey(UUID.randomUUID() + "-" + Thread.currentThread().getId())
							// 請求超時時間
                .invokeTimeout(timeout)
							// 請求方法名稱
                .invokeMethodName(method.getName())
							// 請求參數
                .args(args)
                .build();
        // 發起異步調用,通過 NettyClient 發送請求
        try {
            Future<NettyResponse> responseFuture = executorService.submit(ClientServiceCallable.of(request));
            NettyResponse response = responseFuture.get(timeout, TimeUnit.MILLISECONDS);
            if (response != null) {
                return response.getResult();
            }

        } catch (Exception e) {
            log.error("send request error", e);
        }
}

到這裡,我們就完成了 ClientProxyBeanFactory 代理對象的完整編寫,現在就需要將初始化好的代理對象進行依賴注入

// ConsumerAnnotaionBean.class
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
				// 代理 className
        String serviceName = targetItf.getName();
				// 註冊中心服務
        IRegisterCenter registerCenter = IRegisterCenterZkImpl.getInstance();
				// 根據 serviceName + methodName 獲取可以使用的節點
        List<Producer> producerList = registerCenter.getServiceProducer(serviceName, method.getName());
				// 3.獲取服務代理對象
            Class<?> targetItf = rpcClient.targetItf();
            if (targetItf == Object.class) {
                targetItf = field.getType();
           }
				// 初始化代理對象
				ClientProxyBeanFactory factory = ClientProxyBeanFactory.getInstance(targetItf, rpcClient.timeout(), rpcClient.consumeThreads());
            ReflectionUtils.makeAccessible(field);
            try {
                // 設置代理對象
                field.set(bean, factory.getProxy());

            } catch (IllegalAccessException e) {
                log.error("ReferenceBeanPostProcessor post process properties error, beanName={}", beanName, e);
                throw new RuntimeException("ReferenceBeanPostProcessor post process properties error, beanName=" + beanName, e);
            }
}

通過 ClientProxyBeanFactory.getInstance 獲取到代理對象後,使用 field.set 方法進行執行賦值

完成上述操作之後,當客戶端執行 get 方法時,便會 invokeClientProxyBeanFactoryinvoke 方法上,隨後執行 開啟 Netty 連接,進行消息收發 內容,隨後將服務方結果進行返回

查詢服務節點,預創建 Netty 連接#

這部分內容和第二步有所重疊,其核心邏輯如下:

Mermaid Loading...

到這裡,客戶端的所有流程就都編寫完成了。但是為了理清楚主要思路,文章中對負載均衡策略、序列化和反序列化等都只是一筆帶過。這些也是一個 RPC 框架非常重要的一部分。

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。