从零实现一个 RPC 框架

RPC(Remote Procedure Call,远程过程调用)让调用远程服务像调用本地方法一样透明。gRPC、Dubbo、Thrift 这些框架背后的核心机制并不神秘,本文从零拆解一个 RPC 框架的完整实现路径。

RPC 的本质

一次 RPC 调用在网络上经历了什么:

sequenceDiagram
    participant C as 调用方 Client
    participant S as 服务方 Server

    C->>C: 1. 调用本地 Stub 方法
    C->>C: 2. Stub 将方法名+参数序列化为字节流
    C->>S: 3. 通过网络发送请求
    S->>S: 4. 反序列化 找到对应方法
    S->>S: 5. 执行本地实现
    S->>S: 6. 序列化返回值
    S->>C: 7. 通过网络发送响应
    C->>C: 9. 反序列化 返回结果给调用方

一个完整的 RPC 框架需要解决五个问题:

  1. 网络通信:如何建立连接、传输数据
  2. 协议设计:如何定义请求/响应的格式
  3. 序列化:如何将对象转换为字节流
  4. 动态代理:如何让调用方感知不到网络的存在
  5. 服务注册与发现:如何找到服务在哪台机器上

第一步:定义协议

协议是 RPC 框架的基础。需要定义请求和响应的数据格式,以及如何在字节流中区分消息边界(解决 TCP 粘包问题)。

消息格式

┌─────────────────────────────────────────────────────┐
│  Magic (4B) │ Version (1B) │ MsgType (1B) │ Seq (8B) │
├─────────────────────────────────────────────────────┤
│              Body Length (4B)                        │
├─────────────────────────────────────────────────────┤
│              Body (variable)                         │
└─────────────────────────────────────────────────────┘
  • Magic:魔数,用于快速识别是否是本协议的数据包,例如 0xCAFEBABE
  • Version:协议版本,支持后续升级
  • MsgType:消息类型,区分请求(0x01)和响应(0x02)
  • Seq:请求序列号,用于将响应与请求对应(异步调用时关键)
  • Body Length:消息体长度,解决 TCP 粘包
public class RpcProtocol {
    public static final int MAGIC = 0xCAFEBABE;
    public static final int HEADER_LENGTH = 18; // 4+1+1+8+4

    // 请求消息体
    public static class Request {
        private String serviceInterface; // 接口全限定名
        private String methodName;
        private Class<?>[] paramTypes;
        private Object[] params;
        private String version;          // 接口版本,支持多版本共存
    }

    // 响应消息体
    public static class Response {
        private Object result;
        private String errorMsg;         // 非 null 表示调用出错
        private int code;                // 200 成功,其他失败
    }
}

编解码器

基于 Netty 实现 LengthFieldBasedFrame 解码,处理 TCP 粘包:

// 解码:读 Header,再按 Body Length 读 Body
public class RpcDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        if (in.readableBytes() < RpcProtocol.HEADER_LENGTH) return;

        in.markReaderIndex();
        int magic = in.readInt();
        if (magic != RpcProtocol.MAGIC) {
            ctx.close();
            return;
        }
        byte version = in.readByte();
        byte msgType = in.readByte();
        long seq = in.readLong();
        int bodyLen = in.readInt();

        if (in.readableBytes() < bodyLen) {
            in.resetReaderIndex(); // 数据不够,等下次
            return;
        }
        byte[] body = new byte[bodyLen];
        in.readBytes(body);
        out.add(new RpcMessage(msgType, seq, body));
    }
}

第二步:序列化

序列化将对象转换为字节流用于网络传输。常见方案的对比:

方案 性能 体积 跨语言 可读性
JDK 原生
JSON (Jackson)
Hessian2 部分
Protobuf 最小
Kryo 最快

设计一个可扩展的序列化接口,允许在运行时切换:

public interface Serializer {
    byte[] serialize(Object obj);
    <T> T deserialize(byte[] bytes, Class<T> clazz);
}

// 用 SPI 机制加载,协议头里带一个 serializerType 字段标识用哪种
public class SerializerFactory {
    private static final Map<Byte, Serializer> SERIALIZERS = new HashMap<>();
    static {
        SERIALIZERS.put((byte) 0x01, new JsonSerializer());
        SERIALIZERS.put((byte) 0x02, new KryoSerializer());
        SERIALIZERS.put((byte) 0x03, new ProtobufSerializer());
    }
    public static Serializer get(byte type) {
        return SERIALIZERS.getOrDefault(type, new JsonSerializer());
    }
}

