rpc项目-还需研究下
测试
- 启动 Zookeeper 服务器:bin/zkServer.cmd
- 启动 provider 模块 ProviderApplication
- 启动 consumer 模块 ConsumerApplication
- 测试:浏览器输入 http://localhost:9090/hello/world?name=hello,成功返回 您好:hello, rpc 调用成功
环境
技术栈:SpringBoot + JDK + Netty
依赖管理:Maven
注册中心:ZooKeeper
基本结构
交互流程:
- 服务端启动后,将提供的服务列表发布到注册中心,客户端向注册中心订阅服务地址。
- 客户端通过本地代理调用服务端,Proxy将方法、参数等数据转化成网络字节流。
- 客户端从服务列表里选一个服务地址,通过网络发送给服务端。
- 服务端收到之后解码,得到请求信息。
- 服务端根据解码后的请求信息调用对应的服务,将调用结果返回客户端。
模块依赖
provider-api
这个是暴露出的api接口。
public interface HelloWordService {
String sayHello(String name);
}
consumer
客户端的服务提供者。只是开启SpringBoot应用。
ConsumerApplication:
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
配置信息:
server.port=9090
rpc.client.balance=fullRoundBalance
可以选择自定义负载均衡策略:
@Slf4j
//@Component
public class FirstLoadBalance implements LoadBalance {
@Override
public ServiceInfo chooseOne(List<ServiceInfo> services) {
log.info("---------FirstLoadBalance-----------------");
return services.get(0);
}
}
但是配置中的负载均衡何时用到了?
在rpc-client-spring-boot-starter
会有自动配置。
用户通过浏览器访问就是经过controller
:
@Controller
public class HelloWorldController {
@RpcAutowired(version = "1.0")
private HelloWordService helloWordService;
@GetMapping("/hello/world")
// import org.springframework.http.ResponseEntity;
public ResponseEntity<String> pullServiceInfo(@RequestParam("name") String name){
return ResponseEntity.ok(helloWordService.sayHello(name));
}
}
这里面用一个自定义注解RpcAutowired
和一个HellowordService
(前面提到暴露出来的API接口)。
rpc-client-spring-boot-starter
RpcAutowired
这个是消费者启动之后有个后置处理器,会扫描ioc容器中的bean,如果被rpcAutowired修饰,就给属性动态赋代理对象。
public class RpcClientProcessor implements BeanFactoryPostProcessor, ApplicationContextAware {
。。。
ApplicationContextAware
通过它Spring容器会自动把上下文环境对象调用ApplicationContextAware
接口中的setApplicationContext
方法。
看回RpcAutowired
:
自定义注解:
元注解:
@Documented 注解是否将包含在JavaDoc中
@Retention 什么时候用这个注解
@Target用在什么地方
- ElementType.TYPE: 类、接口、注解、enum
- ElementType.CONSTRUCTOR: 构造函数
- ElementType.FIELD: 成员变量、对象、属性、枚举的常量
- ElementType.LOCAL_VARIABLE: 局部变量
- ElementType.METHOD: 方法
- ElementType.PACKAGE: 包
- ElementType.PARAMETER: 参数
- ElementType.ANNOTATION_TYPE): 注解
- ElementType.TYPE_PARAMETER:类型参数,表示这个注解可以用在 Type的声明式前,jdk1.8引入。
- ElementType.TYPE_USE:类型的注解,表示这个注解可以用在所有使用Type的地方(如:泛型,类型转换等),jdk1.8引入。
@Inherited 是否可以继承
这个其实就是个标志位,给后置处理器用的。
@Target({ElementType.CONSTRUCTOR, ElementType.METHOD, ElementType.PARAMETER, ElementType.FIELD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Autowired
public @interface RpcAutowired {
String version() default "1.0";
}
spring.factories
spring boot 会自动配置org.springframework.boot.autoconfigure.EnableAutoConfiguration的key下所有自动配置类。
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.rrtv.rpc.client.config.RpcClientAutoConfiguration
接下来按这个配置类来理一遍。
RpcClientAutoConfiguration
@Configuration
public class RpcClientAutoConfiguration {
//...
}
对各个方法分别分析。
//RpcClientAutoConfiguration
@Bean
public RpcClientProperties rpcClientProperties(Environment environment) {
BindResult<RpcClientProperties> result = Binder.get(environment).bind("rpc.client", RpcClientProperties.class);
return result.get();
}
@Bean 用在方法上,告诉Spring容器,你可以从下面这个方法中拿到一个Bean。
public class RpcClientProperties {
private String balance; //负载均衡
private String serialization; //序列化
private String discoveryAddr = "127.0.0.1:2181"; //服务发现地址
private Integer timeout; //服务调用超时
}
通过Binder可以将属性绑定到对象上。
甚至是:
//RpcClientAutoConfiguration
@Bean
@ConditionalOnMissingBean
public ClientStubProxyFactory clientStubProxyFactory() {
return new ClientStubProxyFactory();
}
@ConditionalOnMissingBean是修饰bean的一个注解,表示只能由一个bean注册,如果注册相同类型的bean,就会报差错。
返回的是stub代理工厂,通过computeIfAbsent
来关联,如果clazz不在缓存里,就关联到缓存里。handler为ClientStubInvocationHandler
。
public class ClientStubProxyFactory {
private Map<Class<?>, Object> objectCache = new HashMap<>();
/**
* 获取代理对象
*
* @param clazz 接口
* @param version 服务版本
* @param <T>
* @return 代理对象
*/
public <T> T getProxy(Class<T> clazz, String version, DiscoveryService discoveryService, RpcClientProperties properties) {
return (T) objectCache.computeIfAbsent(clazz, clz ->
Proxy.newProxyInstance(clz.getClassLoader(), new Class[]{clz}, new ClientStubInvocationHandler(discoveryService, properties, clz, version))
);
}
}
handler为:
public class ClientStubInvocationHandler implements InvocationHandler {
private DiscoveryService discoveryService;
private RpcClientProperties properties;
private Class<?> clazz;
private String version;
public ClientStubInvocationHandler(DiscoveryService discoveryService, RpcClientProperties properties, Class<?> clazz, String version) {
super();
this.clazz = clazz;
this.version = version;
this.discoveryService = discoveryService;
this.properties = properties;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 1、获得服务信息,在core下面,先不管内部细节
// @Autowired DiscoveryService discoveryService
// 最终这个discoveryService通过autowire得到
ServiceInfo serviceInfo = discoveryService.discovery(ServiceUtil.serviceKey(this.clazz.getName(), this.version));
if (serviceInfo == null) {
throw new ResourceNotFoundException("404");
}
MessageProtocol<RpcRequest> messageProtocol = new MessageProtocol<>();
/*
public class MessageProtocol<T> implements Serializable {
private MessageHeader header; //消息头
private T body; //消息体
}
*/
/*
public class RpcRequest implements Serializable {
private String serviceName; //请求的服务名 + 版本
private String method; //请求调用的方法
private Class<?>[] parameterTypes; //参数类型
private Object[] parameters; //参数
}
*/
// 设置请求头
messageProtocol.setHeader(MessageHeader.build(properties.getSerialization()));
/*
+---------------------------------------------------------------+
| 魔数 2byte | 协议版本号 1byte | 序列化算法 1byte | 报文类型 1byte |
+---------------------------------------------------------------+
| 状态 1byte | 消息 ID 32byte | 数据长度 4byte |
+---------------------------------------------------------------+
*/
// 设置请求体
RpcRequest request = new RpcRequest();
request.setServiceName(ServiceUtil.serviceKey(this.clazz.getName(), this.version));
request.setMethod(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
messageProtocol.setBody(request);
// 发送网络请求 拿到结果
/*
public class RpcResponse implements Serializable {
private Object data;
private String message;
}
*/
//getNetClientTransport()返回的是NettyNetClientTransport
MessageProtocol<RpcResponse> responseMessageProtocol = NetClientTransportFactory.getNetClientTransport()
.sendRequest(RequestMetadata.builder().protocol(messageProtocol).address(serviceInfo.getAddress())
.port(serviceInfo.getPort()).timeout(properties.getTimeout()).build());
if (responseMessageProtocol == null) {
log.error("请求超时");
throw new RpcException("rpc调用结果失败, 请求超时 timeout:" + properties.getTimeout());
}
if (!MsgStatus.isSuccess(responseMessageProtocol.getHeader().getStatus())) {
log.error("rpc调用结果失败, message:{}", responseMessageProtocol.getBody().getMessage());
throw new RpcException(responseMessageProtocol.getBody().getMessage());
}
return responseMessageProtocol.getBody().getData();
}
}
NettyNetClientTransport:
public class NettyNetClientTransport implements NetClientTransport {
private final Bootstrap bootstrap;
private final EventLoopGroup eventLoopGroup;
private final RpcResponseHandler handler;
public NettyNetClientTransport() {
bootstrap = new Bootstrap();
eventLoopGroup = new NioEventLoopGroup(4);
handler = new RpcResponseHandler();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
// 解码 是入站操作 将二进制解码成消息
.addLast(new RpcDecoder())
// 接收响应 入站操作
.addLast(handler)
// 编码 是出站操作 将消息编写二进制
.addLast(new RpcEncoder<>());
}
});
}
//。。。
}
RpcDecoder:
/**
*
* +---------------------------------------------------------------+
* | 魔数 2byte | 协议版本号 1byte | 序列化算法 1byte | 报文类型 1byte|
* +---------------------------------------------------------------+
* | 状态 1byte | 消息 ID 8byte | 数据长度 4byte |
* +---------------------------------------------------------------+
* | 数据内容 (长度不定) |
* +---------------------------------------------------------------+
*/
handler将消息id设为消息头:
public class RpcResponseHandler extends SimpleChannelInboundHandler<MessageProtocol<RpcResponse>> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProtocol<RpcResponse> rpcResponseMessageProtocol) throws Exception {
String requestId = rpcResponseMessageProtocol.getHeader().getRequestId();
// 收到响应 设置响应数据
LocalRpcResponseCache.fillResponse(requestId, rpcResponseMessageProtocol);
}
}
NettyNetClientTransport的sendRequest方法:
//NettyNetClientTransport
@Override
public MessageProtocol<RpcResponse> sendRequest(RequestMetadata metadata) throws Exception {
MessageProtocol<RpcRequest> protocol = metadata.getProtocol();
RpcFuture<MessageProtocol<RpcResponse>> future = new RpcFuture<>();
LocalRpcResponseCache.add(protocol.getHeader().getRequestId(), future);
// TCP 连接
ChannelFuture channelFuture = bootstrap.connect(metadata.getAddress(), metadata.getPort()).sync();
channelFuture.addListener((ChannelFutureListener) arg0 -> {
if (channelFuture.isSuccess()) {
log.info("connect rpc server {} on port {} success.", metadata.getAddress(), metadata.getPort());
} else {
log.error("connect rpc server {} on port {} failed.", metadata.getAddress(), metadata.getPort());
channelFuture.cause().printStackTrace();
eventLoopGroup.shutdownGracefully();
}
});
// 写入数据
channelFuture.channel().writeAndFlush(protocol);
return metadata.getTimeout() != null ? future.get(metadata.getTimeout(), TimeUnit.MILLISECONDS) : future.get();
}
RpcClientAutoConfiguration的负载均衡算法:
//RpcClientAutoConfiguration
@Primary
@Bean(name = "loadBalance")
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "rpc.client", name = "balance", havingValue = "randomBalance", matchIfMissing = true)
public LoadBalance randomBalance() {
// matchIfMissing表示这个是默认加载
return new RandomBalance();
// @Override
// public ServiceInfo chooseOne(List<ServiceInfo> services) {
// return services.get(random.nextInt(services.size()));
// }
}
@Bean(name = "loadBalance")
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "rpc.client", name = "balance", havingValue = "fullRoundBalance")
public LoadBalance loadBalance() {
return new FullRoundBalance();
/** 轮询
private int index;
@Override
public synchronized ServiceInfo chooseOne(List<ServiceInfo> services) {
// 加锁防止多线程情况下,index超出services.size()
if (index >= services.size()) {
index = 0;
}
return services.get(index++);
}
*/
}
最后剩下两个方法:
// RpcClientAutoConfiguration
@Bean
@ConditionalOnMissingBean
@ConditionalOnBean({RpcClientProperties.class, LoadBalance.class})
// ConditionalOnBean表示要包含RpcClientProperties.class, LoadBalance.class才行。
public DiscoveryService discoveryService(@Autowired RpcClientProperties properties,
@Autowired LoadBalance loadBalance) {
return new ZookeeperDiscoveryService(properties.getDiscoveryAddr(), loadBalance);
// 见下节服务发现
}
@Bean
@ConditionalOnMissingBean
public RpcClientProcessor rpcClientProcessor(@Autowired ClientStubProxyFactory clientStubProxyFactory,
@Autowired DiscoveryService discoveryService,
@Autowired RpcClientProperties properties) {
return new RpcClientProcessor(clientStubProxyFactory, discoveryService, properties);
}
bean后置处理器用于替换bean的代理。
public class RpcClientProcessor implements BeanFactoryPostProcessor, ApplicationContextAware {
private ClientStubProxyFactory clientStubProxyFactory;
private DiscoveryService discoveryService;
private RpcClientProperties properties;
private ApplicationContext applicationContext;
public RpcClientProcessor(ClientStubProxyFactory clientStubProxyFactory, DiscoveryService discoveryService, RpcClientProperties properties) {
this.clientStubProxyFactory = clientStubProxyFactory;
this.discoveryService = discoveryService;
this.properties = properties;
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
for (String beanDefinitionName : beanFactory.getBeanDefinitionNames()) {
BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName);
String beanClassName = beanDefinition.getBeanClassName();
if (beanClassName != null) {
Class<?> clazz = ClassUtils.resolveClassName(beanClassName, this.getClass().getClassLoader());
ReflectionUtils.doWithFields(clazz, field -> {
RpcAutowired rpcAutowired = AnnotationUtils.getAnnotation(field, RpcAutowired.class);
if (rpcAutowired != null) {
Object bean = applicationContext.getBean(clazz);
field.setAccessible(true);
// 修改为代理对象
ReflectionUtils.setField(field, bean, clientStubProxyFactory.getProxy(field.getType(), rpcAutowired.version(), discoveryService, properties));
}
});
}
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
服务发现
Service Provider在启动时向 Zookeeper 注册本机提供的 服务名称、端口号和地址;Consumer启动的时候先查询Zookeeper获取到服务Provider列表,然后通过负载均衡算法(随机、RoundRobin、一致性Hash) 选择一台机器去调用服务。
在ClientStubInvocationHandler里面有服务发现:
// 1、获得服务信息,在core下面
// @Autowired DiscoveryService discoveryService
// 最终这个discoveryService通过autowire得到
ServiceInfo serviceInfo = discoveryService.discovery(ServiceUtil.serviceKey(this.clazz.getName(), this.version));
从上节的rpcclientautoconfiguration有对应的bean:
new ZookeeperDiscoveryService(properties.getDiscoveryAddr(), loadBalance);
源码:
public class ZookeeperDiscoveryService implements DiscoveryService {
public static final int BASE_SLEEP_TIME_MS = 1000;
public static final int MAX_RETRIES = 3;
public static final String ZK_BASE_PATH = "/demo_rpc";
private ServiceDiscovery<ServiceInfo> serviceDiscovery;
/*
public class ServiceInfo implements Serializable {
private String appName; // 应用名称
private String serviceName; //服务名称
private String version; //版本
private String address; //地址
private Integer port; //端口
}
*/
private LoadBalance loadBalance;
public ZookeeperDiscoveryService(String registryAddr, LoadBalance loadBalance) {
this.loadBalance = loadBalance;
try {
// Curator是一个zookeeper客户端框架
CuratorFramework client = CuratorFrameworkFactory.newClient(registryAddr, new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES));
client.start();
JsonInstanceSerializer<ServiceInfo> serializer = new JsonInstanceSerializer<>(ServiceInfo.class);
// import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
this.serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceInfo.class)
.client(client)
.serializer(serializer)
.basePath(ZK_BASE_PATH)
.build();
this.serviceDiscovery.start();
} catch (Exception e) {
log.error("serviceDiscovery start error :{}", e);
}
}
/**
* 服务发现
* @param serviceName
* @return
* @throws Exception
*/
@Override
public ServiceInfo discovery(String serviceName) throws Exception {
Collection<ServiceInstance<ServiceInfo>> serviceInstances = serviceDiscovery.queryForInstances(serviceName);
return CollectionUtils.isEmpty(serviceInstances) ? null
: loadBalance.chooseOne(serviceInstances.stream().map(ServiceInstance::getPayload).collect(Collectors.toList()));
}
}
服务发现里面可以使用自定义的负载均衡算法。
----以下是服务端----
provider
@SpringBootApplication
@ComponentScan("com.rrtv.rpc")
public class ProviderApplication {
public static void main(String[] args) {
SpringApplication.run(ProviderApplication.class, args);
}
}
@RpcService(interfaceType = HelloWordService.class, version = "1.0")
public class HelloWordServiceImpl implements HelloWordService {
@Override
public String sayHello(String name) {
return String.format("您好:%s, rpc 调用成功", name);
}
}
RpcService又是一个自定义注解。
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Service
public @interface RpcService {
Class<?> interfaceType() default Object.class; //暴露服务接口类型
String version() default "1.0"; //服务版本
}
RpcServerAutoConfiguration
@Configuration
@EnableConfigurationProperties(RpcServerProperties.class)
public class RpcServerAutoConfiguration {
@Autowired
private RpcServerProperties properties;
@Bean
@ConditionalOnMissingBean
public RegistryService registryService() {
return new ZookeeperRegistryService(properties.getRegistryAddr());
}
@Bean
@ConditionalOnMissingBean(RpcServer.class)
RpcServer RpcServer() {
return new NettyRpcServer();
}
@Bean
@ConditionalOnMissingBean(RpcServerProvider.class)
RpcServerProvider rpcServerProvider(@Autowired RegistryService registryService,
@Autowired RpcServer rpcServer,
@Autowired RpcServerProperties rpcServerProperties){
return new RpcServerProvider(registryService, rpcServer, rpcServerProperties);
}
}
自动配置里面三个点:注册服务,netty server,rpcServerProvider。
rpcServer的handler
线程池处理。
public class RpcRequestHandler extends SimpleChannelInboundHandler<MessageProtocol<RpcRequest>> {
private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000));
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProtocol<RpcRequest> rpcRequestMessageProtocol) throws Exception {
// 多线程处理每个请求
threadPoolExecutor.submit(() -> {
MessageProtocol<RpcResponse> resProtocol = new MessageProtocol<>();
RpcResponse response = new RpcResponse();
MessageHeader header = rpcRequestMessageProtocol.getHeader();
// 设置头部消息类型为响应
header.setMsgType(MsgType.RESPONSE.getType());
try {
Object result = handle(rpcRequestMessageProtocol.getBody());
response.setData(result);
header.setStatus(MsgStatus.SUCCESS.getCode());
resProtocol.setHeader(header);
resProtocol.setBody(response);
} catch (Throwable throwable) {
header.setStatus(MsgStatus.FAIL.getCode());
response.setMessage(throwable.toString());
log.error("process request {} error", header.getRequestId(), throwable);
}
// 把数据写回去
channelHandlerContext.writeAndFlush(resProtocol);
});
}
/**
* 反射调用获取数据
*
* @param request
* @return
*/
private Object handle(RpcRequest request) {
try {
Object bean = LocalServerCache.get(request.getServiceName());
if (bean == null) {
throw new RuntimeException(String.format("service not exist: %s !", request.getServiceName()));
}
// 反射调用
Method method = bean.getClass().getMethod(request.getMethod(), request.getParameterTypes());
return method.invoke(bean, request.getParameters());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
粘包问题
这个是每个request connect一下,应该无这个问题。
TCP采用的 Nagle(批量发送,主要用于解决频繁发送小数据包而带来的网络拥塞问题) 算法对此作出了优化。
解决的方法:消息长度+消息内容。