桔妹导读:HDSF 作为分布式文件系统,常常涉及 DataNode、NameNode、Client 之间的配合、相互调用才能完成完整的流程。为了降低节点之间的耦合性,HDFS 将节点间的调用抽象成不同的接口,其接口主要分为两类:HadoopRPC 接口和基于 TCP 或 HTTP 的流式接口。流式接口主要用于数据传输,HadoopRPC 接口主要用于方法调用。HadoopRPC 框架设计巧妙,本文将结合 hadoop2.7 源码,对 HadoopRPC 做初步剖析。

0.  目录

1. RPC工作原理

2. HadoopRPC架构设计

  • RPC Client解读
  • RPC Server解读

3. 关于并发时的优化

  • 参数配置
  • CallQueue与FairCallQueue
    • 优先级
    • 优先级确定
    • 优先级权重

4. 从一个命令解析

5. 小结

1.   RPC工作原理

RPC(Remote Procedure Call)即远程过程调用,是一种通过网络从远程计算机程序上请求服务的协议。RPC允许本地程序像调用本地方法一样调用远程计算机上的应用程序,其使用常见的网络传输协议(如TCP或UDP)传递RPC请求以及相应信息,使得分布式程序的开发更加容易。

RPC采用客户端/服务器模式,请求程序就是客户端,服务提供程序就是服务器。RPC框架工作原理如图1所示,工作流程依次见图中标号①~⑩,其结构主要包含以下部分:

图1 RPC框架工作原理示例图

  • client functions请求程序,会像调用本地方法一样调用客户端stub程序(如图中①),然后接受stub程序的响应信息(如图中⑩)
  • client stub客户端stub程序,表现得就像本地程序一样,但底层却会调用请求和参数序列化并通过通信模块发送给服务器(如图中②);客户端stub程序也会等待服务器的响应信息(如图中⑨),将响应信息反序列化并返回给请求程序(如图中⑩)
  • sockets网络通信模块,用于传输RPC请求和响应(如图中的③⑧),可以基于TCP或UDP协议
  • server stub服务端stub程序,会接收客户端发送的请求和参数(如图中④)并反序列化,根据调用信息触发对应的服务程序(如图中⑤),然后将服务程序的响应信息(如图⑥),并序列化并发回给客户端(如图中⑦)
  • server functions服务程序,会接收服务端stub程序的调用请求(如图中⑤),执行对应的逻辑并返回执行结果(如图中⑥)

那么要实现RPC框架,基本上要解决三大问题:

  • 函数/方法识别sever functions如何识别client functions请求及参数,并执行函数调用。java 中可利用反射可达到预期目标。
  • 序列化及反序列化如何将请求及参数序列化成网络传输的字节类型,反之还原请求及参数。已有主流的序列化框架如 protobuf、avro 等。
  • 网络通信java 提供网络编程支持如 NIO。

主流的 RPC 框架,除 HadoopRPC 外,还有 gRPC、Thrift、Hessian 等,以及 Dubbo 和 SpringCloud 中的 RPC 模块,在此不再赘述。下文将解读 HDFS 中 HadoopRPC 的实现。

2.    HadoopRPC架构设计

HadoopRPC 实现了图 1 中所示的结构,其实现主要在 org.apache.hadoop.ipc 包下,主要由三个类组成:RPC 类、Client 类和Server 类。HadoopRPC 实现了基于 TCP/IP/Sockets 的网络通信功能。客户端可以通过 Client 类将序列化的请求发送到远程服务器,服务器会通过 Server 类接收客户端的请求。

客户端 Client 在收到请求后,会将请求序列化,然后调用 Client.call() 方法发送请求到到远程服务器。为使 RPC 机制更加健壮,HadoopRPC 允许配置不同的序列化框架如 protobuf。Client 将序列化的请求 rpcRequest 封装成 Writable 类型用于网络传输。具体解析见下节—— RPC Client 解读。

服务端 Server 采用 java NIO 提供的基于 Reactor 设计模式。Sever 接收到一个 RPC Writable 类型请求后,会调用 Server.call() 方法响应这个请求,并返回 Writable 类型作为响应结果。具体解析见下节—— RPC Server 解读。

RPC 类提供一个统一的接口,在客户端可以获取 RPC 协议代理对象,在服务端可以调用 build() 构造 Server 类,并调用 start() 启动 Server 对象监听并响应 RPC 请求。同时,RPC 类提供 setProtocolEngine() 为客户端或服务端适配当前使用的序列化引擎。RPC 的主要两大接口如下:

public static ProtocolProxy getProxy/waitForProxy(…):构造一个客户端代理对象(该对象实现了某个协议),用于向服务器发送RPC请求。
public static Server RPC.Builder(Configuration).build():为某个协议(实际上是Java接口)实例构造一个服务器对象,用于处理客户端发送的请求

那么,如何使用HadoopRPC呢?只需按如下4个步骤:

1. 定义RPC协议

RPC协议是客户端和服务器端之间的通信接口,它定义了服务器端对外提供的服务接口。如ClientProtocol定义了HDFS客户端与NameNode的通信接口, ClientDatanodeProtocol定义了HDFS客户端与DataNode的通信接口等。

2. 实现RPC协议

对接口的实现,将会调用Server端的接口的实现。

3. 构造并启动RPC Server

构造Server并监听请求。可使用静态类Builder构造一个RPC Server,并调用函数start()启动该Server,如:

RPC.Server server = new RPC.Builder(conf).setProtocol(MyProxyProtocol.class)
.setInstance(new MyProxy())
.setBindAddress(HOST)
.setNumHandlers(2)
.setPort(PORT)
.build();
server.start();

4. 构造RPC Client并发送请求

构造客户端代理对象,当有请求时客户端将通过动态代理,调用代理方法进行后续实现,如:

MyProxyProtocol proxy = RPC.getProxy(MyProxyProtocol.class,        MyProxyProtocol.versionID,        new InetSocketAddress(HOST, PORT), conf);XXX result = proxy.fun(args);
RPC Client解读

在 IPC(Inter-Process Communication)发生之前,客户端需要通过 RPC 提供的 getProxy 或 waitForProxy 获得代理对象,以 getProxy 的具体实现为例。RPC.getProxy 直接调用了 RPC.getProtocolProxy 方法,getProtocolProxy 方法如下:

public static ProtocolProxy getProtocolProxy(…) throws IOException {

return getProtocolEngine(protocol, conf).getProxy(…);
}

RPC 类提供了 getProtocolEngine 类方法用于适配 RPC 框架当前使用的序列化引擎,hadoop 本身实现了 Protobuf 和 Writable 序列化的RpcEngine 。以WritableRPCEngine 为例,getProxy(…) 实现如下:

public ProtocolProxy getProxy(…) throws IOException {

// 这里调用到原生的代理
T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new WritableRpcEngine.Invoker(protocol, addr, ticket, conf,
factory, rpcTimeout, fallbackToSimpleAuth));
return new ProtocolProxy(protocol, proxy, true);
}

上述使用动态代理模式,Proxy 实例化时 newProxyInstance 传进去的 InvocationHandler 的实现类是 WritableRpcEngine 的内部类 Invoker。 当 proxy 调用方法时,会代理到 WritableRpcEngine.Invoker 中的 invoke 方法,其代码如下:

private static class Invoker implements RpcInvocationHandler {
….

// 构造器
public Invoker(…) throws IOException {

this.client = CLIENTS.getClient(conf, factory);

}

// 执行的invoke方法
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

ObjectWritable value;
try {
value = (ObjectWritable)
client.call(RPC.RpcKind.RPC_WRITABLE, new WritableRpcEngine.Invocation(method, args), remoteId, fallbackToSimpleAuth);
} finally {
if (traceScope != null) traceScope.close();
}

return value.get();
}

}

在 invoke 方法中,调用了 Client 类的 call 方法,并得到 RPC 请求的返回结果。其中 new WritableRpcEngine.Invocation(method, args) 实现了 Writable 接口,这里的作用是将 method 和 args 进行序列化成 Writable 传输类型。Client 类中的 call 方法如下:

public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId, int serviceClass, AtomicBoolean fallbackToSimpleAuth) throws IOException {
final Call call = createCall(rpcKind, rpcRequest);
Connection connection = getConnection(remoteId, call, serviceClass, fallbackToSimpleAuth);
try {
// 将远程调用信息发送给server端
connection.sendRpcRequest(call); // send the rpc request
} catch (RejectedExecutionException e) {
throw new IOException(“connection has been closed”, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}

synchronized (call) {
// 判断call是否完成,等待server端notify
while (!call.done) {
try {
// 当前线程blocking住,
// 等待Connection线程中receiveRpcResponse调用call.notify
call.wait(); // wait for the result
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new InterruptedIOException(“Call interrupted”);
}
}

if (call.error != null) {
if (call.error instanceof RemoteException) {
call.error.fillInStackTrace();
throw call.error;
} else { // local exception
InetSocketAddress address = connection.getRemoteAddress();
throw NetUtils.wrapException(address.getHostName(),
address.getPort(),
NetUtils.getHostname(),
0,
call.error);
}
} else {
// 得到server结果
return call.getRpcResponse();
}
}
}

以上代码展现了 call() 方法作为代理方法的整个流程。从整体来讲,客户端发送请求和接收请求在两个独立的线程中完成,发送线程调用 Client.call() 线程,而接收响应则是 call() 启动的 Connection 线程( getConnection 方法中,由于篇幅原因不再展示)。

那么二者如何同步 Server 的响应信息呢?内部类 Call 对象在此起到巧妙地同步作用。当线程1调用 Client.call() 方法发送 RPC 请求到 Server,会在请求对应的 Call 对象上调用 Call.wait() 方法等待 Server 响应信息;当线程2接收到 Server 响应信息后,将响应信息保存在 Call.rpcResponse 字段中,然后调用 Call.notify() 唤醒线程1。线程1被唤醒从 Call 中取出响应信息并返回。整个流程如图2所示,分析如下。

  • 在 call 方法中先将远程调用信息封装成一个 Client.Call 对象(保存了完成标志、返回信息、异常信息等),然后得到 connection 对象用于管理 Client 与 Server 的 Socket 连接。
  • getConnection 方法中通过 setupIOstreams 建立与 Server 的 socket 连接,启动 Connection 线程,监听 socket 读取 server 响应。
  • call() 方法发送 RCP 请求。
  • call() 方法调用 Call.wait() 在 Call 对象上等待 Server 响应信息。
  • Connection 线程收到响应信息设置 Call 对象返回信息字段,并调用 Call.notify() 唤醒 call() 方法线程读取 Call 对象返回值。

图2 RPC Client工作流程

RPC Server 解读

Server部分主要负责读取请求并将其反序列化,然后处理请求并将响应序列化,最后返回响应。为了提高性能,Server 采用 NIO Reactor 设计模式。服务器只有在指定 IO 事件发生时才会执行对应业务逻辑,避免 IO 上无谓的阻塞。首先看一下 Server 类的内部结构,如图3所示,其中有4个内部类主要线程类:Listener、Reader、Hander、Resonder。

图3 Server类内部结构关系

Server将各个部分的处理如请求读取、处理逻辑等开辟各自的线程。整个 Server 处理流程如图4所示。

图4 RPC Server处理流程

    Server 处理流程解读如下:

  1. 整个 Server 只有一个 Listener 线程,Listener 对象中的 Selector 对象 acceptorSelector 负责监听来自客户端的 Socket 连接请求。acceptorSelector 在ServerSocketChannel 上注册 OP_ACCEPT 事件,等待客户端 Client.call() 中的 getConnection 触发该事件唤醒 Listener 线程,创建新的 SocketChannel 并创建 readers 线程池;Listener 会在 reader 线程池中选取一个线程,并在 Reader 的 readerSelector 上注册 OP_READ 事件。
  2. readerSelector 监听 OP_READ 事件,当客户端发送 RPC 请求,触发 readerSelector 唤醒 Reader 线程;Reader 线程从 SocketChannel 中读取数据封装成 Call 对象,然后放入共享队列 callQueue。
  3. 最初,handlers 线程池都在 callQueue 上阻塞(BlockingQueue.take()),当有 Call 对象加入,其中一个 Handler 线程被唤醒。根据 Call 对象上的信息,调用 Server.call() 方法(类似 Client.call() ),反序列化并执行 RPC 请求对应的本地函数,最后将响应返回写入 SocketChannel。
  4. Responder 线程起着缓冲作用。当有大量响应或网络不佳时,Handler 不能将完整的响应返回客户端,会在 Responder 的 respondSelector 上注册 OP_WRITE 事件,当监听到写条件时,会唤醒 Responder 返回响应。

整个 HadoopRPC 工作流程如图5所示。其中,动态代理与反射执行目标方法贯穿整个 Client 与 Server,Server 整体又采用 NIO Reactor 模式,使得整个 HadoopRPC 更加健壮。

图5 HadoopRPC整体工作流程

3.  关于并发时的优化

参与配置

Server 端仅存在一个 Listener 线程和 Responder 线程,而 Reader 线程和 Handler 线程却有多个,那个如何配置 Reader 与 Handler 线程个数呢?HadoopRPC 对外提供参数配置,使用常见的配置方式即在 etc/hadoop 下配置 xml 属性:

  • ipc.server.read.threadpool.size:Reader线程数,默认1
  • dfs.namenode.service.handler.count:Handler线程数,默认10
  • ipc.server.handler.queue.size:每个 Handler 处理的最大 Call 队列长度,默认100。结合 Handler 线程数,则默认可处理的 callQueue 最大长度为 10*1000=1000
CallQueue 与 FairCallQueue

共享队列 CallQueue 以先进先出(FIFO)方式提供请求,如果 99% 的请求来自一个用户,则 99% 的时间将会为一个用户服务。因此,恶意用户便可以通过每秒发出许多请求来影响 NameNode 性能。为了防止某个用户的 cleint 的大量请求导致 NameNode 无法响应,HadoopRPC 引入 FairCallQueue 来替代共享队列 CallQueue,请求多的用户将会被请求降级处理。CallQueue 和 FairCallQueue 对比图如图6、图7所示。

图6 CallQueue示例图

图7 FairCallQueue示例图

启用 FairCallQueue,同样是在配置文件中修改 Queue 的实现 callqueue.impl。其中,FairCallQueue 引入了优先级机制,具体分析如下。

优先级

共享队列 callQueue 导致 RPC 拥塞,主要原因是将 Call 对象放在一起处理。FairCallQueue 首先改进的是划分出优先级关系,每个优先级对应一个队列,比如 Queue0,Queue1,Queue2 …,然后定义一个规则,数字越小的,优先级越高。

优先级确定

如何确定哪些请求该放到哪些优先级队列中呢?比较智能的做法是根据用户的请求频率确定优先级。频率越高,分到优先级越低的队列。比如,在相同时限内,A用户请求50次,B用户请求5次,则B用户将放入优先级较高的队列。这就涉及到在一定时限内统计用户请求频率,FairCallQueue 进入了一种频率衰减算法,前面时段内的计数结果通过衰减因子在下一轮的计算中,占比逐步衰减,这种做法比完全清零统计要平滑得多。相关代码如下:

/**
* The decay RPC scheduler counts incoming requests in a map, then
* decays the counts at a fixed time interval. The scheduler is optimized
* for large periods (on the order of seconds), as it offloads work to the
* decay sweep.
*/
public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean {…}

从注释可知,衰减调度将对请求进行间隔几秒钟的计数统计,用于平滑计数。

优先级权重

为了防止低优先级队列“饥饿”,用轮询的方式从各个队列中取出一定的批次请求,再针对各个队列设置一个理论比重。FairCallQueue 采用加权轮询算法,相关代码及注释如下:

/**
* Determines which queue to start reading from, occasionally drawing from
* low-priority queues in order to prevent starvation. Given the pull pattern
* [9, 4, 1] for 3 queues:
*
* The cycle is (a minimum of) 9+4+1=14 reads.
* Queue 0 is read (at least) 9 times
* Queue 1 is read (at least) 4 times
* Queue 2 is read (at least) 1 time
* Repeat
*
* There may be more reads than the minimum due to race conditions. This is
* allowed by design for performance reasons.
*/
public class WeightedRoundRobinMultiplexer implements RpcMultiplexer {…}

从注释可知,若 Q0、Q1、Q2 的比重为 9:4:1,理想情况下在 15 次请求中,Q0 队列处理 9 次请求,Q1 队列处理 4 次请求,Q2 队列处理 1 次请求。

4.  从一个命令解析

接下来将从常见的一条命令解读 HadoopRPC 在 HDFS 中的应用:

hadoop fs -mkdir /user/test

首先看一下 hadoop 目录结构:

hadoop
├── bin 脚本命令核心
├── etc 配置
├── include C头文件等
├── lib 依赖
├── libexec shell配置
├── logs 日志
├── sbin 启停服务
├── share 编译打包文件

其中 hadoop 即为 bin 目录下的 hadoop 脚本,找到相关脚本:

case $COMMAND in

#core commands
*)
# the core commands
if [ “$COMMAND” = “fs” ] ; then
CLASS=org.apache.hadoop.fs.FsShell

export CLASSPATH=$CLASSPATH
exec “$JAVA” $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS “$@”

由脚本可知,最终执行了 java -OPT xxx org.apache.hadoop.fs.FsShell -mkdir /user/test ,转换为最熟悉的 java 类调用。

进入 org.apache.hadoop.fs.FsShell 类的 main 方法中,调用 ToolRunner.run(),并由FsShell.run() 根据参数“-mkdir”解析出对应的 Command 对象。最后由ClientProtocol.mkdirs() 发送RPC请求,向NameNode请求创建文件夹。相关代码如下:}

rpcProxy.mkdirs() 过程则 HadoopRPC 完成。

4.   小结

HadoopRPC 是优秀的、高性能的 RPC 框架,不管是设计模式,还是其他细节技巧都值得开发者学习。

本文作者

王 洪 兵

滴滴出行 | 软件开发工程师

2018年毕业加入滴滴,任职于大数据架构部,对调度系统、大数据底层原理有一定的研究。热爱技术,也热爱旅行

Comments are closed.