如何开发一个自己的 RPC 框架 (三)#
December 13, 2023 9:37 AM
之前的两篇文章中,已经介绍了实现一个 RPC 框架所需要的客户端和注册中心逻辑,那么这一章,将主要介绍如何实现 RPC 框架中的服务端 (服务提供方) 和完善框架的其他补充逻辑。
RPC 服务端实现#
服务注解 (RpcProducer)#
和客户端调用标识类似,服务提供方也需要设置服务标识,用来服务的注册和相关配置信息拓展
@RpcProducer
服务提供方注解
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface RpcProducer {
/**
* 服务接口:用于注册在服务注册中心,服务调用端获取后缓存再本地用于发起服务调用
*/
Class<?> serviceItf() default Object.class;
/**
* 服务分组组名:可用于分组灰度发布,配置不同分组,可以让调用都路由到配置了相同分组的路由上
*/
String groupName() default "default";
/**
* 超时时间:控制服务端超时时间 ms
*/
long timeout() default 3000L;
/**
* 服务提供者权重:配置该机器在集群中的权重,用于某些负载均衡算法
*/
int weight() default 1;
/**
* 服务端线程数:限制服务端改服务线程数,服务端限流
*/
int workThreads() default 10;
}
服务提供方节点注册#
为了实现所有被 @RpcProducer
注解的实现类都可以被注册到注册中心,在 Spring 中,需要实现
- ApplicationListener:
onApplicationEvent
监听应用程序事件 - ApplicationContextAware:
setApplicationContext
设置当前上下文 - DisposableBean:
destroy
方法,bean 被销毁时执行
这三个接口,通过 setApplicationContext
设置当前上下文,通过 onApplicationEvent
监听服务实现类然后提交到注册中心
理清楚上面流程后,便可以创建实现类 ProducerAnnotationBean
//ProducerAnnotationBean.class
@Slf4j
public class ProducerAnnotationBean implements ApplicationListener<ApplicationContextEvent>, ApplicationContextAware, DisposableBean {
private ApplicationContext applicationContext;
@Override
public void destroy() throws Exception {
/* 销毁 */
log.debug("AnnotationServicesPublisher bean destroy");
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
1️⃣ this.applicationContext = applicationContext;
}
@Override
public void onApplicationEvent(ApplicationContextEvent event) {2️⃣}
}
1️⃣:设置当前上下文
2️⃣:监听 Spring Bean 的动作
onApplicationEvent
的实现如下:
//ProducerAnnotationBean.class
@Override
public void onApplicationEvent(ApplicationContextEvent event) {
1️⃣ if (event.getApplicationContext() != applicationContext) {
log.debug("Received a event from another application context {}, ignoring it", event.getApplicationContext());
}
// 刷新事件
if (event instanceof ContextRefreshedEvent) {
Map<String, Object> annotation = applicationContext.getBeansWithAnnotation(RpcProducer.class);
if (MapUtils.isEmpty(annotation)) {
log.info("no simple rpc exist");
} else {
annotation.forEach((beanName, bean) -> {
log.info("simple rpc beanName: {}, bean: {}", beanName, bean);
2️⃣ if (bean.getClass().isAnnotationPresent(RpcProducer.class)) {
// 获取对象
3️⃣ List<Producer> producerList = this.buildProviderService(bean);
// 1. 启动服务
4️⃣ NettyService.getInstance().startService(RpcPropertiesUtil.getServerPort());
// 2. 服务注册
5️⃣ IRegisterCenterZkImpl.getInstance().register(producerList);
}
});
}
} else if (event instanceof ContextClosedEvent) {
// 销毁所有的服务
6️⃣ IRegisterCenterZkImpl.getInstance().destroy(null);
log.info("simple rpc closed");
}
}
1️⃣:判断上下文和当前 Spring 上下文是否一致
2️⃣:判断当前实例对象是否被 RpcProducer
注解
3️⃣:根据当前 bean
构造 Producer
对象
实现代码如下:
//ProducerAnnotationBean.class
public List<Producer> buildProviderService(Object bean) {
RpcProducer annotation = bean.getClass().getAnnotation(RpcProducer.class);
Class<?> serviceItf = annotation.serviceItf();
if (serviceItf == Object.class) {
serviceItf = bean.getClass().getInterfaces()[0];
}
NettyHandlerServer.PRODUCER_BEAN_MAP.put(serviceItf.getName(), bean);
// 获取注解信息
String groupName = annotation.groupName();
long timeout = annotation.timeout();
int weight = annotation.weight();
int workThreads = annotation.workThreads();
Class<?> finalServiceItf = serviceItf;
return Arrays.stream(bean.getClass().getDeclaredMethods())
// 过滤掉 Object.class 的 equals、notify 等方法
.filter(method -> !method.getDeclaringClass().equals(Object.class))
.map(method -> {
Producer producer = Producer.builder()
.serviceItf(finalServiceItf)
.serviceObject(bean)
.ip(IpUtil.getLocalIP())
.port(RpcPropertiesUtil.getServerPort())
.timeout(timeout)
.appKey(RpcPropertiesUtil.getAppKey())
.groupName(groupName)
.weight(weight)
.workerThreads(workThreads)
.method(method)
.build();
return producer;
}).collect(Collectors.toList());
}
4️⃣:启动 Netty Server 服务,这一段比较重要,稍后讲解
5️⃣:向注册中心注册服务提供方的节点信息
6️⃣:服务关闭时,清除所有的节点注册信息
完成上述步骤后,所有被 RpcProducer
注解的 class
就都将被注册到注册中心中。
服务提供方启动 Netty 服务#
上一步的步骤 4️⃣中,在向服务中心注册节点时,同时根据注解的端口启动了 Netty
服务,那么这个 Netty
服务是如何启动的,然后启动的目的是什么呢?
在第一章中我们有讲到,客户端的接口在执行反射操作时,会连接服务端的 Netty
服务,然后发送 NettyRequest
请求。那么服务端接收到 NettyRequest
请求后会做那么事情呢?服务端又是如何根据 NettyRequest
映射到具体的方法上,同时将方法执行的结果进行返回的呢?这一节我们一起弄清楚这一点。
新建一个 NettyService.class
//NettyService.class
@Slf4j
public class NettyService {
private static final NettyService instance = new NettyService();
private EventLoopGroup bossGroup;
private EventLoopGroup workGroup;
public static NettyService getInstance() {
return instance;
}
public void startService(int port) {
synchronized (NettyService.class) {
if (bossGroup != null || workGroup != null) {
log.debug("netty server is already start");
return;
}
bossGroup = new NioEventLoopGroup(1);
workGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 解码器
socketChannel.pipeline().addLast(new NettyDecoderHandler(NettyRequest.class));
// 编码器
socketChannel.pipeline().addLast(new NettyEncoderHandler());
// 服务处理
1️⃣ socketChannel.pipeline().addLast(new NettyHandlerServer());
}
});
try {
serverBootstrap.bind(port).sync().channel();
log.info("NettyServer start {} start now!!!", IpUtil.getLocalIP() + UrlConstants.COLON + port);
} catch (Exception e) {
log.error("NettyServer startServer error", e);
}
}
}
}
1️⃣:设置消息处理器
上述代码都比较常规,主要是使用 Netty
根据传入的端口进行了服务的启动,其中最主要的代码是
socketChannel.pipeline().addLast(new NettyHandlerServer());
设置了消息的处理器。
//NettyHandlerServer.class
public class NettyHandlerServer extends SimpleChannelInboundHandler<NettyRequest> {
/**
* service name to bean map
*/
public static final Map<String, Object> PRODUCER_BEAN_MAP = Maps.newConcurrentMap();
@Override
protected void channelRead0(ChannelHandlerContext ctx, NettyRequest nettyRequest) throws Exception {
if (!ctx.channel().isWritable()) {
log.error("channel closed!");
return;
}
Producer localProducer = this.getLocalProducer(nettyRequest);
if (localProducer == null) {
log.error("service not found, request={}", nettyRequest);
return;
}
Object result = this.invockMethod(localProducer, nettyRequest);
NettyResponse response = NettyResponse.builder()
.uniqueKey(nettyRequest.getUniqueKey())
.result(result)
.build();
ctx.writeAndFlush(response);
}
/**
* 根据 methodname 获取 Producer
*/
private Producer getLocalProducer(NettyRequest request) {
String methodName = request.getInvokeMethodName();
String name = request.getProducer().getServiceItf().getName();
List<Producer> producerList = IRegisterCenterZkImpl.getInstance().getProducersMap().get(name);
return Collections2.filter(producerList, producer -> {
assert producer != null;
Method method = producer.getMethod();
return method != null && method.getName().equals(methodName);
}).iterator().next();
}
/**
* 方法反射调用
*/
private Object invockMethod(Producer producer, NettyRequest request) {
// TODO: 2023/12/8 增加超时检测
Object serviceObject = producer.getServiceObject();
Method method = producer.getMethod();
Object result = null;
try {
// 这里是最重要的,服务端反射调用方法
result = method.invoke(serviceObject, request.getArgs());
return result;
} catch (Exception e) {
result = e;
log.error("NettyServerBizHandler invokeMethod error, provider={}, request={}", producer, request, e);
}
return result;
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("NettyServerBizHandler error, now ctx closed", cause);
// 发生异常,关闭链路
ctx.close();
}
}
消息处理器中,在 invockMethod
方法中,根据传入的 request
的 method
参数进行了方法的反射获取执行结果,然后进行返回。反射执行完成后,将结果组装成 NettyResponse
结果进行返回。
那么到这里就完成了整个远程服务的执行。
Spring 注解服务扫描#
定义 @EnableRpc
注解
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({RpcImportSelector.class})
public @interface EnableRpc {
}
注解中 @Import
了 RpcImportSelector.class
对象
public class RpcImportSelector implements ImportSelector {
@Override
public String[] selectImports(AnnotationMetadata annotationMetadata) {
return new String[]{"com.simon.spring.ProducerAnnotationBean",
"com.simon.spring.ConsumerAnnotaionBean"};
}
}
RpcImportSelector
对象实现了 ImportSelector
接口, selectImports
方法中返回了处理客户端和服务端注解的类。
客户端启动类 ConsumerApplication.class
@EnableRpc
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
服务启动时,只需要加上 @EnableRpc
注解即可启动 RPC 服务
实例#
客户端#
ConsumerApplication.class
启动类
@EnableRpc
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
UserController.class
请求接口
@RestController
@RequestMapping("/user")
public class UserController {
// 需要远程调用的接口
@RpcClient(remoteAppKey = "test")
private UserService userService;
@GetMapping("/getUser/{username}")
public String getUser(@PathVariable String username) {
return userService.get(username);
}
}
配置文件
application.properties
server.port=8082
rpc.properties
# netty 配置
rpc_app_key=test
# ZK 配置
zk_server=127.0.0.1:2181
rpc_session_timeout=3000
rpc_connection_timeout=3000
rpc_channel_connect_size=3
服务端#
ProducerApplication.class
启动类
@EnableRpc
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
UserServiceImpl.class
接口实现类
@Slf4j
@Service
@RpcProducer
public class UserServiceImpl implements UserService {
@Override
public String get(String username) {
return "username " + username + "服务信息: " + RpcPropertiesUtil.getServerPort();
}
}
配置信息
application.properties
server.port=8081
rpc.properties
# netty 配置
rpc_app_key=test
rpc_server_port=9999
# ZK 配置
zk_server=127.0.0.1:2181
rpc_session_timeout=3000
rpc_connection_timeout=3000
rpc_channel_connect_size=3
服务启动#
分别启动服务端和客户端,然后请求 http://127.0.0.1:8082/user/getUser/simon, 可以查看结果
总结#
至此,如何开发一个自己的 RPC 框架就全部完成了,框架的内容非常粗糙,距离真正商用版本还差很多距离。但是通过参考别人的源码,可以理解一个基础的 RPC 框架的大体结构和所需要用到的知识,包括 Netty、Spring 、序列化、负载均衡策略、 zookeeper、 Java 的反射等等。但不管怎么说,有进步就是一件很快乐的事情!