simon

simon

How to Develop Your Own RPC Framework (Part Three)

How to Develop Your Own RPC Framework (Part 3)#

December 13, 2023 9:37 AM

In the previous two articles, we introduced the client and registry center logic required to implement an RPC framework. In this chapter, we will mainly discuss how to implement the server (service provider) in the RPC framework and improve other supplementary logic of the framework.

RPC Server Implementation#

Service Annotation (RpcProducer)#

Similar to the client call identifier, the service provider also needs to set a service identifier for service registration and related configuration information expansion.

@RpcProducer Service Provider Annotation

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface RpcProducer {

    /**
     * Service interface: used for registration in the service registry, the service caller retrieves and caches it locally for initiating service calls.
     */
    Class<?> serviceItf() default Object.class;

    /**
     * Service group name: can be used for group gray release, configuring different groups allows calls to route to routes configured with the same group.
     */
    String groupName() default "default";

    /**
     * Timeout: controls the server timeout in ms.
     */
    long timeout() default 3000L;

    /**
     * Service provider weight: configures the weight of this machine in the cluster, used for certain load balancing algorithms.
     */
    int weight() default 1;

    /**
     * Server thread count: limits the number of service threads for the server, server flow control.
     */
    int workThreads() default 10;
}

Service Provider Node Registration#

To ensure that all implementation classes annotated with @RpcProducer can be registered to the registry center, in Spring, it is necessary to implement:

  • ApplicationListener: onApplicationEvent to listen for application events.
  • ApplicationContextAware: setApplicationContext to set the current context.
  • DisposableBean: destroy method, executed when the bean is destroyed.

These three interfaces set the current context through setApplicationContext and listen for service implementation classes through onApplicationEvent, then submit them to the registry center.

After clarifying the above process, you can create the implementation class ProducerAnnotationBean.

//ProducerAnnotationBean.class
@Slf4j
public class ProducerAnnotationBean implements ApplicationListener<ApplicationContextEvent>, ApplicationContextAware, DisposableBean {

    private ApplicationContext applicationContext;

    @Override
    public void destroy() throws Exception {
        /* Destroy */
        log.debug("AnnotationServicesPublisher bean destroy");
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        1️⃣ this.applicationContext = applicationContext;
    }

    @Override
    public void onApplicationEvent(ApplicationContextEvent event) {2️⃣}
}

1️⃣: Set the current context.

2️⃣: Listen for Spring Bean actions.

The implementation of onApplicationEvent is as follows:

//ProducerAnnotationBean.class
@Override
    public void onApplicationEvent(ApplicationContextEvent event) {
        1️⃣ if (event.getApplicationContext() != applicationContext) {
            log.debug("Received an event from another application context {}, ignoring it", event.getApplicationContext());
        }

        // Refresh event
        if (event instanceof ContextRefreshedEvent) {
            Map<String, Object> annotation = applicationContext.getBeansWithAnnotation(RpcProducer.class);
            if (MapUtils.isEmpty(annotation)) {
                log.info("no simple rpc exist");

            } else {
                annotation.forEach((beanName, bean) -> {
                    log.info("simple rpc beanName: {}, bean: {}", beanName, bean);
                    2️⃣ if (bean.getClass().isAnnotationPresent(RpcProducer.class)) {
                        // Get object
                    3️⃣    List<Producer> producerList = this.buildProviderService(bean);
                        // 1. Start service
                    4️⃣    NettyService.getInstance().startService(RpcPropertiesUtil.getServerPort());
                        // 2. Service registration
                     5️⃣   IRegisterCenterZkImpl.getInstance().register(producerList);
                    }
                });
            }
        } else if (event instanceof ContextClosedEvent) {
            // Destroy all services
           6️⃣ IRegisterCenterZkImpl.getInstance().destroy(null);
            log.info("simple rpc closed");
        }
    }

1️⃣: Check if the context is consistent with the current Spring context.

2️⃣: Check if the current instance object is annotated with RpcProducer.

3️⃣: Construct Producer objects based on the current bean.

The implementation code is as follows:

