compass诊断平台模块分析

应用文档可以直接查看: compassGithub
因:调度系统是自研的,mysql平台不支持canal采集,这里对源码做了分析,进行了数据对接和转换

flowchart TD A[canal
同步调度数据表到compass表] --> B[task syncer
消费mysqldata转存为compass表
写kafka: task-instance] B --> C[task application
消费task-instance
日志提取app_id
写mysql: task_application
发kafka: task-application] C --> D[flink模块
消费task-application] B --> E[task-detect
消费task-instance
异常检测
写ES: compass-job-instance
写Redis: delayed:task] C --> F[task parser
消费Redis: parser:processing
引擎层异常检测] E --> G[task portal
前端接口
报告总览/调度列表/离线列表/诊断] F --> G C --> G subgraph DolphinScheduler H[flow
工作流定义表] I[task
任务定义表] J[task_instance
任务实例表] H --> I I --> J end

compass服务模块作用

dolphinScheduler主要表关系

flow 表(工作流定义表)
task 表(任务定义表)
task_instance 表(任务实例表)

三者的关系

层级关系:

一个 flow(工作流)包含多个 task(任务节点)
当工作流被执行时,会生成工作流实例,同时为每个任务节点生成 task_instance

数据流向:

用户先定义 flow(工作流)
在 flow 中添加多个 task(任务节点)并设置依赖关系
调度或手动触发时,系统根据 flow 和 task 定义生成 task_instance 执行

生命周期:

flow 和 task 是静态定义,一般不随执行改变
task_instance 是动态生成的,每次执行都会创建新记录

自定义调度系统表转换流程

这里采用了spark每小时采集自定义调度系统和kyuubi表信息,找到调度->实例->application相关信息,批量清洗完数据后,统一批次发送到kafka,
诊断系统(task-detect)会消费kafka消息,来进行自动诊断,把结果存入ES进行展示,这里就直接跳过了task-canal和task-applicaion项目处理的逻辑。

graph TD A[源MySQL数据库] -->|1. 读取数据| B[Spark Session] C[Kyuubi MySQL] -->|2. 读取应用ID| B B -->|3. 数据转换| D[临时DataFrame] D -->|4.1 写入目标MySQL| E[目标MySQL task_instance表] D -->|4.2 写入目标MySQL| F[目标MySQL task_application表] D -->|5. 过滤有app_id的数据| G[Kafka生产数据] G -->|6. 发送消息| H[Kafka主题 task-instance]

流程步骤说明

数据源读取

源MySQL:

表:JOB_INST_1(实例信息)、JOB_DESC(任务描述)、HOST_GROUP_DEF(主机组:调度没有flow概念,只有依赖拓扑关系,占时用这个代替,后面再清先转换)
SQL条件:筛选指定时间范围(dateStartHour到当日23:59:59)且任务类型为99或96的记录。

Kyuubi MySQL:

表:sqlmetadata 获取Spark任务的application_id,按时间范围过滤并去重。

数据转换

UDF处理:

getTaskType:将数字任务类型转换为字符串(如99→SPARK)。
getTaskState:将状态码转为文本(如1→success)。
getTriggerType:区分触发类型(schedule或manual)。
getExecutionTime:规范化时间格式。

字段映射:

字段映射逻辑:参考代码注释中的海猫(源系统)与Compass(目标系统)字段对应关系。

源字段(如BUSI_GRP)→目标字段(如project_name)…等 数据写入目标MySQL task_instance表: 使用临时表+ON DUPLICATE KEY UPDATE实现幂等写入(按id更新)。 task_application表: 关联实例数据与application_id,写入任务应用信息。

Kafka消息生产

数据过滤:仅选择包含application_id的实例记录。

消息格式:

{
  "rawData": null,
  "body": {
    "id": "实例ID",
    "projectName": "项目名",
    "flowName": "流名称",
    ...
  },
  "eventType": "INSERT",
  "table": "task_instance"
}

最终效果预览

alt text

诊断逻辑解析

默认诊断配置:compass\task-parser\src\main\resources\application.yml

图解流程(spark中的job->stage->task的流程) 假设我们有如下代码:

//RDD is resilient distributed dataset
val textFile = sc.textFile("hdfs://...")          // RDD A
val words = textFile.flatMap(line => line.split(" ")) // RDD B
val mappedWords = words.map(word => (word, 1))       // RDD C
val wordCounts = mappedWords.reduceByKey(_ + _)     // RDD D
wordCounts.saveAsTextFile("hdfs://...output")      // Action!

其执行流程的 DAG 和 Stage 划分如下图所示

flowchart TD subgraph Application [Application - 应用程序] direction TB A[textFile RDD A] --> B[flatMap RDD B] B --> C[map RDD C] C -- Wide Dependency
Shuffle! --> D[reduceByKey RDD D] D --> Action[saveAsTextFile Action] end Application --> Job subgraph Job [Job - 作业] direction TB S1[Stage 1
ShuffleMapStage] -- Shuffle Data --> S2[Stage 2
ResultStage] end Job --> Stage subgraph Stage1 [Stage 1 内部] direction LR T1_1[Task 1] T1_2[Task 2] T1_3[...] T1_4[Task N] end subgraph Stage2 [Stage 2 内部] direction LR T2_1[Task 1] T2_2[Task 2] T2_3[...] T2_4[Task M] end Stage --> Task T1_1 --> E1[Executor Core] T1_2 --> E2[Executor Core] T1_4 --> E3[Executor Core]

