Skip to content

稳住宝,安排! 我直接给你整一个 基于 DAG 的 Golang 任务调度核心示例: 结构清晰、逻辑明了、可扩展性强,适合做你自己的调度引擎原型。


🧠 项目目标:

实现一个小型的 DAG 任务调度器

  • 支持添加任务(带依赖)
  • 自动识别依赖拓扑
  • 按依赖顺序调度执行
  • 支持并发执行无依赖任务

📦 一、任务定义结构

go
type Task struct {
    Name     string
    Run      func() error         // 任务的执行逻辑
    DependsOn []string            // 当前任务依赖哪些任务
}

🧱 二、任务调度器结构

go
type Scheduler struct {
    Tasks map[string]*Task         // 所有任务
    Graph map[string][]string      // 邻接表:任务依赖图
    InDegree map[string]int        // 拓扑排序用:入度
    mu sync.Mutex
}

🛠️ 三、初始化任务与调度器

go
func NewScheduler(tasks []*Task) *Scheduler {
    s := &Scheduler{
        Tasks: make(map[string]*Task),
        Graph: make(map[string][]string),
        InDegree: make(map[string]int),
    }

    for _, task := range tasks {
        s.Tasks[task.Name] = task
        for _, dep := range task.DependsOn {
            s.Graph[dep] = append(s.Graph[dep], task.Name)
            s.InDegree[task.Name]++
        }
        if _, ok := s.InDegree[task.Name]; !ok {
            s.InDegree[task.Name] = 0
        }
    }

    return s
}

🚀 四、开始调度执行 DAG

go
func (s *Scheduler) Run() {
    var wg sync.WaitGroup
    queue := make(chan string, len(s.Tasks))

    // 初始化:把入度为 0 的任务放入队列
    for name, degree := range s.InDegree {
        if degree == 0 {
            queue <- name
        }
    }

    for len(queue) > 0 {
        taskName := <-queue
        task := s.Tasks[taskName]

        wg.Add(1)
        go func(t *Task) {
            defer wg.Done()
            fmt.Printf("👉 执行任务:%s\n", t.Name)
            err := t.Run()
            if err != nil {
                fmt.Printf("❌ 任务 %s 失败: %v\n", t.Name, err)
                return
            }

            s.mu.Lock()
            for _, next := range s.Graph[t.Name] {
                s.InDegree[next]--
                if s.InDegree[next] == 0 {
                    queue <- next
                }
            }
            s.mu.Unlock()
        }(task)
    }

    wg.Wait()
    fmt.Println("✅ 所有任务完成")
}

🔍 五、举个完整例子(模拟执行)

go
func main() {
    tasks := []*Task{
        {
            Name: "A",
            Run: func() error {
                time.Sleep(1 * time.Second)
                fmt.Println("任务 A 完成")
                return nil
            },
        },
        {
            Name: "B",
            DependsOn: []string{"A"},
            Run: func() error {
                time.Sleep(1 * time.Second)
                fmt.Println("任务 B 完成")
                return nil
            },
        },
        {
            Name: "C",
            DependsOn: []string{"A"},
            Run: func() error {
                time.Sleep(2 * time.Second)
                fmt.Println("任务 C 完成")
                return nil
            },
        },
        {
            Name: "D",
            DependsOn: []string{"B", "C"},
            Run: func() error {
                fmt.Println("任务 D 完成")
                return nil
            },
        },
    }

    scheduler := NewScheduler(tasks)
    scheduler.Run()
}

🧠 执行顺序示意(DAG 拓扑):

     A
    / \
   B   C
    \ /
     D
  • A 先执行
  • B、C 并发执行
  • B、C 完成后 D 执行

✅ 总结亮点:

点位描述
✅ 支持拓扑排序自动解决执行顺序
✅ 并发执行无依赖的任务并发跑
✅ 清晰依赖模型灵活定义依赖关系
✅ 可扩展你可以加失败重试、调度策略、持久化等

如果你想把这个原型升维做成调度系统(比如:定时触发、任务日志、Web 可视化 DAG),我也可以直接带你一步步撸出来。

骚年,这波你拿着可以当作调度引擎骨架了,要不要我再加一套任务失败重试 + 依赖检查的功能?💥