rpc项目-还需研究下


测试

image-20220221230602699

环境

技术栈:SpringBoot + JDK + Netty

依赖管理:Maven

注册中心:ZooKeeper

基本结构

image-20220222093449173

交互流程:

  • 服务端启动后,将提供的服务列表发布到注册中心,客户端向注册中心订阅服务地址。
  • 客户端通过本地代理调用服务端,Proxy将方法、参数等数据转化成网络字节流。
  • 客户端从服务列表里选一个服务地址,通过网络发送给服务端。
  • 服务端收到之后解码,得到请求信息。
  • 服务端根据解码后的请求信息调用对应的服务,将调用结果返回客户端。

模块依赖

image-20220222094745317

provider-api

这个是暴露出的api接口。

public interface HelloWordService {
    String sayHello(String name);
}

consumer

image-20220222111552181

客户端的服务提供者。只是开启SpringBoot应用。

ConsumerApplication:

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

配置信息:

server.port=9090
rpc.client.balance=fullRoundBalance

可以选择自定义负载均衡策略:

@Slf4j
//@Component
public class FirstLoadBalance implements LoadBalance {

    @Override
    public ServiceInfo chooseOne(List<ServiceInfo> services) {
        log.info("---------FirstLoadBalance-----------------");
        return services.get(0);
    }
}

但是配置中的负载均衡何时用到了?

rpc-client-spring-boot-starter会有自动配置。

用户通过浏览器访问就是经过controller:

@Controller
public class HelloWorldController {
    @RpcAutowired(version = "1.0")
    private HelloWordService helloWordService;

    @GetMapping("/hello/world")
    // import org.springframework.http.ResponseEntity;
    public ResponseEntity<String> pullServiceInfo(@RequestParam("name") String name){
        return  ResponseEntity.ok(helloWordService.sayHello(name));
    }
}

这里面用一个自定义注解RpcAutowired和一个HellowordService(前面提到暴露出来的API接口)。

rpc-client-spring-boot-starter

image-20220222112906238

RpcAutowired

这个是消费者启动之后有个后置处理器,会扫描ioc容器中的bean,如果被rpcAutowired修饰,就给属性动态赋代理对象。

public class RpcClientProcessor implements BeanFactoryPostProcessor, ApplicationContextAware {
    。。。

ApplicationContextAware 通过它Spring容器会自动把上下文环境对象调用ApplicationContextAware接口中的setApplicationContext方法。

看回RpcAutowired

自定义注解:

元注解:

@Documented 注解是否将包含在JavaDoc中

@Retention 什么时候用这个注解

@Target用在什么地方

  • ElementType.TYPE: 类、接口、注解、enum
  • ElementType.CONSTRUCTOR: 构造函数
  • ElementType.FIELD: 成员变量、对象、属性、枚举的常量
  • ElementType.LOCAL_VARIABLE: 局部变量
  • ElementType.METHOD: 方法
  • ElementType.PACKAGE: 包
  • ElementType.PARAMETER: 参数
  • ElementType.ANNOTATION_TYPE): 注解
  • ElementType.TYPE_PARAMETER:类型参数,表示这个注解可以用在 Type的声明式前,jdk1.8引入。
  • ElementType.TYPE_USE:类型的注解,表示这个注解可以用在所有使用Type的地方(如:泛型,类型转换等),jdk1.8引入。

@Inherited 是否可以继承

这个其实就是个标志位,给后置处理器用的。

@Target({ElementType.CONSTRUCTOR, ElementType.METHOD, ElementType.PARAMETER, ElementType.FIELD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Autowired
public @interface RpcAutowired {
    String version() default "1.0";
}

spring.factories

spring boot 会自动配置org.springframework.boot.autoconfigure.EnableAutoConfiguration的key下所有自动配置类。

org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.rrtv.rpc.client.config.RpcClientAutoConfiguration

接下来按这个配置类来理一遍。

RpcClientAutoConfiguration

@Configuration
public class RpcClientAutoConfiguration {
    //...
}

对各个方法分别分析。

//RpcClientAutoConfiguration
@Bean
public RpcClientProperties rpcClientProperties(Environment environment) {
    BindResult<RpcClientProperties> result = Binder.get(environment).bind("rpc.client", RpcClientProperties.class);
    return result.get();
}

@Bean 用在方法上,告诉Spring容器,你可以从下面这个方法中拿到一个Bean。

public class RpcClientProperties {
    private String balance; //负载均衡
    private String serialization; //序列化
    private String discoveryAddr = "127.0.0.1:2181"; //服务发现地址
    private Integer timeout; //服务调用超时
}

通过Binder可以将属性绑定到对象上。

甚至是

image-20220222144626108

//RpcClientAutoConfiguration
@Bean
@ConditionalOnMissingBean
public ClientStubProxyFactory clientStubProxyFactory() {
    return new ClientStubProxyFactory();
}

@ConditionalOnMissingBean是修饰bean的一个注解,表示只能由一个bean注册,如果注册相同类型的bean,就会报差错。

返回的是stub代理工厂,通过computeIfAbsent来关联,如果clazz不在缓存里,就关联到缓存里。handler为ClientStubInvocationHandler

public class ClientStubProxyFactory {
    private Map<Class<?>, Object> objectCache = new HashMap<>();

    /**
     * 获取代理对象
     *
     * @param clazz   接口
     * @param version 服务版本
     * @param <T>
     * @return 代理对象
     */
    public <T> T getProxy(Class<T> clazz, String version, DiscoveryService discoveryService, RpcClientProperties properties) {
        return (T) objectCache.computeIfAbsent(clazz, clz ->
                Proxy.newProxyInstance(clz.getClassLoader(), new Class[]{clz}, new ClientStubInvocationHandler(discoveryService, properties, clz, version))
        );
    }
}

handler为:

public class ClientStubInvocationHandler implements InvocationHandler {
    private DiscoveryService discoveryService;
    private RpcClientProperties properties;
    private Class<?> clazz;
    private String version;

    public ClientStubInvocationHandler(DiscoveryService discoveryService, RpcClientProperties properties, Class<?> clazz, String version) {
        super();
        this.clazz = clazz;
        this.version = version;
        this.discoveryService = discoveryService;
        this.properties = properties;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // 1、获得服务信息,在core下面,先不管内部细节
        // @Autowired DiscoveryService discoveryService 
        // 最终这个discoveryService通过autowire得到
        ServiceInfo serviceInfo = discoveryService.discovery(ServiceUtil.serviceKey(this.clazz.getName(), this.version));
        if (serviceInfo == null) {
            throw new ResourceNotFoundException("404");
        }

        MessageProtocol<RpcRequest> messageProtocol = new MessageProtocol<>();
        /*
        public class MessageProtocol<T> implements Serializable {
            private MessageHeader header; //消息头
            private T body; //消息体
        }
        */
        
        /*
        public class RpcRequest implements Serializable {
            private String serviceName; //请求的服务名 + 版本
            private String method; //请求调用的方法
            private Class<?>[] parameterTypes; //参数类型
            private Object[] parameters; //参数
        }
        */
        
        // 设置请求头
        messageProtocol.setHeader(MessageHeader.build(properties.getSerialization()));
        /*
        +---------------------------------------------------------------+
        | 魔数 2byte | 协议版本号 1byte | 序列化算法 1byte | 报文类型 1byte  |
        +---------------------------------------------------------------+
        | 状态 1byte |        消息 ID 32byte     |      数据长度 4byte     |
        +---------------------------------------------------------------+
        */
        
        
        // 设置请求体
        RpcRequest request = new RpcRequest();
        request.setServiceName(ServiceUtil.serviceKey(this.clazz.getName(), this.version));
        request.setMethod(method.getName());
        request.setParameterTypes(method.getParameterTypes());
        request.setParameters(args);
        messageProtocol.setBody(request);

        // 发送网络请求 拿到结果
        /*
        public class RpcResponse implements Serializable {
            private Object data;
            private String message;
        }
        */
        
        //getNetClientTransport()返回的是NettyNetClientTransport
        MessageProtocol<RpcResponse> responseMessageProtocol = NetClientTransportFactory.getNetClientTransport()
            .sendRequest(RequestMetadata.builder().protocol(messageProtocol).address(serviceInfo.getAddress())
                        .port(serviceInfo.getPort()).timeout(properties.getTimeout()).build());

        if (responseMessageProtocol == null) {
            log.error("请求超时");
            throw new RpcException("rpc调用结果失败, 请求超时 timeout:" + properties.getTimeout());
        }

        if (!MsgStatus.isSuccess(responseMessageProtocol.getHeader().getStatus())) {
            log.error("rpc调用结果失败, message:{}", responseMessageProtocol.getBody().getMessage());
            throw new RpcException(responseMessageProtocol.getBody().getMessage());
        }
        return responseMessageProtocol.getBody().getData();
    }
}

NettyNetClientTransport:

public class NettyNetClientTransport implements NetClientTransport {

    private final Bootstrap bootstrap;
    private final EventLoopGroup eventLoopGroup;
    private final RpcResponseHandler handler;


    public NettyNetClientTransport() {
        bootstrap = new Bootstrap();
        eventLoopGroup = new NioEventLoopGroup(4);
        handler = new RpcResponseHandler();
        bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline()
                                // 解码 是入站操作 将二进制解码成消息
                                .addLast(new RpcDecoder())
                                // 接收响应 入站操作
                                .addLast(handler)
                                // 编码 是出站操作 将消息编写二进制
                                .addLast(new RpcEncoder<>());
                    }
                });
    }
    //。。。
}

RpcDecoder:

/**
 *
 *  +---------------------------------------------------------------+
 *  | 魔数 2byte | 协议版本号 1byte | 序列化算法 1byte | 报文类型 1byte|
 *  +---------------------------------------------------------------+
 *  | 状态 1byte |        消息 ID 8byte     |      数据长度 4byte     |
 *  +---------------------------------------------------------------+
 *  |                   数据内容 (长度不定)                         |
 *  +---------------------------------------------------------------+
 */

handler将消息id设为消息头:

public class RpcResponseHandler extends SimpleChannelInboundHandler<MessageProtocol<RpcResponse>> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProtocol<RpcResponse> rpcResponseMessageProtocol) throws Exception {
        String requestId = rpcResponseMessageProtocol.getHeader().getRequestId();
        // 收到响应 设置响应数据
        LocalRpcResponseCache.fillResponse(requestId, rpcResponseMessageProtocol);
    }
}

NettyNetClientTransport的sendRequest方法:

//NettyNetClientTransport
@Override
public MessageProtocol<RpcResponse> sendRequest(RequestMetadata metadata) throws Exception {
    MessageProtocol<RpcRequest> protocol = metadata.getProtocol();
    RpcFuture<MessageProtocol<RpcResponse>> future = new RpcFuture<>();
    LocalRpcResponseCache.add(protocol.getHeader().getRequestId(), future);

    // TCP 连接
    ChannelFuture channelFuture = bootstrap.connect(metadata.getAddress(), metadata.getPort()).sync();
    channelFuture.addListener((ChannelFutureListener) arg0 -> {
        if (channelFuture.isSuccess()) {
            log.info("connect rpc server {} on port {} success.", metadata.getAddress(), metadata.getPort());
        } else {
            log.error("connect rpc server {} on port {} failed.", metadata.getAddress(), metadata.getPort());
            channelFuture.cause().printStackTrace();
            eventLoopGroup.shutdownGracefully();
        }
    });
    // 写入数据
    channelFuture.channel().writeAndFlush(protocol);
    return metadata.getTimeout() != null ? future.get(metadata.getTimeout(), TimeUnit.MILLISECONDS) : future.get();
}

RpcClientAutoConfiguration的负载均衡算法:

//RpcClientAutoConfiguration
@Primary
@Bean(name = "loadBalance")
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "rpc.client", name = "balance", havingValue = "randomBalance", matchIfMissing = true)
public LoadBalance randomBalance() {
    // matchIfMissing表示这个是默认加载
    return new RandomBalance();
    // @Override
    // public ServiceInfo chooseOne(List<ServiceInfo> services) {
    //    return services.get(random.nextInt(services.size()));
    // }
}

@Bean(name = "loadBalance")
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "rpc.client", name = "balance", havingValue = "fullRoundBalance")
public LoadBalance loadBalance() {
    return new FullRoundBalance();
    /** 轮询
    private int index;

    @Override
    public synchronized ServiceInfo chooseOne(List<ServiceInfo> services) {
        // 加锁防止多线程情况下,index超出services.size()
        if (index >= services.size()) {
            index = 0;
        }
        return services.get(index++);
    }
    */
}

最后剩下两个方法:

