simon

simon

自分自身の RPC フレームワークを開発する方法 (二)

自分の RPC フレームワークを開発する方法 (2)#

前回の記事では、クライアントの呼び出しプロセスから、各段階のデータがどのように連携しているかを詳しく理解しました。今回の内容では、主にレジストリセンターの観点から、レジストリセンターがクライアントとサーバーそれぞれの報告情報をどのように維持しているかを見ていきます。

レジストリセンターとして使用できるミドルウェアは非常に多く、一般的なものには ZooKeeperConsulNacos などがあります。本プロジェクトでは、Zookeeper をレジストリセンターのミドルウェアとして採用しています。

機能整理#

同様に、始める前にレジストリセンターが主に実現する機能を整理してみましょう。

Mermaid Loading...

インターフェース定義#

IRegisterCenter という名前のインターフェースを定義し、そのインターフェースは主に以下の機能を実現します。

  1. サービス提供者のノード報告
  2. サービス提供者ノード情報の更新
  3. サービス提供者のローカルデータキャッシュとノード情報の購読
  4. サービス消費者がサービス名に基づいてノードアドレスを取得
  5. サービス消費者がサービスノード情報を購読
  6. サービス消費者情報の登録

上記の 6 つの要件があれば、インターフェースに主に含まれるメソッドは以下のようになります。

// IRegisterCenter.class
	public interface IRegisterCenter {

    /**
     * サービス提供者ノードの登録
     */
    void register(List<Producer> producerList);

    /**
     * サービス提供者がノードリストを取得
     */
    Map<String, List<Producer>> getProducersMap();

    /**
     * サービス提供者サービスの破棄
     */
    void destroy(String name);

    /**
     * サービス消費者がサービスノードリストを初期化
     *
     * @param remoteAppKey サービス提供者のユニーク識別子
     * @param groupName    サービスグループ名
     */
    void initProviderMap(String remoteAppKey, String groupName);

    /**
     * サービス消費者がすべてのノードリストを取得
     */
    List<Producer> getServiceNode();

    /**
     * サービス消費者がノード情報を取得
     *
     * @param serviceName サービス名
     * @param methodName  サービスメソッド名
     */
    List<Producer> getServiceProducer(String serviceName, String methodName);

    /**
     * サービス消費者情報を登録して監視する
     */
    void registerConsumer(Consumer consumer);

}

設定ファイルの読み込み#

レジストリセンターのインターフェース定義が完了したら、zookeeper を使用してインターフェースを実装する必要があります。しかし、インターフェースの実装の前に、RPC フレームワークの関連設定項目を読み込むヘルパーメソッドを実装する必要があります。

前の設計に基づいて、必要な設定パラメータを大まかに列挙できます。

  1. appkey: サービスのユニーク識別子
  2. groupName: サービスのグループ名
  3. zookServer: zk ミドルウェアサービスアドレス
  4. nettyIp: サービス提供者の呼び出しアドレス
  5. nettyPort: サービス提供者の呼び出しポート

もちろん、サービス呼び出しのタイムアウト時間やサービスセッションのタイムアウト時間など、他のパラメータも含まれます。

上記の要件整理が完了したら、RpcPropertiesUtil メソッドの作成を開始できます。

@Slf4j
public class RpcPropertiesUtil {
    private static final Map<String, Object> PRODUCER_MAP;

    private static final String PROPERTIES_FILE_NAME = "rpc.properties";

    static {
        PRODUCER_MAP = Maps.newConcurrentMap();
        try {
            Properties properties = PropertiesLoaderUtils.loadAllProperties(PROPERTIES_FILE_NAME);
            for (Map.Entry<Object, Object> entry : properties.entrySet()) {
                PRODUCER_MAP.put(entry.getKey().toString(), entry.getValue());
            }

        } catch (IOException e) {
            log.error("設定ファイルの読み込み異常", e);
            throw new RuntimeException("Producer Load Properties Exception");
        }
    }

		// 先に環境変数から読み取る
		private static String getString(Map<String, Object> map, String key, String defaultValue) {
        String envKey = System.getenv().get(key.toUpperCase());
        String result = envKey != null ? envKey : MapUtils.getString(map, key);
        return StringUtils.isEmpty(result) ? defaultValue : result;
    }

