RPC(Remote Procedure Call,远程过程调用)让调用远程服务像调用本地方法一样透明。gRPC、Dubbo、Thrift 这些框架背后的核心机制并不神秘,本文从零拆解一个 RPC 框架的完整实现路径。
RPC 的本质
一次 RPC 调用在网络上经历了什么:
sequenceDiagram
participant C as 调用方 Client
participant S as 服务方 Server
C->>C: 1. 调用本地 Stub 方法
C->>C: 2. Stub 将方法名+参数序列化为字节流
C->>S: 3. 通过网络发送请求
S->>S: 4. 反序列化 找到对应方法
S->>S: 5. 执行本地实现
S->>S: 6. 序列化返回值
S->>C: 7. 通过网络发送响应
C->>C: 9. 反序列化 返回结果给调用方
一个完整的 RPC 框架需要解决五个问题:
- 网络通信:如何建立连接、传输数据
- 协议设计:如何定义请求/响应的格式
- 序列化:如何将对象转换为字节流
- 动态代理:如何让调用方感知不到网络的存在
- 服务注册与发现:如何找到服务在哪台机器上
第一步:定义协议
协议是 RPC 框架的基础。需要定义请求和响应的数据格式,以及如何在字节流中区分消息边界(解决 TCP 粘包问题)。
消息格式
┌─────────────────────────────────────────────────────┐
│ Magic (4B) │ Version (1B) │ MsgType (1B) │ Seq (8B) │
├─────────────────────────────────────────────────────┤
│ Body Length (4B) │
├─────────────────────────────────────────────────────┤
│ Body (variable) │
└─────────────────────────────────────────────────────┘
- Magic:魔数,用于快速识别是否是本协议的数据包,例如
0xCAFEBABE - Version:协议版本,支持后续升级
- MsgType:消息类型,区分请求(0x01)和响应(0x02)
- Seq:请求序列号,用于将响应与请求对应(异步调用时关键)
- Body Length:消息体长度,解决 TCP 粘包
public class RpcProtocol {
public static final int MAGIC = 0xCAFEBABE;
public static final int HEADER_LENGTH = 18; // 4+1+1+8+4
// 请求消息体
public static class Request {
private String serviceInterface; // 接口全限定名
private String methodName;
private Class<?>[] paramTypes;
private Object[] params;
private String version; // 接口版本,支持多版本共存
}
// 响应消息体
public static class Response {
private Object result;
private String errorMsg; // 非 null 表示调用出错
private int code; // 200 成功,其他失败
}
}
编解码器
基于 Netty 实现 LengthFieldBasedFrame 解码,处理 TCP 粘包:
// 解码:读 Header,再按 Body Length 读 Body
public class RpcDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < RpcProtocol.HEADER_LENGTH) return;
in.markReaderIndex();
int magic = in.readInt();
if (magic != RpcProtocol.MAGIC) {
ctx.close();
return;
}
byte version = in.readByte();
byte msgType = in.readByte();
long seq = in.readLong();
int bodyLen = in.readInt();
if (in.readableBytes() < bodyLen) {
in.resetReaderIndex(); // 数据不够,等下次
return;
}
byte[] body = new byte[bodyLen];
in.readBytes(body);
out.add(new RpcMessage(msgType, seq, body));
}
}
第二步:序列化
序列化将对象转换为字节流用于网络传输。常见方案的对比:
| 方案 | 性能 | 体积 | 跨语言 | 可读性 |
|---|---|---|---|---|
| JDK 原生 | 慢 | 大 | 否 | 否 |
| JSON (Jackson) | 中 | 中 | 是 | 是 |
| Hessian2 | 中 | 小 | 部分 | 否 |
| Protobuf | 快 | 最小 | 是 | 否 |
| Kryo | 最快 | 小 | 否 | 否 |
设计一个可扩展的序列化接口,允许在运行时切换:
public interface Serializer {
byte[] serialize(Object obj);
<T> T deserialize(byte[] bytes, Class<T> clazz);
}
// 用 SPI 机制加载,协议头里带一个 serializerType 字段标识用哪种
public class SerializerFactory {
private static final Map<Byte, Serializer> SERIALIZERS = new HashMap<>();
static {
SERIALIZERS.put((byte) 0x01, new JsonSerializer());
SERIALIZERS.put((byte) 0x02, new KryoSerializer());
SERIALIZERS.put((byte) 0x03, new ProtobufSerializer());
}
public static Serializer get(byte type) {
return SERIALIZERS.getOrDefault(type, new JsonSerializer());
}
}
Kryo 不是线程安全的,需要用 ThreadLocal 或对象池:
public class KryoSerializer implements Serializer {
private static final ThreadLocal<Kryo> KRYO_LOCAL = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
kryo.setRegistrationRequired(false); // 无需预注册类
return kryo;
});
@Override
public byte[] serialize(Object obj) {
Output output = new Output(1024, -1);
KRYO_LOCAL.get().writeClassAndObject(output, obj);
return output.toBytes();
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
Input input = new Input(bytes);
return clazz.cast(KRYO_LOCAL.get().readClassAndObject(input));
}
}
第三步:网络通信
生产级 RPC 框架都基于 Netty 的 NIO 实现异步非阻塞通信。
Server 端
public class RpcServer {
private final int port;
private final ServiceRegistry registry;
public void start() throws InterruptedException {
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap()
.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.TCP_NODELAY, true) // 禁用 Nagle,降低延迟
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new IdleStateHandler(60, 0, 0)) // 心跳检测
.addLast(new RpcDecoder())
.addLast(new RpcEncoder())
.addLast(new RpcServerHandler(registry)); // 业务处理
}
});
b.bind(port).sync().channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
Server Handler:反射调用本地方法
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcMessage> {
private final ServiceRegistry registry;
// 业务线程池,不阻塞 IO 线程
private final ExecutorService executor = new ThreadPoolExecutor(
16, 200, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000));
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcMessage msg) {
executor.submit(() -> {
RpcProtocol.Response response = new RpcProtocol.Response();
try {
RpcProtocol.Request request = deserialize(msg.getBody());
// 从注册中心找到实现类
Object service = registry.getService(
request.getServiceInterface(), request.getVersion());
// 反射调用
Method method = service.getClass().getMethod(
request.getMethodName(), request.getParamTypes());
Object result = method.invoke(service, request.getParams());
response.setResult(result);
response.setCode(200);
} catch (Exception e) {
response.setCode(500);
response.setErrorMsg(e.getMessage());
}
ctx.writeAndFlush(buildResponse(msg.getSeq(), response));
});
}
}
第四步:动态代理
这是 RPC 的"魔法"所在——让调用方感知不到网络的存在。
public class RpcClientProxy implements InvocationHandler {
private final String host;
private final int port;
private final RpcClient client; // 管理连接池
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> serviceInterface) {
return (T) Proxy.newProxyInstance(
serviceInterface.getClassLoader(),
new Class<?>[]{serviceInterface},
this
);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 构造 RPC 请求
RpcProtocol.Request request = new RpcProtocol.Request();
request.setServiceInterface(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParamTypes(method.getParameterTypes());
request.setParams(args);
// 发送请求,等待响应(同步调用)
RpcProtocol.Response response = client.sendRequest(request);
if (response.getCode() != 200) {
throw new RpcException(response.getErrorMsg());
}
return response.getResult();
}
}
// 使用方:和调用本地方法完全一样
UserService userService = proxy.getProxy(UserService.class);
User user = userService.getById(123); // 实际走网络
异步调用与 Future
同步调用会阻塞调用线程。生产框架通常提供异步接口:
// 使用 CompletableFuture 实现异步
public CompletableFuture<Object> sendAsync(RpcProtocol.Request request) {
long seq = SEQ_GENERATOR.incrementAndGet();
CompletableFuture<Object> future = new CompletableFuture<>();
// 存入等待 Map,响应到来时通过 seq 找到对应 Future
PENDING_FUTURES.put(seq, future);
// 发送请求(不等待)
channel.writeAndFlush(buildRequest(seq, request));
// 设置超时
future.orTimeout(3, TimeUnit.SECONDS)
.exceptionally(e -> { PENDING_FUTURES.remove(seq); return null; });
return future;
}
// 响应到来时
public void onResponse(long seq, RpcProtocol.Response response) {
CompletableFuture<Object> future = PENDING_FUTURES.remove(seq);
if (future != null) {
if (response.getCode() == 200) {
future.complete(response.getResult());
} else {
future.completeExceptionally(new RpcException(response.getErrorMsg()));
}
}
}
第五步:服务注册与发现
硬编码 IP 地址不可扩展。需要一个注册中心让服务提供方和消费方解耦。
基于 ZooKeeper 的注册中心
ZooKeeper 节点结构:
/rpc
/com.example.UserService ← 接口名
/providers ← 服务提供方列表
/192.168.1.10:8080 ← 临时节点(Provider 下线自动删除)
/192.168.1.11:8080
/consumers ← 服务消费方列表(可选)
/192.168.2.1
public class ZkServiceRegistry implements ServiceRegistry {
private final CuratorFramework client;
// Provider 启动时注册(临时节点,断开连接自动删除)
@Override
public void register(String serviceName, String address) throws Exception {
String path = "/rpc/" + serviceName + "/providers/" + address;
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL) // 临时节点
.forPath(path);
}
// Consumer 发现服务,并监听变更
@Override
public List<String> discover(String serviceName) throws Exception {
String path = "/rpc/" + serviceName + "/providers";
List<String> addresses = client.getChildren().forPath(path);
// 监听节点变化,缓存更新
client.getChildren().usingWatcher((CuratorWatcher) event -> {
serviceCache.put(serviceName, client.getChildren().forPath(path));
}).forPath(path);
return addresses;
}
}
第六步:负载均衡
当同一服务有多个实例时,需要选择一个发送请求。
public interface LoadBalancer {
String select(List<String> addresses, RpcProtocol.Request request);
}
// 随机
public class RandomLoadBalancer implements LoadBalancer {
private final Random random = new Random();
@Override
public String select(List<String> addresses, RpcProtocol.Request request) {
return addresses.get(random.nextInt(addresses.size()));
}
}
// 轮询
public class RoundRobinLoadBalancer implements LoadBalancer {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public String select(List<String> addresses, RpcProtocol.Request request) {
return addresses.get(counter.getAndIncrement() % addresses.size());
}
}
// 一致性哈希(相同参数总路由到同一节点,适合有状态服务)
public class ConsistentHashLoadBalancer implements LoadBalancer {
private final TreeMap<Integer, String> ring = new TreeMap<>();
// 每个节点在哈希环上放 150 个虚拟节点,避免数据倾斜
private static final int VIRTUAL_NODES = 150;
public void addNode(String address) {
for (int i = 0; i < VIRTUAL_NODES; i++) {
ring.put(hash(address + "#" + i), address);
}
}
@Override
public String select(List<String> addresses, RpcProtocol.Request request) {
// 以第一个参数的哈希值作为路由 key
int key = hash(String.valueOf(request.getParams()[0]));
Map.Entry<Integer, String> entry = ring.ceilingEntry(key);
return entry != null ? entry.getValue() : ring.firstEntry().getValue();
}
}
第七步:超时与重试
网络是不可靠的,必须处理超时和失败。
public Object invokeWithRetry(RpcProtocol.Request request, int maxRetry, long timeoutMs) {
int attempt = 0;
Exception lastException = null;
while (attempt < maxRetry) {
try {
// 每次重试可以换一个节点(failover)
String address = loadBalancer.select(discover(request.getServiceInterface()), request);
CompletableFuture<Object> future = getClient(address).sendAsync(request);
return future.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
lastException = e;
attempt++;
// 注意:只有幂等操作(查询、有唯一键保护的写)才能重试
} catch (Exception e) {
throw new RpcException("RPC call failed", e);
}
}
throw new RpcException("RPC call failed after " + maxRetry + " retries", lastException);
}
重试有一个重要约束:只有幂等操作才能重试。非幂等写操作(如扣减库存)重试可能导致重复执行。可以通过在接口上加注解声明幂等性:
@Idempotent // 框架识别此注解,允许重试
User getById(long id);
@NonIdempotent // 禁止重试
boolean deductStock(long itemId, int quantity);
连接管理
每次调用都建立新 TCP 连接代价极高,需要连接池。
public class ConnectionPool {
// 每个地址维护一个连接池
private final Map<String, ArrayBlockingQueue<Channel>> pools = new ConcurrentHashMap<>();
private static final int POOL_SIZE = 10;
public Channel acquire(String address) throws InterruptedException {
ArrayBlockingQueue<Channel> pool = pools.computeIfAbsent(
address, k -> createPool(k, POOL_SIZE));
Channel ch = pool.poll(100, TimeUnit.MILLISECONDS);
if (ch == null || !ch.isActive()) {
ch = createChannel(address); // 池里没有可用连接,新建
}
return ch;
}
public void release(String address, Channel ch) {
if (ch.isActive()) {
pools.get(address).offer(ch); // 归还到池
} else {
ch.close(); // 连接已断,直接关闭
}
}
}
心跳保活
长连接需要心跳防止被防火墙或 NAT 断开:
// Client 端:定期发送心跳
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.WRITER_IDLE) {
// 超过 30 秒没发数据,发送心跳包
ctx.writeAndFlush(HEARTBEAT_REQUEST);
} else if (e.state() == IdleState.ALL_IDLE) {
// 超过 60 秒没有任何交互,关闭连接
ctx.close();
}
}
}
}
整体架构
把以上模块组合起来,一个完整 RPC 框架的架构如下:
graph TD
subgraph Consumer[Consumer]
Call[userService.getById 透明调用]
Proxy[Dynamic Proxy JDK/Cglib\n序列化请求 服务发现 负载均衡 发送 等待响应]
ConnPool[Connection Pool + Netty Client]
end
subgraph Provider[Provider]
NettyServer[Netty Server Boss + Worker EventLoopGroup]
Handler[RpcServerHandler 业务线程池 反射调用 序列化响应]
end
Registry[ZooKeeper / Etcd / Nacos\n/rpc/com.example.UserService/providers/...]
Call --> Proxy
Proxy --> ConnPool
ConnPool -->|TCP| NettyServer
NettyServer --> Handler
Provider -->|注册| Registry
Registry -->|发现| Consumer
与 gRPC 的对比
手写框架和 gRPC 的核心差异:
| 特性 | 手写框架 | gRPC |
|---|---|---|
| 传输协议 | TCP + 自定义协议 | HTTP/2 |
| 序列化 | 可插拔(JSON/Kryo/Protobuf) | Protobuf(IDL 强约束) |
| 流式调用 | 需要自行实现 | 原生支持(双向流) |
| 跨语言 | 较复杂 | 天然支持(IDL 生成多语言代码) |
| 服务发现 | 需集成注册中心 | 需集成注册中心 |
| 接口定义 | Java 接口 | .proto 文件 |
HTTP/2 的多路复用让 gRPC 天然解决了 Head-of-Line Blocking 问题,这是自定义 TCP 协议需要额外处理的。
总结
一个 RPC 框架的核心复杂度集中在几个地方:
- 协议设计决定了扩展性和兼容性,魔数 + 版本号 + 长度字段是最小必要集
- 动态代理是透明调用的关键,JDK Proxy 只能代理接口,Cglib 可以代理类
- 异步化是性能的核心,seq + CompletableFuture 的组合是标准做法
- 连接池 + 心跳是生产可用的必要条件,裸连接在高并发下会成为瓶颈
- 幂等性判断是重试安全的前提,不能无脑重试所有失败
理解了这些,再去读 Dubbo 或 gRPC 的源码,会发现它们不过是在这个骨架上做了更多的工程化打磨。