流程解释 (对应上图数字):

总结

概念 产生方式 数量 规划者 执行者
Job 一个 Action 算子 1个Application包含多个Job Driver (整体)
Stage 根据 宽依赖 划分 1个Job包含多个Stage DAGScheduler (阶段)
Task 与 RDD分区 一一对应 1个Stage包含多个Task TaskScheduler Executor

诊断逻辑案例分析

cpu浪费计算

executor计算

driver计算

appTotalTime和jobTime的差距区别 说明

  1. 资源等待 :
    • 启动Driver后YARN没有可用资源时
    • 作业执行过程中资源被抢占或释放后重新申请
  2. 作业间隔期 :
    • 当一个作业完成到下一个作业开始提交之间的间隔时间
    • 这个间隔期会计入 appTotalTime 但不会计入 jobTime
  3. 其他情况 :
    • Driver初始化时间(加载依赖、注册应用等)
    • 作业调度延迟(特别是在动态资源分配模式下)
    • 数据倾斜导致的某些任务长时间运行,而其他资源处于空闲状态
我们当前的环境

综合以上考虑,这个诊断对我们目前不适用,调大这人阈值到95

Task长尾

诊断描述 :map/reduce task最大运行耗时远大于中位数的任务

计算方式

// 计算每个task的最大执行时间与中位数的比值
ratio = max_duration / median_duration
// taskDurationConfig.threshold default:10
 ratio > threshold threshold来自配置),判定为长尾异常

建议优化

首先确认是数据倾斜还是计算倾斜
// 第一步:加盐局部聚合
val saltedPairRDD = originalPairRDD.map{ case (key, value) =>
  val salt = (new util.Random).nextInt(numSalts) // numSalts 是随机范围,例如 10
  (s"$salt-$key", value)
}
val partialAggRDD = saltedPairRDD.reduceByKey(_ + _) // 局部聚合

- 去盐:去掉随机前缀,恢复原始 Key。

// 第二步:去盐
val removedSaltRDD = partialAggRDD.map{ case (saltedKey, value) =>
  val originalKey = saltedKey.substring(saltedKey.indexOf("-") + 1)
  (originalKey, value)
}

- 最终聚合:对恢复后的原始 Key 进行全局聚合。

// 第三步:全局聚合
val finalAggRDD = removedSaltRDD.reduceByKey(_ + _)

效果:将原本由一个 Task 处理的一个大 Key 的计算压力,分摊给了多个 Task,完美解决倾斜。

优化方向二:调整分区与并行度
优化方向三:检查计算逻辑与资源
优化总结与流程

task长尾案例

这里有个运行时长为20.03m,中位只有0.90s
alt text spark这里也能明确看出来 alt text

...  
WHERE   dt = '20250910' and hour >= '13' and hour < '14' 
...  
-- sql语法的条件为 hour非分区,分钟分区为hm,这里会扫描20250910全分区的数据,所以只有几秒的task其实是读了非13 14分区的文件,但没有任何数据input,属于浪费资源空跑。  

container日志:  
25/09/10 15:01:40 INFO FileScanRDD: Reading File path: hdfs://xxcluster01/hive_warehouse/xx.db/tab/dt=20250910/hm=0529/bc_27_merge_1757453497469_0.zlib, range: 268435456-378628974, partition values: [20250910,0529]  

解决方案:
1: 和业务确认了hm和hour是对等关系,所以只需要换成hm即可。如果非对等关系,需要其它优化手段
2: 这里对原数据重新分区,避免底层数据倾斜,这里没有指定分区数,避免最小改动原则,会用全局统计配置是500(spark.sql.shuffle.partitions=500),业务可以根据自已的数据量合理的设置,例:/*+ REPARTITION(200) */

SELECT   /*+ REPARTITION */
 col1,col2.... from tab

3:显式缓存表 有多次重复查询,例:

CACHE TABLE details_data OPTIONS ('storageLevel' = 'DISK_ONLY') AS SELECT * FROM details_data_view;

优化效果

task长尾案例2

-- hdid is not null and hdid != '' 写法生成的执行计化
PushedFilters: [In(mtr_src_type, [三方,官方]), IsNotNull(hdid), Not(EqualTo(hdid,))]

-- coalesce(hdid, '') <> ''  写法的执行计化
PushedFilters: [In(mtr_src_type, [三方,官方])]

-- 这里已经很明确的看出orc文件不知道coalesce是啥玩意,就会把hdid全部数据提出来,你自已去coalesce过滤,性能差

task长尾案例3

1: 这里显示job[17].stage[18].task[2423]运行耗时48.22s 中位值为0.08s
alt text

2: job[26].stage[34].task[4911]运行耗时1.98m 中位值为1.25s

