手写rpc框架3


参考视频: 自己动手实现RPC框架,本内容为视频学习笔记。

三篇内容:

  • 理论篇:rpc核心原理、现有框架对比、相关技术

  • 实战篇:代码实现、使用案例

  • 总结篇

上节:手写rpc框架2

实战篇

Client

image-20220128011814120

首先是配置文件,选取什么类,服务端地址。

@Data
public class RpcClientConfig {
    private Class<? extends TransportClient> transportClass
            = HTTPTransportClient.class;
    private Class<?extends Encoder> encoderClass = JSONEncoder.class;
    private Class<?extends Decoder> decoderClass = JSONDecoder.class;
    private Class<? extends TransportSelector> selectorClass =
            RandomTransportSelector.class;
    private int connectCount = 1;
    private List<Peer> servers = Arrays.asList(
            new Peer("127.0.0.1",3000)
    );
}

server可能有多个,选择server的接口,包括选择、释放


/**
 * 表示选择哪个server连接
 * @author hqingLau
 **/
public interface TransportSelector {
    /**
     * 初始化selector
     * @param peers 可以连接的server端点信息
     * @param count client与server建立多少个连接
     * @param clazz client实现类class
     */
    void init(List<Peer> peers,
              int count,
              Class<? extends  TransportClient> clazz);
    /**
     * 选择一个transport和server做交互
     * @return 网络client
     */
    TransportClient select();

    /**
     * 释放用完的client
     * @param client
     */
    void release(TransportClient client);

    void close();
}

随机选择server的实现类:


@Slf4j
public class RandomTransportSelector implements TransportSelector{
    // 已经连接好的client
    private List<TransportClient> clients;

    public RandomTransportSelector() {
        clients = new ArrayList<>();
    }
    /**
     * 初始化selector
     *
     * @param peers 可以连接的server端点信息
     * @param count client与server建立多少个连接
     * @param clazz client实现类class
     */
    @Override
    public void init(List<Peer> peers, int count, Class<? extends TransportClient> clazz) {
        count = Math.max(count,1);

        for(Peer peer:peers) {
            for (int i = 0; i < count; i++) {
                TransportClient client = ReflectUtils.newInstance(clazz);
                client.connect(peer);
                clients.add(client);
            }
            log.info("connect server: {}",peer);
        }
    }

    /**
     * 选择一个transport和server做交互
     *
     * @return 网络client
     */
    @Override
    public synchronized TransportClient select() {
        int i = new Random().nextInt(clients.size());
        return clients.remove(i);
    }

    /**
     * 释放用完的client,就是重新放回到list
     *
     * @param client
     */
    @Override
    public synchronized void release(TransportClient client) {
        clients.add(client);
    }

    @Override
    public synchronized void close() {
        for(TransportClient client:clients) {
            client.close();
        }
        clients.clear();
    }
}

RpcClient:

public class RpcClient {
    private RpcClientConfig config;
    private Encoder encoder;
    private Decoder decoder;
    private TransportSelector selector;

    public RpcClient() {
        this(new RpcClientConfig());
    }

    public RpcClient(RpcClientConfig config) {
        this.config = config;
        this.encoder = ReflectUtils.newInstance(config.getEncoderClass());
        this.decoder = ReflectUtils.newInstance(config.getDecoderClass());
        this.selector = ReflectUtils.newInstance(config.getSelectorClass());

        this.selector.init(this.config.getServers(),
                this.config.getConnectCount(),
                this.config.getTransportClass());
    }

    // 获取接口的代理对象
    public <T> T getProxy(Class<T> clazz) {
        return (T) Proxy.newProxyInstance(
                getClass().getClassLoader(),
                new Class[]{clazz},
                new RemoteInvoker(clazz,encoder,decoder,selector)
        );
    }
}

可以获取接口的代理对象,就是客户端只要知道接口,方法,就ok了,具体的实现和运行交给服务端。客户端生成了一个动态代理对象来调用,其实内部handler调用的是server的。

RemoteInvoker:

/**
 * 调用远程服务的代理类
 * @author hqingLau
 **/
@Slf4j
public class RemoteInvoker implements InvocationHandler {
    private Encoder encoder;
    private Decoder decoder;
    private TransportSelector selector;
    private Class clazz;

    RemoteInvoker(Class clazz, Encoder encoder,
                  Decoder decoder,
                  TransportSelector selector) {
        this.decoder = decoder;
        this.encoder = encoder;
        this.selector = selector;
        this.clazz = clazz;
    }

                  @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Request request = new Request();
        request.setServiceDescriptor(ServiceDescriptor.from(clazz,method));
        request.setParameters(args);

        Response resp = invokeRemote(request);
        if(resp == null || resp.getCode()!=0) {
            throw new IllegalStateException("fail to invoke remote: "+resp);
        }
        return resp.getData();
    }

    private Response invokeRemote(Request request) {
        Response resp = null;
        TransportClient client = null;
        try {
            client = selector.select();
            byte[] outBytes = encoder.encode(request);
            InputStream recive = client.write(new ByteArrayInputStream(outBytes));

            byte[] inBytes = IOUtils.readFully(recive,recive.available());
            resp = decoder.decode(inBytes,Response.class);
        } catch (IOException e) {
            log.warn(e.getMessage(),e);
            resp = new Response();
            resp.setCode(1);
            resp.setMessage("RpcClient got error: "+
                    e.getClass() +
                    ": "+
                    e.getMessage()
            );
        } finally {
            if(client!=null) {
                selector.release(client);
            }
        }
        return resp;
    }
}

测试

image-20220128012649880

测试代码:

// 服务接口
public interface CalcService {
    int add(int a,int b);
    int minus(int a,int b);
}

//服务具体实现
public class CalServiceImpl implements CalcService{
    @Override
    public int add(int a, int b) {
        return a+b;
    }

    @Override
    public int minus(int a, int b) {
        return a-b;
    }
}

//服务端
public class Server {
    public static void main(String[] args) {
        RpcServer server = new RpcServer();
        server.register(CalcService.class,new CalServiceImpl());
        server.start();
    }
}


//客户端
public class Client {
    public static void main(String[] args) {
        RpcClient client = new RpcClient();
        CalcService service = client.getProxy(CalcService.class);

        int r1 = service.add(1,2);
        int r2 = service.minus(10,3);
        System.out.println(r1);
        System.out.println(r2);
    }
}

输出信息比较多,可以配置下logback配置文件:

<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
    <property name="log.pattern"
              value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread]  %-5level %logger{50} - %msg%n"></property><!--日志格式-->

    <!--控制台设置 定义输出到控制台的信息-->
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
        <pattern>${log.pattern}</pattern>
    </encoder>
    </appender>

    <root level="info">
            <appender-ref ref="STDOUT"/>
    </root>
</configuration>

总结

明天梳理一下。

参考文献

司马极客视频