Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【开源自荐】go-taskflow: golang 原生轻量化 DAG 调度框架,内置可视化以及性能分析工具 #2823

Open
1 of 3 tasks
noneback opened this issue Oct 12, 2024 · 0 comments

Comments

@noneback
Copy link

noneback commented Oct 12, 2024

推荐项目

项目地址:https://github.com/noneback/go-taskflow

类别:Go

项目标题:Go-Taskflow - Go 语言中的静态 DAG 任务计算框架

项目描述:
Go-Taskflow 是一个面向 Go 语言的静态 DAG(有向无环图)任务计算框架,受到 Taskflow API 的启发,专为处理复杂的任务依赖管理和并发任务而设计。该框架特别注重 Go 语言的原生特性和简洁性,充分利用 goroutine 进行并发任务管理,提供简单易用的编程接口,简化复杂的任务流管理。项目设计合理,代码风格清晰,非常适合 Go 语言的初学者学习和使用,帮助他们掌握并发任务管理的基本概念与实践。此外,框架还内置了可视化和性能分析工具,使开发者能够更直观地调试和优化任务流。

亮点:

适合 Go 新人学习:代码设计合理、简单易懂,非常适合初学者学习 Go 并发和任务调度的基础概念。

原生 Go 并发支持:集成 Go 的 goroutine,简化任务并发管理,充分利用 CPU 资源。

内置可视化工具:提供任务流的图形化表示,支持使用 Graphviz 生成 DAG 图,方便调试任务依赖关系。

性能分析支持:通过火焰图分析工具,帮助开发者定位性能瓶颈并优化任务流。

高扩展性:支持静态任务与子任务流,能够轻松扩展适配不同场景。

Go-Taskflow

A static DAG (Directed Acyclic Graph) task computing framework for Go, inspired by taskflow-cpp, with Go's native capabilities and simplicity, suitable for complex dependency management in concurrent tasks.

Feature

  • High extensibility: Easily extend the framework to adapt to various specific use cases.

  • Native Go's concurrency model: Leverages Go's goroutines to manage concurrent task execution effectively.

  • User-friendly programming interface: Simplify complex task dependency management using Go.

  • Static\Subflow\Conditional tasking: Define static tasks, condition nodes, and nested subflows to enhance modularity and programmability.

    Static Subflow Condition
  • Built-in visualization & profiling tools: Generate visual representations of tasks and profile task execution performance using integrated tools, making debugging and optimization easier.

Use Cases

  • Data Pipeline: Orchestrate data processing stages that have complex dependencies.

  • Workflow Automation: Define and run automation workflows where tasks have a clear sequence and dependency structure.

  • Parallel Tasking: Execute independent tasks concurrently to fully utilize CPU resources.

Example

import latest version: go get -u github.com/noneback/go-taskflow

package main

import (
	"fmt"
	"log"
	"os"
	"runtime"
	"time"

	gotaskflow "github.com/noneback/go-taskflow"
)

func main() {
	// 1. Create An executor
	executor := gotaskflow.NewExecutor(uint(runtime.NumCPU() - 1))
	// 2. Prepare all node you want and arrenge their dependencies in a refined DAG
	tf := gotaskflow.NewTaskFlow("G")
	A, B, C :=
		gotaskflow.NewTask("A", func() {
			fmt.Println("A")
		}),
		gotaskflow.NewTask("B", func() {
			fmt.Println("B")
		}),
		gotaskflow.NewTask("C", func() {
			fmt.Println("C")
		})

	A1, B1, C1 :=
		gotaskflow.NewTask("A1", func() {
			fmt.Println("A1")
		}),
		gotaskflow.NewTask("B1", func() {
			fmt.Println("B1")
		}),
		gotaskflow.NewTask("C1", func() {
			fmt.Println("C1")
		})
	A.Precede(B)
	C.Precede(B)
	A1.Precede(B)
	C.Succeed(A1)
	C.Succeed(B1)

	subflow := gotaskflow.NewSubflow("sub1", func(sf *gotaskflow.Subflow) {
		A2, B2, C2 :=
			gotaskflow.NewTask("A2", func() {
				fmt.Println("A2")
			}),
			gotaskflow.NewTask("B2", func() {
				fmt.Println("B2")
			}),
			gotaskflow.NewTask("C2", func() {
				fmt.Println("C2")
			})
		A2.Precede(B2)
		C2.Precede(B2)
		sf.Push(A2, B2, C2)
	})

	subflow2 := gotaskflow.NewSubflow("sub2", func(sf *gotaskflow.Subflow) {
		A3, B3, C3 :=
			gotaskflow.NewTask("A3", func() {
				fmt.Println("A3")
			}),
			gotaskflow.NewTask("B3", func() {
				fmt.Println("B3")
			}),
			gotaskflow.NewTask("C3", func() {
				fmt.Println("C3")
			})
		A3.Precede(B3)
		C3.Precede(B3)
		sf.Push(A3, B3, C3)
	})

	cond := gotaskflow.NewCondition("binary", func() uint {
		return uint(time.Now().Second() % 2)
	})
	B.Precede(cond)
	cond.Precede(subflow, subflow2)

	// 3. Push all node into Taskflow
	tf.Push(A, B, C)
	tf.Push(A1, B1, C1, cond, subflow, subflow2)
	// 4. Run Taskflow via Executor
	executor.Run(tf).Wait()

	// Visualize dag if you need to check dag execution.
	if err := gotaskflow.Visualize(tf, os.Stdout); err != nil {
		log.Fatal(err)
	}
	// Profile it if you need to see which task is most time-consuming
	if err := executor.Profile(os.Stdout); err != nil {
		log.Fatal(err)
	}
}

How to use visualize taskflow

if err := gotaskflow.Visualize(tf, os.Stdout); err != nil {
		log.Fatal(err)
}

Visualize generate raw string in dot format, just use dot to draw a DAG svg.

dot

How to use profile taskflow

if err :=exector.Profile(os.Stdout);err != nil {
		log.Fatal(err)
}

Profile alse generate raw string in flamegraph format, just use flamegraph to draw a flamegraph svg.

flg

What's next

  • Conditional Tasking
  • Task Priority Schedule
  • Taskflow Loop Support
@noneback noneback changed the title 【开源自荐】 【开源自荐】go-taskflow: golang 原生轻量化 DAG 调度框架,内置可视化以及性能分析工具 Oct 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant