Appearance
稳住宝,安排! 我直接给你整一个 基于 DAG 的 Golang 任务调度核心示例: 结构清晰、逻辑明了、可扩展性强,适合做你自己的调度引擎原型。
🧠 项目目标:
实现一个小型的 DAG 任务调度器:
- 支持添加任务(带依赖)
- 自动识别依赖拓扑
- 按依赖顺序调度执行
- 支持并发执行无依赖任务
📦 一、任务定义结构
go
type Task struct {
Name string
Run func() error // 任务的执行逻辑
DependsOn []string // 当前任务依赖哪些任务
}🧱 二、任务调度器结构
go
type Scheduler struct {
Tasks map[string]*Task // 所有任务
Graph map[string][]string // 邻接表:任务依赖图
InDegree map[string]int // 拓扑排序用:入度
mu sync.Mutex
}🛠️ 三、初始化任务与调度器
go
func NewScheduler(tasks []*Task) *Scheduler {
s := &Scheduler{
Tasks: make(map[string]*Task),
Graph: make(map[string][]string),
InDegree: make(map[string]int),
}
for _, task := range tasks {
s.Tasks[task.Name] = task
for _, dep := range task.DependsOn {
s.Graph[dep] = append(s.Graph[dep], task.Name)
s.InDegree[task.Name]++
}
if _, ok := s.InDegree[task.Name]; !ok {
s.InDegree[task.Name] = 0
}
}
return s
}🚀 四、开始调度执行 DAG
go
func (s *Scheduler) Run() {
var wg sync.WaitGroup
queue := make(chan string, len(s.Tasks))
// 初始化:把入度为 0 的任务放入队列
for name, degree := range s.InDegree {
if degree == 0 {
queue <- name
}
}
for len(queue) > 0 {
taskName := <-queue
task := s.Tasks[taskName]
wg.Add(1)
go func(t *Task) {
defer wg.Done()
fmt.Printf("👉 执行任务:%s\n", t.Name)
err := t.Run()
if err != nil {
fmt.Printf("❌ 任务 %s 失败: %v\n", t.Name, err)
return
}
s.mu.Lock()
for _, next := range s.Graph[t.Name] {
s.InDegree[next]--
if s.InDegree[next] == 0 {
queue <- next
}
}
s.mu.Unlock()
}(task)
}
wg.Wait()
fmt.Println("✅ 所有任务完成")
}🔍 五、举个完整例子(模拟执行)
go
func main() {
tasks := []*Task{
{
Name: "A",
Run: func() error {
time.Sleep(1 * time.Second)
fmt.Println("任务 A 完成")
return nil
},
},
{
Name: "B",
DependsOn: []string{"A"},
Run: func() error {
time.Sleep(1 * time.Second)
fmt.Println("任务 B 完成")
return nil
},
},
{
Name: "C",
DependsOn: []string{"A"},
Run: func() error {
time.Sleep(2 * time.Second)
fmt.Println("任务 C 完成")
return nil
},
},
{
Name: "D",
DependsOn: []string{"B", "C"},
Run: func() error {
fmt.Println("任务 D 完成")
return nil
},
},
}
scheduler := NewScheduler(tasks)
scheduler.Run()
}🧠 执行顺序示意(DAG 拓扑):
A
/ \
B C
\ /
D- A 先执行
- B、C 并发执行
- B、C 完成后 D 执行
✅ 总结亮点:
| 点位 | 描述 |
|---|---|
| ✅ 支持拓扑排序 | 自动解决执行顺序 |
| ✅ 并发执行 | 无依赖的任务并发跑 |
| ✅ 清晰依赖模型 | 灵活定义依赖关系 |
| ✅ 可扩展 | 你可以加失败重试、调度策略、持久化等 |
如果你想把这个原型升维做成调度系统(比如:定时触发、任务日志、Web 可视化 DAG),我也可以直接带你一步步撸出来。
骚年,这波你拿着可以当作调度引擎骨架了,要不要我再加一套任务失败重试 + 依赖检查的功能?💥