simon

simon

如何開發一個自己的 RPC 框架(三)

如何開發一個自己的 RPC 框架 (三)#

2023 年 12 月 13 日 上午 9:37

之前的兩篇文章中,已經介紹了實現一個 RPC 框架所需要的客戶端和註冊中心邏輯,那麼這一章,將主要介紹如何實現 RPC 框架中的服務端 (服務提供方) 和完善框架的其他補充邏輯。

RPC 服務端實現#

服務註解 (RpcProducer)#

和客戶端調用標識類似,服務提供方也需要設置服務標識,用來服務的註冊和相關配置信息拓展

@RpcProducer 服務提供方註解

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface RpcProducer {

    /**
     * 服務接口:用於註冊在服務註冊中心,服務調用端獲取後緩存再本地用於發起服務調用
     */
    Class<?> serviceItf() default Object.class;

		/**
     * 服務分組組名:可用於分組灰度發布,配置不同分組,可以讓調用都路由到配置了相同分組的路由上
     */
    String groupName() default "default";

    /**
     * 超時時間:控制服務端超時時間 ms
     */
    long timeout() default 3000L;

    /**
     * 服務提供者權重:配置該機器在集群中的權重,用於某些負載均衡算法
     */
    int weight() default 1;

    /**
     * 服務端線程數:限制服務端改服務線程數,服務端限流
     */
    int workThreads() default 10;
}

服務提供方節點註冊#

為了實現所有被 @RpcProducer 註解的實現類都可以被註冊到註冊中心,在 Spring 中,需要實現

  • ApplicationListener: onApplicationEvent 監聽應用程序事件
  • ApplicationContextAware: setApplicationContext 設置當前上下文
  • DisposableBean:destroy 方法,bean 被銷毀時執行

這三個接口,通過 setApplicationContext 設置當前上下文,通過 onApplicationEvent 監聽服務實現類然後提交到註冊中心

理清楚上面流程後,便可以創建實現類 ProducerAnnotationBean

//ProducerAnnotationBean.class
@Slf4j
public class ProducerAnnotationBean implements ApplicationListener<ApplicationContextEvent>, ApplicationContextAware, DisposableBean {

    private ApplicationContext applicationContext;

    @Override
    public void destroy() throws Exception {
        /* 銷毀 */
        log.debug("AnnotationServicesPublisher bean destroy");
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
       1️⃣ this.applicationContext = applicationContext;
    }

    @Override
    public void onApplicationEvent(ApplicationContextEvent event) {2️⃣}
}

1️⃣:設置當前上下文

2️⃣:監聽 Spring Bean 的動作

onApplicationEvent 的實現如下:

//ProducerAnnotationBean.class
@Override
    public void onApplicationEvent(ApplicationContextEvent event) {
        1️⃣ if (event.getApplicationContext() != applicationContext) {
            log.debug("Received a event from another application context {}, ignoring it", event.getApplicationContext());
        }

        // 刷新事件
        if (event instanceof ContextRefreshedEvent) {
            Map<String, Object> annotation = applicationContext.getBeansWithAnnotation(RpcProducer.class);
            if (MapUtils.isEmpty(annotation)) {
                log.info("no simple rpc exist");

            } else {
                annotation.forEach((beanName, bean) -> {
                    log.info("simple rpc beanName: {}, bean: {}", beanName, bean);
                   2️⃣ if (bean.getClass().isAnnotationPresent(RpcProducer.class)) {
                        // 獲取對象
                    3️⃣    List<Producer> producerList = this.buildProviderService(bean);
                        // 1. 啟動服務
                    4️⃣    NettyService.getInstance().startService(RpcPropertiesUtil.getServerPort());
                        // 2. 服務註冊
                     5️⃣   IRegisterCenterZkImpl.getInstance().register(producerList);
                    }
                });
            }
        } else if (event instanceof ContextClosedEvent) {
            // 銷毀所有的服務
           6️⃣ IRegisterCenterZkImpl.getInstance().destroy(null);
            log.info("simple rpc closed");
        }
    }

1️⃣:判斷上下文和當前 Spring 上下文是否一致

2️⃣:判斷當前實例對象是否被 RpcProducer 註解

3️⃣:根據當前 bean 構造 Producer 對象

實現代碼如下:

//ProducerAnnotationBean.class
public List<Producer> buildProviderService(Object bean) {
        RpcProducer annotation = bean.getClass().getAnnotation(RpcProducer.class);
        Class<?> serviceItf = annotation.serviceItf();
        if (serviceItf == Object.class) {
            serviceItf = bean.getClass().getInterfaces()[0];
        }
			
        NettyHandlerServer.PRODUCER_BEAN_MAP.put(serviceItf.getName(), bean);
			
				// 獲取註解信息
        String groupName = annotation.groupName();
        long timeout = annotation.timeout();
        int weight = annotation.weight();
        int workThreads = annotation.workThreads();

        Class<?> finalServiceItf = serviceItf;
        return Arrays.stream(bean.getClass().getDeclaredMethods())
								// 過濾掉 Object.class 的 equals、notify 等方法
                .filter(method -> !method.getDeclaringClass().equals(Object.class))
                .map(method -> {
                    Producer producer = Producer.builder()
                            .serviceItf(finalServiceItf)
                            .serviceObject(bean)
                            .ip(IpUtil.getLocalIP())
                            .port(RpcPropertiesUtil.getServerPort())
                            .timeout(timeout)
                            .appKey(RpcPropertiesUtil.getAppKey())
                            .groupName(groupName)
                            .weight(weight)
                            .workerThreads(workThreads)
                            .method(method)
                            .build();
                    return producer;
                }).collect(Collectors.toList());
    }

4️⃣:啟動 Netty Server 服務,這一段比較重要,稍後講解

5️⃣:向註冊中心註冊服務提供方的節點信息

6️⃣:服務關閉時,清除所有的節點註冊信息

完成上述步驟後,所有被 RpcProducer 註解的 class 就都將被註冊到註冊中心中。

服務提供方啟動 Netty 服務#

上一步的步驟 4️⃣中,在向服務中心註冊節點時,同時根據註解的端口啟動了 Netty 服務,那麼這個 Netty 服務是如何啟動的,然後啟動的目的是什么呢?

在第一章中我們有講到,客戶端的接口在執行反射操作時,會連接服務端的 Netty 服務,然後發送 NettyRequest 請求。那么服務端接收到 NettyRequest 請求後會做那麼事情呢?服務端又是如何根據 NettyRequest映射到具體的方法上,同時將方法執行的結果進行返回的呢?這一節我們一起弄清楚這一點。

Mermaid Loading...

新建一個 NettyService.class

//NettyService.class
@Slf4j
public class NettyService {
    private static final NettyService instance = new NettyService();

    private EventLoopGroup bossGroup;

    private EventLoopGroup workGroup;

    public static NettyService getInstance() {
        return instance;
    }

    public void startService(int port) {
        synchronized (NettyService.class) {
            if (bossGroup != null || workGroup != null) {
                log.debug("netty server is already start");
                return;
            }

            bossGroup = new NioEventLoopGroup(1);
            workGroup = new NioEventLoopGroup();
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 解碼器
                            socketChannel.pipeline().addLast(new NettyDecoderHandler(NettyRequest.class));
                            // 編碼器
                            socketChannel.pipeline().addLast(new NettyEncoderHandler());
                            // 服務處理
                            1️⃣ socketChannel.pipeline().addLast(new NettyHandlerServer());
                        }
                    });

            try {
                serverBootstrap.bind(port).sync().channel();
                log.info("NettyServer start {} start now!!!", IpUtil.getLocalIP() + UrlConstants.COLON + port);

            } catch (Exception e) {
                log.error("NettyServer startServer error", e);
            }
        }
    }
}

1️⃣:設置消息處理器

上述代碼都比較常規,主要是使用 Netty 根據傳入的端口進行了服務的啟動,其中最主要的代碼是

socketChannel.pipeline().addLast(new NettyHandlerServer()); 設置了消息的處理器。

//NettyHandlerServer.class
public class NettyHandlerServer extends SimpleChannelInboundHandler<NettyRequest> {