//ProducerAnnotationBean.class
public List<Producer> buildProviderService(Object bean) {
        RpcProducer annotation = bean.getClass().getAnnotation(RpcProducer.class);
        Class<?> serviceItf = annotation.serviceItf();
        if (serviceItf == Object.class) {
            serviceItf = bean.getClass().getInterfaces()[0];
        }
			
        NettyHandlerServer.PRODUCER_BEAN_MAP.put(serviceItf.getName(), bean);
			
				// Get annotation information
        String groupName = annotation.groupName();
        long timeout = annotation.timeout();
        int weight = annotation.weight();
        int workThreads = annotation.workThreads();

        Class<?> finalServiceItf = serviceItf;
        return Arrays.stream(bean.getClass().getDeclaredMethods())
								// Filter out Object.class's equals, notify, etc. methods
                .filter(method -> !method.getDeclaringClass().equals(Object.class))
                .map(method -> {
                    Producer producer = Producer.builder()
                            .serviceItf(finalServiceItf)
                            .serviceObject(bean)
                            .ip(IpUtil.getLocalIP())
                            .port(RpcPropertiesUtil.getServerPort())
                            .timeout(timeout)
                            .appKey(RpcPropertiesUtil.getAppKey())
                            .groupName(groupName)
                            .weight(weight)
                            .workerThreads(workThreads)
                            .method(method)
                            .build();
                    return producer;
                }).collect(Collectors.toList());
    }

4️⃣: Start the Netty Server service; this part is quite important and will be explained later.

5️⃣: Register the service provider's node information with the registry center.

6️⃣: When the service is closed, clear all node registration information.

After completing the above steps, all classes annotated with RpcProducer will be registered with the registry center.

Service Provider Starts Netty Service#

In the previous step, step 4️⃣, when registering nodes with the service center, the Netty service was also started based on the annotated port. So how is this Netty service started, and what is its purpose?

In the first chapter, we mentioned that when the client interface performs reflection operations, it connects to the server's Netty service and sends a NettyRequest. So what does the server do when it receives the NettyRequest? How does the server map the NettyRequest to specific methods and return the results of method execution? In this section, we will clarify this.

mermaid

Create a new NettyService.class.

//NettyService.class
@Slf4j
public class NettyService {
    private static final NettyService instance = new NettyService();

    private EventLoopGroup bossGroup;

    private EventLoopGroup workGroup;

    public static NettyService getInstance() {
        return instance;
    }

    public void startService(int port) {
        synchronized (NettyService.class) {
            if (bossGroup != null || workGroup != null) {
                log.debug("netty server is already start");
                return;
            }

            bossGroup = new NioEventLoopGroup(1);
            workGroup = new NioEventLoopGroup();
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // Decoder
                            socketChannel.pipeline().addLast(new NettyDecoderHandler(NettyRequest.class));
                            // Encoder
                            socketChannel.pipeline().addLast(new NettyEncoderHandler());
                            // Service handler
                            1️⃣ socketChannel.pipeline().addLast(new NettyHandlerServer());
                        }
                    });

            try {
                serverBootstrap.bind(port).sync().channel();
                log.info("NettyServer start {} start now!!!", IpUtil.getLocalIP() + UrlConstants.COLON + port);

            } catch (Exception e) {
                log.error("NettyServer startServer error", e);
            }
        }
    }
}

1️⃣: Set message handler.

The above code is quite conventional, mainly using Netty to start the service based on the incoming port, where the most important line of code is:

socketChannel.pipeline().addLast(new NettyHandlerServer()); which sets the message handler.

//NettyHandlerServer.class
public class NettyHandlerServer extends SimpleChannelInboundHandler<NettyRequest> {

