simon

simon

如何开发一个自己的 RPC 框架 (二)

如何开发一个自己的 RPC 框架 (二)#

上篇文章中,从客户端的调用流程,详细的了解了各个环节的数据是如何串联起来的。那么本章内容,将主要从注册中心的角度出发,看看注册中心是如何维护客户端和服务端各自上报的信息。

能够作为注册中心的中间件非常多,常见的比如 ZooKeeperConsulNacos 等,本项目中采用的是 Zookeeper 作为注册中心中间件

功能梳理#

同样,开始之前我们先来理一下注册中心主要实现那些功能

Mermaid Loading...

接口定义#

定义一个名为 IRegisterCenter 的接口,其中接口主要实现如下功能:

  1. 服务提供方的节点上报
  2. 服务提供方节点信息更新
  3. 服务提供方的本地数据缓存和节点信息订阅
  4. 服务消费方根据服务名获取节点地址
  5. 服务消费方订阅服务节点信息
  6. 服务消费方信息注册

有了上面 6 大需求,就可以知道接口接口中主要包含的方法如下:

// IRegisterCenter.class
	public interface IRegisterCenter {

    /**
     * 服务提供方节点注册
     */
    void register(List<Producer> producerList);

    /**
     * 服务提供方获取节点列表
     */
    Map<String, List<Producer>> getProducersMap();

    /**
     * 服务提供方服务销毁
     */
    void destroy(String name);

    /**
     * 服务消费方初始化服务节点列表
     *
     * @param remoteAppKey 服务提供者唯一标识
     * @param groupName    服务组名
     */
    void initProviderMap(String remoteAppKey, String groupName);

    /**
     * 服务消费方获取所有节点列表
     */
    List<Producer> getServiceNode();

    /**
     * 服务消费方获取节点信息
     *
     * @param serviceName 服务名称
     * @param methodName  服务方法名称
     */
    List<Producer> getServiceProducer(String serviceName, String methodName);

    /**
     * 注册服务消费者信息 用于监控
     */
    void registerConsumer(Consumer consumer);

}

配置文件读取#

注册中心的接口定义完成之后,就需要使用 zookeeper 进行接口实现。但是在接口实现之前,需要实现一个帮助方法读取 RPC 框架的相关配置项。

根据前面的设计,可以大概罗列一下需要用到的配置参数

  1. appkey: 服务的唯一标识
  2. groupName: 服务的分组名称
  3. zookServer: zk 中间件服务地址
  4. nettyIp: 服务提供方调用地址
  5. nettyPort: 服务提供方调用端口

当然还包含一些其他参数,比如服务调用超时时间、服务会话超时时间等

完成上面需求整理之后,就可以开始 RpcPropertiesUtil 方法的编写了

@Slf4j
public class RpcPropertiesUtil {
    private static final Map<String, Object> PRODUCER_MAP;

    private static final String PROPERTIES_FILE_NAME = "rpc.properties";

    static {
        PRODUCER_MAP = Maps.newConcurrentMap();
        try {
            Properties properties = PropertiesLoaderUtils.loadAllProperties(PROPERTIES_FILE_NAME);
            for (Map.Entry<Object, Object> entry : properties.entrySet()) {
                PRODUCER_MAP.put(entry.getKey().toString(), entry.getValue());
            }

        } catch (IOException e) {
            log.error("读取配置文件异常", e);
            throw new RuntimeException("Producer Load Properties Exception");
        }
    }

		// 先从环境变量读取
		private static String getString(Map<String, Object> map, String key, String defaultValue) {
        String envKey = System.getenv().get(key.toUpperCase());
        String result = envKey != null ? envKey : MapUtils.getString(map, key);
        return StringUtils.isEmpty(result) ? defaultValue : result;
    }

    private static Integer getInteger(Map<String, Object> map, String key, Integer defaultValue) {
        String envKey = System.getenv().get(key.toUpperCase());
        Integer result = envKey != null ? Integer.parseInt(envKey) : MapUtils.getInteger(map, key);
        return result == null ? defaultValue : result;
    }
}
  1. 定义一个 PRODUCER_MAP 用来做配置文件缓存
  2. static 静态方法中,读取 [rpc.properties](http://rpc.properties) 配置文件,转换成 Map
  3. 定义了两个 getStringgetInteger 的方法,表示先从环境变量中读取配置,不存在时从配置文件中读取

完成上述基础编写之后,实例一个 rpc_app_key 的读取

public static String getAppKey() {
        return getString(PRODUCER_MAP, "rpc_app_key", "test");
 }

zookeeper 接口实现#

由于接口的方法都已经定义完成,只需要根据接口的方法含义,进行对应的实现即可

定义一个类 IRegisterCenterZkImpl 实现 IRegisterCenter 接口

成员变量介绍#

private static final IRegisterCenterZkImpl instance = new IRegisterCenterZkImpl();

    private volatile ZkClient zkClient = null;

    // ZK 相关信息
    private static final String ZK_ADDRESS = RpcPropertiesUtil.getZkServers();
    private static final int ZK_SESSION_TIMEOUT = RpcPropertiesUtil.getSessionTimeout();
    private static final int ZK_CONNECTION_TIMEOUT = RpcPropertiesUtil.getConnectionTimeout();
    // 本地服务地址
    private static final String LOCAL_IP = RpcPropertiesUtil.getServerIp();

    // 服务缓存(服务端缓存)
    private static final Map<String, List<Producer>> PROVIDER_MAP = new ConcurrentHashMap<>();
    // 服务缓存(客户端缓存)
    private static final Map<String, List<Producer>> SERVICE_METADATA = Maps.newConcurrentMap();

    // ZK 的服务地址
    private static final String ROOT_PATH = "/config_register";
    public static final String PROVIDER_TYPE = "/provider";
    public static final String CONSUMER_TYPE = "/consumer";

    /**
     * service name to bean map
     */
    public static final Map<String, Object> PRODUCER_BEAN_MAP = Maps.newConcurrentMap();

register 方法实现#

register 应该是整个注册中心最重要的方法了,他主要目的是接收服务提供方的节点信息上报

//IRegisterCenterZkImpl.class
@Override
    public void register(List<Producer> producerList) {
        if (CollectionUtils.isEmpty(producerList)) {
            log.debug("RegisterCenterImpl registerProvider providers is empty, ignore it, providers={}", producerList);
            return;
        }

        synchronized (IRegisterCenterZkImpl.class) {
            // 初始化 zk 客户端
            1️⃣ this.initZkClient();

						// 本地缓存
            2️⃣ this.setLocalCache(producerList);							
            3️⃣ for (Map.Entry<String, List<Producer>> entry : PROVIDER_MAP.entrySet()) {
                String serviceName = entry.getKey();
                List<Producer> producers = entry.getValue();

                // 创建根节点
                Producer firstProducer = producers.get(0);
                String appKey = firstProducer.getAppKey();
                String rootNode = getRootPath(appKey);
               4️⃣ this.createRootNode(rootNode);

                // 服务组名
                String groupName = firstProducer.getGroupName();
                // 创建服务提供者节点
              5️⃣  String servicePath = getProducerServicePath(appKey, groupName, serviceName);

                for (Producer producer : producers) {
                    this.createServiceNode(servicePath);
                    String producerMathodPath = producerToPath(servicePath, producer);
                6️⃣  this.createCurrentServiceIpNode(producerMathodPath);
                    log.debug("create current service node success, node path = {} ,method path = {}", servicePath, producerMathodPath);
                }

                // 本地监听
                7️⃣ subscribeChildChanges(serviceName, servicePath, PROVIDER_MAP);
            }
        }
}

1️⃣:本地初始化 zk 的请求客户端

//IRegisterCenterZkImpl.class
private void initZkClient() {
        if (zkClient == null) {
            zkClient = new ZkClient(ZK_ADDRESS, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT);
        }
    }

对象定义成员变量 zkClient

2️⃣:设置本地节点缓存

//IRegisterCenterZkImpl.class
private void setLocalCache(List<Producer> producerList) {
        for (Producer producer : producerList) {
            String name = producer.getServiceItf().getName();
            List<Producer> producerListCache = PROVIDER_MAP.get(name);
            if (producerListCache == null) {
                producerListCache = Lists.newArrayList();
            }

            producerListCache.add(producer);
            PROVIDER_MAP.put(name, producerListCache);
        }
    }

存在本地成员变量 PROVIDER_MAP,其中 Map 的 Key 为服务接口的类名

3️⃣:根据类名遍历节点信息

4️⃣:创建根节点信息,根节点地址为 /config_register/{appKey}

5️⃣:根据服务名称创建服务节点信息,服务节点地址为: /config_register/{appKey}/{groupName}/{serviceName} 其中 serviceName 就是服务接口的类名

6️⃣:根据接口类名创建具体的方法节点信息,其中 producerToPath 方法,主要目的是将 Producer 对象转换成 Zk 的路径信息,其中主要实现如下:

private String producerToPath(String servicePath, Producer producer) {
        return servicePath + "/" + producer.getIp() + "|"
                + producer.getPort() + "|"
                + producer.getWeight() + "|"
                + producer.getWorkerThreads() + "|"
                + producer.getMethod().getName() + "|"
                + producer.getGroupName();
    }

返回的地址信息示例如下: /config_register/{appKey}/{groupName}/{serviceName}/127.0.0.1|9999|1|10|get|defaut

127.0.0.1|9999|1|10|get|defaut 这段参数的含义就是:服务地址 + 服务端口 + 权重 + 工作线程数量 + 方法名称 + 服务组名称

createCurrentServiceIpNode 方法实现如下:

private void createCurrentServiceIpNode(String currentServiceIpNode) {
        if (!zkClient.exists(currentServiceIpNode)) {
            // 临时节点
            zkClient.createEphemeral(currentServiceIpNode);
        }
    }

7️⃣:服务地址监听

subscribeChildChanges 方法的实现如下:

private void subscribeChildChanges(String serviceName, String servicePath, Map<String, List<Producer>> dataMap) {
        // 1. 从 zk 中读取路径
        // 2. 根据路径值 反序列化成 producer 对象
        // 3. 将 producer 对象放入 dataMap 中
        zkClient.subscribeChildChanges(servicePath, (parentPath, currentChilds) -> {
            if (currentChilds == null) {
                currentChilds = new ArrayList<>();
            }

            List<Producer> producers = currentChilds.stream().map(currentChild -> pathToProducer(serviceName, currentChild)).collect(Collectors.toList());
            dataMap.put(serviceName, producers);
        });
    }

initProviderMap 服务消费方节点获取#

@Override
    public void initProviderMap(String remoteAppKey, String groupName) {
        if (MapUtils.isEmpty(SERVICE_METADATA)) {
            SERVICE_METADATA.putAll(this.fetchOrUpdateServiceMetaData(remoteAppKey, groupName));
        }
    }

private Map<String, List<Producer>> fetchOrUpdateServiceMetaData(String remoteAppKey, String groupName) {
        final Map<String, List<Producer>> providerServiceMap = Maps.newHashMap();
        this.initZkClient();

        // 根据分组获取服务提供方的地址
        String providerNode = getRootPath(remoteAppKey) + UrlConstants.SLASH + groupName;
        // 获取所有 serverName 地址
       1️⃣ List<String> producerServices = zkClient.getChildren(providerNode);

        for (String serviceName : producerServices) {
             String servicePath = getProducerServicePath(remoteAppKey, groupName, serviceName);
            2️⃣ List<String> producerPaths = zkClient.getChildren(servicePath);

            // 获取所有 producer 地址(method)
            for (String producerPath : producerPaths) {
              3️⃣  Producer pathToProducer = pathToProducer(serviceName, producerPath);
                List<Producer> providerList = providerServiceMap.get(serviceName);
                if (providerList == null) {
                    providerList = new ArrayList<>();
                }

                providerList.add(pathToProducer);
            4️⃣     providerServiceMap.put(serviceName, providerList);
            }

            5️⃣ subscribeChildChanges(serviceName, servicePath, SERVICE_METADATA);
        }

        return providerServiceMap;
    }

1️⃣:根据 appKeygroupName 从 zk 中查询注册的所有服务节点

2️⃣:根据 serviceName 获取该服务下的所有节点信息

3️⃣:使用 pathToProducer 将 zk 的 path 信息,转换成 Producer 对象

4️⃣:将节点信息缓存到本地

5️⃣:和服务提供方注册类似,订阅服务下方节点变化,然后刷新本地 Map

getServiceProducer 服务消费方获取服务节点#

@Override
    public List<Producer> getServiceProducer(String serviceName, String methodName) {
        List<Producer> producers = SERVICE_METADATA.get(serviceName);
        return producers == null ? null :
                producers.stream().filter(producer ->
                        producer.getMethod() != null &&
                                producer.getMethod().getName().equals(methodName)).collect(Collectors.toList());
    }

这里的实现代码就非常简单了,只需要遍历本地缓存的 SERVICE_METADATA 对象即可

由于其他方法的操作都是对本地缓存 Map 的操作,所以这里就不再详细展开了

到这里,已经完成了注册中心最重要方法编写,处理服务提供方的服务注册,和服务消费放的服务节点订阅。

下一章我们将完成服务提供方节点信息的注册和 Netty 消息的处理。

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。