simon

simon

自分の RPC フレームワークを開発する方法 (三)

自分の RPC フレームワークを開発する方法(3)#

2023 年 12 月 13 日 9:37 AM

前の 2 つの記事では、RPC フレームワークを実現するために必要なクライアントとレジストリセンターのロジックについて紹介しました。この章では、RPC フレームワークのサーバー(サービス提供者)を実装し、フレームワークの他の補完ロジックを充実させる方法について主に説明します。

RPC サーバーの実装#

サービスアノテーション(RpcProducer)#

クライアント呼び出しの識別子と同様に、サービス提供者もサービス識別子を設定する必要があります。これはサービスの登録と関連する設定情報の拡張に使用されます。

@RpcProducer サービス提供者アノテーション

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface RpcProducer {

    /**
     * サービスインターフェース: サービスレジストリセンターに登録され、サービス呼び出し側が取得後にローカルにキャッシュしてサービス呼び出しを開始するために使用されます。
     */
    Class<?> serviceItf() default Object.class;

    /**
     * サービスグループ名: グループのグレースフルリリースに使用でき、異なるグループを設定することで、呼び出しを同じグループに設定されたルーティングにルーティングできます。
     */
    String groupName() default "default";

    /**
     * タイムアウト時間: サーバーのタイムアウト時間を制御します(ms)。
     */
    long timeout() default 3000L;

    /**
     * サービス提供者の重み: クラスター内のこのマシンの重みを設定し、特定の負荷分散アルゴリズムに使用されます。
     */
    int weight() default 1;

    /**
     * サーバースレッド数: サーバーのサービススレッド数を制限し、サーバーのスロットリングを行います。
     */
    int workThreads() default 10;
}

サービス提供者ノードの登録#

すべての @RpcProducer アノテーションが付けられた実装クラスがレジストリセンターに登録できるようにするために、Spring では以下を実装する必要があります。

  • ApplicationListener: onApplicationEvent アプリケーションイベントをリッスンします。
  • ApplicationContextAware: setApplicationContext 現在のコンテキストを設定します。
  • DisposableBean:destroy メソッド、bean が破棄されるときに実行されます。

これらの 3 つのインターフェースを通じて、setApplicationContext で現在のコンテキストを設定し、onApplicationEvent でサービス実装クラスをリッスンしてレジストリセンターに提出します。

上記のプロセスを整理した後、実装クラス ProducerAnnotationBean を作成できます。

//ProducerAnnotationBean.class
@Slf4j
public class ProducerAnnotationBean implements ApplicationListener<ApplicationContextEvent>, ApplicationContextAware, DisposableBean {

    private ApplicationContext applicationContext;

    @Override
    public void destroy() throws Exception {
        /* 破棄 */
        log.debug("AnnotationServicesPublisher bean destroy");
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        1️⃣ this.applicationContext = applicationContext;
    }

    @Override
    public void onApplicationEvent(ApplicationContextEvent event) {2️⃣}
}

1️⃣:現在のコンテキストを設定します。

2️⃣:Spring Bean のアクションをリッスンします。

onApplicationEvent の実装は以下の通りです。

//ProducerAnnotationBean.class
@Override
    public void onApplicationEvent(ApplicationContextEvent event) {
        1️⃣ if (event.getApplicationContext() != applicationContext) {
            log.debug("別のアプリケーションコンテキストからのイベントを受信しました {}, 無視します", event.getApplicationContext());
        }

        // リフレッシュイベント
        if (event instanceof ContextRefreshedEvent) {
            Map<String, Object> annotation = applicationContext.getBeansWithAnnotation(RpcProducer.class);
            if (MapUtils.isEmpty(annotation)) {
                log.info("シンプルRPCは存在しません");

            } else {
                annotation.forEach((beanName, bean) -> {
                    log.info("シンプルRPC beanName: {}, bean: {}", beanName, bean);
                    2️⃣ if (bean.getClass().isAnnotationPresent(RpcProducer.class)) {
                        // オブジェクトを取得
                    3️⃣    List<Producer> producerList = this.buildProviderService(bean);
                        // 1. サービスを起動
                    4️⃣    NettyService.getInstance().startService(RpcPropertiesUtil.getServerPort());
                        // 2. サービス登録
                     5️⃣   IRegisterCenterZkImpl.getInstance().register(producerList);
                    }
                });
            }
        } else if (event instanceof ContextClosedEvent) {
            // すべてのサービスを破棄
           6️⃣ IRegisterCenterZkImpl.getInstance().destroy(null);
            log.info("シンプルRPCが閉じられました");
        }
    }

1️⃣:コンテキストと現在の Spring コンテキストが一致しているかを判断します。

2️⃣:現在のインスタンスオブジェクトが RpcProducer アノテーションが付けられているかを判断します。

