您的浏览器禁用了JavaScript(一种计算机语言,用以实现您与网页的交互),请解除该禁用,或者联系我们。[DataFunSummit2023:数据湖架构峰会]:基于 Hudi Spark SQL 调度的近实时计算探索 - 发现报告

基于 Hudi Spark SQL 调度的近实时计算探索

AI智能总结
查看更多
基于 Hudi   Spark SQL   调度的近实时计算探索

基于Hudi+ SparkSQL +调度的近实时计算探索 演讲人:苏承祥—涂鸦智能—数据平台开发 1.什么是近实时计算2.Spark SQL如何对Hudi表进行增量读、快照读3.涂鸦智能在Hudi+ SparkSQL+调度的近实时计算落地实践4.近实时计算中宽表建设5.未来展望 什么是近实时计算 涂鸦数据计算简介 时延分类 通常任务被调度的频率越高,成本也就越高。 随着数据湖技术的快速发展迭代,利用数据湖技术,数仓新增了一种近实时的时延计算,该技术能够在分钟级别的时延下对数仓数据进行增量计算。 近实时计算 5-30分钟的延迟 周期&常驻 折中的成本方案 Spark SQL如何对Hudi表进行增量读、快照读 HudiCall命令 Call命令是在0.11.0版本中新增的,Call命令无Spark版本限制,用户可以使用SparkSQL执行Call命令来进行表的一系列运维操作。 比如:run_compaction、run_clustering、run_clean、create_savepoint、show_commits Call增量读命令 copy_to_table copy_to_temp_view 创建临时视图并使用 # read snapshot data fromhuditablecallcopy_to_temp_view(table=>'hudi_table',view_name=>'table_view',query_type=>'snapshot',as_of_instant=>'20230311000000000')select*fromtable_view # read incremental data fromhuditablecallcopy_to_temp_view(table=>'hudi_table',view_name=>'table_view',query_type=>'incremental',begin_instance_time=>'20230311000000000',end_instance_time=>'20230311001000000')select*fromtable_view # readread_optimizeddata fromhuditablecallcopy_to_temp_view(table=>'hudi_table',view_name=>'table_view',query_type=>'read_optimized')select*fromtable_view 涂鸦智能在Hudi+ SparkSQL +调度的近实时计算落地实践 涂鸦近实时计算架构图 ods_real层 调度系统时间分类 •调度时间:调度系统根据任务配置的执行时间生成,不会随着任务的延迟执行而变化 •业务时间:调度系统根据调度时间进行日期加减,一般默认为调度时间减1天 •执行时间:任务实际开始执行的时间 调度时间增量计算 copy_to_temp_view(table=>'T',view_name=>'view',query_type=>'incremental',begin_instance_time=>'i',end_instance_time=>'i+15')select*fromview dw层计算方式 整条链路很轻量级,全部为增量计算。 近实时计算中宽表建设 宽表建设方案 实时方案 流表+流表:Regular Join、Interval Join 流表+维表:Temporal Join 弊端:窗口过大导致内存打爆、窗口过小导致数据丢失、缓存维表导致数据不一致 离线方案 两张表的全量数据直接Join 弊端:在两张大表做Join时任务易失败且执行时间过久导致下游计算结果延迟 部分列更新payload org.apache.hudi.common.model.PartialUpdateAvroPayload 宽表更新之主键一致 宽表更新之主键一致 //创建宽表 createexternaltable if not existsbi_dw.dim_user(user_idstring comment'用户id',user_attrstring comment'用户基本属性',user_ext_attrstring comment'用户扩展属性1',user_ext_attr2string comment'用户扩展属性2',)comment'用户宽表'partitionedby(dtstring comment'按天分区')stored asparquetlocation'/tmp/bi/bi_dw/dim_user';//获取用户基本属性表当天最新数据with user as(selectuser_id,user_attrfrombi_ods.ods_userwheredt='${yyyymmdd}'),//获取用户扩展属性表当天最新数据user_extas(selectuser_id,user_ext_attr,user_ext_attr2frombi_ods.ods_user_extwheredt='${yyyymmdd}')//更新用户宽表最新数据insertoverwritetablebi_dw.dim_userPARTITION(dt='${yyyymmdd}')selectuser.user_id,user.user_attr,user_ext.user_ext_attr,user_ext.user_ext_attr2fromuserleft joinuser_extonuser.user_id=user_ext.user_id 每次更新都需要读取两张表的全量数据并进行insert overwrite操作,执行成本过高,执行时间久 宽表更新之主键一致 --新建用户近实时宽表CREATE TABLE IF NOT EXISTSbi_dw_real.dim_user_rt(user_idstring comment'用户id',user_attrstring comment'用户基本属性',user_ext_attrstring comment'用户扩展属性1',user_ext_attr2string comment'用户扩展属性2',gmt_modifiedbigintCOMMENT'修改时间戳' --获取用户基本属性表最近的增量数据callcopy_to_temp_view(table=>'bi_ods_real.ods_user_rt',view_name= >'user_view', query_type= >'incremental',begin_instance_time= >'${taskBeginTime} ',end_instance_time= >'${next15minuteTime} '); --获取用户扩展属性表最近的增量数据callcopy_to_temp_view(table=>'bi_ods_real.ods_user_ext_rt',view_name= >'user_ext_view’, query_type= >'incremental',begin_instance_time= >'${taskBeginTime} ',end_instance_time= >'${next15minuteTime}');--将两张表的增量数据插入宽表 )usinghuditblproperties(type ='mor',primaryKey='user_id',preCombineField='gmt_modified’,hoodie.compaction.payload.class='org.apache.hudi.common.model.PartialUpdateAvroPayload',hoodie.datasource.write.payload.class='org.apache.hudi.common.model.PartialUpdateAvroPayload' insert intobi_dw_real.dim_user_rtselectuser_id,user_attr,null asuser_ext_attr,null asuser_ext_attr2,gmt_modifiedfromuser_viewunion allselectuser_id,null asuser_attr,user_ext_attr,user_ext_attr2,gmt_modifiedfromuser_ext_view )COMMENT'用户属性宽表'; 宽表更新之非一致主键 宽表更新之非一致主键 --获取用户基本表最近15分钟的增量数据 callcopy_to_temp_view(table=>'bi_ods_real.ods_user_rt',view_name= >'user_view’,query_type= >'incremental', begin_instance_time= >'${taskBeginTime}',end_instance_time= >'${next15minuteTime}');--获取用户扩展表的快照数据(要保证新增的数据能关联到user_ext的扩展属性)callcopy_to_temp_view(table=>'bi_ods_real.ods_user_ext_rt',view_name= >'user_ext_view',query_type= >'snapshot',as_of_instant= >'${next15minuteTime}');--首先将用户主表数据插入insert into tablebi_dw_real.dim_user_rtselectuser_id,user_ext_id,user_attr,nullasuser_ext_attr,nullasuser_gmt_modified,gmt_create,gmt_modifiedfromuser_view;--增量更新用户扩展信息表insert into tablebi_dw_real.dim_user_rtselectuser_id,user_ext_id,user_attr,user_ext_view.user_extasuser_ext_attr,user_ext_view.gmt_modifiedasuser_gmt_modified,gmt_create,gmt_modifiedfrombi_dw_real.dim_user_rtuserleft joinuser_ext_viewonuser_ext_view.user_ext_id=user.user_ext_idand (user.user_gmt_modifiedis null oruser.user_gmt_modified<user_ext_view.gmt_modified) --新建用户近实时宽表 CREATE TABLE IF NOT EXISTSbi_dw_real.dim_user_rt(user_idstring comment'用户id',user_ext_idstring comment'用户扩展表id',user_attrstring comment'用户基本属性',user_ext_attrstring comment'用户扩展属性',user_gmt_modifiedbigintCOMMENT'用户扩展修改时间戳',gmt_createbigintCOMMENT'创建时间戳',gmt_modifiedbigintCOMMENT'修改时间戳’)usinghuditblproperties(type ='mor'