// RpcClientAutoConfiguration
@Bean
@ConditionalOnMissingBean
@ConditionalOnBean({RpcClientProperties.class, LoadBalance.class})
// ConditionalOnBean表示要包含RpcClientProperties.class, LoadBalance.class才行。
public DiscoveryService discoveryService(@Autowired RpcClientProperties properties,
                                         @Autowired LoadBalance loadBalance) {
    return new ZookeeperDiscoveryService(properties.getDiscoveryAddr(), loadBalance);
    // 见下节服务发现
}

@Bean
@ConditionalOnMissingBean
public RpcClientProcessor rpcClientProcessor(@Autowired ClientStubProxyFactory clientStubProxyFactory,
                                             @Autowired DiscoveryService discoveryService,
                                             @Autowired RpcClientProperties properties) {
    return new RpcClientProcessor(clientStubProxyFactory, discoveryService, properties);
}

bean后置处理器用于替换bean的代理。

public class RpcClientProcessor implements BeanFactoryPostProcessor, ApplicationContextAware {

    private ClientStubProxyFactory clientStubProxyFactory;
    private DiscoveryService discoveryService;
    private RpcClientProperties properties;
    private ApplicationContext applicationContext;

    public RpcClientProcessor(ClientStubProxyFactory clientStubProxyFactory, DiscoveryService discoveryService, RpcClientProperties properties) {
        this.clientStubProxyFactory = clientStubProxyFactory;
        this.discoveryService = discoveryService;
        this.properties = properties;
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        for (String beanDefinitionName : beanFactory.getBeanDefinitionNames()) {
            BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName);
            String beanClassName = beanDefinition.getBeanClassName();
            if (beanClassName != null) {
                Class<?> clazz = ClassUtils.resolveClassName(beanClassName, this.getClass().getClassLoader());
                ReflectionUtils.doWithFields(clazz, field -> {
                    RpcAutowired rpcAutowired = AnnotationUtils.getAnnotation(field, RpcAutowired.class);
                    if (rpcAutowired != null) {
                        Object bean = applicationContext.getBean(clazz);
                        field.setAccessible(true);
                        // 修改为代理对象
                        ReflectionUtils.setField(field, bean, clientStubProxyFactory.getProxy(field.getType(), rpcAutowired.version(), discoveryService, properties));
                    }
                });
            }
        }
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

服务发现

Service Provider在启动时向 Zookeeper 注册本机提供的 服务名称端口号地址;Consumer启动的时候先查询Zookeeper获取到服务Provider列表,然后通过负载均衡算法(随机、RoundRobin、一致性Hash) 选择一台机器去调用服务。

在ClientStubInvocationHandler里面有服务发现:

// 1、获得服务信息,在core下面
// @Autowired DiscoveryService discoveryService 
// 最终这个discoveryService通过autowire得到
ServiceInfo serviceInfo = discoveryService.discovery(ServiceUtil.serviceKey(this.clazz.getName(), this.version));

从上节的rpcclientautoconfiguration有对应的bean:

new ZookeeperDiscoveryService(properties.getDiscoveryAddr(), loadBalance);

源码:

public class ZookeeperDiscoveryService implements DiscoveryService {

    public static final int BASE_SLEEP_TIME_MS = 1000;
    public static final int MAX_RETRIES = 3;
    public static final String ZK_BASE_PATH = "/demo_rpc";

    private ServiceDiscovery<ServiceInfo> serviceDiscovery;
    /*
    public class ServiceInfo implements Serializable {
        private String appName; // 应用名称
        private String serviceName; //服务名称
        private String version; //版本
        private String address; //地址
        private Integer port; //端口
    }
    */
    private LoadBalance loadBalance;

    public ZookeeperDiscoveryService(String registryAddr, LoadBalance loadBalance) {
        this.loadBalance = loadBalance;
        try {
            // Curator是一个zookeeper客户端框架
            CuratorFramework client = CuratorFrameworkFactory.newClient(registryAddr, new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES));
            client.start();
            JsonInstanceSerializer<ServiceInfo> serializer = new JsonInstanceSerializer<>(ServiceInfo.class);
            // import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
            this.serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceInfo.class)
                    .client(client)
                    .serializer(serializer)
                    .basePath(ZK_BASE_PATH)
                    .build();
            this.serviceDiscovery.start();
        } catch (Exception e) {
            log.error("serviceDiscovery start error :{}", e);
        }
    }


    /**
     *  服务发现
     * @param serviceName
     * @return
     * @throws Exception
     */
    @Override
    public ServiceInfo discovery(String serviceName) throws Exception {
        Collection<ServiceInstance<ServiceInfo>> serviceInstances = serviceDiscovery.queryForInstances(serviceName);
        return CollectionUtils.isEmpty(serviceInstances) ? null
                : loadBalance.chooseOne(serviceInstances.stream().map(ServiceInstance::getPayload).collect(Collectors.toList()));
    }

}