3️⃣:現在の bean に基づいて Producer オブジェクトを構築します。

実装コードは以下の通りです。

//ProducerAnnotationBean.class
public List<Producer> buildProviderService(Object bean) {
        RpcProducer annotation = bean.getClass().getAnnotation(RpcProducer.class);
        Class<?> serviceItf = annotation.serviceItf();
        if (serviceItf == Object.class) {
            serviceItf = bean.getClass().getInterfaces()[0];
        }
			
        NettyHandlerServer.PRODUCER_BEAN_MAP.put(serviceItf.getName(), bean);
			
				// アノテーション情報を取得
        String groupName = annotation.groupName();
        long timeout = annotation.timeout();
        int weight = annotation.weight();
        int workThreads = annotation.workThreads();

        Class<?> finalServiceItf = serviceItf;
        return Arrays.stream(bean.getClass().getDeclaredMethods())
								// Object.class の equals、notify などのメソッドをフィルタリング
                .filter(method -> !method.getDeclaringClass().equals(Object.class))
                .map(method -> {
                    Producer producer = Producer.builder()
                            .serviceItf(finalServiceItf)
                            .serviceObject(bean)
                            .ip(IpUtil.getLocalIP())
                            .port(RpcPropertiesUtil.getServerPort())
                            .timeout(timeout)
                            .appKey(RpcPropertiesUtil.getAppKey())
                            .groupName(groupName)
                            .weight(weight)
                            .workerThreads(workThreads)
                            .method(method)
                            .build();
                    return producer;
                }).collect(Collectors.toList());
    }

4️⃣:Netty サーバーサービスを起動します。この部分は非常に重要で、後で説明します。

5️⃣:レジストリセンターにサービス提供者のノード情報を登録します。

6️⃣:サービスが閉じられるとき、すべてのノード登録情報をクリアします。

上記の手順を完了すると、すべての RpcProducer アノテーションが付けられた class がレジストリセンターに登録されます。

サービス提供者が Netty サービスを起動する#

前のステップの 4️⃣で、サービスセンターにノードを登録する際に、アノテーションのポートに基づいて Netty サービスを起動しました。この Netty サービスはどのように起動され、起動の目的は何でしょうか?

第 1 章で述べたように、クライアントのインターフェースは反射操作を実行する際に、サーバーの Netty サービスに接続し、NettyRequest リクエストを送信します。サーバーは NettyRequest リクエストを受信した後、何をするのでしょうか?サーバーはどのように NettyRequest を具体的なメソッドにマッピングし、メソッドの実行結果を返すのでしょうか?このセクションで一緒に明らかにしていきましょう。

Mermaid Loading...

新しい NettyService.class を作成します。

//NettyService.class
@Slf4j
public class NettyService {
    private static final NettyService instance = new NettyService();

    private EventLoopGroup bossGroup;

    private EventLoopGroup workGroup;

    public static NettyService getInstance() {
        return instance;
    }

    public void startService(int port) {
        synchronized (NettyService.class) {
            if (bossGroup != null || workGroup != null) {
                log.debug("nettyサーバーはすでに起動しています");
                return;
            }

            bossGroup = new NioEventLoopGroup(1);
            workGroup = new NioEventLoopGroup();
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // デコーダ
                            socketChannel.pipeline().addLast(new NettyDecoderHandler(NettyRequest.class));
                            // エンコーダ
                            socketChannel.pipeline().addLast(new NettyEncoderHandler());
                            // サービス処理
                            1️⃣ socketChannel.pipeline().addLast(new NettyHandlerServer());
                        }
                    });

            try {
                serverBootstrap.bind(port).sync().channel();
                log.info("NettyServerが {} で起動しました!!!", IpUtil.getLocalIP() + UrlConstants.COLON + port);

            } catch (Exception e) {
                log.error("NettyServer起動エラー", e);
            }
        }
    }
}

1️⃣:メッセージハンドラを設定します。

上記のコードは比較的一般的で、主に Netty を使用して渡されたポートに基づいてサービスを起動しています。最も重要なコードは

socketChannel.pipeline().addLast(new NettyHandlerServer()); メッセージハンドラを設定しました。

//NettyHandlerServer.class
public class NettyHandlerServer extends SimpleChannelInboundHandler<NettyRequest> {

