一、背景与痛点
在2017年上半年以前,TalkingData的App Analytics和Game Analytics两个产品,流式框架使用的是自研的td-etl-framework。该框架降低了开发流式任务的复杂度,对于不同的任务只需要实现一个changer链即可,并且支持水平扩展,性能尚可,曾经可以满足业务需求。
但是到了2016年底和2017年上半年,发现这个框架存在以下重要局限:
1. 性能隐患:App
Analytics-etl-adaptor和Game Analytics-etl-adaptor这两个模块相继在节假日出现了严重的性能问题(Full-GC),导致指标计算延迟;
2. 框架的容错机制不足:依赖于保存在Kafka或ZK上的offset,最多只能达到at-least-once,而需要依赖其他服务与存储才能实现exactly-once,并且会产生异常导致重启丢数;
3. 框架的表达能力不足: 不能完整的表达DAG图,对于复杂的流式处理问题需要若干依赖该框架的若干个服务组合在一起才能解决问题;
TalkingData这两款产品主要为各类移动端App和游戏提供数据分析服务,随着近几年业务量不断扩大,需要选择一个性能更强、功能更完善的流式引擎来逐步升级我们的流式服务。调研从2016年底开始,主要是从Flink、Heron、Spark streaming中作选择。
最终,我们选择了Flink,主要基于以下几点考虑:
1. Flink的容错机制完善,支持Exactly-once;
2. Flink已经集成了较丰富的streaming
operator,自定义operator也较为方便,并且可以直接调用API完成stream的split和join,可以完整的表达DAG图;
3. Flink自主实现内存管理而不完全依赖于JVM,可以在一定程度上避免当前的etl-framework的部分服务的Full-GC问题;
4. Flink的window机制可以解决GA中类似于单日游戏时长\游戏次数分布等时间段内某个指标的分布类问题;
5. Flink的理念在当时的流式框架中最为超前: 将批当作流的特例,最终实现批流统一;
二、演进路线
2.1 standalone-cluster (1.1.3->1.1.5->1.3.2)
我们最开始是以standalone cluster的模式部署。从2017年上半年开始,我们逐步把Game Analytics中一些小流量的etl-job迁移到Flink,到4月份时,已经将产品接收各版本SDK数据的etl-job完全迁移至Flink,并整合成了一个job。形成了如下的数据流和stream graph:
图1. Game Analytics-etl-adaptor迁移至Flink后的数据流图
图2. Game Analytics-etl的stream graph
在上面的数据流图中,flink-job通过Dubbo来调用etl-service,从而将访问外部存储的逻辑都抽象到了etl-service中,flink-job则不需考虑复杂的访存逻辑以及在job中自建Cache,这样既完成了服务的共用,又减轻了job自身的GC压力。
此外我们自构建了一个monitor服务,因为当时的1.1.3版本的Flink可提供的监控metric少,而且由于其Kafka-connector使用的是Kafka08的低阶API,Kafka的消费offset并没有提交的ZK上,因此我们需要构建一个monitor来监控Flink的job的活性、瞬时速度、消费淤积等metric,并接入公司owl完成监控告警。
这时候,Flink的standalone
cluster已经承接了来自Game Analytics的所有流量,日均处理消息约10亿条,总吞吐量达到12TB每日。到了暑假的时候,日均日志量上升到了18亿条每天,吞吐量达到了约20TB每日,TPS峰值为3万。
在这个过程中,我们又遇到了Flink的job消费不均衡、在standalone cluster上job的deploy不均衡等问题,而造成线上消费淤积,以及集群无故自动重启而自动重启后job无法成功重启。(我们将在第三章中详细介绍这些问题中的典型表现及当时的解决方案。)
经过一个暑假后,我们认为Flink经受了考验,因此开始将App Analytics的etl-job也迁移到Flink上。形成了如下的数据流图:
图3. App Analytics-etl-adaptor的标准SDK处理工作迁移到Flink后的数据流图
图4. App Analytics-etl-flink job的stream graph
2017年3月开始有大量用户开始迁移至统一的JSON SDK,新版SDK的Kafka
topic的峰值流量从年中的8K/s 上涨至了年底的 3W/s。此时,整个Flink standalone cluster上一共部署了两款产品的4个job,日均吞吐量达到了35TB。
这时遇到了两个非常严重的问题:
1) 同一个standalone
cluster中的job相互抢占资源,而standalone
cluster的模式仅仅只能通过task slot在task
manager的堆内内存上做到资源隔离。同时由于前文提到过的Flink在standalone cluster中deploy job的方式本来就会造成资源分配不均衡,从而会导致App Analytics线流量大时而引起Game Analytics线淤积的问题;
2) 我们的source
operator的并行度等同于所消费Kafka topic的partition数量,而中间做etl的operator的并行度往往会远大于Kafka的partition数量。因此最后的job graph不可能完全被链成一条operator chain,operator之间的数据传输必须通过Flink的network buffer的申请和释放,而1.1.x 版本的network buffer在数据量大的时候很容易在其申请和释放时造成死锁,而导致Flink明明有许多消息要处理,但是大部分线程处于waiting的状态导致业务的大量延迟。
这些问题逼迫着我们不得不将两款产品的job拆分到两个standalone cluster中,并对Flink做一次较大的版本升级,从1.1.3(中间过度到1.1.5)升级成1.3.2。最终升级至1.3.2在18年的Q1完成,1.3.2版本引入了增量式的checkpoint提交并且在性能和稳定性上比1.1.x版本做了巨大的改进。升级之后,Flink集群基本稳定,尽管还有消费不均匀等问题,但是基本可以在业务量增加时通过扩容机器来解决。
2.2 Flink on yarn (1.7.1)
因为standalone cluster的资源隔离做的并不优秀,而且还有deploy job不均衡等问题,加上社区上使用Flink on yarn已经非常成熟,因此我们在18年的Q4就开始计划将Flink的standalone cluster迁移至Flink on yarn上,并且Flink在最近的版本中对于batch的提升较多,我们还规划逐步使用Flink来逐步替换现在的批处理引擎。
图5. Flink on yarn cluster规划
如图5,未来的Flink on
yarn cluster将可以完成流式计算和批处理计算,集群的使用者可以通过一个构建service来完成stream/batch job的构建、优化和提交,job提交后,根据使用者所在的业务团队及服务客户的业务量分发到不同的yarn队列中,此外,集群需要一个完善的监控系统,采集用户的提交记录、各个队列的流量及负载、各个job的运行时指标等等,并接入公司的OWL。
从19年的Q1开始,我们将App Analytics的部分stream job迁移到了Flink on yarn 1.7中,又在19年Q2前完成了App Analytics所有处理统一JSON SDK的流任务迁移。当前的Flink on yarn集群的峰值处理的消息量达到30W/s,日均日志吞吐量达约到50亿条,约60TB。在Flink迁移到on
yarn之后,因为版本的升级性能有所提升,且job之间的资源隔离确实优于standalone cluster。迁移后我们使用Prometheus+Grafana的监控方案,监控更方便和直观。
我们将在后续将Game Analytics的Flink job和日志导出的job也迁移至该on yarn集群,预计可以节约1/4的机器资源。
三、重点问题的描述与解决
在Flink实践的过程中,我们一路上遇到了不少坑,我们挑出其中几个重点坑做简要讲解。
1. 少用静态变量及job
cancel时合理释放资源
在我们实现Flink的operator的function时,一般都可以继承AbstractRichFunction,其已提供生命周期方法open()/close(),所以operator依赖的资源的初始化和释放应该通过重写这些方法执行。当我们初始化一些资源,如spring context、dubbo config时,应该尽可能使用单例对象持有这些资源且(在一个TaskManager中)只初始化1次,同样的,我们在close方法中应当(在一个TaskManager中)只释放一次。
static的变量应该慎重使用,否则很容易引起job cancel而相应的资源没有释放进而导致job重启遇到问题。规避static变量来初始化可以使用org.apache.flink.configuration.Configuration(1.3)或者org.apache.flink.api.java.utils.ParameterTool(1.7)来保存我们的资源配置,然后通过ExecutionEnvironment来存放(Job提交时)和获取这些配置(Job运行时)。
示例代码:
Flink 1.3
设置及注册配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration parameters = new Configuration();
parameters.setString("zkConnects", zkConnects);
parameters.setBoolean("debug", debug);
env.getConfig().setGlobalJobParameters(parameters);
获取配置(在operator的open方法中)
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
Configuration globConf = (Configuration) globalParams;
debug = globConf.getBoolean("debug", false);
String zks = globConf.getString("zkConnects", "");
//.. do more ..
}
Flink 1.7
设置及注册配置
ParameterTool parameters = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
获取配置
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
ParameterTool parameters = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
parameters.getRequired("input");
// .. do more ..
2. NetworkBuffer及operator chain
如前文所述,当Flink的job 的上下游Task(的subTask)分布在不同的TaskManager节点上时(也就是上下游operator没有chained在一起,且相对应的subTask分布在了不同的TaskManager节点上),就需要在operator的数据传递时申请和释放network buffer并通过网络I/O传递数据。
其过程简述如下:上游的operator产生的结果会通过RecordWriter序列化,然后申请BufferPool中的Buffer并将序列化后的结果写入Buffer,此后Buffer会被加入ResultPartition的ResultSubPartition中。ResultSubPartition中的Buffer会通过Netty传输至下一级的operator的InputGate的InputChannel中,同样的,Buffer进入InputChannel前同样需要到下一级operator所在的TaskManager的BufferPool申请,RecordReader读取Buffer并将其中的数据反序列化。BufferPool是有限的,在BufferPool为空时RecordWriter/RecordReader所在的线程会在申请Buffer的过程中wait一段时间,具体原理可以参考:[1], [2]。
简要截图如下:
图6. Flink的网络栈,
其中RP为ResultPartition、RS为ResultSubPartition、IG为InputGate、IC为inputChannel。
在使用Flink 1.1.x和1.3.x版本时,如果我们的network buffer的数量配置的不充足且数据的吞吐量变大的时候,就会遇到如下现象:
图7. 上游operator阻塞在获取network buffer的requestBuffer()方法中
图8. 下游的operator阻塞在等待新数据输入
图9. 下游的operator阻塞在等待新数据输入
我们的工作线程(RecordWriter和RecordReader所在的线程)的大部分时间都花在了向BufferPool申请Buffer上,这时候CPU的使用率会剧烈的抖动,使得Job的消费速度下降,在1.1.x版本中甚至会阻塞很长的一段时间,触发整个job的背压,从而造成较严重的业务延迟。
这时候,我们就需要通过上下游operator的并行度来计算ResultPartition和InputGate中所需要的buffer的个数,以配置充足的taskmanager.network.numberOfBuffers。
图10. 不同的network
buffer对CPU使用率的影响
当配置了充足的network buffer数时,CPU抖动可以减少,Job消费速度有所提高。
在Flink 1.5之后,在其network
stack中引入了基于信用度的流量传输控制(credit-based flow control)机制[2],该机制大限度的避免了在向BufferPool申请Buffer的阻塞现象,我们初步测试1.7的network stack的性能确实比1.3要高。
但这毕竟还不是最优的情况,因为如果借助network buffer来完成上下游的operator的数据传递不可以避免的要经过序列化/反序列化的过程,而且信用度的信息传递有一定的延迟性和开销,而这个过程可以通过将上下游的operator链成一条operator chain而避免。
因此我们在构建我们流任务的执行图时,应该尽可能多的让operator都chain在一起,在Kafka资源允许的情况下可以扩大Kafka的partition而使得source
operator和后继的operator 链在一起,但也不能一味扩大Kafka topic的partition,应根据业务量和机器资源做好取舍。更详细的关于operator的training和task
slot的调优可以参考: [4]。
3. Flink中所选用序列化器的建议
在上一节中我们知道,Flink的分布在不同节点上的Task的数据传输必须经过序列化/反序列化,因此序列化/反序列化也是影响Flink性能的一个重要因素。Flink自有一套类型体系,即Flink有自己的类型描述类(TypeInformation)。Flink希望能够掌握尽可能多的进出operator的数据类型信息,并使用TypeInformation来描述,这样做主要有以下2个原因:
1. 类型信息知道的越多,Flink可以选取更好的序列化方式,并使得Flink对内存的使用更加高效;
2. TypeInformation内部封装了自己的序列化器,可通过createSerializer()获取,这样可以让用户不再操心序列化框架的使用(例如如何将他们自定义的类型注册到序列化框架中,尽管用户的定制化和注册可以提高性能)。
总体上来说,Flink推荐我们在operator间传递的数据是POJOs类型,对于POJOs类型,Flink默认会使用Flink自身的PojoSerializer进行序列化,而对于Flink无法自己描述或推断的数据类型,Flink会将其识别为GenericType,并使用Kryo进行序列化。Flink在处理POJOs时更高效,此外POJOs类型会使得stream的grouping/joining/aggregating等操作变得简单,因为可以使用如:dataSet.keyBy(“username”) 这样的方式直接操作数据流中的数据字段。
除此之外,我们还可以做进一步的优化:
1) 显示调用returns方法,从而触发Flink的Type Hint:
dataStream.flatMap(new MyOperator()).returns(MyClass.class)
returns方法最终会调用TypeExtractor.createTypeInfo(typeClass) ,用以构建我们自定义的类型的TypeInformation。createTypeInfo方法在构建TypeInformation时,如果我们的类型满足POJOs的规则或Flink中其他的基本类型的规则,会尽可能的将我们的类型“翻译”成Flink熟知的类型如POJOs类型或其他基本类型,便于Flink自行使用更高效的序列化方式。
//org.apache.flink.api.java.typeutils.PojoTypeInfo
@Override
@PublicEvolving
@SuppressWarnings("unchecked")
public TypeSerializer<T> createSerializer(ExecutionConfig config) {
if (config.isForceKryoEnabled()) {
return new KryoSerializer<>(getTypeClass(), config);
}
if (config.isForceAvroEnabled()) {
return AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass());
}
return createPojoSerializer(config);
}
对于Flink无法“翻译”的类型,则返回GenericTypeInfo,并使用Kryo序列化:
//org.apache.flink.api.java.typeutils.TypeExtractor
@SuppressWarnings({ "unchecked", "rawtypes" })
private <OUT,IN1,IN2> TypeInformation<OUT> privateGetForClass(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
checkNotNull(clazz);
// 尝试将 clazz转换为 PrimitiveArrayTypeInfo, BasicArrayTypeInfo, ObjectArrayTypeInfo
// BasicTypeInfo, PojoTypeInfo 等,具体源码已省略
//...
//如果上述尝试不成功 , 则return a generic type
return new GenericTypeInfo<OUT>(clazz);
}
2) 注册subtypes: 通过StreamExecutionEnvironment或ExecutionEnvironment的实例的registerType(clazz)方法注册我们的数据类及其子类、其字段的类型。如果Flink对类型知道的越多,性能会更好;
3) 如果还想做进一步的优化,Flink还允许用户注册自己定制的序列化器,手动创建自己类型的TypeInformation,具体可以参考Flink官网:[3];
在我们的实践中,最初为了扩展性,在operator之间传递的数据为JsonNode,但是我们发现性能达不到预期,因此将JsonNode改成了符合POJOs规范的类型,在1.1.x的Flink版本上直接获得了超过30%的性能提升。在我们调用了Flink的Type Hint和env.getConfig().enableForceAvro()后,性能得到进一步提升。这些方法一直沿用到了1.3.x版本。
在升级至1.7.x时,如果使用env.getConfig().enableForceAvro()这个配置,我们的代码会引起校验空字段的异常。因此我们取消了这个配置,并尝试使用Kyro进行序列化,并且注册我们的类型的所有子类到Flink的ExecutionEnvironment中,目前看性能尚可,并优于旧版本使用Avro的性能。但是最佳实践还需要经过比较和压测KryoSerializer\AvroUtils.getAvroUtils().createAvroSerializer\PojoSerializer才能总结出来,大家还是应该根据自己的业务场景和数据类型来合理挑选适合自己的serializer。
4. Standalone模式下job的deploy与资源隔离共享
结合我们之前的使用经验,Flink的standalone
cluster在发布具体的job时,会有一定的随机性。举个例子,如果当前集群总共有2台8核的机器用以部署TaskManager,每台机器上一个TaskManager实例,每个TaskManager的TaskSlot为8,而我们的job的并行度为12,那么就有可能会出现下图的现象:
第一个TaskManager的slot全被占满,而第二个TaskManager只使用了一半的资源!资源严重不平衡,随着job处理的流量加大,一定会造成TM1上的task消费速度慢,而TM2上的task消费速度远高于TM1的task的情况。假设业务量的增长迫使我们不得不扩大job的并行度为24,并且扩容2台性能更高的机器(12核),在新的机器上,我们分别部署slot数为12的TaskManager。经过扩容后,集群的TaskSlot的占用可能会形成下图:
新扩容的配置高的机器并没有去承担更多的Task,老机器的负担仍然比较严重,资源本质上还是不均匀!
除了standalone cluster模式下job的发布策略造成不均衡的情况外,还有资源隔离差的问题。因为我们在一个cluster中往往会部署不止一个job,而这些job在每台机器上都共用JVM,自然会造成资源的竞争。起初,我们为了解决这些问题,采用了如下的解决方法:
1. 将TaskManager的粒度变小,即一台机器部署多个实例,每个实例持有的slot数较少;
2. 将大的业务job隔离到不同的集群上。
这些解决方法增加了实例数和集群数,进而增加了维护成本。因此我们决定要迁移到on
yarn上,目前看Flink on yarn的资源分配和资源隔离确实比standalone模式要优秀一些。
四、总结与展望
Flink在2016年时仅为星星之火,而只用短短两年的时间就成长为了当前最为炙手可热的流处理平台,而且大有统一批与流之势。经过两年的实践,Flink已经证明了它能够承接TalkingData的App Analytics和Game Analytics两个产品的流处理需求。接下来我们会将更复杂的业务和批处理迁移到Flink上,完成集群部署和技术栈的统一,最终实现图5 中Flink on yarn cluster 的规划,以更少的成本来支撑更大的业务量。
参考资料:
[1]
https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks
[2] https://flink.apache.org/2019/06/05/flink-network-stack.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/types_serialization.html#type-hints-in-the-java-api
[4] https://mp.weixin.qq.com/s/XROoLEu38e46PlBAcepaTg
作者简介:
肖强:TalkingData资深工程师,TalkingData统计分析产品App Analytics和Game Analytics技术负责人。硕士毕业于北京航空航天大学,主要从事大数据平台开发,对流式计算和分布式存储有一定研究。