simon

simon

自分自身のRPCフレームワークを開発する方法 (一)

自分の RPC フレームワークを開発する方法 (1)#

2023 年 12 月 11 日 午前 12:00

前書き:この記事はオープンソースプロジェクト https://github.com/MIracleCczs/simple-rpc を参考にしており、その大部分のコードはこのプロジェクトを参考にしています。本章では、クライアントの呼び出しから出発し、RPC フレームワークの実装においてクライアント側で実現する必要がある機能について説明します。

RPC の定義#

ウィキを参照できます: https://zh.wikipedia.org/wiki/ 遠程過程調用

シンプルな RPC フレームワークはどのように構成されているのか?#

基本的な RPC フレームワークは、以下の三つの主要部分を含む必要があります:1. 登録センター 2. サービス提供者 3. サービス消費者

Mermaid Loading...

上の図からわかるように、サービス提供者と消費者はどちらも登録センターと通信する必要があります。

リモートメソッドの呼び出しはどのように実現されるのか?#

Mermaid Loading...

次に、上記のフローチャートに基づいて、一歩ずつ説明します。全体のロジックをより明確に説明するために、実際のビジネスニーズから出発します。

ニーズ:

サービス提供者 Producer(以下、サービスサイドと呼ぶ)が get メソッドを提供します。

サービス消費者 Consumer(以下、クライアントと呼ぶ)が、Producer の get メソッドを呼び出す必要があります。

Mermaid Loading...

基本インターフェース定義#

UserService インターフェースを定義し、インターフェース内に get メソッドを含めます。

public interface UserService {
    String get(String username);
}

クライアントがサービス呼び出しを開始する#

クライアント注釈定義#

クライアントは、どのようにしてローカルメソッドのようにリモートサービスを呼び出すことができるのでしょうか?RPC フレームワークはこの問題を解決するために存在します。一般的に、ローカルメソッドの呼び出しは次のように行います。

@Autowired
private UserService userService;

Spring の依存性注入の方法を使用して、必要なメソッドを呼び出しオブジェクトに注入します。それでは、RPC 呼び出しもこの形式を採用できるのでしょうか?もちろん、答えは「はい」です。したがって、上記のニーズを実現するための最も簡単な方法は、カスタム注釈 RpcClient を定義することです。

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface RpcClient {
}

注釈の定義が完了したら、注釈に設定する必要がある属性について考える必要があります。

最初の問題は、クライアントがどのリモートサービスを呼び出しているかをどのように知るかということです。このとき、最初の属性 remoteAppKey サービスの一意の識別子を設定する必要があります。remoteAppKey を使用することで、クライアントは登録センターでターゲットサービスを簡単に見つけることができます。

次に、第二の疑問が生じます。複数のバージョンのサービスがある場合はどう処理するのでしょうか?たとえば、グレードアップなどの操作を行う場合、このときは第二のパラメータ groupName を設定して、特定のサービスの具体的なグループを見つける必要があります。

残りのパラメータは比較的簡単で、完成したパラメータ設定は次のようになります。

 		/**
     * サービスインターフェース: 登録センターから取得したローカルのサービス提供者と一致し、サービス提供者リストを取得し、負荷分散戦略に基づいて一つを選択してサービス呼び出しを開始します。
     */
    Class<?> targetItf() default Object.class;

    /**
     * タイムアウト時間:サービス呼び出しのタイムアウト時間
     */
    long timeout() default 3000 * 10L;

    /**
     * 呼び出しスレッド数
     */
    int consumeThreads() default 10;

    /**
     * サービス提供者の一意の識別子
     */
    String remoteAppKey() default "";

    /**
     * サービスグループ名
     */
    String groupName() default "default";

クライアント初期化#

@Autowired の機能を実現するために、フレームワークは Bean の初期化時に、すべての RpcClient 注釈が付けられたオブジェクトに依存性注入を行う必要があります。この機能をどのように実現するのでしょうか? SpringInstantiationAwareBeanPostProcessor インターフェースは、Bean のインスタンス化の各段階でカスタムロジックを実行できます。ConsumerAnnotaionBean メソッドを定義し、InstantiationAwareBeanPostProcessor インターフェースを実装します。

public class ConsumerAnnotaionBean implements InstantiationAwareBeanPostProcessor {
		...他のメソッドは省略
		@Override
    public PropertyValues postProcessProperties(PropertyValues pvs, Object bean, String beanName) throws BeansException {
	}
}

主にインターフェースの postProcessProperties メソッドを実装し、Bean オブジェクトの属性値を設定します。

具体的なコードを書く前に、ここで実現する必要がある目的を整理する必要があります:

  1. クライアントサービスの登録(監視目的)
  2. オブジェクト依存の注入
  3. サービスノードの検索、Netty 接続の事前作成

ニーズを整理した後、対応するロジックの記述を開始します。

@Override
    public PropertyValues postProcessProperties(PropertyValues pvs, Object bean, String beanName) throws BeansException {
        Class<?> beanClass = bean.getClass();
        // 依存性注入のサービス
        Field[] fields = beanClass.getDeclaredFields();
        for (Field field : fields) {
            RpcClient rpcClient = field.getAnnotation(RpcClient.class);
            if (rpcClient == null) {
                continue;
            }
        }
    }

初期化された bean からオブジェクト内のすべてのパラメータを取得し、getAnnotation(RpcClient.class); を使用してパラメータが RpcClient で注釈されているかどうかを判断します。

クライアントサービス登録#

このステップの目的は、登録センターに消費者情報を記録し、後の監視を容易にすることです。したがって、このステップは比較的簡単で、消費者情報を構築して登録センターに提出するだけです。

// 消費者オブジェクトを構築
Consumer consumer = Consumer
                    .builder()
                    .groupName(rpcClient.groupName())
                    .remoteAppKey(rpcClient.remoteAppKey())
                    .targetItf(targetItf)
                    .build();
// 登録センターに消費者を登録
registerCenter.registerConsumer(consumer);

オブジェクト依存の注入#

同様に、ここでのニーズを整理します。動的プロキシを実現する必要があります。メソッド呼び出し時に、メソッド呼び出し名 + クラス名に基づいてリモートサービス提供者のノード情報を取得し、NettyRequest 情報を構築してサービス側に送信し、最後にサービス側から返された NettyResponse を受信してメソッドの戻り値として返す必要があります。

Mermaid Loading...

上記の全体的なプロセスを整理したので、具体的なコードの記述を開始します。

まず、ClientProxyBeanFactory オブジェクトを定義し、InvocationHandler インターフェースを実装します。

インターフェースの invoke メソッドを実装する必要があります。

@Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {}

invoke メソッド内では、クラス名 + メソッド名に基づいて登録センターから利用可能なノードを取得する必要があります。このとき、具体的なプロキシクラスはオブジェクトのインスタンス化から渡す必要があるため、ClientProxyBeanFactory を定義する際にいくつかのメンバー変数を定義する必要があります。

@Slf4j
public class ClientProxyBeanFactory implements InvocationHandler {

    // 呼び出し接続プール(Netty リクエスト)
    private ExecutorService executorService;

    // 目標プロキシクラス
    private Class<?> targetItf;

    // タイムアウト
    private long timeout;

    // 呼び出しスレッド数
    private int consumeThreads;
}

オブジェクト初期化時にメンバー変数の値を設定する必要があります。

private static volatile ClientProxyBeanFactory instance;

    public ClientProxyBeanFactory(Class<?> targetItf, long timeout, int consumeThreads) {
        this.executorService = new ThreadPoolExecutor(consumeThreads, consumeThreads,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(), new ThreadFactoryBuilder()
                .setNameFormat("simple-rpc-%d").build(), new ThreadPoolExecutor.AbortPolicy());

        this.targetItf = targetItf;
        this.timeout = timeout;
        this.consumeThreads = consumeThreads;
    }

    /**
     * プロキシオブジェクトを取得
     *
     * @return
     */
    public Object getProxy() {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{targetItf}, this);
    }

    public static ClientProxyBeanFactory getInstance(Class<?> targetItf, long timeout, int consumeThreads) {
        if (null == instance) {
            synchronized (ClientProxyBeanFactory.class) {
                if (null == instance) {
                    instance = new ClientProxyBeanFactory(targetItf, timeout, consumeThreads);
                }
            }
        }

        return instance;
    }

上記のメンバー変数の値を設定した後、登録センターからサービスノードを取得することができます。

// ConsumerAnnotaionBean.class
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
				// プロキシクラス名
        String serviceName = targetItf.getName();
				// 登録センターサービス
        IRegisterCenter registerCenter = IRegisterCenterZkImpl.getInstance();
				// サービス名 + メソッド名に基づいて利用可能なノードを取得
        List<Producer> producerList = registerCenter.getServiceProducer(serviceName, method.getName());
				...
}

サービスノードを取得した後、設定された負荷分散戦略に基づいて、今回使用するノード情報を取得できます。ここでは、ランダム取得の方法を採用してノード Producer を取得すると仮定します。

Netty 接続を開き、メッセージの送受信を行う

Producer を取得したということは、リモートサービスの Netty の ip + port 情報を取得できることを意味します。このとき、リモートサービス接続を確立できます。しかし、ここには最適化ロジックが存在します。毎回メソッド呼び出し時に接続を確立するのではなく、ip + port に基づいて事前に Channel プールを確立すれば、メソッド呼び出し時に接続プールから Channel を取得するだけで済み、サービスの効率が大幅に向上します。

Mermaid Loading...

上記のロジックに基づいて、クライアントの Netty リクエストキャッシュをキャッシュするための NettyChannelPoolFactory を実装する必要があります。また、外部に二つのメソッドを提供します:acquire は Channel 情報を取得し、release は Channel 情報を解放します。

具体的な実装コードは次の通りです。

@Slf4j
public class NettyChannelPoolFactory {

    private static final NettyChannelPoolFactory CHANNEL_POOL_FACTORY = new NettyChannelPoolFactory();

    // 接続プールキャッシュ key はサービス提供者アドレス、valueはNetty Channelのブロッキングキュー
    public static final Map<InetSocketAddress, ArrayBlockingQueue<Channel>> CHANNEL_POOL_MAP = Maps.newConcurrentMap();

    /**
     * Netty Channelのブロッキングキューの長さを初期化します。この値は設定可能な情報です。
     */
    private static final Integer CHANNEL_CONNECT_SIZE = 3;

    public static NettyChannelPoolFactory getInstance() {
        return CHANNEL_POOL_FACTORY;
    }

    /**
     * netty 接続プールを初期化します。
     */
    public void initChannelFactory(List<Producer> producerNodeList) {
        for (Producer producer : producerNodeList) {
            InetSocketAddress address = new InetSocketAddress(producer.getIp(), producer.getPort());
            while (CHANNEL_POOL_MAP.get(address) == null || CHANNEL_POOL_MAP.get(address).size() < CHANNEL_CONNECT_SIZE) {
                ArrayBlockingQueue<Channel> channels = CHANNEL_POOL_MAP.get(address);
                if (channels == null || channels.size() < CHANNEL_CONNECT_SIZE) {
                    // Netty Channelのブロッキングキューを初期化
                    Channel channel = null;
                    while (channel == null) {
                        channel = registerChannel(address);
                    }

                    if (channels == null) {
                        channels = new ArrayBlockingQueue<>(CHANNEL_CONNECT_SIZE);
                    }

                    boolean offer = channels.offer(channel);
                    if (!offer) {
                        log.debug("channelArrayBlockingQueue fail");

                    } else {
                        CHANNEL_POOL_MAP.put(address, channels);
                    }
                }
            }
        }
    }

    /**
     * address に基づいてクライアントキューを取得します。
     */
    public ArrayBlockingQueue<Channel> acquire(InetSocketAddress address) {
        return CHANNEL_POOL_MAP.get(address);
    }

    /**
     * 使用後、channelをブロッキングキューに戻します。
     */
    public void release(ArrayBlockingQueue<Channel> queue, Channel channel, InetSocketAddress address) {
        if (queue == null) {
            return;
        }

        // 回収前にchannelが使用可能かどうかを判断します。
        if (channel == null || !channel.isOpen() || !channel.isActive() || !channel.isWritable()) {
            if (channel != null) {
                channel.deregister().syncUninterruptibly().awaitUninterruptibly();
                channel.closeFuture().syncUninterruptibly().awaitUninterruptibly();

            } else {
                while (channel == null) {
                    channel = registerChannel(address);
                }
            }
        }

        queue.offer(channel);
    }