    /**
     * サービス名からbeanへのマップ
     */
    public static final Map<String, Object> PRODUCER_BEAN_MAP = Maps.newConcurrentMap();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, NettyRequest nettyRequest) throws Exception {
        if (!ctx.channel().isWritable()) {
            log.error("チャネルが閉じられました!");
            return;
        }

        Producer localProducer = this.getLocalProducer(nettyRequest);
        if (localProducer == null) {
            log.error("サービスが見つかりません, request={}", nettyRequest);
            return;
        }

        Object result = this.invockMethod(localProducer, nettyRequest);
        NettyResponse response = NettyResponse.builder()
                .uniqueKey(nettyRequest.getUniqueKey())
                .result(result)
                .build();
        ctx.writeAndFlush(response);
    }

    /**
     * methodnameに基づいてProducerを取得
     */
    private Producer getLocalProducer(NettyRequest request) {
        String methodName = request.getInvokeMethodName();
        String name = request.getProducer().getServiceItf().getName();
        List<Producer> producerList = IRegisterCenterZkImpl.getInstance().getProducersMap().get(name);
        return Collections2.filter(producerList, producer -> {
            assert producer != null;
            Method method = producer.getMethod();
            return method != null && method.getName().equals(methodName);
        }).iterator().next();
    }

    /**
     * メソッド反射呼び出し
     */
    private Object invockMethod(Producer producer, NettyRequest request) {
        // TODO: 2023/12/8 タイムアウト検出を追加
        Object serviceObject = producer.getServiceObject();
        Method method = producer.getMethod();
        Object result = null;

        try {
            // ここが最も重要で、サーバーが反射を通じてメソッドを呼び出します
            result = method.invoke(serviceObject, request.getArgs());
            return result;
        } catch (Exception e) {
            result = e;
            log.error("NettyServerBizHandler invokeMethodエラー, provider={}, request={}", producer, request, e);
        }

        return result;
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("NettyServerBizHandlerエラー, 現在ctxが閉じられました", cause);
        // 例外が発生した場合、リンクを閉じます
        ctx.close();
    }
}

メッセージハンドラの中で、invockMethod メソッドでは、渡された requestmethod パラメータに基づいてメソッドの反射を取得し、実行結果を返します。反射実行が完了した後、結果を NettyResponse 結果に組み立てて返します。

これで、リモートサービスの実行が完了しました。

Spring アノテーションサービススキャン#

@EnableRpc アノテーションを定義します。

@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({RpcImportSelector.class})
public @interface EnableRpc {
}

アノテーション内で @Import された RpcImportSelector.class オブジェクト

public class RpcImportSelector implements ImportSelector {

    @Override
    public String[] selectImports(AnnotationMetadata annotationMetadata) {
        return new String[]{"com.simon.spring.ProducerAnnotationBean",
                "com.simon.spring.ConsumerAnnotaionBean"};
    }

}

RpcImportSelector オブジェクトは ImportSelector インターフェースを実装し、 selectImports メソッド内でクライアントとサーバーのアノテーションを処理するクラスを返します。

クライアント起動クラス ConsumerApplication.class

@EnableRpc
@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

サービスを起動する際は、@EnableRpc アノテーションを追加するだけで RPC サービスを起動できます。

インスタンス#

クライアント#

ConsumerApplication.class 起動クラス

@EnableRpc
@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

UserController.class リクエストインターフェース

@RestController
@RequestMapping("/user")
public class UserController {
		
		// リモート呼び出しが必要なインターフェース
    @RpcClient(remoteAppKey = "test")
    private UserService userService;

    @GetMapping("/getUser/{username}")
    public String getUser(@PathVariable String username) {
        return userService.get(username);
    }
}

設定ファイル

application.properties

server.port=8082

rpc.properties

# netty設定
rpc_app_key=test
# ZK設定
zk_server=127.0.0.1:2181
rpc_session_timeout=3000
rpc_connection_timeout=3000
rpc_channel_connect_size=3

サーバー#

ProducerApplication.class 起動クラス

@EnableRpc
@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
}

UserServiceImpl.class インターフェース実装クラス

@Slf4j
@Service
@RpcProducer
public class UserServiceImpl implements UserService {
    @Override
    public String get(String username) {
        return "username " + username + "サービス情報: " + RpcPropertiesUtil.getServerPort();
    }
}

設定情報

application.properties

server.port=8081

rpc.properties

# netty設定
rpc_app_key=test
rpc_server_port=9999
# ZK設定
zk_server=127.0.0.1:2181
rpc_session_timeout=3000
rpc_connection_timeout=3000
rpc_channel_connect_size=3

サービス起動#

サーバーとクライアントをそれぞれ起動し、http://127.0.0.1:8082/user/getUser/simon にリクエストを送信すると、結果を確認できます。

1

まとめ#

これで、自分の RPC フレームワークを開発する方法がすべて完了しました。フレームワークの内容は非常に粗いもので、実際の商用バージョンにはまだ多くの距離があります。しかし、他の人のソースコードを参考にすることで、基本的な RPC フレームワークの全体構造と必要な知識(Netty、Spring、シリアル化、負荷分散戦略、Zookeeper、Java の反射など)を理解することができます。どんな形であれ、進歩があることはとても嬉しいことです!

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。