    /**
     * service name to bean map
     */
    public static final Map<String, Object> PRODUCER_BEAN_MAP = Maps.newConcurrentMap();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, NettyRequest nettyRequest) throws Exception {
        if (!ctx.channel().isWritable()) {
            log.error("channel closed!");
            return;
        }

        Producer localProducer = this.getLocalProducer(nettyRequest);
        if (localProducer == null) {
            log.error("service not found, request={}", nettyRequest);
            return;
        }

        Object result = this.invockMethod(localProducer, nettyRequest);
        NettyResponse response = NettyResponse.builder()
                .uniqueKey(nettyRequest.getUniqueKey())
                .result(result)
                .build();
        ctx.writeAndFlush(response);
    }

    /**
     * Get Producer by method name
     */
    private Producer getLocalProducer(NettyRequest request) {
        String methodName = request.getInvokeMethodName();
        String name = request.getProducer().getServiceItf().getName();
        List<Producer> producerList = IRegisterCenterZkImpl.getInstance().getProducersMap().get(name);
        return Collections2.filter(producerList, producer -> {
            assert producer != null;
            Method method = producer.getMethod();
            return method != null && method.getName().equals(methodName);
        }).iterator().next();
    }

    /**
     * Method reflection call
     */
    private Object invockMethod(Producer producer, NettyRequest request) {
        // TODO: 2023/12/8 Add timeout detection
        Object serviceObject = producer.getServiceObject();
        Method method = producer.getMethod();
        Object result = null;

        try {
            // This is the most important part, the server reflects and calls the method
            result = method.invoke(serviceObject, request.getArgs());
            return result;
        } catch (Exception e) {
            result = e;
            log.error("NettyServerBizHandler invokeMethod error, provider={}, request={}", producer, request, e);
        }

        return result;
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("NettyServerBizHandler error, now ctx closed", cause);
        // An exception occurred, close the link
        ctx.close();
    }
}

In the message handler, in the invockMethod method, the method is reflected based on the method parameter of the incoming request to get the execution result, which is then returned. After the reflection execution is completed, the result is assembled into a NettyResponse result for return.

At this point, the entire remote service execution is complete.

Spring Annotation Service Scanning#

Define @EnableRpc annotation.

@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({RpcImportSelector.class})
public @interface EnableRpc {
}

The annotation imports the RpcImportSelector.class object.

public class RpcImportSelector implements ImportSelector {

    @Override
    public String[] selectImports(AnnotationMetadata annotationMetadata) {
        return new String[]{"com.simon.spring.ProducerAnnotationBean",
                "com.simon.spring.ConsumerAnnotaionBean"};
    }

}

The RpcImportSelector object implements the ImportSelector interface, and the selectImports method returns the classes that handle client and server annotations.

Client startup class ConsumerApplication.class.

@EnableRpc
@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

To start the service, simply add the @EnableRpc annotation to enable the RPC service.

Example#

Client#

ConsumerApplication.class startup class.

@EnableRpc
@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

UserController.class request interface.

@RestController
@RequestMapping("/user")
public class UserController {
		
		// The interface that needs to be called remotely
    @RpcClient(remoteAppKey = "test")
    private UserService userService;

    @GetMapping("/getUser/{username}")
    public String getUser(@PathVariable String username) {
        return userService.get(username);
    }
}

Configuration file

application.properties

server.port=8082

rpc.properties

# netty configuration
rpc_app_key=test
# ZK configuration
zk_server=127.0.0.1:2181
rpc_session_timeout=3000
rpc_connection_timeout=3000
rpc_channel_connect_size=3

Server#

ProducerApplication.class startup class.

@EnableRpc
@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
}

UserServiceImpl.class interface implementation class.

@Slf4j
@Service
@RpcProducer
public class UserServiceImpl implements UserService {
    @Override
    public String get(String username) {
        return "username " + username + " service information: " + RpcPropertiesUtil.getServerPort();
    }
}

Configuration information

application.properties

server.port=8081

rpc.properties

# netty configuration
rpc_app_key=test
rpc_server_port=9999
# ZK configuration
zk_server=127.0.0.1:2181
rpc_session_timeout=3000
rpc_connection_timeout=3000
rpc_channel_connect_size=3

Service Startup#

Start the server and client separately, then request http://127.0.0.1:8082/user/getUser/simon to view the result.

1

Summary#

Thus, how to develop your own RPC framework has been fully completed. The content of the framework is very rough and still far from a truly commercial version. However, by referencing others' source code, one can understand the general structure of a basic RPC framework and the knowledge required, including Netty, Spring, serialization, load balancing strategies, zookeeper, Java reflection, etc. Nevertheless, progress is a joyful thing!

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.