simon

simon

如何开发一个自己的 RPC 框架(三)

如何开发一个自己的 RPC 框架 (三)#

December 13, 2023 9:37 AM

之前的两篇文章中,已经介绍了实现一个 RPC 框架所需要的客户端和注册中心逻辑,那么这一章,将主要介绍如何实现 RPC 框架中的服务端 (服务提供方) 和完善框架的其他补充逻辑。

RPC 服务端实现#

服务注解 (RpcProducer)#

和客户端调用标识类似,服务提供方也需要设置服务标识,用来服务的注册和相关配置信息拓展

@RpcProducer 服务提供方注解

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface RpcProducer {

    /**
     * 服务接口:用于注册在服务注册中心,服务调用端获取后缓存再本地用于发起服务调用
     */
    Class<?> serviceItf() default Object.class;

		/**
     * 服务分组组名:可用于分组灰度发布,配置不同分组,可以让调用都路由到配置了相同分组的路由上
     */
    String groupName() default "default";

    /**
     * 超时时间:控制服务端超时时间 ms
     */
    long timeout() default 3000L;

    /**
     * 服务提供者权重:配置该机器在集群中的权重,用于某些负载均衡算法
     */
    int weight() default 1;

    /**
     * 服务端线程数:限制服务端改服务线程数,服务端限流
     */
    int workThreads() default 10;
}

服务提供方节点注册#

为了实现所有被 @RpcProducer 注解的实现类都可以被注册到注册中心,在 Spring 中,需要实现

  • ApplicationListener: onApplicationEvent 监听应用程序事件
  • ApplicationContextAware: setApplicationContext 设置当前上下文
  • DisposableBean:destroy 方法,bean 被销毁时执行

这三个接口,通过 setApplicationContext 设置当前上下文,通过 onApplicationEvent 监听服务实现类然后提交到注册中心

理清楚上面流程后,便可以创建实现类 ProducerAnnotationBean

//ProducerAnnotationBean.class
@Slf4j
public class ProducerAnnotationBean implements ApplicationListener<ApplicationContextEvent>, ApplicationContextAware, DisposableBean {

    private ApplicationContext applicationContext;

    @Override
    public void destroy() throws Exception {
        /* 销毁 */
        log.debug("AnnotationServicesPublisher bean destroy");
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
       1️⃣ this.applicationContext = applicationContext;
    }

    @Override
    public void onApplicationEvent(ApplicationContextEvent event) {2️⃣}
}

1️⃣:设置当前上下文

2️⃣:监听 Spring Bean 的动作

onApplicationEvent 的实现如下:

//ProducerAnnotationBean.class
@Override
    public void onApplicationEvent(ApplicationContextEvent event) {
        1️⃣ if (event.getApplicationContext() != applicationContext) {
            log.debug("Received a event from another application context {}, ignoring it", event.getApplicationContext());
        }

        // 刷新事件
        if (event instanceof ContextRefreshedEvent) {
            Map<String, Object> annotation = applicationContext.getBeansWithAnnotation(RpcProducer.class);
            if (MapUtils.isEmpty(annotation)) {
                log.info("no simple rpc exist");

            } else {
                annotation.forEach((beanName, bean) -> {
                    log.info("simple rpc beanName: {}, bean: {}", beanName, bean);
                   2️⃣ if (bean.getClass().isAnnotationPresent(RpcProducer.class)) {
                        // 获取对象
                    3️⃣    List<Producer> producerList = this.buildProviderService(bean);
                        // 1. 启动服务
                    4️⃣    NettyService.getInstance().startService(RpcPropertiesUtil.getServerPort());
                        // 2. 服务注册
                     5️⃣   IRegisterCenterZkImpl.getInstance().register(producerList);
                    }
                });
            }
        } else if (event instanceof ContextClosedEvent) {
            // 销毁所有的服务
           6️⃣ IRegisterCenterZkImpl.getInstance().destroy(null);
            log.info("simple rpc closed");
        }
    }

1️⃣:判断上下文和当前 Spring 上下文是否一致

2️⃣:判断当前实例对象是否被 RpcProducer 注解

3️⃣:根据当前 bean 构造 Producer 对象

实现代码如下:

//ProducerAnnotationBean.class
public List<Producer> buildProviderService(Object bean) {
        RpcProducer annotation = bean.getClass().getAnnotation(RpcProducer.class);
        Class<?> serviceItf = annotation.serviceItf();
        if (serviceItf == Object.class) {
            serviceItf = bean.getClass().getInterfaces()[0];
        }
			
        NettyHandlerServer.PRODUCER_BEAN_MAP.put(serviceItf.getName(), bean);
			
				// 获取注解信息
        String groupName = annotation.groupName();
        long timeout = annotation.timeout();
        int weight = annotation.weight();
        int workThreads = annotation.workThreads();

        Class<?> finalServiceItf = serviceItf;
        return Arrays.stream(bean.getClass().getDeclaredMethods())
								// 过滤掉 Object.class 的 equals、notify 等方法
                .filter(method -> !method.getDeclaringClass().equals(Object.class))
                .map(method -> {
                    Producer producer = Producer.builder()
                            .serviceItf(finalServiceItf)
                            .serviceObject(bean)
                            .ip(IpUtil.getLocalIP())
                            .port(RpcPropertiesUtil.getServerPort())
                            .timeout(timeout)
                            .appKey(RpcPropertiesUtil.getAppKey())
                            .groupName(groupName)
                            .weight(weight)
                            .workerThreads(workThreads)
                            .method(method)
                            .build();
                    return producer;
                }).collect(Collectors.toList());
    }

4️⃣:启动 Netty Server 服务,这一段比较重要,稍后讲解

5️⃣:向注册中心注册服务提供方的节点信息

6️⃣:服务关闭时,清除所有的节点注册信息

完成上述步骤后,所有被 RpcProducer 注解的 class 就都将被注册到注册中心中。

服务提供方启动 Netty 服务#

上一步的步骤 4️⃣中,在向服务中心注册节点时,同时根据注解的端口启动了 Netty 服务,那么这个 Netty 服务是如何启动的,然后启动的目的是什么呢?

在第一章中我们有讲到,客户端的接口在执行反射操作时,会连接服务端的 Netty 服务,然后发送 NettyRequest 请求。那么服务端接收到 NettyRequest 请求后会做那么事情呢?服务端又是如何根据 NettyRequest映射到具体的方法上,同时将方法执行的结果进行返回的呢?这一节我们一起弄清楚这一点。

Mermaid Loading...

新建一个 NettyService.class

//NettyService.class
@Slf4j
public class NettyService {
    private static final NettyService instance = new NettyService();

    private EventLoopGroup bossGroup;

    private EventLoopGroup workGroup;

    public static NettyService getInstance() {
        return instance;
    }

    public void startService(int port) {
        synchronized (NettyService.class) {
            if (bossGroup != null || workGroup != null) {
                log.debug("netty server is already start");
                return;
            }

            bossGroup = new NioEventLoopGroup(1);
            workGroup = new NioEventLoopGroup();
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 解码器
                            socketChannel.pipeline().addLast(new NettyDecoderHandler(NettyRequest.class));
                            // 编码器
                            socketChannel.pipeline().addLast(new NettyEncoderHandler());
                            // 服务处理
                            1️⃣ socketChannel.pipeline().addLast(new NettyHandlerServer());
                        }
                    });

            try {
                serverBootstrap.bind(port).sync().channel();
                log.info("NettyServer start {} start now!!!", IpUtil.getLocalIP() + UrlConstants.COLON + port);

            } catch (Exception e) {
                log.error("NettyServer startServer error", e);
            }
        }
    }
}

1️⃣:设置消息处理器

上述代码都比较常规,主要是使用 Netty 根据传入的端口进行了服务的启动,其中最主要的代码是

socketChannel.pipeline().addLast(new NettyHandlerServer()); 设置了消息的处理器。

//NettyHandlerServer.class
public class NettyHandlerServer extends SimpleChannelInboundHandler<NettyRequest> {

    /**
     * service name to bean map
     */
    public static final Map<String, Object> PRODUCER_BEAN_MAP = Maps.newConcurrentMap();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, NettyRequest nettyRequest) throws Exception {
        if (!ctx.channel().isWritable()) {
            log.error("channel closed!");
            return;
        }

        Producer localProducer = this.getLocalProducer(nettyRequest);
        if (localProducer == null) {
            log.error("service not found, request={}", nettyRequest);
            return;
        }

        Object result = this.invockMethod(localProducer, nettyRequest);
        NettyResponse response = NettyResponse.builder()
                .uniqueKey(nettyRequest.getUniqueKey())
                .result(result)
                .build();
        ctx.writeAndFlush(response);
    }

    /**
     * 根据 methodname 获取 Producer
     */
    private Producer getLocalProducer(NettyRequest request) {
        String methodName = request.getInvokeMethodName();
        String name = request.getProducer().getServiceItf().getName();
        List<Producer> producerList = IRegisterCenterZkImpl.getInstance().getProducersMap().get(name);
        return Collections2.filter(producerList, producer -> {
            assert producer != null;
            Method method = producer.getMethod();
            return method != null && method.getName().equals(methodName);
        }).iterator().next();
    }

    /**
     * 方法反射调用
     */
    private Object invockMethod(Producer producer, NettyRequest request) {
        // TODO: 2023/12/8 增加超时检测
        Object serviceObject = producer.getServiceObject();
        Method method = producer.getMethod();
        Object result = null;

        try {
            // 这里是最重要的,服务端反射调用方法
            result = method.invoke(serviceObject, request.getArgs());
            return result;
        } catch (Exception e) {
            result = e;
            log.error("NettyServerBizHandler invokeMethod error, provider={}, request={}", producer, request, e);
        }

        return result;
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("NettyServerBizHandler error, now ctx closed", cause);
        // 发生异常,关闭链路
        ctx.close();
    }
}

消息处理器中,在 invockMethod 方法中,根据传入的 requestmethod 参数进行了方法的反射获取执行结果,然后进行返回。反射执行完成后,将结果组装成 NettyResponse 结果进行返回。

那么到这里就完成了整个远程服务的执行。

Spring 注解服务扫描#

定义 @EnableRpc 注解

@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({RpcImportSelector.class})
public @interface EnableRpc {
}

注解中 @ImportRpcImportSelector.class 对象

public class RpcImportSelector implements ImportSelector {

    @Override
    public String[] selectImports(AnnotationMetadata annotationMetadata) {
        return new String[]{"com.simon.spring.ProducerAnnotationBean",
                "com.simon.spring.ConsumerAnnotaionBean"};
    }

}

RpcImportSelector 对象实现了 ImportSelector 接口, selectImports 方法中返回了处理客户端和服务端注解的类。

客户端启动类 ConsumerApplication.class

@EnableRpc
@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

服务启动时,只需要加上 @EnableRpc 注解即可启动 RPC 服务

实例#

客户端#

ConsumerApplication.class 启动类

@EnableRpc
@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

UserController.class 请求接口

@RestController
@RequestMapping("/user")
public class UserController {
		
		// 需要远程调用的接口
    @RpcClient(remoteAppKey = "test")
    private UserService userService;

    @GetMapping("/getUser/{username}")
    public String getUser(@PathVariable String username) {
        return userService.get(username);
    }
}

配置文件

application.properties

server.port=8082

rpc.properties

# netty 配置
rpc_app_key=test
# ZK 配置
zk_server=127.0.0.1:2181
rpc_session_timeout=3000
rpc_connection_timeout=3000
rpc_channel_connect_size=3

服务端#

ProducerApplication.class 启动类

@EnableRpc
@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
}

UserServiceImpl.class 接口实现类

@Slf4j
@Service
@RpcProducer
public class UserServiceImpl implements UserService {
    @Override
    public String get(String username) {
        return "username " + username + "服务信息: " + RpcPropertiesUtil.getServerPort();
    }
}

配置信息

application.properties

server.port=8081

rpc.properties

# netty 配置
rpc_app_key=test
rpc_server_port=9999
# ZK 配置
zk_server=127.0.0.1:2181
rpc_session_timeout=3000
rpc_connection_timeout=3000
rpc_channel_connect_size=3

服务启动#

分别启动服务端和客户端,然后请求 http://127.0.0.1:8082/user/getUser/simon, 可以查看结果

1

总结#

至此,如何开发一个自己的 RPC 框架就全部完成了,框架的内容非常粗糙,距离真正商用版本还差很多距离。但是通过参考别人的源码,可以理解一个基础的 RPC 框架的大体结构和所需要用到的知识,包括 Netty、Spring 、序列化、负载均衡策略、 zookeeper、 Java 的反射等等。但不管怎么说,有进步就是一件很快乐的事情!

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。