Databricks在ACM Sigmod2022发表了关于Photon的论文,关注大数据或者Apache Spark的同学可能都对于Photon有所耳闻,它是Databricks的闭源商业化版本的Spark内核引擎的核心部分,Databricks一直把它作为商业化版本的核心卖点之一宣传,但是这些商业宣传对于技术细节大都浮光掠影,没有太多信息,现在论文发出来了,我们终于可以通过论文了解到更多细节。本篇文章希望通过阅读论文,介绍Photon的工作机制和原理,以及基于此的一些认识和浅见。

背景

开源大数据领域一直是JVM语言的天下,从早期的MapReduce/HDFS, HBase,Hive,到Spark,Flink,Presto,再到近两年的Iceberg,Hudi等项目,无不是基于JVM语言进行核心模块的开发,最多是在接口层提供对于其他语言的支持。JVM语言有易用,安全等优点,同时也有执行效率低(相对地)的缺点,这对于计算引擎类的对计算效率要求高的项目尤其难以容忍,大数据方向对于JVM语言执行效率的改造就我所见,有两拨大的浪潮:第一波是对JVM内存管理低效问题的改造,比如Spark/Flink/Presto等项目对于引擎处理的格式化数据,不使用JVM管理的Java对象,而是转化为binary数据,通过项目自己设计的binary内存管理机制进行管理,典型的示例是开源Spark的Project Tungsten项目,大家有兴趣深入了解的可以读下我在15年写的一篇相关的文章:脱离JVM? Hadoop生态圈的挣扎与演化。第二波浪潮就是当前计算引擎Native化,比如针对Spark,国内有阿里云EMR的Native CodeGen和 Intel的OAP Gazelle项目,国外有Databricks的Photon和一个学术研究项目Flare,以及针对Presto的Facebook主导的Velox项目等等。此外,随着Apache Arrow项目的持续发展,也逐渐衍生出了基于Arrow列式内存存储的计算生态,比如Gandiva和DataFusion子项目。

和国内云数仓主要由公有云厂商掌握不同,国外的的云数仓方向,第三方独立厂商占有了很大一部分市场,Snowflake和DataBricks就是典型代表。这两年从行业新闻也能看出,Databricks公司引入了大量高阶SQL引擎人才,不仅希望统治数据预处理的市场(主要包括ETL,特征工程等),也更进一步和SnowFlake和云厂商等争夺数据分析(BI报表,交互式探查,数据服务等)的市场。DataBricks在公有云上从Photon引擎,DeltaLake存储格式,本地缓存到Auto Data Clustering等技术,包装成了一个完整的LakeHouse湖仓一体技术方案。Photon是DataBricks的LakeHouse方案的核心,非常值得我们研究和分析。这篇论文的作者是Photon团队,第一作者Alexander Behm是Photon团队的Leader,之前是在Cloudera负责Impala的内核开发,有非常丰富的C++ SQL Engine的开发经验。

Photon论文解读

为什么用C++语言?

数据湖和数据仓库的割裂带了的数据冗余和组件架构的复杂性,通过湖仓一体的方案解决这个问题要求查询引擎提供和专业的数仓一样高效的查询效率,而当前基于JVM的Spark内核实现已经出现诸多瓶颈,这主要体现在:

CPU已经成为SQL执行效率的瓶颈,这是因为:
Local NVMe SDD的广泛采用(DataBricks在公有云上机器规格大都包含NVMe SDD存储,会被用作云存储文件的本地缓存)。
LakeHouse架构下数据的组织分布优化使得DataSkipping的效率大大提升。
许多新的业务场景是重CPU的查询语句,比如String/JSON格式数据的处理。

在优化方面,低垂的果子都已经被摘完了,想要进一步优化,比如提升算子的执行效率,基于当前的JVM内核引擎越来越难做,JVM语言对于底层控制力太弱,比如Memory Pipeline和SIMD指令集的使用用户完全是无法控制的。
JVM GC的影响,在超过64GB时,对性能有较大的负面影响。
Spark Java内核的Runtime CodeGen受限于Java Compiler有诸多限制,比如Method Size等,使得线上查询经常fallback到火山模型执行,效率很低。
所以Photon基于C++语言实现了一个完全兼容Spark的SQL Engine,它的整个设计除了提升性能外包含两个主要目标:

要很好的支持针对原始数据的处理,即针对那些未经数仓规范化处理的数据,也能够较为高效的处理,论文中具体说的是针对这种数据要提供"good performance",而针对Parquet/ORC这种规范化治理后的数据要提供更进一步的"excellent performance"。
要完全和Spark兼容,包括API,UDF等等,对用户完全透明,DataBricks Runtime作为商业化产品肯定要做到这一点,这个在很多方面还是有很多挑战的,这部分后面会详细介绍。

CodeGen vs. Vectorization

火山模型(Volcano Iterator Model)是SQL引擎最常见的实现方式,特别容易抽象和实现,可以通过表达式和算子的组合灵活表达SQL逻辑,但是火山模型缺点也十分明显,整个pipeline过多的虚函数调用使得CPU大量中断,只有很少的时间是真正花在SQL的计算逻辑上。Runtime CodeGen和Vectorized Execution都是SQL Engine计算模型领域常见的优化技术,在各个不同的项目中也都有非常成功的案例,比如开源Spark的WholeStageCodeGen,ClickHouse的Vectorized算子实现。这两者的区别,简单可以理解为,对于Vectorized Execution,实现的算子每次调用处理一批数据,而不是火山模型中的一条数据,从而减少每条数据分担的虚函数调用代价,同时在算子实现内部尽量充分利用SIMD的指令高效并发计算。而对于CodeGen,在运行时把算子的Pipeline都通过CodeGen技术生成在一个for-loop中,虽然还是每次处理一条数据,但是完全避免了不必要的虚函数调用。在Photon中是继续采用CodeGen还是转为Vectorization,他们其实在原型阶段两种方式都尝试了,比对后最终选择了后者,主要的原因如下:

容易开发和维护。这个其实很好理解,CodeGen的代码都是Runtime生成的,整个开发的逻辑多绕了一层,可读性很差,debug非常不方便,stacktrace信息也很难获取,所以整个代码开发和维护的效率很低。
整个Engine可观测性更好,因为Vectorization模型的每个算子依然是独立的,通过Pipeline串起来,所以可以非常方便准确观察到每个算子之前和之后的数据及状态,从而更方便分析、定位各种问题。而CodeGen把整个Pipeline的算子都生成到同一个loop代码块中,有一个算子的实现有问题,也很难比较直观观察到。
对于像Spark AQE这样的在Runtime动态调整执行计划的功能,显然和Volcano+Vectorization模型更加匹配,和CodeGen打通的代价太高了。
CodeGen确实在比如复杂表达式等场景下表现更好,Photon通过实现一些特定的算子可以支持比较常见的表达式组合从而能在大部分复杂表达式场景下达到和CodeGen类型的性能。
CodeGen VS Vectorization其实也是学术界研究了多年的问题,对此问题感兴趣的同学特别推荐看看这篇论文:Everything You Always Wanted to Know About Compiled and Vectorized Queries But Were Afraid to Ask,这篇论文在其他条件相同的情况下仔细对比了CodeGen和Vectorization,详情参考论文,结论如下:

codegen适合计算密集型场景,数据可以都在cpu register里面,更少的指令加速效果明显。
向量化则适合在访存密集型场景,避免memory stall,例如aggregation和hash join。
SIMD理论上可以数倍加速,但是实际TPC-H评测,一些请求大部分受限于memory bound,所以效果并不十分明显。
两种实现都可以很好的在多核机器上并行化执行。

Row vs. Column-Oriented Execution

