自分の RPC フレームワークを開発する方法 (2)#
前回の記事では、クライアントの呼び出しプロセスから、各段階のデータがどのように連携しているかを詳しく理解しました。今回の内容では、主にレジストリセンターの観点から、レジストリセンターがクライアントとサーバーそれぞれの報告情報をどのように維持しているかを見ていきます。
レジストリセンターとして使用できるミドルウェアは非常に多く、一般的なものには ZooKeeper、Consul、Nacos などがあります。本プロジェクトでは、Zookeeper をレジストリセンターのミドルウェアとして採用しています。
機能整理#
同様に、始める前にレジストリセンターが主に実現する機能を整理してみましょう。
インターフェース定義#
IRegisterCenter
という名前のインターフェースを定義し、そのインターフェースは主に以下の機能を実現します。
- サービス提供者のノード報告
- サービス提供者ノード情報の更新
- サービス提供者のローカルデータキャッシュとノード情報の購読
- サービス消費者がサービス名に基づいてノードアドレスを取得
- サービス消費者がサービスノード情報を購読
- サービス消費者情報の登録
上記の 6 つの要件があれば、インターフェースに主に含まれるメソッドは以下のようになります。
// IRegisterCenter.class
public interface IRegisterCenter {
/**
* サービス提供者ノードの登録
*/
void register(List<Producer> producerList);
/**
* サービス提供者がノードリストを取得
*/
Map<String, List<Producer>> getProducersMap();
/**
* サービス提供者サービスの破棄
*/
void destroy(String name);
/**
* サービス消費者がサービスノードリストを初期化
*
* @param remoteAppKey サービス提供者のユニーク識別子
* @param groupName サービスグループ名
*/
void initProviderMap(String remoteAppKey, String groupName);
/**
* サービス消費者がすべてのノードリストを取得
*/
List<Producer> getServiceNode();
/**
* サービス消費者がノード情報を取得
*
* @param serviceName サービス名
* @param methodName サービスメソッド名
*/
List<Producer> getServiceProducer(String serviceName, String methodName);
/**
* サービス消費者情報を登録して監視する
*/
void registerConsumer(Consumer consumer);
}
設定ファイルの読み込み#
レジストリセンターのインターフェース定義が完了したら、zookeeper を使用してインターフェースを実装する必要があります。しかし、インターフェースの実装の前に、RPC フレームワークの関連設定項目を読み込むヘルパーメソッドを実装する必要があります。
前の設計に基づいて、必要な設定パラメータを大まかに列挙できます。
- appkey: サービスのユニーク識別子
- groupName: サービスのグループ名
- zookServer: zk ミドルウェアサービスアドレス
- nettyIp: サービス提供者の呼び出しアドレス
- nettyPort: サービス提供者の呼び出しポート
もちろん、サービス呼び出しのタイムアウト時間やサービスセッションのタイムアウト時間など、他のパラメータも含まれます。
上記の要件整理が完了したら、RpcPropertiesUtil
メソッドの作成を開始できます。
@Slf4j
public class RpcPropertiesUtil {
private static final Map<String, Object> PRODUCER_MAP;
private static final String PROPERTIES_FILE_NAME = "rpc.properties";
static {
PRODUCER_MAP = Maps.newConcurrentMap();
try {
Properties properties = PropertiesLoaderUtils.loadAllProperties(PROPERTIES_FILE_NAME);
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
PRODUCER_MAP.put(entry.getKey().toString(), entry.getValue());
}
} catch (IOException e) {
log.error("設定ファイルの読み込み異常", e);
throw new RuntimeException("Producer Load Properties Exception");
}
}
// 先に環境変数から読み取る
private static String getString(Map<String, Object> map, String key, String defaultValue) {
String envKey = System.getenv().get(key.toUpperCase());
String result = envKey != null ? envKey : MapUtils.getString(map, key);
return StringUtils.isEmpty(result) ? defaultValue : result;
}
private static Integer getInteger(Map<String, Object> map, String key, Integer defaultValue) {
String envKey = System.getenv().get(key.toUpperCase());
Integer result = envKey != null ? Integer.parseInt(envKey) : MapUtils.getInteger(map, key);
return result == null ? defaultValue : result;
}
}
- 設定ファイルキャッシュ用に
PRODUCER_MAP
を定義 - static メソッド内で、
[rpc.properties](http://rpc.properties)
設定ファイルを読み込み、Map に変換 - 環境変数から設定を読み取るための
getString
とgetInteger
の 2 つのメソッドを定義
上記の基本的な作成が完了したら、rpc_app_key
の読み取りをインスタンス化します。
public static String getAppKey() {
return getString(PRODUCER_MAP, "rpc_app_key", "test");
}
zookeeper インターフェース実装#
インターフェースのメソッドがすでに定義されているので、インターフェースのメソッドの意味に基づいて、対応する実装を行うだけです。
IRegisterCenterZkImpl
というクラスを定義し、IRegisterCenter
インターフェースを実装します。
メンバー変数の紹介#
private static final IRegisterCenterZkImpl instance = new IRegisterCenterZkImpl();
private volatile ZkClient zkClient = null;
// ZK 関連情報
private static final String ZK_ADDRESS = RpcPropertiesUtil.getZkServers();
private static final int ZK_SESSION_TIMEOUT = RpcPropertiesUtil.getSessionTimeout();
private static final int ZK_CONNECTION_TIMEOUT = RpcPropertiesUtil.getConnectionTimeout();
// ローカルサービスアドレス
private static final String LOCAL_IP = RpcPropertiesUtil.getServerIp();
// サービスキャッシュ(サーバーキャッシュ)
private static final Map<String, List<Producer>> PROVIDER_MAP = new ConcurrentHashMap<>();
// サービスキャッシュ(クライアントキャッシュ)
private static final Map<String, List<Producer>> SERVICE_METADATA = Maps.newConcurrentMap();
// ZK のサービスアドレス
private static final String ROOT_PATH = "/config_register";
public static final String PROVIDER_TYPE = "/provider";
public static final String CONSUMER_TYPE = "/consumer";
/**
* サービス名からビーンマップ
*/
public static final Map<String, Object> PRODUCER_BEAN_MAP = Maps.newConcurrentMap();
register メソッドの実装#
register
はレジストリセンターで最も重要なメソッドであり、主にサービス提供者のノード情報の報告を受け取ることを目的としています。
//IRegisterCenterZkImpl.class
@Override
public void register(List<Producer> producerList) {
if (CollectionUtils.isEmpty(producerList)) {
log.debug("RegisterCenterImpl registerProvider providers is empty, ignore it, providers={}", producerList);
return;
}
synchronized (IRegisterCenterZkImpl.class) {
// zk クライアントの初期化
1️⃣ this.initZkClient();
// ローカルキャッシュ
2️⃣ this.setLocalCache(producerList);
3️⃣ for (Map.Entry<String, List<Producer>> entry : PROVIDER_MAP.entrySet()) {
String serviceName = entry.getKey();
List<Producer> producers = entry.getValue();
// ルートノードの作成
Producer firstProducer = producers.get(0);
String appKey = firstProducer.getAppKey();
String rootNode = getRootPath(appKey);
4️⃣ this.createRootNode(rootNode);
// サービスグループ名
String groupName = firstProducer.getGroupName();
// サービス提供者ノードの作成
5️⃣ String servicePath = getProducerServicePath(appKey, groupName, serviceName);
for (Producer producer : producers) {
this.createServiceNode(servicePath);
String producerMathodPath = producerToPath(servicePath, producer);
6️⃣ this.createCurrentServiceIpNode(producerMathodPath);
log.debug("create current service node success, node path = {} ,method path = {}", servicePath, producerMathodPath);
}
// ローカルリスニング
7️⃣ subscribeChildChanges(serviceName, servicePath, PROVIDER_MAP);
}
}
}
1️⃣:ローカルで zk のリクエストクライアントを初期化
//IRegisterCenterZkImpl.class
private void initZkClient() {
if (zkClient == null) {
zkClient = new ZkClient(ZK_ADDRESS, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT);
}
}
オブジェクト定義メンバー変数 zkClient
2️⃣:ローカルノードキャッシュを設定
//IRegisterCenterZkImpl.class
private void setLocalCache(List<Producer> producerList) {
for (Producer producer : producerList) {
String name = producer.getServiceItf().getName();
List<Producer> producerListCache = PROVIDER_MAP.get(name);
if (producerListCache == null) {
producerListCache = Lists.newArrayList();
}
producerListCache.add(producer);
PROVIDER_MAP.put(name, producerListCache);
}
}
ローカルメンバー変数 PROVIDER_MAP
が存在し、Map の Key はサービスインターフェースのクラス名です。
3️⃣:クラス名に基づいてノード情報を反復処理
4️⃣:ルートノード情報を作成、ルートノードアドレスは /config_register/{appKey}
です。
5️⃣:サービス名に基づいてサービスノード情報を作成、サービスノードアドレスは /config_register/{appKey}/{groupName}/{serviceName}
であり、serviceName
はサービスインターフェースのクラス名です。
6️⃣:インターフェースクラス名に基づいて具体的なメソッドノード情報を作成します。producerToPath
メソッドは、Producer
オブジェクトを Zk のパス情報に変換することを目的としています。主な実装は以下の通りです。
private String producerToPath(String servicePath, Producer producer) {
return servicePath + "/" + producer.getIp() + "|"
+ producer.getPort() + "|"
+ producer.getWeight() + "|"
+ producer.getWorkerThreads() + "|"
+ producer.getMethod().getName() + "|"
+ producer.getGroupName();
}
返されるアドレス情報の例は次のとおりです: /config_register/{appKey}/{groupName}/{serviceName}/127.0.0.1|9999|1|10|get|defaut
127.0.0.1|9999|1|10|get|defaut
というパラメータの意味は、サービスアドレス + サービスポート + 重み + ワーカースレッド数 + メソッド名 + サービスグループ名です。
createCurrentServiceIpNode
メソッドの実装は以下の通りです。
private void createCurrentServiceIpNode(String currentServiceIpNode) {
if (!zkClient.exists(currentServiceIpNode)) {
// 一時ノード
zkClient.createEphemeral(currentServiceIpNode);
}
}
7️⃣:サービスアドレスのリスニング
subscribeChildChanges
メソッドの実装は以下の通りです。
private void subscribeChildChanges(String serviceName, String servicePath, Map<String, List<Producer>> dataMap) {
// 1. zk からパスを読み取る
// 2. パス値に基づいて producer オブジェクトを逆シリアル化
// 3. producer オブジェクトを dataMap に追加
zkClient.subscribeChildChanges(servicePath, (parentPath, currentChilds) -> {
if (currentChilds == null) {
currentChilds = new ArrayList<>();
}
List<Producer> producers = currentChilds.stream().map(currentChild -> pathToProducer(serviceName, currentChild)).collect(Collectors.toList());
dataMap.put(serviceName, producers);
});
}
initProviderMap サービス消費者ノードの取得#
@Override
public void initProviderMap(String remoteAppKey, String groupName) {
if (MapUtils.isEmpty(SERVICE_METADATA)) {
SERVICE_METADATA.putAll(this.fetchOrUpdateServiceMetaData(remoteAppKey, groupName));
}
}
private Map<String, List<Producer>> fetchOrUpdateServiceMetaData(String remoteAppKey, String groupName) {
final Map<String, List<Producer>> providerServiceMap = Maps.newHashMap();
this.initZkClient();
// グループに基づいてサービス提供者のアドレスを取得
String providerNode = getRootPath(remoteAppKey) + UrlConstants.SLASH + groupName;
// すべての serverName アドレスを取得
1️⃣ List<String> producerServices = zkClient.getChildren(providerNode);
for (String serviceName : producerServices) {
String servicePath = getProducerServicePath(remoteAppKey, groupName, serviceName);
2️⃣ List<String> producerPaths = zkClient.getChildren(servicePath);
// すべての producer アドレス(メソッド)を取得
for (String producerPath : producerPaths) {
3️⃣ Producer pathToProducer = pathToProducer(serviceName, producerPath);
List<Producer> providerList = providerServiceMap.get(serviceName);
if (providerList == null) {
providerList = new ArrayList<>();
}
providerList.add(pathToProducer);
4️⃣ providerServiceMap.put(serviceName, providerList);
}
5️⃣ subscribeChildChanges(serviceName, servicePath, SERVICE_METADATA);
}
return providerServiceMap;
}
1️⃣:appKey
と groupName
に基づいて zk から登録されたすべてのサービスノードを照会
2️⃣:serviceName
に基づいてそのサービスのすべてのノード情報を取得
3️⃣:pathToProducer
を使用して zk のパス情報を Producer
オブジェクトに変換
4️⃣:ノード情報をローカルにキャッシュ
5️⃣:サービス提供者の登録と同様に、サービス下のノードの変化を購読し、ローカルの Map
を更新します。
getServiceProducer サービス消費者がサービスノードを取得#
@Override
public List<Producer> getServiceProducer(String serviceName, String methodName) {
List<Producer> producers = SERVICE_METADATA.get(serviceName);
return producers == null ? null :
producers.stream().filter(producer ->
producer.getMethod() != null &&
producer.getMethod().getName().equals(methodName)).collect(Collectors.toList());
}
ここでの実装コードは非常にシンプルで、ローカルキャッシュの SERVICE_METADATA
オブジェクトを反復処理するだけです。
他のメソッドの操作はすべてローカルキャッシュ Map
に対する操作であるため、ここでは詳細に展開することはありません。
これで、レジストリセンターの最も重要なメソッドの作成が完了し、サービス提供者のサービス登録とサービス消費者のサービスノード購読を処理しました。
次の章では、サービス提供者ノード情報の登録と Netty
メッセージの処理を完了します。