如何開發一個自己的 RPC 框架 (三)#
2023 年 12 月 13 日 上午 9:37
之前的兩篇文章中,已經介紹了實現一個 RPC 框架所需要的客戶端和註冊中心邏輯,那麼這一章,將主要介紹如何實現 RPC 框架中的服務端 (服務提供方) 和完善框架的其他補充邏輯。
RPC 服務端實現#
服務註解 (RpcProducer)#
和客戶端調用標識類似,服務提供方也需要設置服務標識,用來服務的註冊和相關配置信息拓展
@RpcProducer
服務提供方註解
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface RpcProducer {
/**
* 服務接口:用於註冊在服務註冊中心,服務調用端獲取後緩存再本地用於發起服務調用
*/
Class<?> serviceItf() default Object.class;
/**
* 服務分組組名:可用於分組灰度發布,配置不同分組,可以讓調用都路由到配置了相同分組的路由上
*/
String groupName() default "default";
/**
* 超時時間:控制服務端超時時間 ms
*/
long timeout() default 3000L;
/**
* 服務提供者權重:配置該機器在集群中的權重,用於某些負載均衡算法
*/
int weight() default 1;
/**
* 服務端線程數:限制服務端改服務線程數,服務端限流
*/
int workThreads() default 10;
}
服務提供方節點註冊#
為了實現所有被 @RpcProducer
註解的實現類都可以被註冊到註冊中心,在 Spring 中,需要實現
- ApplicationListener:
onApplicationEvent
監聽應用程序事件 - ApplicationContextAware:
setApplicationContext
設置當前上下文 - DisposableBean:
destroy
方法,bean 被銷毀時執行
這三個接口,通過 setApplicationContext
設置當前上下文,通過 onApplicationEvent
監聽服務實現類然後提交到註冊中心
理清楚上面流程後,便可以創建實現類 ProducerAnnotationBean
//ProducerAnnotationBean.class
@Slf4j
public class ProducerAnnotationBean implements ApplicationListener<ApplicationContextEvent>, ApplicationContextAware, DisposableBean {
private ApplicationContext applicationContext;
@Override
public void destroy() throws Exception {
/* 銷毀 */
log.debug("AnnotationServicesPublisher bean destroy");
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
1️⃣ this.applicationContext = applicationContext;
}
@Override
public void onApplicationEvent(ApplicationContextEvent event) {2️⃣}
}
1️⃣:設置當前上下文
2️⃣:監聽 Spring Bean 的動作
onApplicationEvent
的實現如下:
//ProducerAnnotationBean.class
@Override
public void onApplicationEvent(ApplicationContextEvent event) {
1️⃣ if (event.getApplicationContext() != applicationContext) {
log.debug("Received a event from another application context {}, ignoring it", event.getApplicationContext());
}
// 刷新事件
if (event instanceof ContextRefreshedEvent) {
Map<String, Object> annotation = applicationContext.getBeansWithAnnotation(RpcProducer.class);
if (MapUtils.isEmpty(annotation)) {
log.info("no simple rpc exist");
} else {
annotation.forEach((beanName, bean) -> {
log.info("simple rpc beanName: {}, bean: {}", beanName, bean);
2️⃣ if (bean.getClass().isAnnotationPresent(RpcProducer.class)) {
// 獲取對象
3️⃣ List<Producer> producerList = this.buildProviderService(bean);
// 1. 啟動服務
4️⃣ NettyService.getInstance().startService(RpcPropertiesUtil.getServerPort());
// 2. 服務註冊
5️⃣ IRegisterCenterZkImpl.getInstance().register(producerList);
}
});
}
} else if (event instanceof ContextClosedEvent) {
// 銷毀所有的服務
6️⃣ IRegisterCenterZkImpl.getInstance().destroy(null);
log.info("simple rpc closed");
}
}
1️⃣:判斷上下文和當前 Spring 上下文是否一致
2️⃣:判斷當前實例對象是否被 RpcProducer
註解
3️⃣:根據當前 bean
構造 Producer
對象
實現代碼如下:
//ProducerAnnotationBean.class
public List<Producer> buildProviderService(Object bean) {
RpcProducer annotation = bean.getClass().getAnnotation(RpcProducer.class);
Class<?> serviceItf = annotation.serviceItf();
if (serviceItf == Object.class) {
serviceItf = bean.getClass().getInterfaces()[0];
}
NettyHandlerServer.PRODUCER_BEAN_MAP.put(serviceItf.getName(), bean);
// 獲取註解信息
String groupName = annotation.groupName();
long timeout = annotation.timeout();
int weight = annotation.weight();
int workThreads = annotation.workThreads();
Class<?> finalServiceItf = serviceItf;
return Arrays.stream(bean.getClass().getDeclaredMethods())
// 過濾掉 Object.class 的 equals、notify 等方法
.filter(method -> !method.getDeclaringClass().equals(Object.class))
.map(method -> {
Producer producer = Producer.builder()
.serviceItf(finalServiceItf)
.serviceObject(bean)
.ip(IpUtil.getLocalIP())
.port(RpcPropertiesUtil.getServerPort())
.timeout(timeout)
.appKey(RpcPropertiesUtil.getAppKey())
.groupName(groupName)
.weight(weight)
.workerThreads(workThreads)
.method(method)
.build();
return producer;
}).collect(Collectors.toList());
}
4️⃣:啟動 Netty Server 服務,這一段比較重要,稍後講解
5️⃣:向註冊中心註冊服務提供方的節點信息
6️⃣:服務關閉時,清除所有的節點註冊信息
完成上述步驟後,所有被 RpcProducer
註解的 class
就都將被註冊到註冊中心中。
服務提供方啟動 Netty 服務#
上一步的步驟 4️⃣中,在向服務中心註冊節點時,同時根據註解的端口啟動了 Netty
服務,那麼這個 Netty
服務是如何啟動的,然後啟動的目的是什么呢?
在第一章中我們有講到,客戶端的接口在執行反射操作時,會連接服務端的 Netty
服務,然後發送 NettyRequest
請求。那么服務端接收到 NettyRequest
請求後會做那麼事情呢?服務端又是如何根據 NettyRequest
映射到具體的方法上,同時將方法執行的結果進行返回的呢?這一節我們一起弄清楚這一點。
新建一個 NettyService.class
//NettyService.class
@Slf4j
public class NettyService {
private static final NettyService instance = new NettyService();
private EventLoopGroup bossGroup;
private EventLoopGroup workGroup;
public static NettyService getInstance() {
return instance;
}
public void startService(int port) {
synchronized (NettyService.class) {
if (bossGroup != null || workGroup != null) {
log.debug("netty server is already start");
return;
}
bossGroup = new NioEventLoopGroup(1);
workGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 解碼器
socketChannel.pipeline().addLast(new NettyDecoderHandler(NettyRequest.class));
// 編碼器
socketChannel.pipeline().addLast(new NettyEncoderHandler());
// 服務處理
1️⃣ socketChannel.pipeline().addLast(new NettyHandlerServer());
}
});
try {
serverBootstrap.bind(port).sync().channel();
log.info("NettyServer start {} start now!!!", IpUtil.getLocalIP() + UrlConstants.COLON + port);
} catch (Exception e) {
log.error("NettyServer startServer error", e);
}
}
}
}
1️⃣:設置消息處理器
上述代碼都比較常規,主要是使用 Netty
根據傳入的端口進行了服務的啟動,其中最主要的代碼是
socketChannel.pipeline().addLast(new NettyHandlerServer());
設置了消息的處理器。
//NettyHandlerServer.class
public class NettyHandlerServer extends SimpleChannelInboundHandler<NettyRequest> {
/**
* service name to bean map
*/
public static final Map<String, Object> PRODUCER_BEAN_MAP = Maps.newConcurrentMap();
@Override
protected void channelRead0(ChannelHandlerContext ctx, NettyRequest nettyRequest) throws Exception {
if (!ctx.channel().isWritable()) {
log.error("channel closed!");
return;
}
Producer localProducer = this.getLocalProducer(nettyRequest);
if (localProducer == null) {
log.error("service not found, request={}", nettyRequest);
return;
}
Object result = this.invockMethod(localProducer, nettyRequest);
NettyResponse response = NettyResponse.builder()
.uniqueKey(nettyRequest.getUniqueKey())
.result(result)
.build();
ctx.writeAndFlush(response);
}
/**
* 根據 methodname 獲取 Producer
*/
private Producer getLocalProducer(NettyRequest request) {
String methodName = request.getInvokeMethodName();
String name = request.getProducer().getServiceItf().getName();
List<Producer> producerList = IRegisterCenterZkImpl.getInstance().getProducersMap().get(name);
return Collections2.filter(producerList, producer -> {
assert producer != null;
Method method = producer.getMethod();
return method != null && method.getName().equals(methodName);
}).iterator().next();
}
/**
* 方法反射調用
*/
private Object invockMethod(Producer producer, NettyRequest request) {
// TODO: 2023/12/8 增加超時檢測
Object serviceObject = producer.getServiceObject();
Method method = producer.getMethod();
Object result = null;
try {
// 這裡是最重要的,服務端反射調用方法
result = method.invoke(serviceObject, request.getArgs());
return result;
} catch (Exception e) {
result = e;
log.error("NettyServerBizHandler invokeMethod error, provider={}, request={}", producer, request, e);
}
return result;
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("NettyServerBizHandler error, now ctx closed", cause);
// 發生異常,關閉鏈路
ctx.close();
}
}
消息處理器中,在 invockMethod
方法中,根據傳入的 request
的 method
參數進行了方法的反射獲取執行結果,然後進行返回。反射執行完成後,將結果組裝成 NettyResponse
結果進行返回。
那麼到這裡就完成了整個遠程服務的執行。
Spring 註解服務掃描#
定義 @EnableRpc
註解
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({RpcImportSelector.class})
public @interface EnableRpc {
}
註解中 @Import
了 RpcImportSelector.class
對象
public class RpcImportSelector implements ImportSelector {
@Override
public String[] selectImports(AnnotationMetadata annotationMetadata) {
return new String[]{"com.simon.spring.ProducerAnnotationBean",
"com.simon.spring.ConsumerAnnotaionBean"};
}
}
RpcImportSelector
對象實現了 ImportSelector
接口, selectImports
方法中返回了處理客戶端和服務端註解的類。
客戶端啟動類 ConsumerApplication.class
@EnableRpc
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
服務啟動時,只需要加上 @EnableRpc
註解即可啟動 RPC 服務
實例#
客戶端#
ConsumerApplication.class
啟動類
@EnableRpc
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
UserController.class
請求接口
@RestController
@RequestMapping("/user")
public class UserController {
// 需要遠程調用的接口
@RpcClient(remoteAppKey = "test")
private UserService userService;
@GetMapping("/getUser/{username}")
public String getUser(@PathVariable String username) {
return userService.get(username);
}
}
配置文件
application.properties
server.port=8082
rpc.properties
# netty 配置
rpc_app_key=test
# ZK 配置
zk_server=127.0.0.1:2181
rpc_session_timeout=3000
rpc_connection_timeout=3000
rpc_channel_connect_size=3
服務端#
ProducerApplication.class
啟動類
@EnableRpc
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
UserServiceImpl.class
接口實現類
@Slf4j
@Service
@RpcProducer
public class UserServiceImpl implements UserService {
@Override
public String get(String username) {
return "username " + username + "服務信息: " + RpcPropertiesUtil.getServerPort();
}
}
配置信息
application.properties
server.port=8081
rpc.properties
# netty 配置
rpc_app_key=test
rpc_server_port=9999
# ZK 配置
zk_server=127.0.0.1:2181
rpc_session_timeout=3000
rpc_connection_timeout=3000
rpc_channel_connect_size=3
服務啟動#
分別啟動服務端和客戶端,然後請求 http://127.0.0.1:8082/user/getUser/simon, 可以查看結果
總結#
至此,如何開發一個自己的 RPC 框架就全部完成了,框架的內容非常粗糙,距離真正商用版本還差很多距離。但是通過參考別人的源碼,可以理解一個基礎的 RPC 框架的大體結構和所需要用到的知識,包括 Netty、Spring 、序列化、負載均衡策略、 zookeeper、 Java 的反射等等。但不管怎麼說,有進步就是一件很快樂的事情!