服务发现里面可以使用自定义的负载均衡算法。

----以下是服务端----

provider

@SpringBootApplication
@ComponentScan("com.rrtv.rpc")
public class ProviderApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProviderApplication.class, args);
    }
}


@RpcService(interfaceType = HelloWordService.class, version = "1.0")
public class HelloWordServiceImpl implements HelloWordService {
    @Override
    public String sayHello(String name) {
        return String.format("您好:%s, rpc 调用成功", name);
    }
}

RpcService又是一个自定义注解。

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Service
public @interface RpcService {
    Class<?> interfaceType() default Object.class; //暴露服务接口类型
    String version() default "1.0"; //服务版本
}

RpcServerAutoConfiguration

@Configuration
@EnableConfigurationProperties(RpcServerProperties.class)
public class RpcServerAutoConfiguration {

    @Autowired
    private RpcServerProperties properties;

    @Bean
    @ConditionalOnMissingBean
    public RegistryService registryService() {
        return new ZookeeperRegistryService(properties.getRegistryAddr());
    }

    @Bean
    @ConditionalOnMissingBean(RpcServer.class)
    RpcServer RpcServer() {
        return new NettyRpcServer();
    }

    @Bean
    @ConditionalOnMissingBean(RpcServerProvider.class)
    RpcServerProvider rpcServerProvider(@Autowired RegistryService registryService,
                                        @Autowired RpcServer rpcServer,
                                        @Autowired RpcServerProperties rpcServerProperties){
        return new RpcServerProvider(registryService, rpcServer, rpcServerProperties);
    }
}

自动配置里面三个点:注册服务,netty server,rpcServerProvider。

rpcServer的handler

线程池处理。

public class RpcRequestHandler extends SimpleChannelInboundHandler<MessageProtocol<RpcRequest>> {

    private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000));

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProtocol<RpcRequest> rpcRequestMessageProtocol) throws Exception {
        // 多线程处理每个请求
        threadPoolExecutor.submit(() -> {
            MessageProtocol<RpcResponse> resProtocol = new MessageProtocol<>();
            RpcResponse response = new RpcResponse();
            MessageHeader header = rpcRequestMessageProtocol.getHeader();
            // 设置头部消息类型为响应
            header.setMsgType(MsgType.RESPONSE.getType());
            try {
                Object result = handle(rpcRequestMessageProtocol.getBody());
                response.setData(result);
                header.setStatus(MsgStatus.SUCCESS.getCode());
                resProtocol.setHeader(header);
                resProtocol.setBody(response);
            } catch (Throwable throwable) {
                header.setStatus(MsgStatus.FAIL.getCode());
                response.setMessage(throwable.toString());
                log.error("process request {} error", header.getRequestId(), throwable);
            }
            // 把数据写回去
            channelHandlerContext.writeAndFlush(resProtocol);
        });

    }

    /**
     * 反射调用获取数据
     *
     * @param request
     * @return
     */
    private Object handle(RpcRequest request) {
        try {
            Object bean = LocalServerCache.get(request.getServiceName());
            if (bean == null) {
                throw new RuntimeException(String.format("service not exist: %s !", request.getServiceName()));
            }
            // 反射调用
            Method method = bean.getClass().getMethod(request.getMethod(), request.getParameterTypes());
            return method.invoke(bean, request.getParameters());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

}

粘包问题

这个是每个request connect一下,应该无这个问题。

TCP采用的 Nagle(批量发送,主要用于解决频繁发送小数据包而带来的网络拥塞问题) 算法对此作出了优化。

image-20220222163503439

解决的方法:消息长度+消息内容。

参考链接

手撸rpc

通过 Netty、ZooKeeper 手撸一个 RPC 服务