    /**
     * netty クライアントを登録します。
     */
    public Channel registerChannel(InetSocketAddress address) {
        try {
            EventLoopGroup group = new NioEventLoopGroup(10);
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.remoteAddress(address);
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // Nettyエンコーダーを登録
                            ch.pipeline().addLast(new NettyEncoderHandler());
                            // Nettyデコーダーを登録
                            ch.pipeline().addLast(new NettyDecoderHandler(NettyResponse.class));
                            // クライアントビジネス処理ロジックHandlerを登録
                            ch.pipeline().addLast(new NettyHandlerClient());
                        }
                    });

            ChannelFuture channelFuture = bootstrap.connect().sync();
            final Channel channel = channelFuture.channel();
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            final List<Boolean> isSuccessHolder = Lists.newArrayListWithCapacity(1);
            // channelが確立されたかどうかを監視
            channelFuture.addListener(future -> {
                if (future.isSuccess()) {
                    isSuccessHolder.add(Boolean.TRUE);

                } else {
                    // 確立に失敗した場合、失敗のマークを保存
                    log.error("registerChannel fail , {}", future.cause().getMessage());
                    isSuccessHolder.add(Boolean.FALSE);
                }

                countDownLatch.countDown();
            });

            countDownLatch.await();
            // Channelが確立された場合、新しく作成されたChannelを返します。
            if (Boolean.TRUE.equals(isSuccessHolder.get(0))) {
                return channel;
            }

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("registerChannel fail", e);
        }

        return null;
    }
}

NettyChannelPoolFactory オブジェクト内には、registerChannel メソッドも定義されており、InetSocketAddress を引数として受け取り、戻り値は Channel です。このメソッドでは、渡された address 情報に基づいて Netty 接続を作成し、シリアル化とデシリアル化のコーデックを設定し、最後に NettyHandlerClient のクライアントメッセージ処理器を追加します。初期化された Channel 接続を返します。

上記の NettyChannelPoolFactory を使用して、登録センターから取得した Producer 情報に基づいて ip + port を取得し、Channel を取得して NettyRequest メッセージを送信します。

**NettyRequest メッセージの構築 **

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    // サービス名 + メソッド名に基づいて利用可能なノードを取得
List<Producer> producerList = registerCenter.getServiceProducer(serviceName, method.getName());
    // 直接0番目を取得します。ここでは負荷分散戦略を採用できます。
    Producer providerCopy =producerList.get(0) ;
		// NettyRequest 構築
NettyRequest request = NettyRequest.builder()
								// サービスノード情報
                .producer(providerCopy)
							// 今回のリクエストの一意の番号
                .uniqueKey(UUID.randomUUID() + "-" + Thread.currentThread().getId())
							// リクエストのタイムアウト時間
                .invokeTimeout(timeout)
							// リクエストメソッド名
                .invokeMethodName(method.getName())
							// リクエストパラメータ
                .args(args)
                .build();

}

これで NettyRequest とメッセージを送信するための Channel が揃いました。メッセージを送信し、結果を受信してメソッドの出力としてデシリアライズするだけです。

ここでは、スレッドプールの方法を使用して Netty メッセージの送信と戻り値のデコードを行います。

ClientServiceCallable を定義し、Callable<NettyResponse> を継承した戻り値のあるタスクインターフェースを作成します。

Callable には実装する必要があるメソッドが一つだけあります。call() メソッド内で、1. Channel オブジェクトを取得 2. リクエストを送信 3. 結果値を返す必要があります。

@Slf4j
public class ClientServiceCallable implements Callable<NettyResponse> {

    /**
     * Netty 通信パイプライン
     */
    private Channel channel;

    /**
     * リクエストパラメータ
     */
    private final NettyRequest request;

    public static ClientServiceCallable of(NettyRequest request) {
        return new ClientServiceCallable(request);
    }

