多点 DMALL x StarRocks:实现存储引擎的收敛,保障高查询并发及低延迟要求
本文发表于: &{ new Date(1639497600000).toLocaleDateString() }
作者:任伟,多点生活大数据部门资深研发工程师
多点 DMALL 成立于2015年,是一站式全渠道数字零售解决方案服务商。数字化解构重构零售产业,提供端到端的商业 SaaS 解决方案。目前,多点 DMALL 已与120多家连锁零售商、品牌商等达成合作,覆盖四个国家和地区15000家门店,模式受到广泛验证。
多点大数据部门使用 StarRocks 逐步替代了 Impala、Impala on Kudu、Apache Kylin 等存储引擎,实现了存储引擎的收敛,简化了实时数据处理链路,同时也能保障较高的查询并发以及较低的响应延迟要求。
背景介绍
多点大数据部门为内部业务研发团队、数据分析师、外部用户以及合作伙伴,提供了基础的大数据产品、平台服务,帮助零售企业解决了从基本的数据汇总管理、统一的数据计算应用、到各种场景下对数据的多模式使用的需求,可覆盖零售企业绝大部分数据诉求。
技术层面,多点大数据部门基于 Hadoop 开源技术栈,并进行了部分二次开发后构建起了以下的一个技术架构全景图。从下到上分为基础设施层、数据源层、数据集成层、离线/实时计算层、集市层、分析存储层、数据服务/应用层,数据开发、数据模型中心与运维管理层对各层提供支持。
基础设施层:包括超大带宽的专线网络;公有云、私有云、机房托管的混合云部署;
数据源层:包括企业 OLTP 数据库、业务数据、日志数据、三方接入数据;
数据集成层:DataBus 是多点自研数据同步平台,解决企业内各业务线之间、跨企业组织之间以及跨行业的数据汇聚、融合等问题,将不同系统的数据相互打通,实现数据自由流动;
离线计算层:利用 Hive / Spark 高可扩展的批处理能力承担离线数仓的 ETL 和数据模型加工;
实时计算层:利用 Flink / Spark Streaming 完成实时数据的 ETL(包括维度扩充,多流 Join,实时汇总)等;
离线/实时集市层:使用数仓分层模型构建 ODS(原始数据层)、DWD(数据明细层)、DWS(汇总层)、DIM(维度层)、DWT(主题层)、ADS(应用层),并根据公司业务拆分不同的数据域;
分析存储层:主要依赖 Druid、ClickHouse、Impala on Kudu、Apache Kylin、Elasticsearch、HBase、MySQL、StarRocks 提供 OLAP 查询能力;
数据服务/应用层:该层通过提供 BI 分析产品、数据服务接口、营销、报表类产品,向内部运营人员、外部客户、合作伙伴提供数据分析决策能力。
原有架构痛点
上述架构解决了多点绝大部分数据诉求,在整个架构中,无论是基于 Hive、Spark 的离线计算,基于 Flink、Spark Streaming 的实时计算;基于 HDFS、Kafka 的存储;基于数仓分层模型建设等方案都已基本成熟。但是在 OLAP 领域,无论是多点还是业界仍然处于百家争鸣,各有所长的状态。纵观多点在 OLAP 引擎的探索实践中,遇到了各种各样的问题,总结起来如下:
技术成本
由于上层业务场景复杂,各个场景的技术难点、核心点均不一样。多点生活在整个技术架构升级的过程中先后引入了 HBase、Elasticsearch、Druid、ClickHouse、Impala on Kudu、Apache Kylin 等 OLAP 引擎。但是随着技术栈增多,技术曲线陡峭,没有充足的资源进行多技术栈的维护,造成了比较高的技术成本。
开发成本
多点的数据分析场景大致可以分为 离线 T+1 更新分析场景、实时更新分析场景、固定维度分析场景。
- 离线 T+1 更新的分析场景
例如多点的精细化用户运营平台,其核心的功能是基于用户、消费、行为、设备等属性,提供多维度筛选条件,并通过自定义条件实现用户分层,便于进行精细化用户运营。
针对数据更新为 T+1 的分析场景,原主要使用的分析引擎为 ClickHouse。利用 ClickHouse 构建“大宽表”模型,将事实表与维度表提前进行关联,对外提供单表聚合的 SQL 查询,以及通过构建 DWT 主题宽表,提供 Adhoc 查询;该场景面临的问题是:虽然 ClickHouse 单表查询强悍,但是 Join 能力不强, 需要提前进行关联,将多表关联成单表,会存在额外的开发成本。
- 实时更新分析场景
实时更新场景主要是实时监控经营的各项指标,如当前时间段内的 GMV、下单数量、妥投数量、指标达成、对比、环比等指标。为客户的经营决策提供更具备时效性的参考依据。
针对数据为实时(秒级)更新的场景,原主要使用 Impala on Kudu 引擎,采用 Lambda 架构,基于相同的主键,将流式的预计算的结果数据、批计算的结果数据,基于相同的主键进行 merge。
上述方案中的 Flink AGG 部分,该程序的功能包括窗口内的预计算、多流 Join 等操作。当业务需求变更或者上游数据结构变动的时候,需要升级 Flink AGG 程序,以及离线 ETL 的任务,类似于“烟囱式”的迭代开发,开发效率低下。资源消耗层面,在 Flink 里面做预计算,时间窗口的选取以及内存占用之间也需要平衡。
- 固定维度分析场景
固定维度的分析场景主要针对固化的、标准的业务场景进行分析,多维分析可以对以多维形式组织起来的数据进行上卷、下钻、切片、切块、旋转等各种分析操作,以便剖析数据,使分析者、决策者能从多个角度、多个侧面观察数据仓库中的数据,从而深入了解包含在数据中的信息和内涵。
针对分析维度固定的分析场景,按照业务上常用的分析指标以及维度,此前使用 Apache Kylin 进行 cube 预计算。但是使用 Apache Kylin 也会遇到如下问题:
- 由于多点业务场景涉及的维度比较多,各种类目、营运组织的组合,会导致 cube 膨胀,占用比较多的存储资源;
- 当数据重跑以及新增维度,指标的时候。针对已经在线上运行的 cube 模型,为了保障数据重跑时候服务依然可用,需要新增 cube 模型,并行提供支持,造成存储重复;
- 由于目前使用的 Apache Kylin v3.1.2 是使用 HBase 作为后端存储,row key 顺序设计以及分区键的选择会严重的影响查询性能,对开发不友好。
运维成本
多点作为一站式全渠道数字零售解决方案服务商,可以满足客户不同的接入部署需求。多点大数据产品系统的接入可以大致分为 SaaS 化接入、私有云以及本地化部署。针对私有云、本地化部署的客户,OLAP 引擎易部署、易维护、极简的架构尤其重要,像 HBase、Impala on Kudu、Apache Kylin 等强依赖 Hadoop 生态的 OLAP 引擎,会增加部署的复杂性;ClickHouse 集群不能自动感知集群拓扑变化,也不能自动 balance 数据,会增加缩容、扩容等的维护成本。
选择 StarRocks 的原因
多点大数据部门从2021年年初开始,在调研市面上常用的存储引擎时发现了 StarRocks。StarRocks 架构设计融合了 MPP 数据库,以及分布式系统的设计思想,具备架构精简,支持全面向量化引擎、智能查询优化、高效更新、智能物化视图、标准 SQL、流批一体、高可用易扩展等特性,天然的解决了上述的问题。
使用 StarRocks 的特性解决当前痛点
- 引擎收敛
原有系统的多维分析,高并发查询,预计算,实时分析,Adhoc 查询等场景下使用了多套系统,基本上可以使用一套 StarRocks 解决。多点大数据平台、产品逐步形成以 StarRocks 为主,其他 OLAP 引擎为辅的存储架构,解决维护多套引擎的技术成本问题。
- 使用星型、星座模型替代“大宽表”模型
StarRocks 支持 Broadcast Join、Colocate Join 等分布式 Join 的特性,可以在查询性能可接受的范围内,使用星型、星座模型替代“大宽表”模型,节约提前关联的开发成本,同时针对事实表中历史数据变更,需要重新“跑数”的场景,可以只重跑(OverWrite)部分表的数据,提高整体的“跑数”效率。
- 简化 Lambda 架构中的预聚合部分
StarRocks 支持明细、聚合、更新模型,可以基于 StarRocks 自带预聚合的特性,优化掉现有 Lambda 架构的中的预聚合部分。StarRocks 直接拉取/订阅 Hive 或者 Kafka 中的数据,在 StarRocks 中进行聚合运算;StarRocks 的数据模型是 Aggregate 模型,通过 MAX、SUM、MIN、BITMAP_UNION 等聚合函数在 StarRocks 中进行预聚合。
- 模型持续迭代
针对已在线上运行的模型,如果有需求上的变更,比如增加、删除、变更字段,可以使用 StarRocks 简单 SQL 命令动态地修改表的定义,在表结构变更的过程中,线上的服务不受任何的影响。
- 明细、汇总一体化
在实际的业务场景中,通常存在两种场景并存的分析需求:对固定维度的聚合分析 和对原始明细数据的查询。在这种情况下,StarRocks 支持对原表构建物化视图,数据更新的时候,物化视图跟随原表一起进行更新,保证数据的一致性。当用户查询时,并不感知物化视图的存在,不必显式的指定物化视图的名称,查询优化器可以根据查询条件自动判断是否可以路由到相应的物化视图上。
- 外表能力
StarRocks 支持以外部表的形式,接入其他数据源包括 MySQL、HDFS、Elasticsearch、Hive 等。比如可以使用 StarRocks 建立 Elasticsearch 的外表,为Elasticsearch 提供 SQL 查询的能力。
基于多点报表业务真实场景的性能测试
- 单表聚合查询
在现有的数据 T+1 更新的汇总业务场景中,选取了多点报表业务中的“单品销售分析”场景进行测试,单表单天数据亿级别,上百个维度和分析指标,属于典型的基于“大宽表”的 Adhoc 查询场景。在相同情况(机器配置、数据量、SQL)下进行 ClickHouse 对比 StarRocks 的性能测试:
横坐标:分区(天)数-并发数;纵坐标:响应时长(ms)从查询响应时长来看,单表的聚合查询,ClickHouse 与 StarRocks 的查询响应时长相差不多。
- 多表关联查询
在现有的数据 T+1 更新多表关联的汇总分析业务场景中,选取了现在多点报表业务中的“门店销售分析”场景进行测试,事实表单天数据亿级别,多个维表数据量在十万级别,属于典型的高维分析场景。在相同情况(机器配置、数据量、SQL)下进行 ClickHouse 对比 StarRocks 的性能测试:
横坐标:分区(天)数-并发数;纵坐标:响应时长(ms)
从查询响应时长来看,多表关联聚合查询,StarRocks 的性能要优于 ClickHouse。
- 实时更新读写查询
在现有的数据准实时更新(边写边读)的汇总查询业务场景中,选取了“实时销售分析”场景进行测试,订单数据实时更新,单天数据量亿级别。属于典型的“实时更新,实时查询”场景。在相同情况(机器配置、数据量、SQL)下进行 Impala on Kudu 对比 StarRocks 的性能测试:
横坐标:分区(天)数-并发数;纵坐标:响应时长(ms)。
从查询响应时长来看,在边读边写的情况下,聚合查询的 SQL,StarRocks 的性能要优于 Impala on Kudu。
实践经验
多点目前已经在高维业务指标报表、Adhoc 分析、实时全链路监控等场景中引入了 StarRocks,在使用中总结出以下经验:
集群拆分
由于 StarRocks 极简的架构设计,易于运维部署。我们根据一定的规则,搭建了多套集群,避免业务之间的相互影响。
按照数据更新频率进行拆分
例如数据是 T+1 更新,且单表数据量在百亿级别以上的场景(例如高维业务指标报表、Adhoc 分析),我们构建了离线分析集群。通过提高 StarRocks 的查询并发(parallel_fragment_exec_instance_num)、单节点内存限制(exec_mem_limit)等对复杂查询友好的参数,提高集群的查询性能;
针对数据是准实时更新,写多读多的场景(实时报表、实时全链路监控),我们构建了实时分析集群,通过 调整 StarRocks 的 compaction(cumulative_compaction_num_threads_per_disk、base_compaction_num_threads_per_disk)等对写入友好的参数,加快数据版本合并。
按照业务域进行拆分
多点客户的接入方式不同,且各种 SLA 要求也不同,会按照不同的需求搭建不同的 StarRocks 集群,尽量满足多种客户需求。
调优手段
针对在线服务、系统,为了提高系统整体的查询性能,可以从不同的维度进行优化:
- 优化表结构定义
1、模型选择
StarRocks 的模型包括明细模型、聚合模型、更新模型。如果需要对原始的数据(例如订单流水,原始操作记录等)来进行分析,可以选择明细模型;如果业务方进行的查询为汇总类查询,比如 SUM、COUNT、MAX 等类型的查询,可以选择聚合模型,提前进行预聚合,查询的时候直接获取结果;如果数据需要频繁的进行状态更新(比如订单的状态变更),可以选择更新模型。
2、分区 (parition) 和分桶 (bucket)
StarRocks 可以对表进行分区和分桶,分区在逻辑上把表划分成了多个子表,可以按照时间进行分区;分桶可以按照不同的策略将数据划分为不同的 tablet,分布在不同的 BE 节点上。按照目前多点大数据集群的机器配置(64C+256G+12TB SSD),通常将一个 tablet 保持在200MB~1GB的大小,会有比较好的性能。
3、稀疏索引、bloomfilter、Bitmap Index
为了提高查询的性能,可以对 StarRocks 的表结构额外构建索引。稀疏索引:可以将查询中常见的过滤字段放在 schema 的前面, 区分度越大,频次越高的查询字段越往前放;同时对区分度比较大的列构建 bloomfilter;对区分度不大的列构建 Bitmap Index。
4、物化视图
针对实际查询场景中经常用到的查询 SQL,可以对原始表构建物化视图,其本质为原始表 (base table) 的一个物化索引,通过物化视图提前进行索引排序、指标预计算,查询的时候自动路由到物化视图进行查询。
5、使用 BITMAP / HyperLogLog 数据类型进行去重
在交易场景中进行会计算交易次数,使用常规的方式(COUNT DISTRINCT order_id)去重,其缺点是需要消耗极大的计算和存储资源,对大规模数据集和查询延迟敏感的去重场景支持不够友好。通过定义 BITMAP 的数据类型,可以减少传统 COUNT DISTINCT 去重的执行需要的内存空间、执行时长;而对于像流量统计场景中针对 UV 的计算,在允许有部分统计偏差的前提下,可以定义 HyperLogLog 的数据类型,提高去重效率。
- 优化查询 SQL
1、小表 Join 可以对使用 Broadcast Join
当大表与小表进行 Join 的时候,可以使用 Broadcast Join(StarRocks 针对小表的默认 Join 方式),小表向大表广播的方式进行 Join。该方式可以用于事实表与维度表进行关联查询;
2、大表 Join 可以使用 Colocation Join
当大表与大表进行 Join 的时候,为了加速查询,相关表可以采用共同的分桶列(colocate_with)进行分桶。当分桶列相同,相关表进行 Join 操作时,可以直接在本地进行 Join,再将结果数据进行合并,避免数据在中间计算的时候就在集群中的传输。
3、并行度调整
当机器资源比较充裕时,可以将增加执行并行度( parallel_fragment_exec_instance_num),让更多的执行实例同时处理一组数据扫描,从而提升查询效率。但是并行度设置为较大的数值会消耗更多的机器资源,如 CPU、内存、磁盘 IO,影响整体的 QPS。需要根据实际上的查询场景来设置并行度,一般建议占用机器核数的50%。
4、CBO 优化器
针对复杂 Ad-hoc 场景,可以开启 StarRocks 的基于成本(Cost-based Optimizer,CBO)的查询规划器,在众多查询计划空间中快速找到最优计划,提高查询优化器。
工具集成
为了与目前多点的大数据平台进行打通,对 StartRocks 进行了一些集成封装。
- 数据集成
通过封装 StarRocks 的 Broker Load 以及 Stream Load 接口,与多点的大数据平台打通,实现通过配置的方式将数据从 Hive 批量同步到 StarRocks,或者订阅 MQ 将实时数据同步到 StarRocks。
- 监控预警
通过集成 Prometheus 与 Grafana,与监控平台打通。对多个 StarRocks 集群的运行情况进行监控,当集群的某些指标超过一定阈值的时候进行报警。
总结与展望
多点从2021年上半年开始调研引入 StarRocks,当前已有四个集群在稳定运行提供线上服务,逐步替代了 Impala、Impala on Kudu、Apache Kylin 等存储引擎,实现了存储引擎的收敛,简化了实时数据处理链路,同时也能保障较高的查询并发以及较低的响应延迟要求。目前公司也在越来越多的业务中尝试使用 StarRocks。
在引擎引入以及切换的过程中,得到了 StarRocks 社区的大力支持。后续公司在有余力的情况下会参与 StarRocks 的社区共建,共同打造性能强悍的国产新一代 MPP 数据库。