How to Develop Your Own RPC Framework (Part 2)#
In the previous article, we detailed how the data in each link is connected through the client call process. In this chapter, we will mainly look at how the registration center maintains the information reported by clients and servers from the perspective of the registration center.
There are many middleware options that can serve as a registration center, commonly including ZooKeeper, Consul, and Nacos. In this project, we use Zookeeper as the registration center middleware.
Function Overview#
Similarly, before we start, let's clarify the main functions that the registration center implements.
Interface Definition#
Define an interface named IRegisterCenter
, which mainly implements the following functions:
- Service provider node reporting
- Service provider node information updating
- Local data caching and node information subscription for service providers
- Service consumers obtaining node addresses based on service names
- Service consumers subscribing to service node information
- Service consumer information registration
With the above six major requirements, we can see that the main methods included in the interface are as follows:
// IRegisterCenter.class
public interface IRegisterCenter {
/**
* Service provider node registration
*/
void register(List<Producer> producerList);
/**
* Service provider obtaining node list
*/
Map<String, List<Producer>> getProducersMap();
/**
* Service provider service destruction
*/
void destroy(String name);
/**
* Service consumer initializes service node list
*
* @param remoteAppKey Unique identifier for service provider
* @param groupName Service group name
*/
void initProviderMap(String remoteAppKey, String groupName);
/**
* Service consumer obtains all node lists
*/
List<Producer> getServiceNode();
/**
* Service consumer obtains node information
*
* @param serviceName Service name
* @param methodName Service method name
*/
List<Producer> getServiceProducer(String serviceName, String methodName);
/**
* Register service consumer information for monitoring
*/
void registerConsumer(Consumer consumer);
}
Configuration File Reading#
After defining the interface for the registration center, we need to implement the interface using Zookeeper. However, before implementing the interface, we need to create a helper method to read the relevant configuration items of the RPC framework.
Based on the previous design, we can roughly list the configuration parameters needed:
- appkey: Unique identifier for the service
- groupName: Group name of the service
- zookServer: zk middleware service address
- nettyIp: Service provider call address
- nettyPort: Service provider call port
Of course, there are also some other parameters, such as service call timeout, service session timeout, etc.
After completing the above requirement organization, we can start writing the RpcPropertiesUtil
method.
@Slf4j
public class RpcPropertiesUtil {
private static final Map<String, Object> PRODUCER_MAP;
private static final String PROPERTIES_FILE_NAME = "rpc.properties";
static {
PRODUCER_MAP = Maps.newConcurrentMap();
try {
Properties properties = PropertiesLoaderUtils.loadAllProperties(PROPERTIES_FILE_NAME);
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
PRODUCER_MAP.put(entry.getKey().toString(), entry.getValue());
}
} catch (IOException e) {
log.error("Error reading configuration file", e);
throw new RuntimeException("Producer Load Properties Exception");
}
}
// First read from environment variables
private static String getString(Map<String, Object> map, String key, String defaultValue) {
String envKey = System.getenv().get(key.toUpperCase());
String result = envKey != null ? envKey : MapUtils.getString(map, key);
return StringUtils.isEmpty(result) ? defaultValue : result;
}
private static Integer getInteger(Map<String, Object> map, String key, Integer defaultValue) {
String envKey = System.getenv().get(key.toUpperCase());
Integer result = envKey != null ? Integer.parseInt(envKey) : MapUtils.getInteger(map, key);
return result == null ? defaultValue : result;
}
}
- Define a
PRODUCER_MAP
to cache the configuration file. - In the static method, read the
[rpc.properties](http://rpc.properties)
configuration file and convert it into a Map. - Define two methods,
getString
andgetInteger
, which indicate that the configuration is first read from environment variables and, if not present, from the configuration file.
After completing the basic writing above, instantiate a reading of rpc_app_key
.
public static String getAppKey() {
return getString(PRODUCER_MAP, "rpc_app_key", "test");
}
Zookeeper Interface Implementation#
Since the methods of the interface have been defined, we only need to implement them according to their meanings.
Define a class IRegisterCenterZkImpl
that implements the IRegisterCenter
interface.
Member Variable Introduction#
private static final IRegisterCenterZkImpl instance = new IRegisterCenterZkImpl();
private volatile ZkClient zkClient = null;
// ZK related information
private static final String ZK_ADDRESS = RpcPropertiesUtil.getZkServers();
private static final int ZK_SESSION_TIMEOUT = RpcPropertiesUtil.getSessionTimeout();
private static final int ZK_CONNECTION_TIMEOUT = RpcPropertiesUtil.getConnectionTimeout();
// Local service address
private static final String LOCAL_IP = RpcPropertiesUtil.getServerIp();
// Service cache (server cache)
private static final Map<String, List<Producer>> PROVIDER_MAP = new ConcurrentHashMap<>();
// Service cache (client cache)
private static final Map<String, List<Producer>> SERVICE_METADATA = Maps.newConcurrentMap();
// ZK service address
private static final String ROOT_PATH = "/config_register";
public static final String PROVIDER_TYPE = "/provider";
public static final String CONSUMER_TYPE = "/consumer";
/**
* service name to bean map
*/
public static final Map<String, Object> PRODUCER_BEAN_MAP = Maps.newConcurrentMap();
Register Method Implementation#
The register
method is probably the most important method of the entire registration center, as its main purpose is to receive service provider node information reports.
//IRegisterCenterZkImpl.class
@Override
public void register(List<Producer> producerList) {
if (CollectionUtils.isEmpty(producerList)) {
log.debug("RegisterCenterImpl registerProvider providers is empty, ignore it, providers={}", producerList);
return;
}
synchronized (IRegisterCenterZkImpl.class) {
// Initialize zk client
1️⃣ this.initZkClient();
// Local cache
2️⃣ this.setLocalCache(producerList);
3️⃣ for (Map.Entry<String, List<Producer>> entry : PROVIDER_MAP.entrySet()) {
String serviceName = entry.getKey();
List<Producer> producers = entry.getValue();
// Create root node
Producer firstProducer = producers.get(0);
String appKey = firstProducer.getAppKey();
String rootNode = getRootPath(appKey);
4️⃣ this.createRootNode(rootNode);
// Service group name
String groupName = firstProducer.getGroupName();
// Create service provider node
5️⃣ String servicePath = getProducerServicePath(appKey, groupName, serviceName);
for (Producer producer : producers) {
this.createServiceNode(servicePath);
String producerMathodPath = producerToPath(servicePath, producer);
6️⃣ this.createCurrentServiceIpNode(producerMathodPath);
log.debug("create current service node success, node path = {} ,method path = {}", servicePath, producerMathodPath);
}
// Local listening
7️⃣ subscribeChildChanges(serviceName, servicePath, PROVIDER_MAP);
}
}
}
1️⃣: Local initialization of the zk request client.
//IRegisterCenterZkImpl.class
private void initZkClient() {
if (zkClient == null) {
zkClient = new ZkClient(ZK_ADDRESS, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT);
}
}
Define the member variable zkClient
.
2️⃣: Set local node cache.
//IRegisterCenterZkImpl.class
private void setLocalCache(List<Producer> producerList) {
for (Producer producer : producerList) {
String name = producer.getServiceItf().getName();
List<Producer> producerListCache = PROVIDER_MAP.get(name);
if (producerListCache == null) {
producerListCache = Lists.newArrayList();
}
producerListCache.add(producer);
PROVIDER_MAP.put(name, producerListCache);
}
}
There is a local member variable PROVIDER_MAP
, where the key of the Map is the class name of the service interface.
3️⃣: Traverse node information based on class name.
4️⃣: Create root node information, with the root node address being /config_register/{appKey}
.
5️⃣: Create service node information based on service name, with the service node address being: /config_register/{appKey}/{groupName}/{serviceName}
, where serviceName
is the class name of the service interface.
6️⃣: Create specific method node information based on the interface class name, where the producerToPath
method mainly converts the Producer
object into Zk path information, which is mainly implemented as follows:
private String producerToPath(String servicePath, Producer producer) {
return servicePath + "/" + producer.getIp() + "|"
+ producer.getPort() + "|"
+ producer.getWeight() + "|"
+ producer.getWorkerThreads() + "|"
+ producer.getMethod().getName() + "|"
+ producer.getGroupName();
}
The returned address information example is as follows: /config_register/{appKey}/{groupName}/{serviceName}/127.0.0.1|9999|1|10|get|defaut
.
127.0.0.1|9999|1|10|get|defaut
means: service address + service port + weight + number of worker threads + method name + service group name.
The createCurrentServiceIpNode
method is implemented as follows:
private void createCurrentServiceIpNode(String currentServiceIpNode) {
if (!zkClient.exists(currentServiceIpNode)) {
// Temporary node
zkClient.createEphemeral(currentServiceIpNode);
}
}
7️⃣: Service address listening.
The implementation of the subscribeChildChanges
method is as follows:
private void subscribeChildChanges(String serviceName, String servicePath, Map<String, List<Producer>> dataMap) {
// 1. Read path from zk
// 2. Deserialize based on path value into producer object
// 3. Put producer object into dataMap
zkClient.subscribeChildChanges(servicePath, (parentPath, currentChilds) -> {
if (currentChilds == null) {
currentChilds = new ArrayList<>();
}
List<Producer> producers = currentChilds.stream().map(currentChild -> pathToProducer(serviceName, currentChild)).collect(Collectors.toList());
dataMap.put(serviceName, producers);
});
}
InitProviderMap Service Consumer Node Retrieval#
@Override
public void initProviderMap(String remoteAppKey, String groupName) {
if (MapUtils.isEmpty(SERVICE_METADATA)) {
SERVICE_METADATA.putAll(this.fetchOrUpdateServiceMetaData(remoteAppKey, groupName));
}
}
private Map<String, List<Producer>> fetchOrUpdateServiceMetaData(String remoteAppKey, String groupName) {
final Map<String, List<Producer>> providerServiceMap = Maps.newHashMap();
this.initZkClient();
// Get service provider addresses based on group
String providerNode = getRootPath(remoteAppKey) + UrlConstants.SLASH + groupName;
// Get all serverName addresses
1️⃣ List<String> producerServices = zkClient.getChildren(providerNode);
for (String serviceName : producerServices) {
String servicePath = getProducerServicePath(remoteAppKey, groupName, serviceName);
2️⃣ List<String> producerPaths = zkClient.getChildren(servicePath);
// Get all producer addresses (method)
for (String producerPath : producerPaths) {
3️⃣ Producer pathToProducer = pathToProducer(serviceName, producerPath);
List<Producer> providerList = providerServiceMap.get(serviceName);
if (providerList == null) {
providerList = new ArrayList<>();
}
providerList.add(pathToProducer);
4️⃣ providerServiceMap.put(serviceName, providerList);
}
5️⃣ subscribeChildChanges(serviceName, servicePath, SERVICE_METADATA);
}
return providerServiceMap;
}
1️⃣: Query all registered service nodes from zk based on appKey
and groupName
.
2️⃣: Get all node information under the service based on serviceName
.
3️⃣: Use pathToProducer
to convert zk path information into Producer
objects.
4️⃣: Cache node information locally.
5️⃣: Similar to service provider registration, subscribe to changes in the service's child nodes and refresh the local Map
.
GetServiceProducer Service Consumer Obtaining Service Nodes#
@Override
public List<Producer> getServiceProducer(String serviceName, String methodName) {
List<Producer> producers = SERVICE_METADATA.get(serviceName);
return producers == null ? null :
producers.stream().filter(producer ->
producer.getMethod() != null &&
producer.getMethod().getName().equals(methodName)).collect(Collectors.toList());
}
The implementation code here is very simple; it only needs to traverse the locally cached SERVICE_METADATA
object.
Since other method operations are all on the local cached Map
, we will not elaborate further here.
At this point, we have completed the writing of the most important methods of the registration center, handling service registration for service providers and service node subscriptions for service consumers.
In the next chapter, we will complete the registration of service provider node information and the handling of Netty
messages.