-- 这里明显是没有进行 PushedFilters 
(104) Scan orc pub_dw.pub_dwv_live_view_btype_view_dr_di
Output [8]: [live_prod_name#9455, aid#9459L, uid#9466L, suid#9467, view_prod_name#9468, view_dr#9476, bste_act_type#9479, dt#9490]
Batched: true
Location: InMemoryFileIndex [hdfs://yycluster02/hive_warehouse/pub_dw.db/pub_dwv_live_view_btype_view_dr_di/dt=2025-09-18]
PartitionFilters: [isnotnull(dt#9490), (dt#9490 = 2025-09-18)]
PushedFilters: [IsNotNull(bste_act_type), EqualTo(bste_act_type,0)]
ReadSchema: struct<live_prod_name:string,aid:bigint,uid:bigint,suid:string,view_prod_name:string,view_dr:int,bste_act_type:int>

 -- 优化sql写法

-- and a.live_prod_name in ('YY',if(a.dt < '2022-04-06','bdgame','bdsdk'),'sdk_voiceroom')
  AND (
    -- 替代 live_prod_name IN 的动态逻辑
    a.live_prod_name = 'YY'
    OR a.live_prod_name = 'sdk_voiceroom'
    OR (
      a.dt < '2022-04-06' AND a.live_prod_name = 'bdgame'
    )
    OR (
      a.dt >= '2022-04-06' AND a.live_prod_name = 'bdsdk'
    )
  )

-- 优化后的执行计化,live_prod_name成功谓词下推,
(107) Scan orc pub_dw.pub_dwv_live_view_btype_view_dr_di
Output [8]: [live_prod_name#9467, aid#9471L, uid#9478L, suid#9479, view_prod_name#9480, view_dr#9488, bste_act_type#9491, dt#9502]
Batched: true
Location: InMemoryFileIndex [hdfs://yycluster02/hive_warehouse/pub_dw.db/pub_dwv_live_view_btype_view_dr_di/dt=2025-09-18]
PartitionFilters: [isnotnull(dt#9502), (dt#9502 = 2025-09-18)]
PushedFilters: [IsNotNull(bste_act_type), Or(Or(EqualTo(live_prod_name,YY),EqualTo(live_prod_name,sdk_voiceroom)),Or(EqualTo(live_prod_name,bdgame),EqualTo(live_prod_name,bdsdk))), EqualTo(bste_act_type,0)]

ReadSchema: struct<live_prod_name:string,aid:bigint,uid:bigint,suid:string,view_prod_name:string,view_dr:int,bste_act_type:int>

and (include(‘SDK_PROD’,a.view_prod_name) = 1or a.view_prod_name = ‘bdbaizhan’)
这种因为使用了UDF函数,也不能谓词下推,从执行计化里可以看到有(output rows-filter=68,360,440)条数据被ColumnarToRow,UDF逻辑只有业务清楚,如果需要更优化的性能,需要业务进行先 view_prod_name in(xx,xx)再进行udf转换过滤。
UDF使用提示: 黑盒设计:UDF的设计初衷是为了允许用户扩展Spark的功能,但从Spark的角度来看,UDF是一个黑盒。Spark引擎无法理解UDF内部的计算逻辑,也就无法进行深入的优化。
无法利用Spark的内部优化::UDF在Spark看来是一个黑盒,它无法理解UDF内部的计算逻辑,因此无法对UDF进行如代码优化、向量化等操作,从而错失了利用Spark引擎的优化能力。
替代方案: 为了克服这些缺陷,可以考虑使用以下方案:

task长尾案例4

46.3 M  138.8 M  hdfs://xx/hive_warehouse/xx.db/livevip_dws_entity_eqmt_mtr_mall_stat_180d_di/dt=2025-09-18/part-00000-c407b8f4-c9be-462f-90dd-2e32ba9a6df1-c000
46.3 M  138.9 M  hdfs://xx/hive_warehouse/xx.db/livevip_dws_entity_eqmt_mtr_mall_stat_180d_di/dt=2025-09-18/part-00001-c407b8f4-c9be-462f-90dd-2e32ba9a6df1-c000

10.12.68.133 task 快 spark task 2 写入
57:10-52:06=5:04
25/09/19 03:52:06 INFO WriterImpl: ORC writer created for path: hdfs://xx/hive_warehouse/xx.db/livevip_dws_entity_eqmt_mtr_mall_stat_180d_di/.hive-staging_hive_2025-09-19_03-28-47_267_4485193128334077227-1/-ext-10000/_temporary/0/_temporary/attempt_202509190351587536033476281976156_0068_m_000002_2214/part-00002-c407b8f4-c9be-462f-90dd-2e32ba9a6df1-c000 with stripeSize: 67108864 options: Compress: SNAPPY buffer: 65536
25/09/19 03:57:10 INFO FileOutputCommitter: Saved output of task 'attempt_202509190351587536033476281976156_0068_m_000002_2214' to hdfs://xx/hive_warehouse/xx.db/livevip_dws_entity_eqmt_mtr_mall_stat_180d_di/.hive-staging_hive_2025-09-19_03-28-47_267_4485193128334077227-1/-ext-10000
25/09/19 03:57:10 INFO SparkHadoopMapRedUtil: attempt_202509190351587536033476281976156_0068_m_000002_2214: Committed
25/09/19 03:57:10 INFO Executor: Finished task 2.0 in stage 68.0 (TID 2214). 19524 bytes result sent to driver

10.12.68.18 慢spark task 1 写入日志
58:59-52:15=6:34
25/09/19 03:52:15 INFO WriterImpl: ORC writer created for path: hdfs://xx/hive_warehouse/x.db/livevip_dws_entity_eqmt_mtr_mall_stat_180d_di/.hive-staging_hive_2025-09-19_03-28-47_267_4485193128334077227-1/-ext-10000/_temporary/0/_temporary/attempt_202509190351584893227535942616307_0068_m_000001_2212/part-00001-c407b8f4-c9be-462f-90dd-2e32ba9a6df1-c000 with stripeSize: 67108864 options: Compress: SNAPPY buffer: 65536
25/09/19 03:58:59 INFO FileOutputCommitter: Saved output of task 'attempt_202509190351584893227535942616307_0068_m_000001_2212' to hdfs://xx/hive_warehouse/xx.db/livevip_dws_entity_eqmt_mtr_mall_stat_180d_di/.hive-staging_hive_2025-09-19_03-28-47_267_4485193128334077227-1/-ext-10000
25/09/19 03:58:59 INFO SparkHadoopMapRedUtil: attempt_202509190351584893227535942616307_0068_m_000001_2212: Committed
25/09/19 03:58:59 INFO Executor: Finished task 1.0 in stage 68.0 (TID 2212). 19524 bytes result sent to driver

慢的文件
hdfs fsck hdfs://xx/hive_warehouse/xx.db/livevip_dws_entity_eqmt_mtr_mall_stat_180d_di/dt=2025-09-18/part-00001-c407b8f4-c9be-462f-90dd-2e32ba9a6df1-c000 -files -blocks -locations
Connecting to namenode via http://fs-hiido-xx-yynn2.hiido.host.int.yy.com:53070/fsck?ugi=hdfs&files=1&blocks=1&locations=1&path=%2Fhive_warehouse%2Fxx.db%2Flivevip_dws_entity_eqmt_mtr_mall_stat_180d_di%2Fdt%3D2025-09-18%2Fpart-00001-c407b8f4-c9be-462f-90dd-2e32ba9a6df1-c000
FSCK started by hdfs (auth:KERBEROS_SSL) from /10.12.68.182 for path /hive_warehouse/xx.db/livevip_dws_entity_eqmt_mtr_mall_stat_180d_di/dt=2025-09-18/part-00001-c407b8f4-c9be-462f-90dd-2e32ba9a6df1-c000 at Fri Sep 19 10:52:55 CST 2025
/hive_warehouse/xx.db/livevip_dws_entity_eqmt_mtr_mall_stat_180d_di/dt=2025-09-18/part-00001-c407b8f4-c9be-462f-90dd-2e32ba9a6df1-c000 48551864 bytes, replicated: replication=3, 1 block(s):  OK
0. BP-1159253446-10.21.118.29-1568116770575:blk_5688236938_4994280941 len=48551864 Live_repl=3  [DatanodeInfoWithStorage[10.12.64.3:1019,DS-13c2dcdc-a665-4729-9e21-cffa544b5906,DISK], DatanodeInfoWithStorage[10.12.64.74:1019,DS-a4d7b2a7-ddda-4ddc-9ae0-ca9639836a63,DISK], DatanodeInfoWithStorage[10.12.68.18:1019,DS-2a82ab06-9482-4363-9360-d67681206333,DISK]]

ip:10.12.68.18
2025-09-19 03:52:39,146 INFO  datanode.DataNode (DataXceiver.java:writeBlock(738)) - Receiving BP-1159253446-10.21.118.29-1568116770575:blk_5688236938_4994280941 src: /10.12.68.18:37166 dest: /10.12.68.18:1019
2025-09-19 03:58:59,645 INFO  DataNode.clienttrace (BlockReceiver.java:finalizeBlock(1533)) - src: /10.12.68.18:37166, dest: /10.12.68.18:1019, bytes: 48551864, op: HDFS_WRITE, cliID: DFSClient_attempt_202509190351584893227535942616307_0068_m_000001_2212_1350099200_47, offset: 0, srvID: 7141b89d-984a-4f49-a211-2a29c33c603c, blockid: BP-1159253446-10.21.118.29-1568116770575:blk_5688236938_4994280941, duration(ns): 380461993891
2025-09-19 03:58:59,645 INFO  datanode.DataNode (BlockReceiver.java:run(1506)) - PacketResponder: BP-1159253446-10.21.118.29-1568116770575:blk_5688236938_4994280941, type=HAS_DOWNSTREAM_IN_PIPELINE, downstreams=2:[10.12.64.74:1019, 10.12.64.3:1019] terminating

ip:10.12.64.74
2025-09-19 03:52:39,156 INFO  datanode.DataNode (DataXceiver.java:writeBlock(738)) - Receiving BP-1159253446-10.21.118.29-1568116770575:blk_5688236938_4994280941 src: /10.12.68.18:39452 dest: /10.12.64.74:1019
2025-09-19 03:58:59,628 INFO  DataNode.clienttrace (BlockReceiver.java:finalizeBlock(1533)) - src: /10.12.68.18:39452, dest: /10.12.64.74:1019, bytes: 48551864, op: HDFS_WRITE, cliID: DFSClient_attempt_202509190351584893227535942616307_0068_m_000001_2212_1350099200_47, offset: 0, srvID: 71b081c8-3cd9-4bf8-be76-12961de006da, blockid: BP-1159253446-10.21.118.29-1568116770575:blk_5688236938_4994280941, duration(ns): 380462834937
2025-09-19 03:58:59,628 INFO  datanode.DataNode (BlockReceiver.java:run(1506)) - PacketResponder: BP-1159253446-10.21.118.29-1568116770575:blk_5688236938_4994280941, type=HAS_DOWNSTREAM_IN_PIPELINE, downstreams=1:[10.12.64.3:1019] terminating

ip:10.12.64.3
2025-09-19 03:52:39,171 INFO  datanode.DataNode (DataXceiver.java:writeBlock(738)) - Receiving BP-1159253446-10.21.118.29-1568116770575:blk_5688236938_4994280941 src: /10.12.64.74:28970 dest: /10.12.64.3:1019
2025-09-19 03:58:59,642 INFO  DataNode.clienttrace (BlockReceiver.java:finalizeBlock(1533)) - src: /10.12.64.74:28970, dest: /10.12.64.3:1019, bytes: 48551864, op: HDFS_WRITE, cliID: DFSClient_attempt_202509190351584893227535942616307_0068_m_000001_2212_1350099200_47, offset: 0, srvID: 9f591b64-46d5-40fe-8b65-177a13a2aba8, blockid: BP-1159253446-10.21.118.29-1568116770575:blk_5688236938_4994280941, duration(ns): 380465797665
2025-09-19 03:58:59,642 INFO  datanode.DataNode (BlockReceiver.java:run(1506)) - PacketResponder: BP-1159253446-10.21.118.29-1568116770575:blk_5688236938_4994280941, type=LAST_IN_PIPELINE terminating

part-00001-c407b8f4-c9be-462f-90dd-2e32ba9a6df1-c000 ,(2025-09-19 03:52:39.xx)接收,同秒钟(2025-09-19 03:58:59.xx)finalize 从时间上看,管道建立阶段是差不多的,数据完成时间也是差不多的,那就可以证明问题出在客户端上,客户端就是第一个写入datanode的节点10.12.68.18,查看这台服务器的监控指标,发现刚好这个时间点上的某几块磁盘io 100%。说以导致写文件变慢。 因为一台服务器上同时部署有dn\nm\shuufle服务,不能确定是那个服务导致io 100%的问题,这个只能实现用cgroup来隔离磁盘io,待优化

数据倾斜

描述: 数据倾斜诊断规则如下
1、任务总耗时>30min
2、stage耗时/任务总耗时>45%
3、shuffle read的数据量满足一下条件之一:

  # 0w-5w
  - start: 0
    end: 50000
    threshold: 0
  # 5w-10w
  - start: 50000
    end: 100000
    threshold: 100
  # 10w-100w
  - start: 100000
    end: 1000000
    threshold: 50
  # 100w-500w
  - start: 1000000
    end: 5000000
    threshold: 10
  # 500w-2000w
  - start: 5000000
    end: 20000000
    threshold: 5
  # 2000w-3000w
  - start: 20000000
    end: 30000000
    threshold: 3.5
  # 3000w-4000w
  - start: 30000000
    end: 40000000
    threshold: 3
  # 4000w-5000w
  - start: 40000000
    end: 50000000
    threshold: 2.25
  # 5000w
  - start: 50000000
    end: 0
    threshold: 2

计算公式

建议优化

针对Spark数据倾斜问题,常见的优化方法包括:

优化后的sql:
这里是根据session_id分组求 path最长的一条,所以null值也只会取一条,这里可以直接干掉,或把null值单独去求一个max

  select
    xxx,
    row_number() over(partition by session_id  order by    length(path) desc ) as row_id
  from   raw_expand_data where session_id is not null

代码实现示例:

 // 增加分区数
Dataset<Row> repartitioned = dataset.repartition(200);
// 广播小表
Dataset<Row> smallTable = ...;
Dataset<Row> bigTable = ...;
Dataset<Row> joined = bigTable.join(broadcast(smallTable), "key");
// 两阶段聚合
Dataset<Row> stage1 = dataset.groupBy("key").agg(sum("value"));
Dataset<Row> stage2 = stage1.groupBy("key").agg(sum("sum(value)"));

大表扫描

找到对应的表sql,看是否有异常
alt text

推测执行过多分析

Stage中推测执行任务数超过20个,即可判定为推测执行过多 alt text

当前spark3.2.1

spark.speculation	true
spark.speculation.interval	5s
spark.speculation.multiplier	2
--3.2.1默认值 
spark.speculation.multiplier 1.5 //一项任务的速度要比平均速度慢多少倍,才能被纳入推测执行的范围之内。
spark.speculation.quantile	0.75 //在为特定阶段启用推测之前必须完成的任务比例。

建议优化

Stage耗时异常分析

Stage空闲时间 (stage运行时间-任务运行时间) 与stage运行时间的占比超过30.0%,即判定为Stage耗时异常

案列解析

基线时间异常

相对于历史正常结束时间,提前结束或晚点结束的任务

hdfs卡顿分析

计算Stage中每个任务的处理速率(读取数据量与耗时的比值), 当处理速率的中位值与最小值的比大于10.00,即判定为HDFS卡顿

spark日志 有些task共用一个executor,日志会输出在一个文件中 alt text

task 1
25/09/09 11:06:41 INFO FileScanRDD: Reading File path: hdfs://yycluster01/hive_warehouse/hiidodw.db/yy_lpfplayerfirstaccess_original/dt=20250909/hm=1029/bc_124_merge_1757385248862_0.zlib, range: 0-121518116, partition values: [20250909,1029]
25/09/09 11:06:41 INFO FileScanRDD: Reading File path: hdfs://yycluster01/hive_warehouse/hiidodw.db/yy_lpfplayerfirstaccess_original/dt=20250909/hm=1044/bc_124_merge_1757386148166_0.zlib, range: 0-121518116, partition values: [20250909,1044]
task 2
25/09/09 11:06:41 INFO FileScanRDD: Reading File path: hdfs://yycluster01/hive_warehouse/hiidodw.db/yy_lpfplayerfirstaccess_original/dt=20250909/hm=1029/bc_124_merge_1757385248862_0.zlib, range: 0-121518116, partition values: [20250909,1029]
25/09/09 11:06:41 INFO FileScanRDD: Reading File path: hdfs://yycluster01/hive_warehouse/hiidodw.db/yy_lpfplayerfirstaccess_original/dt=20250909/hm=1044/bc_124_merge_1757386148166_0.zlib, range: 0-121518116, partition values: [20250909,1044]
task 3
25/09/09 11:06:49 INFO FileScanRDD: Reading File path: hdfs://yycluster01/hive_warehouse/hiidodw.db/yy_lpfplayerfirstaccess_original/dt=20250909/hm=1014/bc_124_merge_1757384348136_0.zlib, range: 0-112774706, partition values: [20250909,1014]
25/09/09 11:06:49 INFO FileScanRDD: Reading File path: hdfs://yycluster01/hive_warehouse/hiidodw.db/yy_lpfplayerfirstaccess_original/dt=20250909/hm=1059/bc_124_merge_1757387049011_0.zlib, range: 0-109471027, partition values: [20250909,1059]
task 4
25/09/09 11:06:49 INFO FileScanRDD: Reading File path: hdfs://yycluster01/hive_warehouse/hiidodw.db/yy_lpfplayerfirstaccess_original/dt=20250909/hm=1014/bc_124_merge_1757384348136_0.zlib, range: 0-112774706, partition values: [20250909,1014]
25/09/09 11:06:49 INFO FileScanRDD: Reading File path: hdfs://yycluster01/hive_warehouse/hiidodw.db/yy_lpfplayerfirstaccess_original/dt=20250909/hm=1059/bc_124_merge_1757387049011_0.zlib, range: 0-109471027, partition values: [20250909,1059]
task 5
25/09/09 11:06:50 INFO FileScanRDD: Reading File path: hdfs://yycluster01/hive_warehouse/hiidodw.db/yy_lpfplayerfirstaccess_original/dt=20250909/hm=1044/bc_124_merge_1757386148166_0.zlib, range: 121518116-123307681, partition values: [20250909,1044]

原因分析

真正原因:

minPartitionNum=leafNodeDefaultParallelism -> spark.default.parallelism= -> (execoutor * core) -> 4 469295249+4194304*4=486072465 520249602/4=121518116.25

val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum
  .getOrElse(sparkSession.leafNodeDefaultParallelism)
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / minPartitionNum
Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

defaultMaxSplitBytes:256MB Math.min(256MB, Math.max(4096, 121518116))


- 最终spark日志输出为:range: 0-121518116  

**优化方案**
例: 
```sql
select c1,c2...各种列组合转换 from tab where dt=xx ..
group by cl,c2...;
-- 优化后 对原始表进行重分区,/*+ REPARTITION(12) */ 是一个明确的优化器提示,告诉 Spark 必须将数据重新分区为 12 个分区
-- 小提示.慎用这个:DISTRIBUTE BY CAST(rand() * 24 AS INT) 是一个逻辑表达式,Spark 的优化器可能会对其进行优化或重写
create temporary table tmp_tab  as
select  select /*+ REPARTITION(24) */
* from tab where dt=xx ..;
select c1,c2...各种列组合转换 from tmp_tab where dt=xx ..
group by cl,c2...;

效果

Job耗时异常分析

Job中空闲时间 (job总时间 - stage累计时间) 与总时间的占比超过30.00%%,即判定为Job耗时异常

异常排名统计

点击展开spark解析代码(默认收起,点击展示开)
package com.aengine.spark.app.compass
import com.aengine.spark.utils.ResourcesUtils
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.{DataFrame, SparkSession, functions}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{ArrayType, BooleanType, FloatType, IntegerType, LongType, StringType, StructField, StructType}
import java.text.SimpleDateFormat
import java.util.{Calendar, Locale}

/**
 * read ES JSON DATA parese json data to detect result order by max values TOP_N
 * create by liangrui on 20250904
 *
 * es json data to hdfs by spark  stat top
 * es_date=2025-09-07
 * elasticdump   --input=http://xx:2949/compass-detector-app-$es_date --output=/data/es/compass-detector-app-$es_date.json  --type=data
 * hdfs dfs -put  /data/es/compass-detector-app-$es_date.json /data/es/
 *
 */
object ReadEsJosnFile {
  //解析json规则
  //dataSewk  -> ratio
  //stageDurationAbnormal  -> ratio
  //jobDurationAbnormal  -> ratio
  //taskDurationAbnormal  -> ratio
  //hdfsStuck  -> ratio
  //speculativeTask  -> speculativeCount
  //largeTableScan   -> rows


  def main(args: Array[String]): Unit = {

    var Array(argDate, other) = (args ++ Array(null, null)).slice(0, 2)
    val dateFormatter = new SimpleDateFormat("yyyy-MM-dd", Locale.US)
    val calendar = Calendar.getInstance
    //calendar.add(Calendar.HOUR_OF_DAY, -1) // 减去1小时
    val currentDate = dateFormatter.format(calendar.getTime)
    var es_index_file = ""
    if (StringUtils.isBlank(argDate)) {
      es_index_file = s"/data/es/compass-detector-app-$currentDate.json"
      argDate = currentDate
    } else {
      es_index_file = s"/data/es/compass-detector-app-$argDate.json"
    }
    println("es_index:" + es_index_file)
    val spark = SparkSession.builder()
      .appName("com.aengine.spark.app.compass.SparkReadES")
      .getOrCreate()
    val df = spark.read.json(es_index_file)
    df.printSchema()
    df.show()
    val newDF = df.select("_source.applicationId", "_source.taskName", "_source.executionTime", "_source.dataList", "_source.projectName", "_source.flowName")
      .filter(col("dataList").isNotNull && functions.size(col("dataList")) > 0)

    val df_parsed = newDF.select(
      col("applicationId"),
      col("taskName"),
      col("executionTime"),
      col("projectName"),
      col("flowName"),
      explode(col("dataList")).alias("dataListItem"))

    //# 提取dataList中的字段
    // 首先检查dataListItem的schema来确定tables字段是否存在
    val dataListItemColumns = df_parsed.select(col("dataListItem")).schema.head.dataType.asInstanceOf[org.apache.spark.sql.types.StructType].fieldNames.toSet

    val expandedDF = if (dataListItemColumns.contains("tables")) {
      // 如果tables字段存在,正常提取
      df_parsed.select(
        col("applicationId"),
        col("taskName"),
        col("executionTime"),
        col("projectName"),
        col("flowName"),
        col("dataListItem.abnormal").alias("abnormal"),
        col("dataListItem.appCategory").alias("appCategory"),
        col("dataListItem.data").alias("data"),
        col("dataListItem.tables").alias("tables")
      )
    } else {
      // 如果tables字段不存在,添加空数组
      df_parsed.select(
        col("applicationId"),
        col("taskName"),
        col("executionTime"),
        col("projectName"),
        col("flowName"),
        col("dataListItem.abnormal").alias("abnormal"),
        col("dataListItem.appCategory").alias("appCategory"),
        col("dataListItem.data").alias("data"),
        lit(null).cast("string").alias("tables")
      )
    }
    println("expandedDF print====")
    expandedDF.printSchema()
    expandedDF.show()
    // 定义Schema
    val dataSkewSchema = StructType(Seq(
      StructField("abnormal", BooleanType, true),
      // StructField("attemptNumber", LongType, true),
      //StructField("duration", LongType, true),
      // StructField("jobId", LongType, true),
      // StructField("maxShuffleReadBytes", LongType, true),
      //StructField("maxShuffleReadRecords", LongType, true),
      //StructField("medianRecords", LongType, true),
      StructField("ratio", FloatType, true),
      //StructField("stageId", FloatType, true),
      StructField("threshold", FloatType, true)
    ))
    val dataSchema = ArrayType(dataSkewSchema) // 定义为数组类型
    // ration column 的解析逻辑
    val RatioDF = expandedDF.
      filter(
        """
          |abnormal=true
          | and appCategory
          | in('dataSkew','stageDurationAbnormal','jobDurationAbnormal','taskDurationAbnormal','hdfsStuck') """
          .stripMargin).
      withColumn("data_array", from_json(col("data"), dataSchema,
        Map("mode" -> "PERMISSIVE", "allowNumericLeadingZeros" -> "true")))
    val RatioResultDF = getRatioDF(spark, RatioDF, "ratio")

    // rows column 的解析逻辑
    val rowsStructSchema = StructType(Seq(
      StructField("abnormal", BooleanType, true),
      StructField("rows", LongType, true),
      StructField("threshold", FloatType, true)
    ))
    val rowsSchema = ArrayType(rowsStructSchema) // 定义为数组类型
    val RowsDF = expandedDF.filter("abnormal=true and appCategory = 'largeTableScan'").
      withColumn("data_array", from_json(col("data"), rowsSchema,
        Map("mode" -> "PERMISSIVE", "allowNumericLeadingZeros" -> "true")))
    val RowsResultDF = getRatioDF(spark, RowsDF, "rows")

    // speculativeCount column 的解析逻辑
    val speculativeCountStructSchema = StructType(Seq(
      StructField("abnormal", BooleanType, true),
      StructField("speculativeCount", IntegerType, true),
      StructField("threshold", FloatType, true)
    ))
    val speculativeCountSchema = ArrayType(speculativeCountStructSchema) // 定义为数组类型
    val speculativeCountDF = expandedDF.filter("abnormal=true and appCategory = 'speculativeTask'").
      withColumn("data_array", from_json(col("data"), speculativeCountSchema,
        Map("mode" -> "PERMISSIVE", "allowNumericLeadingZeros" -> "true")))
    val speculativeCountResultDF = getRatioDF(spark, speculativeCountDF, "speculativeCount")

    //union
    val ResultDF = RatioResultDF.union(RowsResultDF).union(speculativeCountResultDF).
      withColumn("es_index", lit(argDate)).
      withColumn("create_date", lit(currentDate))
    ResultDF.show(false)

    //wirter mysql
    val jdbc_conf = ResourcesUtils.getProperties("jdbc.properties")
    val targetJdbcUrl = jdbc_conf.getProperty("compass_url")
    val targetJdbcUsername = jdbc_conf.getProperty("compass_user")
    val targetJdbcPassword = jdbc_conf.getProperty("compass_password")
    //delete
    deleteMysql(targetJdbcUrl, targetJdbcUsername, targetJdbcPassword, argDate)
    ResultDF.write
      .format("jdbc")
      .option("url", targetJdbcUrl)
      .option("driver", "com.mysql.jdbc.Driver")
      .option("dbtable", "detect_top")
      .option("user", targetJdbcUsername)
      .option("password", targetJdbcPassword)
      .mode("append")
      .save()

    spark.stop()
  }


  /**
   * get  column max value order by to top
   *
   * @param spark
   * @param RatioDF
   * @return
   */
  def getRatioDF(spark: SparkSession, RatioDF: DataFrame, columnName: String): DataFrame = {
    println("finalDF print====")
    RatioDF.printSchema()
    RatioDF.show(10, false)
    RatioDF.select(
      col("applicationId"),
      col("taskName"),
      col("executionTime"),
      col("projectName"),
      col("flowName"),
      col("abnormal"),
      col("appCategory"),
      explode(col("data_array")).alias("data_element")
    ).select(
      col("applicationId"),
      col("taskName"),
      col("executionTime"),
      col("projectName"),
      col("flowName"),
      col("abnormal"),
      col("appCategory"),
      col("data_element.abnormal").alias("data_abnormal"), // 提取结构体内的字段
      //col(s"data_element.$columnName").alias("data_col"),
      col(s"data_element.$columnName").cast("float").alias("data_col"),
      col("data_element.threshold").alias("threshold"),
      // col("data_element.jobId").alias("jobId"),
      //col("data_element.stageId").alias("stageId")
    ).filter("data_abnormal=true").createOrReplaceTempView("tmp_result")

    spark.sql(
      s"""
         |with ranked_data as (
         |  select
         |    projectName as project_name,
         |    flowName as flow_name,
         |    taskName as task_name,
         |    applicationId as application_id,
         |    executionTime as execution_time,
         |    appCategory as app_category,
         |    ROUND(max(data_col),2) as max_detect,
         |    ROUND(sum(data_col),2) as sum_detect,
         |    ROW_NUMBER() OVER (PARTITION BY appCategory ORDER BY ROUND(max(data_col),2) DESC) as rn
         |  from tmp_result
         |  group by
         |    projectName,flowName,taskName,applicationId,executionTime,appCategory
         |)
         |select 
         |  project_name,
         |  flow_name,
         |  task_name,
         |  application_id,
         |  execution_time,
         |  app_category,
         |  max_detect,
         |  sum_detect
         |from ranked_data
         |where rn <= 100
         |order by app_category, max_detect desc
         |""".stripMargin)
  }

  def deleteMysql(jdbcUrl: String, jdbcUsername: String, jdbcPassword: String, es_index: String): Unit = {
    import java.sql.{Connection, DriverManager, PreparedStatement}
    var connection: Connection = null
    var preparedStatement: PreparedStatement = null

    try {
      // 注册驱动,建立连接
      Class.forName("com.mysql.jdbc.Driver")
      //Class.forName("com.mysql.cj.jdbc.Driver")
      println(s"Connecting to database: $jdbcUrl")
      connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword)
      println("Database connection established successfully.")

      // 构建参数化的DELETE SQL语句
      val sql = "DELETE FROM detect_top WHERE es_index = ?"
      println(s"Preparing SQL: $sql with parameter: $es_index")
      preparedStatement = connection.prepareStatement(sql)

      // 设置参数
      preparedStatement.setString(1, es_index)

      // 执行删除(不使用事务,直接提交)
      val affectedRows = preparedStatement.executeUpdate()
      println(s"Delete operation completed. Affected rows: $affectedRows")

      if (affectedRows > 0) {
        println(s"Successfully deleted $affectedRows rows from detect_top where es_index = '$es_index'")
      } else {
        println(s"No rows found with es_index = '$es_index' in detect_top table")
      }
    } catch {
      case e: Exception =>
        println(s"Error occurred during delete operation: ${e.getMessage}")
        e.printStackTrace()
    } finally {
      // 关闭资源
      if (preparedStatement != null) preparedStatement.close()
      if (connection != null) connection.close()
    }
  }
}  
大纲: