simon

simon

如何開發一個自己的 RPC 框架 (二)

如何開發一個自己的 RPC 框架 (二)#

上篇文章中,從客戶端的調用流程,詳細的了解了各個環節的數據是如何串聯起來的。那么本章內容,將主要從註冊中心的角度出發,看看註冊中心是如何維護客戶端和服務端各自上報的信息。

能夠作為註冊中心的中間件非常多,常見的比如 ZooKeeperConsulNacos 等,本項目中採用的是 Zookeeper 作為註冊中心中間件

功能梳理#

同樣,開始之前我們先來理一下註冊中心主要實現那些功能

Mermaid Loading...

接口定義#

定義一個名為 IRegisterCenter 的接口,其中接口主要實現如下功能:

  1. 服務提供方的節點上報
  2. 服務提供方節點信息更新
  3. 服務提供方的本地數據緩存和節點信息訂閱
  4. 服務消費方根據服務名獲取節點地址
  5. 服務消費方訂閱服務節點信息
  6. 服務消費方信息註冊

有了上面 6 大需求,就可以知道接口中主要包含的方法如下:

// IRegisterCenter.class
	public interface IRegisterCenter {

    /**
     * 服務提供方節點註冊
     */
    void register(List<Producer> producerList);

    /**
     * 服務提供方獲取節點列表
     */
    Map<String, List<Producer>> getProducersMap();

    /**
     * 服務提供方服務銷毀
     */
    void destroy(String name);

    /**
     * 服務消費方初始化服務節點列表
     *
     * @param remoteAppKey 服務提供者唯一標識
     * @param groupName    服務組名
     */
    void initProviderMap(String remoteAppKey, String groupName);

    /**
     * 服務消費方獲取所有節點列表
     */
    List<Producer> getServiceNode();

    /**
     * 服務消費方獲取節點信息
     *
     * @param serviceName 服務名稱
     * @param methodName  服務方法名稱
     */
    List<Producer> getServiceProducer(String serviceName, String methodName);

    /**
     * 註冊服務消費者信息 用於監控
     */
    void registerConsumer(Consumer consumer);

}

配置文件讀取#

註冊中心的接口定義完成之後,就需要使用 zookeeper 進行接口實現。但是在接口實現之前,需要實現一個幫助方法讀取 RPC 框架的相關配置項。

根據前面的設計,可以大概羅列一下需要用到的配置參數

  1. appkey: 服務的唯一標識
  2. groupName: 服務的分組名稱
  3. zookServer: zk 中間件服務地址
  4. nettyIp: 服務提供方調用地址
  5. nettyPort: 服務提供方調用端口

當然還包含一些其他參數,比如服務調用超時時間、服務會話超時時間等

完成上面需求整理之後,就可以開始 RpcPropertiesUtil 方法的編寫了

@Slf4j
public class RpcPropertiesUtil {
    private static final Map<String, Object> PRODUCER_MAP;

    private static final String PROPERTIES_FILE_NAME = "rpc.properties";

    static {
        PRODUCER_MAP = Maps.newConcurrentMap();
        try {
            Properties properties = PropertiesLoaderUtils.loadAllProperties(PROPERTIES_FILE_NAME);
            for (Map.Entry<Object, Object> entry : properties.entrySet()) {
                PRODUCER_MAP.put(entry.getKey().toString(), entry.getValue());
            }

        } catch (IOException e) {
            log.error("讀取配置文件異常", e);
            throw new RuntimeException("Producer Load Properties Exception");
        }
    }

		// 先從環境變量讀取
		private static String getString(Map<String, Object> map, String key, String defaultValue) {
        String envKey = System.getenv().get(key.toUpperCase());
        String result = envKey != null ? envKey : MapUtils.getString(map, key);
        return StringUtils.isEmpty(result) ? defaultValue : result;
    }

    private static Integer getInteger(Map<String, Object> map, String key, Integer defaultValue) {
        String envKey = System.getenv().get(key.toUpperCase());
        Integer result = envKey != null ? Integer.parseInt(envKey) : MapUtils.getInteger(map, key);
        return result == null ? defaultValue : result;
    }
}
  1. 定義一個 PRODUCER_MAP 用來做配置文件緩存
  2. static 靜態方法中,讀取 [rpc.properties](http://rpc.properties) 配置文件,轉換成 Map
  3. 定義了兩個 getStringgetInteger 的方法,表示先從環境變量中讀取配置,不存在時從配置文件中讀取

完成上述基礎編寫之後,實例一個 rpc_app_key 的讀取

public static String getAppKey() {
        return getString(PRODUCER_MAP, "rpc_app_key", "test");
 }

zookeeper 接口實現#

由于接口的方法都已經定義完成,只需要根據接口的方法含義,進行對應的實現即可

定義一個類 IRegisterCenterZkImpl 實現 IRegisterCenter 接口

成員變量介紹#

private static final IRegisterCenterZkImpl instance = new IRegisterCenterZkImpl();

    private volatile ZkClient zkClient = null;

    // ZK 相關信息
    private static final String ZK_ADDRESS = RpcPropertiesUtil.getZkServers();
    private static final int ZK_SESSION_TIMEOUT = RpcPropertiesUtil.getSessionTimeout();
    private static final int ZK_CONNECTION_TIMEOUT = RpcPropertiesUtil.getConnectionTimeout();
    // 本地服務地址
    private static final String LOCAL_IP = RpcPropertiesUtil.getServerIp();

    // 服務緩存(服務端緩存)
    private static final Map<String, List<Producer>> PROVIDER_MAP = new ConcurrentHashMap<>();
    // 服務緩存(客戶端緩存)
    private static final Map<String, List<Producer>> SERVICE_METADATA = Maps.newConcurrentMap();

    // ZK 的服務地址
    private static final String ROOT_PATH = "/config_register";
    public static final String PROVIDER_TYPE = "/provider";
    public static final String CONSUMER_TYPE = "/consumer";

    /**
     * service name to bean map
     */
    public static final Map<String, Object> PRODUCER_BEAN_MAP = Maps.newConcurrentMap();

register 方法實現#

register 應該是整個註冊中心最重要的方法了,他主要目的是接收服務提供方的節點信息上報

//IRegisterCenterZkImpl.class
@Override
    public void register(List<Producer> producerList) {
        if (CollectionUtils.isEmpty(producerList)) {
            log.debug("RegisterCenterImpl registerProvider providers is empty, ignore it, providers={}", producerList);
            return;
        }

        synchronized (IRegisterCenterZkImpl.class) {
            // 初始化 zk 客戶端
            1️⃣ this.initZkClient();

						// 本地緩存
            2️⃣ this.setLocalCache(producerList);							
            3️⃣ for (Map.Entry<String, List<Producer>> entry : PROVIDER_MAP.entrySet()) {
                String serviceName = entry.getKey();
                List<Producer> producers = entry.getValue();

                // 創建根節點
                Producer firstProducer = producers.get(0);
                String appKey = firstProducer.getAppKey();
                String rootNode = getRootPath(appKey);
               4️⃣ this.createRootNode(rootNode);

                // 服務組名
                String groupName = firstProducer.getGroupName();
                // 創建服務提供者節點
              5️⃣  String servicePath = getProducerServicePath(appKey, groupName, serviceName);

                for (Producer producer : producers) {
                    this.createServiceNode(servicePath);
                    String producerMathodPath = producerToPath(servicePath, producer);
                6️⃣  this.createCurrentServiceIpNode(producerMathodPath);
                    log.debug("create current service node success, node path = {} ,method path = {}", servicePath, producerMathodPath);
                }

                // 本地監聽
                7️⃣ subscribeChildChanges(serviceName, servicePath, PROVIDER_MAP);
            }
        }
}

1️⃣:本地初始化 zk 的請求客戶端

//IRegisterCenterZkImpl.class
private void initZkClient() {
        if (zkClient == null) {
            zkClient = new ZkClient(ZK_ADDRESS, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT);
        }
    }

對象定義成員變量 zkClient

2️⃣:設置本地節點緩存

//IRegisterCenterZkImpl.class
private void setLocalCache(List<Producer> producerList) {
        for (Producer producer : producerList) {
            String name = producer.getServiceItf().getName();
            List<Producer> producerListCache = PROVIDER_MAP.get(name);
            if (producerListCache == null) {
                producerListCache = Lists.newArrayList();
            }

            producerListCache.add(producer);
            PROVIDER_MAP.put(name, producerListCache);
        }
    }

存在本地成員變量 PROVIDER_MAP,其中 Map 的 Key 為服務接口的類名

3️⃣:根據類名遍歷節點信息

4️⃣:創建根節點信息,根節點地址為 /config_register/{appKey}

5️⃣:根據服務名稱創建服務節點信息,服務節點地址為: /config_register/{appKey}/{groupName}/{serviceName} 其中 serviceName 就是服務接口的類名

6️⃣:根據接口類名創建具體的方法節點信息,其中 producerToPath 方法,主要目的是將 Producer 對象轉換成 Zk 的路徑信息,其中主要實現如下:

private String producerToPath(String servicePath, Producer producer) {
        return servicePath + "/" + producer.getIp() + "|"
                + producer.getPort() + "|"
                + producer.getWeight() + "|"
                + producer.getWorkerThreads() + "|"
                + producer.getMethod().getName() + "|"
                + producer.getGroupName();
    }

返回的地址信息示例如下: /config_register/{appKey}/{groupName}/{serviceName}/127.0.0.1|9999|1|10|get|defaut

127.0.0.1|9999|1|10|get|defaut 這段參數的含義就是:服務地址 + 服務端口 + 權重 + 工作線程數量 + 方法名稱 + 服務組名稱

createCurrentServiceIpNode 方法實現如下:

private void createCurrentServiceIpNode(String currentServiceIpNode) {
        if (!zkClient.exists(currentServiceIpNode)) {
            // 臨時節點
            zkClient.createEphemeral(currentServiceIpNode);
        }
    }

7️⃣:服務地址監聽

subscribeChildChanges 方法的實現如下:

private void subscribeChildChanges(String serviceName, String servicePath, Map<String, List<Producer>> dataMap) {
        // 1. 從 zk 中讀取路徑
        // 2. 根據路徑值 反序列化成 producer 對象
        // 3. 將 producer 對象放入 dataMap 中
        zkClient.subscribeChildChanges(servicePath, (parentPath, currentChilds) -> {
            if (currentChilds == null) {
                currentChilds = new ArrayList<>();
            }

            List<Producer> producers = currentChilds.stream().map(currentChild -> pathToProducer(serviceName, currentChild)).collect(Collectors.toList());
            dataMap.put(serviceName, producers);
        });
    }

initProviderMap 服務消費方節點獲取#

@Override
    public void initProviderMap(String remoteAppKey, String groupName) {
        if (MapUtils.isEmpty(SERVICE_METADATA)) {
            SERVICE_METADATA.putAll(this.fetchOrUpdateServiceMetaData(remoteAppKey, groupName));
        }
    }

private Map<String, List<Producer>> fetchOrUpdateServiceMetaData(String remoteAppKey, String groupName) {
        final Map<String, List<Producer>> providerServiceMap = Maps.newHashMap();
        this.initZkClient();

        // 根據分組獲取服務提供方的地址
        String providerNode = getRootPath(remoteAppKey) + UrlConstants.SLASH + groupName;
        // 獲取所有 serverName 地址
       1️⃣ List<String> producerServices = zkClient.getChildren(providerNode);

        for (String serviceName : producerServices) {
             String servicePath = getProducerServicePath(remoteAppKey, groupName, serviceName);
            2️⃣ List<String> producerPaths = zkClient.getChildren(servicePath);

            // 獲取所有 producer 地址(method)
            for (String producerPath : producerPaths) {
              3️⃣  Producer pathToProducer = pathToProducer(serviceName, producerPath);
                List<Producer> providerList = providerServiceMap.get(serviceName);
                if (providerList == null) {
                    providerList = new ArrayList<>();
                }

                providerList.add(pathToProducer);
            4️⃣     providerServiceMap.put(serviceName, providerList);
            }

            5️⃣ subscribeChildChanges(serviceName, servicePath, SERVICE_METADATA);
        }

        return providerServiceMap;
    }

1️⃣:根據 appKeygroupName 從 zk 中查詢註冊的所有服務節點

2️⃣:根據 serviceName 獲取該服務下的所有節點信息

3️⃣:使用 pathToProducer 將 zk 的 path 信息,轉換成 Producer 對象

4️⃣:將節點信息緩存到本地

5️⃣:和服務提供方註冊類似,訂閱服務下方節點變化,然後刷新本地 Map

getServiceProducer 服務消費方獲取服務節點#

@Override
    public List<Producer> getServiceProducer(String serviceName, String methodName) {
        List<Producer> producers = SERVICE_METADATA.get(serviceName);
        return producers == null ? null :
                producers.stream().filter(producer ->
                        producer.getMethod() != null &&
                                producer.getMethod().getName().equals(methodName)).collect(Collectors.toList());
    }

這裡的實現代碼就非常簡單了,只需要遍歷本地緩存的 SERVICE_METADATA 對象即可

由于其他方法的操作都是對本地緩存 Map 的操作,所以這裡就不再詳細展開了

到這裡,已經完成了註冊中心最重要方法編寫,處理服務提供方的服務註冊,和服務消費方的服務節點訂閱。

下一章我們將完成服務提供方節點信息的註冊和 Netty 消息的處理。

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。