开源的Spark尽管有ColumnarBatch和InternalRow两种数据处理的单元,但是实际上在引擎的算子层面主要是面向InternalRow处理的,即一次处理一条数据,ColumnarBatch主要是用在Parquet等列式存储的TableScan层,还是需要经过ColumnarBatch到InternalRow的Pivot转换才能喂到计算的算子中。Photon采用了Vectorization计算模型,很自然的也就从Row-Oriented Execution切换到了Column-Oriented Execution,即算子处理的单元是一个Batch,Batch内部是采用列式的方式存储的。相比于Row-Oriented Execution,Column-Oriented Execution的主要好处是数据按照列组织有更多机会可以使用SIMD指令集提升查询效率,在一个紧密的loop中循环也更容易利用上CPU cache和pipeline机制。如果原始文件的存储格式是Parquet这样的列存格式,从数据读取到算子执行,可以全部基于Column-Oriented Execution模型,避免开源Spark从ColumnarBatch到InternalRow的转换代价。更进一步,针对低基数字段,Photon还可以使用字典编码大幅减少ColumnarBatch的内存占用。

Photon做了哪些事情?

Photon实际上最主要的工作是用C++实现了Spark所有的Vectorized版本的物理算子及表达式,在逻辑执行计划以上的API、Analyzer、Resolver、Optimizer等部分都没有任何变化,主要是在逻辑执行计划到物理执行计划的部分,从原来的翻译成火山模型的算子Pipeline或者WholeStageCodeGen变成使用C++ Vectorized版本的物理算子。当然,这其中除了算子本身,还有不少的衔接部分的工作,比如任务调度,内存管理,UDF支持等等。

Photon实现了一个类似Arrow的Column Batch存储结构,用于存储Photon算子的输入和输出数据,Photon算子和表达式的实现部分直接使用SIMD intrinsics,但是主要还是以来编译器进行向量化指令优化,我理解这里的原因可能更多的是为了保证代码的可读性,而且C++的代码结合Hint可以比较确定的进行向量化指令优化。

Photon如何与Spark兼容

Catalyst optimizer增加了一个规则用于将物理执行计划中的原生的Node转换为Photon Node,如果该算子Photon还不支持,则添加一个tansition node位于两者之间,用于将数据从ColumnBatch转化为Row。Scan Node会使用原生的Java Node,通过Adpator Node衔接Photon Node,Scan Node会读取数据在offheap内存上列式存储,Scan Node和Photon Node之间是通过Adaptor Node在JVM和C++之间通过JNI传递引用地址实现Zero-Copy数据传递。Task的调度分发和Spark一样,在Task内部执行时,JVM内的task通过PB格式信息传递到Photon的C++库,反序列化后解析执行,对于Shuffle节点,由于定制优化的shuffle encode逻辑,photon的shuffle节点和原生的shuffle节点并不兼容。

由于Photon加上其对应的JVM executor属于同一个内存等资源的分配粒度,所以Photon hack了Executor的内存管理器,简单来说Photon的算子和Java的原生算子一样,可以申请向内存管理器申请内存资源,如果不足,由内存管理器决定是否需要spill其他Photon的算子和Java的原生算子的内存,从而保证了Executor的内存管理器对于JVM和Photon全局统一管理。

photon在任务并发方面和原生Spark完全一致,基于InputSplit级别的数据切分,每个task处理一个InputSplit的数据,task在executor进程中以线程粒度并发执行,和比较新的Morsel-Driven Parallelism模式相比,photon没有过多的考虑NUMA-aware和Cache locality。

Databricks的Lakehouse架构

Databricks的湖仓一体架构主要由三个部分组成:

数据湖存储,美国的公有云市场云存储已经基本一统天下,所以Databricks Runtime主要使用S3、GCS,ADLS等云存储系统作为数据胡存储,完全的存储和计算分离的模式,用户的数据存储在自己的账号下。
弹性计算,在公有云上,Cluster模式部署管理,VM粒度弹性扩缩容,Photon是计算层DataBricks Runtime和开源Spark的主要区别。
Automatic Data Management,主要负责表数据的存储组织优化。由于论文的重点是Photon,所以并没有详细介绍Automatic Data Management,但是这块其实是Databricks湖仓一体非常核心的部分,从论文和Databricks的一些文档中透露出的信息来看:
支持数据文件的Optimization,包括小文件合并,排序等,也支持多字段的Z-Order、Hibert曲线的排序算法。
支持BloomFilter索引。
支持智能的本地缓存管理,而且不仅是文件级别,还支持带有过滤条件的本地缓存,类似于在本地缓存过滤预计算后的结果。

