大纲:

    compass诊断平台模块分析

    应用文档可以直接查看: compassGithub 因:调度系统是自研的,mysql平台不支持canal采集,这里对源码做了分析,进行了数据对接和转换

    flowchart TD A[canal
    同步调度数据表到compass表] --> B[task syncer
    消费mysqldata转存为compass表
    写kafka: task-instance] B --> C[task application
    消费task-instance
    日志提取app_id
    写mysql: task_application
    发kafka: task-application] C --> D[flink模块
    消费task-application] B --> E[task-detect
    消费task-instance
    异常检测
    写ES: compass-job-instance
    写Redis: delayed:task] C --> F[task parser
    消费Redis: parser:processing
    引擎层异常检测] E --> G[task portal
    前端接口
    报告总览/调度列表/离线列表/诊断] F --> G C --> G subgraph DolphinScheduler H[flow
    工作流定义表] I[task
    任务定义表] J[task_instance
    任务实例表] H --> I I --> J end

    canal作用

    通过kafka 主题为:mysqldata, 进行同步调度数据表到compass表
    adapter主要是适配不同调度表数据,主要配置srcDataSources:源调度的数据源, canalAdapters:目标数据源 具体表转换规则在:task-canal-adapter/src/main/adapter/conf/rdb/xx.yml 进行配置

    task metadata

    主要是同步spark yarn 的作业元数据

    task syncer

    通过消费 kafka主题:mysqldata的调度mysql表数据,转存为compass表 并且写kafka入信息(xx_task_instance表):消费topic:mysqldata -> 发送 topic:task-instance

    task application

    将工作流层与引擎层元数据关联
    消费 kafka主题:task-instance, 通过task_instance.id从task_instance表中查询出实例信息
    通过解析日志文件,使用正则表达式匹配提取出来找到对应的application_id,->rules.extractLog.name 转换数据后写入mysql -> compass.task_application表中, 并将TaskApplication信息发送到kafka -> 主题为:task-application ->flink模块进行消费

    task-detect

    模块进行工作流层异常任务检测,例如运行失败、基线耗时异常等
    DetectedTask通过消费kafka主题:task-instance进行处理逻辑,取到的是task实例信息,
    再通过projectName,flowName,taskName,executionTime去mysql表:task_application查询出app信息,
    然后把诊断结果写入
    ES -> (compass-job-instance)
    延迟的实例处理(缺少appid)
    Redis -> ({lua}:delayed:task)
    DelayedTask延迟任务处理,通过spingBoot->CommandLineRunner实现启动时运行

    task parser

    进行引擎层异常任务检测,例如SQL失败、Shuffle失败等 从redis中消费->{lua}:parser:processing

    task portal

    前端页面展示相关接口模块 报告总览 ReportController -> /api/v1/report
    调度列表入口为 AppController -> /api/v1/app/list -> 查询ES索引compass-task-app*
    离线列表入口为 JobController -> /api/v1/job/list -> 查询ES索上compass-job-analysis*
    离线诊断入口 /openapi/offline/app/metadata -> redis:{lua}:log:record ->| task-parser -> RedisConsumer数据消费 redis:{lua}:log:record

    dolphinScheduler主要表关系

    flow 表(工作流定义表)
    task 表(任务定义表)
    task_instance 表(任务实例表)

    三者的关系

    层级关系:

    一个 flow(工作流)包含多个 task(任务节点)
    当工作流被执行时,会生成工作流实例,同时为每个任务节点生成 task_instance

    数据流向:

    用户先定义 flow(工作流)
    在 flow 中添加多个 task(任务节点)并设置依赖关系
    调度或手动触发时,系统根据 flow 和 task 定义生成 task_instance 执行

    生命周期:

    flow 和 task 是静态定义,一般不随执行改变
    task_instance 是动态生成的,每次执行都会创建新记录

    自定义调度系统表转换流程

    这里采用了spark每小时采集自定义调度系统和kyuubi表信息,找到调度->实例->application相关信息,批量清洗完数据后,统一批次发送到kafka,
    诊断系统(task-detect)会消费kafka消息,来进行自动诊断,把结果存入ES进行展示,这里就直接跳过了task-canal和task-applicaion项目处理的逻辑。

    graph TD A[源MySQL数据库] -->|1. 读取数据| B[Spark Session] C[Kyuubi MySQL] -->|2. 读取应用ID| B B -->|3. 数据转换| D[临时DataFrame] D -->|4.1 写入目标MySQL| E[目标MySQL task_instance表] D -->|4.2 写入目标MySQL| F[目标MySQL task_application表] D -->|5. 过滤有app_id的数据| G[Kafka生产数据] G -->|6. 发送消息| H[Kafka主题 task-instance]

    流程步骤说明

    数据源读取

    源MySQL:

    表:JOB_INST_1(实例信息)、JOB_DESC(任务描述)、HOST_GROUP_DEF(主机组:调度没有flow概念,只有依赖拓扑关系,占时用这个代替,后面再清先转换)
    SQL条件:筛选指定时间范围(dateStartHour到当日23:59:59)且任务类型为99或96的记录。

    Kyuubi MySQL:

    表:sqlmetadata 获取Spark任务的application_id,按时间范围过滤并去重。

    数据转换

    UDF处理:

    getTaskType:将数字任务类型转换为字符串(如99→SPARK)。
    getTaskState:将状态码转为文本(如1→success)。
    getTriggerType:区分触发类型(schedule或manual)。
    getExecutionTime:规范化时间格式。

    字段映射:

    字段映射逻辑:参考代码注释中的海猫(源系统)与Compass(目标系统)字段对应关系。

    源字段(如BUSI_GRP)→目标字段(如project_name)…等 数据写入目标MySQL task_instance表: 使用临时表+ON DUPLICATE KEY UPDATE实现幂等写入(按id更新)。 task_application表: 关联实例数据与application_id,写入任务应用信息。

    Kafka消息生产

    数据过滤:仅选择包含application_id的实例记录。

    消息格式:

    {
      "rawData": null,
      "body": {
        "id": "实例ID",
        "projectName": "项目名",
        "flowName": "流名称",
        ...
      },
      "eventType": "INSERT",
      "table": "task_instance"
    }
    

    最终效果预览

    alt text

    诊断逻辑解析

    默认诊断配置:compass\task-parser\src\main\resources\application.yml

    图解流程(spark中的job->stage->task的流程) 假设我们有如下代码:

    //RDD is resilient distributed dataset
    val textFile = sc.textFile("hdfs://...")          // RDD A
    val words = textFile.flatMap(line => line.split(" ")) // RDD B
    val mappedWords = words.map(word => (word, 1))       // RDD C
    val wordCounts = mappedWords.reduceByKey(_ + _)     // RDD D
    wordCounts.saveAsTextFile("hdfs://...output")      // Action!
    

    其执行流程的 DAG 和 Stage 划分如下图所示

    flowchart TD subgraph Application [Application - 应用程序] direction TB A[textFile RDD A] --> B[flatMap RDD B] B --> C[map RDD C] C -- Wide Dependency
    Shuffle! --> D[reduceByKey RDD D] D --> Action[saveAsTextFile Action] end Application --> Job subgraph Job [Job - 作业] direction TB S1[Stage 1
    ShuffleMapStage] -- Shuffle Data --> S2[Stage 2
    ResultStage] end Job --> Stage subgraph Stage1 [Stage 1 内部] direction LR T1_1[Task 1] T1_2[Task 2] T1_3[...] T1_4[Task N] end subgraph Stage2 [Stage 2 内部] direction LR T2_1[Task 1] T2_2[Task 2] T2_3[...] T2_4[Task M] end Stage --> Task T1_1 --> E1[Executor Core] T1_2 --> E2[Executor Core] T1_4 --> E3[Executor Core]

    流程解释 (对应上图数字):

    总结

    概念 产生方式 数量 规划者 执行者
    Job 一个 Action 算子 1个Application包含多个Job Driver (整体)
    Stage 根据 宽依赖 划分 1个Job包含多个Stage DAGScheduler (阶段)
    Task 与 RDD分区 一一对应 1个Stage包含多个Task TaskScheduler Executor

    cpu浪费计算

    executor计算

    driver计算

    appTotalTime和jobTime的差距区别 说明

    1. 资源等待 :
      • 启动Driver后YARN没有可用资源时
      • 作业执行过程中资源被抢占或释放后重新申请
    2. 作业间隔期 :
      • 当一个作业完成到下一个作业开始提交之间的间隔时间
      • 这个间隔期会计入 appTotalTime 但不会计入 jobTime
    3. 其他情况 :
      • Driver初始化时间(加载依赖、注册应用等)
      • 作业调度延迟(特别是在动态资源分配模式下)
      • 数据倾斜导致的某些任务长时间运行,而其他资源处于空闲状态

    我们当前的环境

    综合以上考虑,这个诊断对我们目前不适用,调大这人阈值到95

    Task长尾

    诊断描述 :map/reduce task最大运行耗时远大于中位数的任务

    计算方式

    // 计算每个task的最大执行时间与中位数的比值
    ratio = max_duration / median_duration
    // taskDurationConfig.threshold default:10
     ratio > threshold threshold来自配置),判定为长尾异常
    

    建议优化

    首先确认是数据倾斜还是计算倾斜

    // 第一步:加盐局部聚合
    val saltedPairRDD = originalPairRDD.map{ case (key, value) =>
      val salt = (new util.Random).nextInt(numSalts) // numSalts 是随机范围,例如 10
      (s"$salt-$key", value)
    }
    val partialAggRDD = saltedPairRDD.reduceByKey(_ + _) // 局部聚合
    

    - 去盐:去掉随机前缀,恢复原始 Key。

    // 第二步:去盐
    val removedSaltRDD = partialAggRDD.map{ case (saltedKey, value) =>
      val originalKey = saltedKey.substring(saltedKey.indexOf("-") + 1)
      (originalKey, value)
    }
    

    - 最终聚合:对恢复后的原始 Key 进行全局聚合。

    // 第三步:全局聚合
    val finalAggRDD = removedSaltRDD.reduceByKey(_ + _)
    

    效果:将原本由一个 Task 处理的一个大 Key 的计算压力,分摊给了多个 Task,完美解决倾斜。

    优化方向二:调整分区与并行度

    优化方向三:检查计算逻辑与资源

    优化总结与流程

    数据倾斜

    描述: 数据倾斜诊断规则如下
    1、任务总耗时>30min
    2、stage耗时/任务总耗时>45%
    3、shuffle read的数据量满足一下条件之一:

      # 0w-5w
      - start: 0
        end: 50000
        threshold: 0
      # 5w-10w
      - start: 50000
        end: 100000
        threshold: 100
      # 10w-100w
      - start: 100000
        end: 1000000
        threshold: 50
      # 100w-500w
      - start: 1000000
        end: 5000000
        threshold: 10
      # 500w-2000w
      - start: 5000000
        end: 20000000
        threshold: 5
      # 2000w-3000w
      - start: 20000000
        end: 30000000
        threshold: 3.5
      # 3000w-4000w
      - start: 30000000
        end: 40000000
        threshold: 3
      # 4000w-5000w
      - start: 40000000
        end: 50000000
        threshold: 2.25
      # 5000w
      - start: 50000000
        end: 0
        threshold: 2
    

    ### 计算公式

    建议优化

    针对Spark数据倾斜问题,常见的优化方法包括:

    优化后的sql:
    这里是根据session_id分组求 path最长的一条,所以null值也只会取一条,这里可以直接干掉,或把null值单独去求一个max

      select
        xxx,
        row_number() over(partition by session_id  order by    length(path) desc ) as row_id
      from   raw_expand_data where session_id is not null
    

    代码实现示例:

     // 增加分区数
    Dataset<Row> repartitioned = dataset.repartition(200);
    // 广播小表
    Dataset<Row> smallTable = ...;
    Dataset<Row> bigTable = ...;
    Dataset<Row> joined = bigTable.join(broadcast(smallTable), "key");
    // 两阶段聚合
    Dataset<Row> stage1 = dataset.groupBy("key").agg(sum("value"));
    Dataset<Row> stage2 = stage1.groupBy("key").agg(sum("sum(value)"));
    

    大表扫描

    找到对应的表sql,看是否有异常
    alt text

    推测执行过多分析

    Stage中推测执行任务数超过20个,即可判定为推测执行过多 alt text

    当前spark3.2.1

    spark.speculation	true
    spark.speculation.interval	5s
    spark.speculation.multiplier	2
    --3.2.1默认值 
    spark.speculation.multiplier 1.5 //一项任务的速度要比平均速度慢多少倍,才能被纳入推测执行的范围之内。
    spark.speculation.quantile	0.75 //在为特定阶段启用推测之前必须完成的任务比例。
    

    建议优化

    Job耗时异常分析

    Job中空闲时间 (job总时间 - stage累计时间) 与总时间的占比超过30.00%%,即判定为Job耗时异常

    Stage耗时异常分析

    Stage空闲时间 (stage运行时间-任务运行时间) 与stage运行时间的占比超过30.0%,即判定为Stage耗时异常

    基线时间异常

    相对于历史正常结束时间,提前结束或晚点结束的任务

    待补充更多的诊断逻辑分析

    后续优化

    默认诊断不符合当前效果,后续需要结合实际场景,给出优化建议