如何開發一個自己的 RPC 框架 (二)#
上篇文章中,從客戶端的調用流程,詳細的了解了各個環節的數據是如何串聯起來的。那么本章內容,將主要從註冊中心的角度出發,看看註冊中心是如何維護客戶端和服務端各自上報的信息。
能夠作為註冊中心的中間件非常多,常見的比如 ZooKeeper、Consul、Nacos 等,本項目中採用的是 Zookeeper 作為註冊中心中間件
功能梳理#
同樣,開始之前我們先來理一下註冊中心主要實現那些功能
接口定義#
定義一個名為 IRegisterCenter
的接口,其中接口主要實現如下功能:
- 服務提供方的節點上報
- 服務提供方節點信息更新
- 服務提供方的本地數據緩存和節點信息訂閱
- 服務消費方根據服務名獲取節點地址
- 服務消費方訂閱服務節點信息
- 服務消費方信息註冊
有了上面 6 大需求,就可以知道接口中主要包含的方法如下:
// IRegisterCenter.class
public interface IRegisterCenter {
/**
* 服務提供方節點註冊
*/
void register(List<Producer> producerList);
/**
* 服務提供方獲取節點列表
*/
Map<String, List<Producer>> getProducersMap();
/**
* 服務提供方服務銷毀
*/
void destroy(String name);
/**
* 服務消費方初始化服務節點列表
*
* @param remoteAppKey 服務提供者唯一標識
* @param groupName 服務組名
*/
void initProviderMap(String remoteAppKey, String groupName);
/**
* 服務消費方獲取所有節點列表
*/
List<Producer> getServiceNode();
/**
* 服務消費方獲取節點信息
*
* @param serviceName 服務名稱
* @param methodName 服務方法名稱
*/
List<Producer> getServiceProducer(String serviceName, String methodName);
/**
* 註冊服務消費者信息 用於監控
*/
void registerConsumer(Consumer consumer);
}
配置文件讀取#
註冊中心的接口定義完成之後,就需要使用 zookeeper 進行接口實現。但是在接口實現之前,需要實現一個幫助方法讀取 RPC 框架的相關配置項。
根據前面的設計,可以大概羅列一下需要用到的配置參數
- appkey: 服務的唯一標識
- groupName: 服務的分組名稱
- zookServer: zk 中間件服務地址
- nettyIp: 服務提供方調用地址
- nettyPort: 服務提供方調用端口
當然還包含一些其他參數,比如服務調用超時時間、服務會話超時時間等
完成上面需求整理之後,就可以開始 RpcPropertiesUtil
方法的編寫了
@Slf4j
public class RpcPropertiesUtil {
private static final Map<String, Object> PRODUCER_MAP;
private static final String PROPERTIES_FILE_NAME = "rpc.properties";
static {
PRODUCER_MAP = Maps.newConcurrentMap();
try {
Properties properties = PropertiesLoaderUtils.loadAllProperties(PROPERTIES_FILE_NAME);
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
PRODUCER_MAP.put(entry.getKey().toString(), entry.getValue());
}
} catch (IOException e) {
log.error("讀取配置文件異常", e);
throw new RuntimeException("Producer Load Properties Exception");
}
}
// 先從環境變量讀取
private static String getString(Map<String, Object> map, String key, String defaultValue) {
String envKey = System.getenv().get(key.toUpperCase());
String result = envKey != null ? envKey : MapUtils.getString(map, key);
return StringUtils.isEmpty(result) ? defaultValue : result;
}
private static Integer getInteger(Map<String, Object> map, String key, Integer defaultValue) {
String envKey = System.getenv().get(key.toUpperCase());
Integer result = envKey != null ? Integer.parseInt(envKey) : MapUtils.getInteger(map, key);
return result == null ? defaultValue : result;
}
}
- 定義一個
PRODUCER_MAP
用來做配置文件緩存 - static 靜態方法中,讀取
[rpc.properties](http://rpc.properties)
配置文件,轉換成 Map - 定義了兩個
getString
和getInteger
的方法,表示先從環境變量中讀取配置,不存在時從配置文件中讀取
完成上述基礎編寫之後,實例一個 rpc_app_key
的讀取
public static String getAppKey() {
return getString(PRODUCER_MAP, "rpc_app_key", "test");
}
zookeeper 接口實現#
由于接口的方法都已經定義完成,只需要根據接口的方法含義,進行對應的實現即可
定義一個類 IRegisterCenterZkImpl
實現 IRegisterCenter
接口
成員變量介紹#
private static final IRegisterCenterZkImpl instance = new IRegisterCenterZkImpl();
private volatile ZkClient zkClient = null;
// ZK 相關信息
private static final String ZK_ADDRESS = RpcPropertiesUtil.getZkServers();
private static final int ZK_SESSION_TIMEOUT = RpcPropertiesUtil.getSessionTimeout();
private static final int ZK_CONNECTION_TIMEOUT = RpcPropertiesUtil.getConnectionTimeout();
// 本地服務地址
private static final String LOCAL_IP = RpcPropertiesUtil.getServerIp();
// 服務緩存(服務端緩存)
private static final Map<String, List<Producer>> PROVIDER_MAP = new ConcurrentHashMap<>();
// 服務緩存(客戶端緩存)
private static final Map<String, List<Producer>> SERVICE_METADATA = Maps.newConcurrentMap();
// ZK 的服務地址
private static final String ROOT_PATH = "/config_register";
public static final String PROVIDER_TYPE = "/provider";
public static final String CONSUMER_TYPE = "/consumer";
/**
* service name to bean map
*/
public static final Map<String, Object> PRODUCER_BEAN_MAP = Maps.newConcurrentMap();
register 方法實現#
register
應該是整個註冊中心最重要的方法了,他主要目的是接收服務提供方的節點信息上報
//IRegisterCenterZkImpl.class
@Override
public void register(List<Producer> producerList) {
if (CollectionUtils.isEmpty(producerList)) {
log.debug("RegisterCenterImpl registerProvider providers is empty, ignore it, providers={}", producerList);
return;
}
synchronized (IRegisterCenterZkImpl.class) {
// 初始化 zk 客戶端
1️⃣ this.initZkClient();
// 本地緩存
2️⃣ this.setLocalCache(producerList);
3️⃣ for (Map.Entry<String, List<Producer>> entry : PROVIDER_MAP.entrySet()) {
String serviceName = entry.getKey();
List<Producer> producers = entry.getValue();
// 創建根節點
Producer firstProducer = producers.get(0);
String appKey = firstProducer.getAppKey();
String rootNode = getRootPath(appKey);
4️⃣ this.createRootNode(rootNode);
// 服務組名
String groupName = firstProducer.getGroupName();
// 創建服務提供者節點
5️⃣ String servicePath = getProducerServicePath(appKey, groupName, serviceName);
for (Producer producer : producers) {
this.createServiceNode(servicePath);
String producerMathodPath = producerToPath(servicePath, producer);
6️⃣ this.createCurrentServiceIpNode(producerMathodPath);
log.debug("create current service node success, node path = {} ,method path = {}", servicePath, producerMathodPath);
}
// 本地監聽
7️⃣ subscribeChildChanges(serviceName, servicePath, PROVIDER_MAP);
}
}
}
1️⃣:本地初始化 zk 的請求客戶端
//IRegisterCenterZkImpl.class
private void initZkClient() {
if (zkClient == null) {
zkClient = new ZkClient(ZK_ADDRESS, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT);
}
}
對象定義成員變量 zkClient
2️⃣:設置本地節點緩存
//IRegisterCenterZkImpl.class
private void setLocalCache(List<Producer> producerList) {
for (Producer producer : producerList) {
String name = producer.getServiceItf().getName();
List<Producer> producerListCache = PROVIDER_MAP.get(name);
if (producerListCache == null) {
producerListCache = Lists.newArrayList();
}
producerListCache.add(producer);
PROVIDER_MAP.put(name, producerListCache);
}
}
存在本地成員變量 PROVIDER_MAP
,其中 Map 的 Key 為服務接口的類名
3️⃣:根據類名遍歷節點信息
4️⃣:創建根節點信息,根節點地址為 /config_register/{appKey}
5️⃣:根據服務名稱創建服務節點信息,服務節點地址為: /config_register/{appKey}/{groupName}/{serviceName}
其中 serviceName
就是服務接口的類名
6️⃣:根據接口類名創建具體的方法節點信息,其中 producerToPath
方法,主要目的是將 Producer
對象轉換成 Zk 的路徑信息,其中主要實現如下:
private String producerToPath(String servicePath, Producer producer) {
return servicePath + "/" + producer.getIp() + "|"
+ producer.getPort() + "|"
+ producer.getWeight() + "|"
+ producer.getWorkerThreads() + "|"
+ producer.getMethod().getName() + "|"
+ producer.getGroupName();
}
返回的地址信息示例如下: /config_register/{appKey}/{groupName}/{serviceName}/127.0.0.1|9999|1|10|get|defaut
127.0.0.1|9999|1|10|get|defaut
這段參數的含義就是:服務地址 + 服務端口 + 權重 + 工作線程數量 + 方法名稱 + 服務組名稱
createCurrentServiceIpNode
方法實現如下:
private void createCurrentServiceIpNode(String currentServiceIpNode) {
if (!zkClient.exists(currentServiceIpNode)) {
// 臨時節點
zkClient.createEphemeral(currentServiceIpNode);
}
}
7️⃣:服務地址監聽
subscribeChildChanges
方法的實現如下:
private void subscribeChildChanges(String serviceName, String servicePath, Map<String, List<Producer>> dataMap) {
// 1. 從 zk 中讀取路徑
// 2. 根據路徑值 反序列化成 producer 對象
// 3. 將 producer 對象放入 dataMap 中
zkClient.subscribeChildChanges(servicePath, (parentPath, currentChilds) -> {
if (currentChilds == null) {
currentChilds = new ArrayList<>();
}
List<Producer> producers = currentChilds.stream().map(currentChild -> pathToProducer(serviceName, currentChild)).collect(Collectors.toList());
dataMap.put(serviceName, producers);
});
}
initProviderMap 服務消費方節點獲取#
@Override
public void initProviderMap(String remoteAppKey, String groupName) {
if (MapUtils.isEmpty(SERVICE_METADATA)) {
SERVICE_METADATA.putAll(this.fetchOrUpdateServiceMetaData(remoteAppKey, groupName));
}
}
private Map<String, List<Producer>> fetchOrUpdateServiceMetaData(String remoteAppKey, String groupName) {
final Map<String, List<Producer>> providerServiceMap = Maps.newHashMap();
this.initZkClient();
// 根據分組獲取服務提供方的地址
String providerNode = getRootPath(remoteAppKey) + UrlConstants.SLASH + groupName;
// 獲取所有 serverName 地址
1️⃣ List<String> producerServices = zkClient.getChildren(providerNode);
for (String serviceName : producerServices) {
String servicePath = getProducerServicePath(remoteAppKey, groupName, serviceName);
2️⃣ List<String> producerPaths = zkClient.getChildren(servicePath);
// 獲取所有 producer 地址(method)
for (String producerPath : producerPaths) {
3️⃣ Producer pathToProducer = pathToProducer(serviceName, producerPath);
List<Producer> providerList = providerServiceMap.get(serviceName);
if (providerList == null) {
providerList = new ArrayList<>();
}
providerList.add(pathToProducer);
4️⃣ providerServiceMap.put(serviceName, providerList);
}
5️⃣ subscribeChildChanges(serviceName, servicePath, SERVICE_METADATA);
}
return providerServiceMap;
}
1️⃣:根據 appKey
和 groupName
從 zk 中查詢註冊的所有服務節點
2️⃣:根據 serviceName
獲取該服務下的所有節點信息
3️⃣:使用 pathToProducer
將 zk 的 path 信息,轉換成 Producer
對象
4️⃣:將節點信息緩存到本地
5️⃣:和服務提供方註冊類似,訂閱服務下方節點變化,然後刷新本地 Map
getServiceProducer 服務消費方獲取服務節點#
@Override
public List<Producer> getServiceProducer(String serviceName, String methodName) {
List<Producer> producers = SERVICE_METADATA.get(serviceName);
return producers == null ? null :
producers.stream().filter(producer ->
producer.getMethod() != null &&
producer.getMethod().getName().equals(methodName)).collect(Collectors.toList());
}
這裡的實現代碼就非常簡單了,只需要遍歷本地緩存的 SERVICE_METADATA 對象即可
由于其他方法的操作都是對本地緩存 Map
的操作,所以這裡就不再詳細展開了
到這裡,已經完成了註冊中心最重要方法編寫,處理服務提供方的服務註冊,和服務消費方的服務節點訂閱。
下一章我們將完成服務提供方節點信息的註冊和 Netty
消息的處理。