Kryo 不是线程安全的,需要用 ThreadLocal 或对象池:

public class KryoSerializer implements Serializer {
    private static final ThreadLocal<Kryo> KRYO_LOCAL = ThreadLocal.withInitial(() -> {
        Kryo kryo = new Kryo();
        kryo.setRegistrationRequired(false); // 无需预注册类
        return kryo;
    });

    @Override
    public byte[] serialize(Object obj) {
        Output output = new Output(1024, -1);
        KRYO_LOCAL.get().writeClassAndObject(output, obj);
        return output.toBytes();
    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> clazz) {
        Input input = new Input(bytes);
        return clazz.cast(KRYO_LOCAL.get().readClassAndObject(input));
    }
}

第三步:网络通信

生产级 RPC 框架都基于 Netty 的 NIO 实现异步非阻塞通信。

Server 端

public class RpcServer {
    private final int port;
    private final ServiceRegistry registry;

    public void start() throws InterruptedException {
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap()
                .group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childOption(ChannelOption.TCP_NODELAY, true)  // 禁用 Nagle,降低延迟
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ch.pipeline()
                            .addLast(new IdleStateHandler(60, 0, 0)) // 心跳检测
                            .addLast(new RpcDecoder())
                            .addLast(new RpcEncoder())
                            .addLast(new RpcServerHandler(registry)); // 业务处理
                    }
                });
            b.bind(port).sync().channel().closeFuture().sync();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

Server Handler:反射调用本地方法

public class RpcServerHandler extends SimpleChannelInboundHandler<RpcMessage> {
    private final ServiceRegistry registry;
    // 业务线程池,不阻塞 IO 线程
    private final ExecutorService executor = new ThreadPoolExecutor(
        16, 200, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000));

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcMessage msg) {
        executor.submit(() -> {
            RpcProtocol.Response response = new RpcProtocol.Response();
            try {
                RpcProtocol.Request request = deserialize(msg.getBody());
                // 从注册中心找到实现类
                Object service = registry.getService(
                    request.getServiceInterface(), request.getVersion());
                // 反射调用
                Method method = service.getClass().getMethod(
                    request.getMethodName(), request.getParamTypes());
                Object result = method.invoke(service, request.getParams());
                response.setResult(result);
                response.setCode(200);
            } catch (Exception e) {
                response.setCode(500);
                response.setErrorMsg(e.getMessage());
            }
            ctx.writeAndFlush(buildResponse(msg.getSeq(), response));
        });
    }
}

第四步:动态代理

这是 RPC 的"魔法"所在——让调用方感知不到网络的存在。

public class RpcClientProxy implements InvocationHandler {
    private final String host;
    private final int port;
    private final RpcClient client; // 管理连接池