    private static Integer getInteger(Map<String, Object> map, String key, Integer defaultValue) {
        String envKey = System.getenv().get(key.toUpperCase());
        Integer result = envKey != null ? Integer.parseInt(envKey) : MapUtils.getInteger(map, key);
        return result == null ? defaultValue : result;
    }
}
  1. 設定ファイルキャッシュ用に PRODUCER_MAP を定義
  2. static メソッド内で、[rpc.properties](http://rpc.properties) 設定ファイルを読み込み、Map に変換
  3. 環境変数から設定を読み取るための getStringgetInteger の 2 つのメソッドを定義

上記の基本的な作成が完了したら、rpc_app_key の読み取りをインスタンス化します。

public static String getAppKey() {
        return getString(PRODUCER_MAP, "rpc_app_key", "test");
 }

zookeeper インターフェース実装#

インターフェースのメソッドがすでに定義されているので、インターフェースのメソッドの意味に基づいて、対応する実装を行うだけです。

IRegisterCenterZkImpl というクラスを定義し、IRegisterCenter インターフェースを実装します。

メンバー変数の紹介#

private static final IRegisterCenterZkImpl instance = new IRegisterCenterZkImpl();

    private volatile ZkClient zkClient = null;

    // ZK 関連情報
    private static final String ZK_ADDRESS = RpcPropertiesUtil.getZkServers();
    private static final int ZK_SESSION_TIMEOUT = RpcPropertiesUtil.getSessionTimeout();
    private static final int ZK_CONNECTION_TIMEOUT = RpcPropertiesUtil.getConnectionTimeout();
    // ローカルサービスアドレス
    private static final String LOCAL_IP = RpcPropertiesUtil.getServerIp();

    // サービスキャッシュ(サーバーキャッシュ)
    private static final Map<String, List<Producer>> PROVIDER_MAP = new ConcurrentHashMap<>();
    // サービスキャッシュ(クライアントキャッシュ)
    private static final Map<String, List<Producer>> SERVICE_METADATA = Maps.newConcurrentMap();

    // ZK のサービスアドレス
    private static final String ROOT_PATH = "/config_register";
    public static final String PROVIDER_TYPE = "/provider";
    public static final String CONSUMER_TYPE = "/consumer";

    /**
     * サービス名からビーンマップ
     */
    public static final Map<String, Object> PRODUCER_BEAN_MAP = Maps.newConcurrentMap();

register メソッドの実装#

register はレジストリセンターで最も重要なメソッドであり、主にサービス提供者のノード情報の報告を受け取ることを目的としています。

//IRegisterCenterZkImpl.class
@Override
    public void register(List<Producer> producerList) {
        if (CollectionUtils.isEmpty(producerList)) {
            log.debug("RegisterCenterImpl registerProvider providers is empty, ignore it, providers={}", producerList);
            return;
        }

        synchronized (IRegisterCenterZkImpl.class) {
            // zk クライアントの初期化
            1️⃣ this.initZkClient();

						// ローカルキャッシュ
            2️⃣ this.setLocalCache(producerList);							
            3️⃣ for (Map.Entry<String, List<Producer>> entry : PROVIDER_MAP.entrySet()) {
                String serviceName = entry.getKey();
                List<Producer> producers = entry.getValue();

                // ルートノードの作成
                Producer firstProducer = producers.get(0);
                String appKey = firstProducer.getAppKey();
                String rootNode = getRootPath(appKey);
               4️⃣ this.createRootNode(rootNode);

                // サービスグループ名
                String groupName = firstProducer.getGroupName();
                // サービス提供者ノードの作成
              5️⃣  String servicePath = getProducerServicePath(appKey, groupName, serviceName);

                for (Producer producer : producers) {
                    this.createServiceNode(servicePath);
                    String producerMathodPath = producerToPath(servicePath, producer);
                6️⃣  this.createCurrentServiceIpNode(producerMathodPath);
                    log.debug("create current service node success, node path = {} ,method path = {}", servicePath, producerMathodPath);
                }

                // ローカルリスニング
                7️⃣ subscribeChildChanges(serviceName, servicePath, PROVIDER_MAP);
            }
        }
}

1️⃣:ローカルで zk のリクエストクライアントを初期化

//IRegisterCenterZkImpl.class
private void initZkClient() {
        if (zkClient == null) {
            zkClient = new ZkClient(ZK_ADDRESS, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT);
        }
    }

オブジェクト定義メンバー変数 zkClient

2️⃣:ローカルノードキャッシュを設定

//IRegisterCenterZkImpl.class
private void setLocalCache(List<Producer> producerList) {
        for (Producer producer : producerList) {
            String name = producer.getServiceItf().getName();
            List<Producer> producerListCache = PROVIDER_MAP.get(name);
            if (producerListCache == null) {
                producerListCache = Lists.newArrayList();
            }

            producerListCache.add(producer);
            PROVIDER_MAP.put(name, producerListCache);
        }
    }

ローカルメンバー変数 PROVIDER_MAP が存在し、Map の Key はサービスインターフェースのクラス名です。

3️⃣:クラス名に基づいてノード情報を反復処理

4️⃣:ルートノード情報を作成、ルートノードアドレスは /config_register/{appKey} です。

5️⃣:サービス名に基づいてサービスノード情報を作成、サービスノードアドレスは /config_register/{appKey}/{groupName}/{serviceName} であり、serviceName はサービスインターフェースのクラス名です。

6️⃣:インターフェースクラス名に基づいて具体的なメソッドノード情報を作成します。producerToPath メソッドは、Producer オブジェクトを Zk のパス情報に変換することを目的としています。主な実装は以下の通りです。

private String producerToPath(String servicePath, Producer producer) {
        return servicePath + "/" + producer.getIp() + "|"
                + producer.getPort() + "|"
                + producer.getWeight() + "|"
                + producer.getWorkerThreads() + "|"
                + producer.getMethod().getName() + "|"
                + producer.getGroupName();
    }

返されるアドレス情報の例は次のとおりです: /config_register/{appKey}/{groupName}/{serviceName}/127.0.0.1|9999|1|10|get|defaut

127.0.0.1|9999|1|10|get|defaut というパラメータの意味は、サービスアドレス + サービスポート + 重み + ワーカースレッド数 + メソッド名 + サービスグループ名です。

createCurrentServiceIpNode メソッドの実装は以下の通りです。

private void createCurrentServiceIpNode(String currentServiceIpNode) {
        if (!zkClient.exists(currentServiceIpNode)) {
            // 一時ノード
            zkClient.createEphemeral(currentServiceIpNode);
        }
    }

7️⃣:サービスアドレスのリスニング

subscribeChildChanges メソッドの実装は以下の通りです。

private void subscribeChildChanges(String serviceName, String servicePath, Map<String, List<Producer>> dataMap) {
        // 1. zk からパスを読み取る
        // 2. パス値に基づいて producer オブジェクトを逆シリアル化
        // 3. producer オブジェクトを dataMap に追加
        zkClient.subscribeChildChanges(servicePath, (parentPath, currentChilds) -> {
            if (currentChilds == null) {
                currentChilds = new ArrayList<>();
            }

            List<Producer> producers = currentChilds.stream().map(currentChild -> pathToProducer(serviceName, currentChild)).collect(Collectors.toList());
            dataMap.put(serviceName, producers);
        });
    }

initProviderMap サービス消費者ノードの取得#

@Override
    public void initProviderMap(String remoteAppKey, String groupName) {
        if (MapUtils.isEmpty(SERVICE_METADATA)) {
            SERVICE_METADATA.putAll(this.fetchOrUpdateServiceMetaData(remoteAppKey, groupName));
        }
    }

private Map<String, List<Producer>> fetchOrUpdateServiceMetaData(String remoteAppKey, String groupName) {
        final Map<String, List<Producer>> providerServiceMap = Maps.newHashMap();
        this.initZkClient();

        // グループに基づいてサービス提供者のアドレスを取得
        String providerNode = getRootPath(remoteAppKey) + UrlConstants.SLASH + groupName;
        // すべての serverName アドレスを取得
       1️⃣ List<String> producerServices = zkClient.getChildren(providerNode);

        for (String serviceName : producerServices) {
             String servicePath = getProducerServicePath(remoteAppKey, groupName, serviceName);
            2️⃣ List<String> producerPaths = zkClient.getChildren(servicePath);

            // すべての producer アドレス(メソッド)を取得
            for (String producerPath : producerPaths) {
              3️⃣  Producer pathToProducer = pathToProducer(serviceName, producerPath);
                List<Producer> providerList = providerServiceMap.get(serviceName);
                if (providerList == null) {
                    providerList = new ArrayList<>();
                }

                providerList.add(pathToProducer);
            4️⃣     providerServiceMap.put(serviceName, providerList);
            }

            5️⃣ subscribeChildChanges(serviceName, servicePath, SERVICE_METADATA);
        }

        return providerServiceMap;
    }

1️⃣:appKeygroupName に基づいて zk から登録されたすべてのサービスノードを照会

2️⃣:serviceName に基づいてそのサービスのすべてのノード情報を取得

3️⃣:pathToProducer を使用して zk のパス情報を Producer オブジェクトに変換

4️⃣:ノード情報をローカルにキャッシュ

5️⃣:サービス提供者の登録と同様に、サービス下のノードの変化を購読し、ローカルの Map を更新します。

getServiceProducer サービス消費者がサービスノードを取得#

@Override
    public List<Producer> getServiceProducer(String serviceName, String methodName) {
        List<Producer> producers = SERVICE_METADATA.get(serviceName);
        return producers == null ? null :
                producers.stream().filter(producer ->
                        producer.getMethod() != null &&
                                producer.getMethod().getName().equals(methodName)).collect(Collectors.toList());
    }

ここでの実装コードは非常にシンプルで、ローカルキャッシュの SERVICE_METADATA オブジェクトを反復処理するだけです。

他のメソッドの操作はすべてローカルキャッシュ Map に対する操作であるため、ここでは詳細に展開することはありません。

これで、レジストリセンターの最も重要なメソッドの作成が完了し、サービス提供者のサービス登録とサービス消費者のサービスノード購読を処理しました。

次の章では、サービス提供者ノード情報の登録と Netty メッセージの処理を完了します。

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。