自分の RPC フレームワークを開発する方法(3)#
2023 年 12 月 13 日 9:37 AM
前の 2 つの記事では、RPC フレームワークを実現するために必要なクライアントとレジストリセンターのロジックについて紹介しました。この章では、RPC フレームワークのサーバー(サービス提供者)を実装し、フレームワークの他の補完ロジックを充実させる方法について主に説明します。
RPC サーバーの実装#
サービスアノテーション(RpcProducer)#
クライアント呼び出しの識別子と同様に、サービス提供者もサービス識別子を設定する必要があります。これはサービスの登録と関連する設定情報の拡張に使用されます。
@RpcProducer
サービス提供者アノテーション
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface RpcProducer {
/**
* サービスインターフェース: サービスレジストリセンターに登録され、サービス呼び出し側が取得後にローカルにキャッシュしてサービス呼び出しを開始するために使用されます。
*/
Class<?> serviceItf() default Object.class;
/**
* サービスグループ名: グループのグレースフルリリースに使用でき、異なるグループを設定することで、呼び出しを同じグループに設定されたルーティングにルーティングできます。
*/
String groupName() default "default";
/**
* タイムアウト時間: サーバーのタイムアウト時間を制御します(ms)。
*/
long timeout() default 3000L;
/**
* サービス提供者の重み: クラスター内のこのマシンの重みを設定し、特定の負荷分散アルゴリズムに使用されます。
*/
int weight() default 1;
/**
* サーバースレッド数: サーバーのサービススレッド数を制限し、サーバーのスロットリングを行います。
*/
int workThreads() default 10;
}
サービス提供者ノードの登録#
すべての @RpcProducer
アノテーションが付けられた実装クラスがレジストリセンターに登録できるようにするために、Spring では以下を実装する必要があります。
- ApplicationListener:
onApplicationEvent
アプリケーションイベントをリッスンします。 - ApplicationContextAware:
setApplicationContext
現在のコンテキストを設定します。 - DisposableBean:
destroy
メソッド、bean が破棄されるときに実行されます。
これらの 3 つのインターフェースを通じて、setApplicationContext
で現在のコンテキストを設定し、onApplicationEvent
でサービス実装クラスをリッスンしてレジストリセンターに提出します。
上記のプロセスを整理した後、実装クラス ProducerAnnotationBean
を作成できます。
//ProducerAnnotationBean.class
@Slf4j
public class ProducerAnnotationBean implements ApplicationListener<ApplicationContextEvent>, ApplicationContextAware, DisposableBean {
private ApplicationContext applicationContext;
@Override
public void destroy() throws Exception {
/* 破棄 */
log.debug("AnnotationServicesPublisher bean destroy");
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
1️⃣ this.applicationContext = applicationContext;
}
@Override
public void onApplicationEvent(ApplicationContextEvent event) {2️⃣}
}
1️⃣:現在のコンテキストを設定します。
2️⃣:Spring Bean のアクションをリッスンします。
onApplicationEvent
の実装は以下の通りです。
//ProducerAnnotationBean.class
@Override
public void onApplicationEvent(ApplicationContextEvent event) {
1️⃣ if (event.getApplicationContext() != applicationContext) {
log.debug("別のアプリケーションコンテキストからのイベントを受信しました {}, 無視します", event.getApplicationContext());
}
// リフレッシュイベント
if (event instanceof ContextRefreshedEvent) {
Map<String, Object> annotation = applicationContext.getBeansWithAnnotation(RpcProducer.class);
if (MapUtils.isEmpty(annotation)) {
log.info("シンプルRPCは存在しません");
} else {
annotation.forEach((beanName, bean) -> {
log.info("シンプルRPC beanName: {}, bean: {}", beanName, bean);
2️⃣ if (bean.getClass().isAnnotationPresent(RpcProducer.class)) {
// オブジェクトを取得
3️⃣ List<Producer> producerList = this.buildProviderService(bean);
// 1. サービスを起動
4️⃣ NettyService.getInstance().startService(RpcPropertiesUtil.getServerPort());
// 2. サービス登録
5️⃣ IRegisterCenterZkImpl.getInstance().register(producerList);
}
});
}
} else if (event instanceof ContextClosedEvent) {
// すべてのサービスを破棄
6️⃣ IRegisterCenterZkImpl.getInstance().destroy(null);
log.info("シンプルRPCが閉じられました");
}
}
1️⃣:コンテキストと現在の Spring コンテキストが一致しているかを判断します。
2️⃣:現在のインスタンスオブジェクトが RpcProducer
アノテーションが付けられているかを判断します。
3️⃣:現在の bean
に基づいて Producer
オブジェクトを構築します。
実装コードは以下の通りです。
//ProducerAnnotationBean.class
public List<Producer> buildProviderService(Object bean) {
RpcProducer annotation = bean.getClass().getAnnotation(RpcProducer.class);
Class<?> serviceItf = annotation.serviceItf();
if (serviceItf == Object.class) {
serviceItf = bean.getClass().getInterfaces()[0];
}
NettyHandlerServer.PRODUCER_BEAN_MAP.put(serviceItf.getName(), bean);
// アノテーション情報を取得
String groupName = annotation.groupName();
long timeout = annotation.timeout();
int weight = annotation.weight();
int workThreads = annotation.workThreads();
Class<?> finalServiceItf = serviceItf;
return Arrays.stream(bean.getClass().getDeclaredMethods())
// Object.class の equals、notify などのメソッドをフィルタリング
.filter(method -> !method.getDeclaringClass().equals(Object.class))
.map(method -> {
Producer producer = Producer.builder()
.serviceItf(finalServiceItf)
.serviceObject(bean)
.ip(IpUtil.getLocalIP())
.port(RpcPropertiesUtil.getServerPort())
.timeout(timeout)
.appKey(RpcPropertiesUtil.getAppKey())
.groupName(groupName)
.weight(weight)
.workerThreads(workThreads)
.method(method)
.build();
return producer;
}).collect(Collectors.toList());
}
4️⃣:Netty サーバーサービスを起動します。この部分は非常に重要で、後で説明します。
5️⃣:レジストリセンターにサービス提供者のノード情報を登録します。
6️⃣:サービスが閉じられるとき、すべてのノード登録情報をクリアします。
上記の手順を完了すると、すべての RpcProducer
アノテーションが付けられた class
がレジストリセンターに登録されます。
サービス提供者が Netty サービスを起動する#
前のステップの 4️⃣で、サービスセンターにノードを登録する際に、アノテーションのポートに基づいて Netty
サービスを起動しました。この Netty
サービスはどのように起動され、起動の目的は何でしょうか?
第 1 章で述べたように、クライアントのインターフェースは反射操作を実行する際に、サーバーの Netty
サービスに接続し、NettyRequest
リクエストを送信します。サーバーは NettyRequest
リクエストを受信した後、何をするのでしょうか?サーバーはどのように NettyRequest
を具体的なメソッドにマッピングし、メソッドの実行結果を返すのでしょうか?このセクションで一緒に明らかにしていきましょう。
新しい NettyService.class
を作成します。
//NettyService.class
@Slf4j
public class NettyService {
private static final NettyService instance = new NettyService();
private EventLoopGroup bossGroup;
private EventLoopGroup workGroup;
public static NettyService getInstance() {
return instance;
}
public void startService(int port) {
synchronized (NettyService.class) {
if (bossGroup != null || workGroup != null) {
log.debug("nettyサーバーはすでに起動しています");
return;
}
bossGroup = new NioEventLoopGroup(1);
workGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// デコーダ
socketChannel.pipeline().addLast(new NettyDecoderHandler(NettyRequest.class));
// エンコーダ
socketChannel.pipeline().addLast(new NettyEncoderHandler());
// サービス処理
1️⃣ socketChannel.pipeline().addLast(new NettyHandlerServer());
}
});
try {
serverBootstrap.bind(port).sync().channel();
log.info("NettyServerが {} で起動しました!!!", IpUtil.getLocalIP() + UrlConstants.COLON + port);
} catch (Exception e) {
log.error("NettyServer起動エラー", e);
}
}
}
}
1️⃣:メッセージハンドラを設定します。
上記のコードは比較的一般的で、主に Netty
を使用して渡されたポートに基づいてサービスを起動しています。最も重要なコードは
socketChannel.pipeline().addLast(new NettyHandlerServer());
メッセージハンドラを設定しました。
//NettyHandlerServer.class
public class NettyHandlerServer extends SimpleChannelInboundHandler<NettyRequest> {
/**
* サービス名からbeanへのマップ
*/
public static final Map<String, Object> PRODUCER_BEAN_MAP = Maps.newConcurrentMap();
@Override
protected void channelRead0(ChannelHandlerContext ctx, NettyRequest nettyRequest) throws Exception {
if (!ctx.channel().isWritable()) {
log.error("チャネルが閉じられました!");
return;
}
Producer localProducer = this.getLocalProducer(nettyRequest);
if (localProducer == null) {
log.error("サービスが見つかりません, request={}", nettyRequest);
return;
}
Object result = this.invockMethod(localProducer, nettyRequest);
NettyResponse response = NettyResponse.builder()
.uniqueKey(nettyRequest.getUniqueKey())
.result(result)
.build();
ctx.writeAndFlush(response);
}
/**
* methodnameに基づいてProducerを取得
*/
private Producer getLocalProducer(NettyRequest request) {
String methodName = request.getInvokeMethodName();
String name = request.getProducer().getServiceItf().getName();
List<Producer> producerList = IRegisterCenterZkImpl.getInstance().getProducersMap().get(name);
return Collections2.filter(producerList, producer -> {
assert producer != null;
Method method = producer.getMethod();
return method != null && method.getName().equals(methodName);
}).iterator().next();
}
/**
* メソッド反射呼び出し
*/
private Object invockMethod(Producer producer, NettyRequest request) {
// TODO: 2023/12/8 タイムアウト検出を追加
Object serviceObject = producer.getServiceObject();
Method method = producer.getMethod();
Object result = null;
try {
// ここが最も重要で、サーバーが反射を通じてメソッドを呼び出します
result = method.invoke(serviceObject, request.getArgs());
return result;
} catch (Exception e) {
result = e;
log.error("NettyServerBizHandler invokeMethodエラー, provider={}, request={}", producer, request, e);
}
return result;
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("NettyServerBizHandlerエラー, 現在ctxが閉じられました", cause);
// 例外が発生した場合、リンクを閉じます
ctx.close();
}
}
メッセージハンドラの中で、invockMethod
メソッドでは、渡された request
の method
パラメータに基づいてメソッドの反射を取得し、実行結果を返します。反射実行が完了した後、結果を NettyResponse
結果に組み立てて返します。
これで、リモートサービスの実行が完了しました。
Spring アノテーションサービススキャン#
@EnableRpc
アノテーションを定義します。
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({RpcImportSelector.class})
public @interface EnableRpc {
}
アノテーション内で @Import
された RpcImportSelector.class
オブジェクト
public class RpcImportSelector implements ImportSelector {
@Override
public String[] selectImports(AnnotationMetadata annotationMetadata) {
return new String[]{"com.simon.spring.ProducerAnnotationBean",
"com.simon.spring.ConsumerAnnotaionBean"};
}
}
RpcImportSelector
オブジェクトは ImportSelector
インターフェースを実装し、 selectImports
メソッド内でクライアントとサーバーのアノテーションを処理するクラスを返します。
クライアント起動クラス ConsumerApplication.class
@EnableRpc
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
サービスを起動する際は、@EnableRpc
アノテーションを追加するだけで RPC サービスを起動できます。
インスタンス#
クライアント#
ConsumerApplication.class
起動クラス
@EnableRpc
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
UserController.class
リクエストインターフェース
@RestController
@RequestMapping("/user")
public class UserController {
// リモート呼び出しが必要なインターフェース
@RpcClient(remoteAppKey = "test")
private UserService userService;
@GetMapping("/getUser/{username}")
public String getUser(@PathVariable String username) {
return userService.get(username);
}
}
設定ファイル
application.properties
server.port=8082
rpc.properties
# netty設定
rpc_app_key=test
# ZK設定
zk_server=127.0.0.1:2181
rpc_session_timeout=3000
rpc_connection_timeout=3000
rpc_channel_connect_size=3
サーバー#
ProducerApplication.class
起動クラス
@EnableRpc
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
UserServiceImpl.class
インターフェース実装クラス
@Slf4j
@Service
@RpcProducer
public class UserServiceImpl implements UserService {
@Override
public String get(String username) {
return "username " + username + "サービス情報: " + RpcPropertiesUtil.getServerPort();
}
}
設定情報
application.properties
server.port=8081
rpc.properties
# netty設定
rpc_app_key=test
rpc_server_port=9999
# ZK設定
zk_server=127.0.0.1:2181
rpc_session_timeout=3000
rpc_connection_timeout=3000
rpc_channel_connect_size=3
サービス起動#
サーバーとクライアントをそれぞれ起動し、http://127.0.0.1:8082/user/getUser/simon にリクエストを送信すると、結果を確認できます。
まとめ#
これで、自分の RPC フレームワークを開発する方法がすべて完了しました。フレームワークの内容は非常に粗いもので、実際の商用バージョンにはまだ多くの距離があります。しかし、他の人のソースコードを参考にすることで、基本的な RPC フレームワークの全体構造と必要な知識(Netty、Spring、シリアル化、負荷分散戦略、Zookeeper、Java の反射など)を理解することができます。どんな形であれ、進歩があることはとても嬉しいことです!