    @SuppressWarnings("unchecked")
    public <T> T getProxy(Class<T> serviceInterface) {
        return (T) Proxy.newProxyInstance(
            serviceInterface.getClassLoader(),
            new Class<?>[]{serviceInterface},
            this
        );
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // 构造 RPC 请求
        RpcProtocol.Request request = new RpcProtocol.Request();
        request.setServiceInterface(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParamTypes(method.getParameterTypes());
        request.setParams(args);

        // 发送请求,等待响应(同步调用)
        RpcProtocol.Response response = client.sendRequest(request);

        if (response.getCode() != 200) {
            throw new RpcException(response.getErrorMsg());
        }
        return response.getResult();
    }
}

// 使用方:和调用本地方法完全一样
UserService userService = proxy.getProxy(UserService.class);
User user = userService.getById(123); // 实际走网络

异步调用与 Future

同步调用会阻塞调用线程。生产框架通常提供异步接口:

// 使用 CompletableFuture 实现异步
public CompletableFuture<Object> sendAsync(RpcProtocol.Request request) {
    long seq = SEQ_GENERATOR.incrementAndGet();
    CompletableFuture<Object> future = new CompletableFuture<>();
    // 存入等待 Map,响应到来时通过 seq 找到对应 Future
    PENDING_FUTURES.put(seq, future);

    // 发送请求(不等待)
    channel.writeAndFlush(buildRequest(seq, request));

    // 设置超时
    future.orTimeout(3, TimeUnit.SECONDS)
          .exceptionally(e -> { PENDING_FUTURES.remove(seq); return null; });
    return future;
}

// 响应到来时
public void onResponse(long seq, RpcProtocol.Response response) {
    CompletableFuture<Object> future = PENDING_FUTURES.remove(seq);
    if (future != null) {
        if (response.getCode() == 200) {
            future.complete(response.getResult());
        } else {
            future.completeExceptionally(new RpcException(response.getErrorMsg()));
        }
    }
}

第五步:服务注册与发现

硬编码 IP 地址不可扩展。需要一个注册中心让服务提供方和消费方解耦。

基于 ZooKeeper 的注册中心

ZooKeeper 节点结构:

/rpc
  /com.example.UserService          ← 接口名
    /providers                      ← 服务提供方列表
      /192.168.1.10:8080            ← 临时节点(Provider 下线自动删除)
      /192.168.1.11:8080
    /consumers                      ← 服务消费方列表(可选)
      /192.168.2.1
public class ZkServiceRegistry implements ServiceRegistry {
    private final CuratorFramework client;

    // Provider 启动时注册(临时节点,断开连接自动删除)
    @Override
    public void register(String serviceName, String address) throws Exception {
        String path = "/rpc/" + serviceName + "/providers/" + address;
        client.create()
              .creatingParentsIfNeeded()
              .withMode(CreateMode.EPHEMERAL)  // 临时节点
              .forPath(path);
    }

    // Consumer 发现服务,并监听变更
    @Override
    public List<String> discover(String serviceName) throws Exception {
        String path = "/rpc/" + serviceName + "/providers";
        List<String> addresses = client.getChildren().forPath(path);

        // 监听节点变化,缓存更新
        client.getChildren().usingWatcher((CuratorWatcher) event -> {
            serviceCache.put(serviceName, client.getChildren().forPath(path));
        }).forPath(path);

        return addresses;
    }
}

第六步:负载均衡

当同一服务有多个实例时,需要选择一个发送请求。

public interface LoadBalancer {
    String select(List<String> addresses, RpcProtocol.Request request);
}

// 随机
public class RandomLoadBalancer implements LoadBalancer {
    private final Random random = new Random();
    @Override
    public String select(List<String> addresses, RpcProtocol.Request request) {
        return addresses.get(random.nextInt(addresses.size()));
    }
}

// 轮询
public class RoundRobinLoadBalancer implements LoadBalancer {
    private final AtomicInteger counter = new AtomicInteger(0);
    @Override
    public String select(List<String> addresses, RpcProtocol.Request request) {
        return addresses.get(counter.getAndIncrement() % addresses.size());
    }
}

// 一致性哈希(相同参数总路由到同一节点,适合有状态服务)
public class ConsistentHashLoadBalancer implements LoadBalancer {
    private final TreeMap<Integer, String> ring = new TreeMap<>();
    // 每个节点在哈希环上放 150 个虚拟节点,避免数据倾斜
    private static final int VIRTUAL_NODES = 150;

    public void addNode(String address) {
        for (int i = 0; i < VIRTUAL_NODES; i++) {
            ring.put(hash(address + "#" + i), address);
        }
    }