    /**
     * service name to bean map
     */
    public static final Map<String, Object> PRODUCER_BEAN_MAP = Maps.newConcurrentMap();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, NettyRequest nettyRequest) throws Exception {
        if (!ctx.channel().isWritable()) {
            log.error("channel closed!");
            return;
        }

        Producer localProducer = this.getLocalProducer(nettyRequest);
        if (localProducer == null) {
            log.error("service not found, request={}", nettyRequest);
            return;
        }

        Object result = this.invockMethod(localProducer, nettyRequest);
        NettyResponse response = NettyResponse.builder()
                .uniqueKey(nettyRequest.getUniqueKey())
                .result(result)
                .build();
        ctx.writeAndFlush(response);
    }

    /**
     * 根據 methodname 獲取 Producer
     */
    private Producer getLocalProducer(NettyRequest request) {
        String methodName = request.getInvokeMethodName();
        String name = request.getProducer().getServiceItf().getName();
        List<Producer> producerList = IRegisterCenterZkImpl.getInstance().getProducersMap().get(name);
        return Collections2.filter(producerList, producer -> {
            assert producer != null;
            Method method = producer.getMethod();
            return method != null && method.getName().equals(methodName);
        }).iterator().next();
    }

    /**
     * 方法反射調用
     */
    private Object invockMethod(Producer producer, NettyRequest request) {
        // TODO: 2023/12/8 增加超時檢測
        Object serviceObject = producer.getServiceObject();
        Method method = producer.getMethod();
        Object result = null;

        try {
            // 這裡是最重要的,服務端反射調用方法
            result = method.invoke(serviceObject, request.getArgs());
            return result;
        } catch (Exception e) {
            result = e;
            log.error("NettyServerBizHandler invokeMethod error, provider={}, request={}", producer, request, e);
        }

        return result;
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("NettyServerBizHandler error, now ctx closed", cause);
        // 發生異常,關閉鏈路
        ctx.close();
    }
}

消息處理器中,在 invockMethod 方法中,根據傳入的 requestmethod 參數進行了方法的反射獲取執行結果,然後進行返回。反射執行完成後,將結果組裝成 NettyResponse 結果進行返回。

那麼到這裡就完成了整個遠程服務的執行。

Spring 註解服務掃描#

定義 @EnableRpc 註解

@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({RpcImportSelector.class})
public @interface EnableRpc {
}

註解中 @ImportRpcImportSelector.class 對象

public class RpcImportSelector implements ImportSelector {

    @Override
    public String[] selectImports(AnnotationMetadata annotationMetadata) {
        return new String[]{"com.simon.spring.ProducerAnnotationBean",
                "com.simon.spring.ConsumerAnnotaionBean"};
    }

}

RpcImportSelector 對象實現了 ImportSelector 接口, selectImports 方法中返回了處理客戶端和服務端註解的類。

客戶端啟動類 ConsumerApplication.class

@EnableRpc
@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

服務啟動時,只需要加上 @EnableRpc 註解即可啟動 RPC 服務

實例#

客戶端#

ConsumerApplication.class 啟動類

@EnableRpc
@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

UserController.class 請求接口

@RestController
@RequestMapping("/user")
public class UserController {
		
		// 需要遠程調用的接口
    @RpcClient(remoteAppKey = "test")
    private UserService userService;

    @GetMapping("/getUser/{username}")
    public String getUser(@PathVariable String username) {
        return userService.get(username);
    }
}

配置文件

application.properties

server.port=8082

rpc.properties

# netty 配置
rpc_app_key=test
# ZK 配置
zk_server=127.0.0.1:2181
rpc_session_timeout=3000
rpc_connection_timeout=3000
rpc_channel_connect_size=3

服務端#

ProducerApplication.class 啟動類

@EnableRpc
@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
}

UserServiceImpl.class 接口實現類

@Slf4j
@Service
@RpcProducer
public class UserServiceImpl implements UserService {
    @Override
    public String get(String username) {
        return "username " + username + "服務信息: " + RpcPropertiesUtil.getServerPort();
    }
}

配置信息

application.properties

server.port=8081

rpc.properties

# netty 配置
rpc_app_key=test
rpc_server_port=9999
# ZK 配置
zk_server=127.0.0.1:2181
rpc_session_timeout=3000
rpc_connection_timeout=3000
rpc_channel_connect_size=3

服務啟動#

分別啟動服務端和客戶端,然後請求 http://127.0.0.1:8082/user/getUser/simon, 可以查看結果

1

總結#

至此,如何開發一個自己的 RPC 框架就全部完成了,框架的內容非常粗糙,距離真正商用版本還差很多距離。但是通過參考別人的源碼,可以理解一個基礎的 RPC 框架的大體結構和所需要用到的知識,包括 Netty、Spring 、序列化、負載均衡策略、 zookeeper、 Java 的反射等等。但不管怎麼說,有進步就是一件很快樂的事情!

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。