如何更好支持原始数据

湖仓一体意味着Photon不仅要处理规范化后的干净的仓内的数据,还需要处理凌乱原始的湖里的数据,这种湖数据的特点包括:不规范的数据格式,比如int类型的数据但是使用String类型存储;缺少statistic信息;未合理组织排序的文件;用placeholder代替Null值等等。针对于这种类型数据,由于Photon采用了Vectorization模型而不是CodeGen,所以可以比较灵活地在运行时收集数据的信息,并根据具体数据情况动态调整具体的执行计划,Photon可以通过Adaptive Execution来在很多场景下大幅提升性能。

比如String类型数据一般使用UTF-8编码,如果实际数据字符都在ASCII字符集中,那么Photon会实际使用ASCII字符集编码,从而减少内存占用,字符操作效率也会有提升。
Photon会运行时收集并设置Batch是否包含Null值及Inactive值,Photon的每个算子都会根据这两个变量选择更优的执行分支。
Adaptive的shuffle encode策略,根据数据的实际情况选择合适的encode方式,从而减少shuffle的数据传输。

Photon的性能测试结果

HashJoin的性能提升了3倍,Upper()表达式(使用SIMD ASCII编码)的性能提升了3倍,聚合性能提升5倍,写Parquet文件性能提升2倍,TPC-H benchmark(sf=3000)性能提升了4倍。论文中还提到Databricks的实际客户在使用Photon版本后性能平均提升3倍以上,整体的效果还是非常显著的。为什么可以做到这样?我们以TPC-H benchmark为例详细分析下,论文里提到测试用的是8台i3.2xlarge机器,AWS的i3.2xlarge机型有8核64G内存以及2TB NVMe SSD,测试是一轮warmup加上三轮正式测试,TPC-H sf=3000的数据量使用Parquet格式存储应该在2TB左右,我猜测第一轮warmup会把数据缓存到本地SSD,且文件的存储组织已经经过优化,文件扫描的磁盘IO不太会成为瓶颈,在计算层算子的执行效率才会成为影响性能的关键因素。论文中具体分析各个查询的性能提升时也充分说明了这一点,主要的性能提升来自于Photon在Join、Aggregation、表达式计算方面充分利用SIMD指令集重构设计的数据结构(如hashmap)和实现方式。

总结

Photon加上DletaLake构成了DataBricks湖仓一体的核心,之前TPC-DS的测试结果也从某种程度上证明了,开放的计算引擎和开放的数据格式在性能上确实可以达到专门的OLAP数仓水平,Photon项目无疑是比较成功的,这给我们的启示包括:

Vectorization vs. CodeGen,从开发难易,代码可维护性,性能等方面综合考虑,Vectorization似乎是一个更好的选择,尤其是用Native语言实现的情况下。
Photon能够大幅提升性能的前提是CPU计算成为瓶颈,而不是磁盘IO,这个需要在湖仓一体架构下对数据自动的进行存储组织优化,使得SQL查询只扫描必要数据文件,且高效的本地缓存必不可少。
从0开始开发一个Photon类似的项目肯定是一项巨大的工程,且需要一流的内核工程团队,对于普通的互联网公司的大数据平台,想要通过Native化实现降本增效,利用已有的开源项目可能是一个更加现实的做法。具体来说,比如Presto可以通过Velox项目提升性能,Spark可以考虑通过集成Apache Arrow的计算子模块,但是新的语言、工具、技术栈肯定也都会给大数据平台的部署、运维、开发、troubleshooting带来新的挑战。此外,如果想要充分发挥Native向量化引擎的能力,磁盘IO不能成为查询的最主要的瓶颈,可能需要考虑通过Iceberg/Hudi/DeltaLake这类的表存储格式,对于数据存储组织进行优化,并结合索引和本地缓存等能力。如果单纯只是将Spark内核进行向量化升级,可能很难取得论文中的性能提升效果。

参考
https://www-cs.stanford.edu/~matei/papers/2022/sigmod_photon.pdf
https://www.vldb.org/pvldb/vol11/p2209-kersten.pdf
https://15721.courses.cs.cmu.edu