    public ClientServiceCallable(NettyRequest request) {
        this.request = request;
    }

@Override
public NettyResponse call() throws Exception {
    InetSocketAddress inetSocketAddress = new InetSocketAddress(request.getProducer().getIp(), request.getProducer().getPort());
    // ローカルキャッシュ Channel キューを取得
    ArrayBlockingQueue<Channel> blockingQueue = NettyChannelPoolFactory.getInstance().acquire(inetSocketAddress);
    try {
        if (channel == null) {
            // キューから Channel を取得
            channel = blockingQueue.take();
        }

        if (channel == null) {
            throw new RuntimeException("このリクエストを解決するためのチャネルが見つかりません");
        }
    } catch (Exception e) {
        log.error("クライアントリクエスト送信エラー", e);

    } finally {
        // リクエスト終了後、キューに Channel を返す
        NettyChannelPoolFactory.getInstance().release(blockingQueue, channel, inetSocketAddress);
    }
}

上記のコードの call メソッド内では、まずローカルキャッシュから Channel キューを取得し、finally 内で Channel をキューに返します。メソッド内の残りのロジックは、NettyRequest リクエストを送信し、結果を返すことです。

try {
            if (channel == null) {
                channel = blockingQueue.take();
            }

            if (channel == null) {
                throw new RuntimeException("このリクエストを解決するためのチャネルが見つかりません");

            } else {
                1️⃣ ClientResponseHolder.initResponseData(request.getUniqueKey());

                2️⃣ while (!channel.isOpen() || !channel.isActive() || !channel.isWritable()) {
                    log.warn("新しいチャネルを取得するために再試行します");
                    channel = blockingQueue.poll(request.getInvokeTimeout(), TimeUnit.MILLISECONDS);
                    if (channel == null) {
                        // キューに利用可能なChannelがない場合、再度Channelを登録します。
                        channel = NettyChannelPoolFactory.getInstance().registerChannel(inetSocketAddress);
                    }
                }

                // 今回の呼び出し情報をNettyチャネルに書き込み、非同期呼び出しを開始します。
                3️⃣ ChannelFuture channelFuture = channel.writeAndFlush(request);
                channelFuture.syncUninterruptibly();
                // 戻り結果コンテナから戻り結果を取得し、待機タイムアウトをinvokeTimeoutに設定します。
                long invokeTimeout = request.getInvokeTimeout();
                4️⃣ return ClientResponseHolder.getValue(request.getUniqueKey(), invokeTimeout);
            }

        } catch (Exception e) {
            log.error("クライアントリクエスト送信エラー", e);

        } finally {
            NettyChannelPoolFactory.getInstance().release(blockingQueue, channel, inetSocketAddress);
        }

1️⃣ ClientResponseHolder クラス

ClientResponseHolder.initResponseData(request.getUniqueKey()); ここで新しいクラス ClientResponseHolder が追加されました。このクラスは何をするのでしょうか?

メッセージの送信はすべて非同期形式で行われるため、Map<String,NettyResponseWrapper> を使用してローカルデータをキャッシュします。Map の KEY は NeettyRequestuniqueKey であり、Value は Netty の戻り結果、つまりサービス側の実行後の戻り値です。

ClientResponseHolder の具体的な実装は次の通りです。

@Slf4j
public class ClientResponseHolder {

    private static final Map<String, NettyResponseWrapper> RESPONSE_WRAPPER_MAP = Maps.newConcurrentMap();

    private static final ScheduledExecutorService executorService;

    static {
        executorService = new ScheduledThreadPoolExecutor(1, new RemoveExpireThreadFactory("simple-rpc", false));
        // 過去データを削除します。
        executorService.scheduleWithFixedDelay(() -> {
            for (Map.Entry<String, NettyResponseWrapper> entry : RESPONSE_WRAPPER_MAP.entrySet()) {
                boolean expire = entry.getValue().isExpire();
                if (expire) {
                    RESPONSE_WRAPPER_MAP.remove(entry.getKey());
                }
            }
        }, 1, 20, TimeUnit.MILLISECONDS);
    }

    /**
     * 戻り結果コンテナを初期化します。requestUniqueKeyは今回の呼び出しを一意に識別します。
     */
    public static void initResponseData(String requestUniqueKey) {
        RESPONSE_WRAPPER_MAP.put(requestUniqueKey, NettyResponseWrapper.of());
    }

    /**
     * Netty呼び出しの非同期戻り結果をブロッキングキューに挿入します。
     */
    public static void putResultValue(NettyResponse response) {
        long currentTimeMillis = System.currentTimeMillis();
        NettyResponseWrapper responseWrapper = RESPONSE_WRAPPER_MAP.get(response.getUniqueKey());
        responseWrapper.setResponseTime(currentTimeMillis);
        responseWrapper.getResponseBlockingQueue().add(response);
        RESPONSE_WRAPPER_MAP.put(response.getUniqueKey(), responseWrapper);
    }