    @Override
    public String select(List<String> addresses, RpcProtocol.Request request) {
        // 以第一个参数的哈希值作为路由 key
        int key = hash(String.valueOf(request.getParams()[0]));
        Map.Entry<Integer, String> entry = ring.ceilingEntry(key);
        return entry != null ? entry.getValue() : ring.firstEntry().getValue();
    }
}

第七步:超时与重试

网络是不可靠的,必须处理超时和失败。

public Object invokeWithRetry(RpcProtocol.Request request, int maxRetry, long timeoutMs) {
    int attempt = 0;
    Exception lastException = null;

    while (attempt < maxRetry) {
        try {
            // 每次重试可以换一个节点(failover)
            String address = loadBalancer.select(discover(request.getServiceInterface()), request);
            CompletableFuture<Object> future = getClient(address).sendAsync(request);
            return future.get(timeoutMs, TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            lastException = e;
            attempt++;
            // 注意:只有幂等操作(查询、有唯一键保护的写)才能重试
        } catch (Exception e) {
            throw new RpcException("RPC call failed", e);
        }
    }
    throw new RpcException("RPC call failed after " + maxRetry + " retries", lastException);
}

重试有一个重要约束:只有幂等操作才能重试。非幂等写操作(如扣减库存)重试可能导致重复执行。可以通过在接口上加注解声明幂等性:

@Idempotent  // 框架识别此注解,允许重试
User getById(long id);

@NonIdempotent  // 禁止重试
boolean deductStock(long itemId, int quantity);

连接管理

每次调用都建立新 TCP 连接代价极高,需要连接池。

public class ConnectionPool {
    // 每个地址维护一个连接池
    private final Map<String, ArrayBlockingQueue<Channel>> pools = new ConcurrentHashMap<>();
    private static final int POOL_SIZE = 10;

    public Channel acquire(String address) throws InterruptedException {
        ArrayBlockingQueue<Channel> pool = pools.computeIfAbsent(
            address, k -> createPool(k, POOL_SIZE));

        Channel ch = pool.poll(100, TimeUnit.MILLISECONDS);
        if (ch == null || !ch.isActive()) {
            ch = createChannel(address); // 池里没有可用连接,新建
        }
        return ch;
    }

    public void release(String address, Channel ch) {
        if (ch.isActive()) {
            pools.get(address).offer(ch); // 归还到池
        } else {
            ch.close(); // 连接已断,直接关闭
        }
    }
}

心跳保活

长连接需要心跳防止被防火墙或 NAT 断开:

// Client 端:定期发送心跳
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            if (e.state() == IdleState.WRITER_IDLE) {
                // 超过 30 秒没发数据,发送心跳包
                ctx.writeAndFlush(HEARTBEAT_REQUEST);
            } else if (e.state() == IdleState.ALL_IDLE) {
                // 超过 60 秒没有任何交互,关闭连接
                ctx.close();
            }
        }
    }
}

整体架构

把以上模块组合起来,一个完整 RPC 框架的架构如下:

graph TD
    subgraph Consumer[Consumer]
        Call[userService.getById 透明调用]
        Proxy[Dynamic Proxy JDK/Cglib\n序列化请求 服务发现 负载均衡 发送 等待响应]
        ConnPool[Connection Pool + Netty Client]
    end
    subgraph Provider[Provider]
        NettyServer[Netty Server Boss + Worker EventLoopGroup]
        Handler[RpcServerHandler 业务线程池 反射调用 序列化响应]
    end
    Registry[ZooKeeper / Etcd / Nacos\n/rpc/com.example.UserService/providers/...]

    Call --> Proxy
    Proxy --> ConnPool
    ConnPool -->|TCP| NettyServer
    NettyServer --> Handler
    Provider -->|注册| Registry
    Registry -->|发现| Consumer

与 gRPC 的对比

手写框架和 gRPC 的核心差异:

特性 手写框架 gRPC
传输协议 TCP + 自定义协议 HTTP/2
序列化 可插拔(JSON/Kryo/Protobuf) Protobuf(IDL 强约束)
流式调用 需要自行实现 原生支持(双向流)
跨语言 较复杂 天然支持(IDL 生成多语言代码)
服务发现 需集成注册中心 需集成注册中心
接口定义 Java 接口 .proto 文件

HTTP/2 的多路复用让 gRPC 天然解决了 Head-of-Line Blocking 问题,这是自定义 TCP 协议需要额外处理的。

总结

一个 RPC 框架的核心复杂度集中在几个地方:

  • 协议设计决定了扩展性和兼容性,魔数 + 版本号 + 长度字段是最小必要集
  • 动态代理是透明调用的关键,JDK Proxy 只能代理接口,Cglib 可以代理类
  • 异步化是性能的核心,seq + CompletableFuture 的组合是标准做法
  • 连接池 + 心跳是生产可用的必要条件,裸连接在高并发下会成为瓶颈
  • 幂等性判断是重试安全的前提,不能无脑重试所有失败

理解了这些,再去读 Dubbo 或 gRPC 的源码,会发现它们不过是在这个骨架上做了更多的工程化打磨。