    /**
     * ブロッキングキューから非同期戻り結果を取得します。
     */
    public static NettyResponse getValue(String requestUniqueKey, long timeout) {
        NettyResponseWrapper responseWrapper = RESPONSE_WRAPPER_MAP.get(requestUniqueKey);
        try {
            return responseWrapper.getResponseBlockingQueue().poll(timeout, TimeUnit.MILLISECONDS);
            
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("値取得エラー", e);

        } finally {
            RESPONSE_WRAPPER_MAP.remove(requestUniqueKey);
        }
        return null;
    }

}
  • initResponseData: uniqueKey に基づいて Map を初期化します。
  • putResultValue: NettyResponse 戻り結果を挿入します。
  • getValue: uniqueKey に基づいて結果を取得します。

同時に、定期的に実行されるキューを定義し、responseTime に基づいてメッセージが期限切れかどうかを判断し、メモリデータをクリーンアップします。

2️⃣ Channel 状態判断

現在の Netty チャネルの状態を判断します。現在の Channel が使用できない場合、再度チャネルを申請する必要があります。

3️⃣ Netty メッセージ送信

4️⃣ ローカルキャッシュから Netty 戻り結果を取得します。

非同期呼び出し Netty サービス、Future を使用して戻り結果を取得

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    // サービス名 + メソッド名に基づいて利用可能なノードを取得
List<Producer> producerList = registerCenter.getServiceProducer(serviceName, method.getName());
    // 直接0番目を取得します。ここでは負荷分散戦略を採用できます。
    Producer providerCopy =producerList.get(0) ;
		// NettyRequest 構築
NettyRequest request = NettyRequest.builder()
								// サービスノード情報
                .producer(providerCopy)
							// 今回のリクエストの一意の番号
                .uniqueKey(UUID.randomUUID() + "-" + Thread.currentThread().getId())
							// リクエストのタイムアウト時間
                .invokeTimeout(timeout)
							// リクエストメソッド名
                .invokeMethodName(method.getName())
							// リクエストパラメータ
                .args(args)
                .build();
        // 非同期呼び出しを開始し、NettyClient を通じてリクエストを送信します。
        try {
            Future<NettyResponse> responseFuture = executorService.submit(ClientServiceCallable.of(request));
            NettyResponse response = responseFuture.get(timeout, TimeUnit.MILLISECONDS);
            if (response != null) {
                return response.getResult();
            }

        } catch (Exception e) {
            log.error("リクエスト送信エラー", e);
        }
}

これで、ClientProxyBeanFactory プロキシオブジェクトの完全な記述が完了しました。次に、初期化されたプロキシオブジェクトを依存性注入します。

// ConsumerAnnotaionBean.class
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
				// プロキシクラス名
        String serviceName = targetItf.getName();
				// 登録センターサービス
        IRegisterCenter registerCenter = IRegisterCenterZkImpl.getInstance();
				// サービス名 + メソッド名に基づいて利用可能なノードを取得
        List<Producer> producerList = registerCenter.getServiceProducer(serviceName, method.getName());
				// 3.サービスプロキシオブジェクトを取得
            Class<?> targetItf = rpcClient.targetItf();
            if (targetItf == Object.class) {
                targetItf = field.getType();
           }
				// プロキシオブジェクトを初期化
				ClientProxyBeanFactory factory = ClientProxyBeanFactory.getInstance(targetItf, rpcClient.timeout(), rpcClient.consumeThreads());
            ReflectionUtils.makeAccessible(field);
            try {
                // プロキシオブジェクトを設定
                field.set(bean, factory.getProxy());

            } catch (IllegalAccessException e) {
                log.error("ReferenceBeanPostProcessor プロパティ処理エラー, beanName={}", beanName, e);
                throw new RuntimeException("ReferenceBeanPostProcessor プロパティ処理エラー, beanName=" + beanName, e);
            }
}

ClientProxyBeanFactory.getInstance を通じてプロキシオブジェクトを取得した後、field.set メソッドを使用して実行値を設定します。

上記の操作を完了した後、クライアントが get メソッドを実行すると、ClientProxyBeanFactoryinvoke メソッドに invoke され、その後 Netty 接続を開き、メッセージの送受信を行う 内容が実行され、サービス側の結果が返されます。

サービスノードの検索、Netty 接続の事前作成#

この部分の内容は第二ステップと重複しており、核心ロジックは次の通りです:

Mermaid Loading...

これで、クライアントのすべてのプロセスが記述されました。しかし、主要な考え方を整理するために、記事内では負荷分散戦略、シリアル化とデシリアル化などについては簡単に触れただけです。これらも RPC フレームワークの非常に重要な部分です。

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。