Awesome
________________ ______
__ __ \_ ____/____________ ___________ /_
_ / / / / __ __ ___/ __ `/__ __ \_ __ \
/ /_/ // /_/ / _ / / /_/ /__ /_/ / / / /
\____/ \____/ /_/ \__,_/ _ .___//_/ /_/
/_/
OGraph: A simple way to build a pipeline with Go
<p align="left"> <a href="https://github.com/symphony09/ograph"><img src="https://badgen.net/badge/langs/Golang?list=1" alt="languages"></a> <a href="https://github.com/symphony09/ograph"><img src="https://badgen.net/badge/os/MacOS,Linux,Windows/cyan?list=1" alt="os"></a> </p>OGraph 是一个用 Go
实现的图流程执行框架。
你可以通过构建Pipeline
(流水线),来控制依赖元素依次顺序执行、非依赖元素并发执行的调度功能。
此外,OGraph 还提供了丰富的重试,超时限制,执行追踪等开箱即用的特征。
同类项目对比
OGraph 受启发于另一个 C++
项目 CGraph。但 OGraph 并不等于 Go 版本的 CGraph。
功能对比
和 CGraph 一样,OGraph 也提供基本的构图和调度执行能力,但有以下几点关键不同:
-
用 Go 实现,使用协程而非线程进行调度,更轻量灵活
-
支持通过 Wrapper 来自定义循环、执行条件判断、错误处理等逻辑,并可以随意组合
-
支持导出图结构,再在别处导入执行(符合限制的情况下)
-
灵活的虚节点设置,用以简化依赖关系
性能对比
经过 Benchmark 测试,OGraph 和 CGraph 的性能在同一水平。如果在 io 密集场景下,OGraph 更有优势。
OGraph 性能测试参考
限制 8 核,三个场景(并发32节点,串行32节点,复杂情况模拟6节点)分别执行 100 w 次
cd test
go test -bench='(Concurrent_32|Serial_32|Complex_6)$' -benchtime=1000000x -benchmem -cpu=8
输出结果
goos: linux
goarch: amd64
pkg: github.com/symphony09/ograph/test
cpu: AMD Ryzen 5 5600G with Radeon Graphics
BenchmarkConcurrent_32-8 1000000 9669 ns/op 2212 B/op 64 allocs/op
BenchmarkSerial_32-8 1000000 1761 ns/op 712 B/op 15 allocs/op
BenchmarkComplex_6-8 1000000 3118 ns/op 1152 B/op 26 allocs/op
PASS
ok github.com/symphony09/ograph/test 14.553s
快速开始
第一步:声明一个 Node 接口实现
type Person struct {
ograph.BaseNode
}
func (person *Person) Run(ctx context.Context, state ogcore.State) error {
fmt.Printf("Hello, i am %s.\n", person.Name())
return nil
}
上面代码中 Person 组合了 BaseNode,并覆写了 Node 接口方法 Run。
第二步:构建一个 Pipeline 并运行
func TestHello(t *testing.T) {
pipeline := ograph.NewPipeline()
zhangSan := ograph.NewElement("ZhangSan").UseNode(&Person{})
liSi := ograph.NewElement("LiSi").UseNode(&Person{})
pipeline.Register(zhangSan).
Register(liSi, ograph.Rely(zhangSan))
if err := pipeline.Run(context.TODO(), nil); err != nil {
t.Error(err)
}
}
上面代码在 pipeline 中注册了两个 Person 节点(zhangSan、liSi),并指定 liSi 依赖于 zhangSan。
输出结果
Hello, i am ZhangSan.
Hello, i am LiSi.
更多示例
更多示例代码,请参考 example 目录下代码。
示例文件名 | 示例说明 |
---|---|
e01_hello_test.go | 演示基本流程 |
e02_state_test.go | 演示如何在节点间分享状态数据 |
e03_factory_test.go | 演示如何用工厂模式创建节点 |
e04_param_test.go | 演示如何设置节点参数 |
e05_wrapper_test.go | 演示如何使用 wrapper 增强节点功能 |
e06_cluster_test.go | 演示如何使用 cluster 灵活调度多个节点 |
e07_global_test.go | 演示如何全局注册工厂函数 |
e08_virtual_test.go | 演示如何使用虚拟节点简化依赖关系 |
e09_interrupter_test.go | 演示如何在pipeline 运行过程中插入中断 |
e10_compose_test.go | 演示怎么组合嵌套pipeline |
e11_advance_test.go | 一些进阶用法,包含图校验、导出,池预热等 |
开箱即用
ograph 提供了一些比较通用的节点实现:
名称 | 类型 | 作用 | 文档 |
---|---|---|---|
CMD | 普通节点 | 命令行执行 | 链接 |
HttpReq | 普通节点 | HTTP请求 | 链接 |
Choose | 簇 | 在多个节点中选择一个执行 | 施工中 |
Parallel | 簇 | 并发执行多个节点 | 链接 |
Queue | 簇 | 队列顺序执行多个节点 | 链接 |
Race | 簇 | 多个节点竞争执行 | 施工中 |
Async | 包装器 | 异步执行被包装节点 | 链接 |
Condition | 包装器 | 按条件判断是否执行被包装节点 | 施工中 |
Debug | 包装器 | 调试被包装节点 | 链接 |
Delay | 包装器 | 延迟执行被包装节点 | 链接 |
Loop | 包装器 | 循环执行被包装节点 | 链接 |
Retry | 包装器 | 重试失败节点 | 链接 |
Silent | 包装器 | 抑制节点报错失败 | 链接 |
Timeout | 包装器 | 节点超时控制 | 链接 |
Trace | 包装器 | 追踪被包装节点的执行过程 | 链接 |
Q&A
导出导入图的限制是什么?
所有节点需要是以工厂方式创建,导入图的 pipeline 需要已注册节点对应的工厂。
为什么提供多种节点创建方式(UseNode,UseFactory,UseFn)?
对于简单场景直接注册单例和运行函数比较方便,但要考虑 pipeline 并发执行问题和图导入导出时,就需要使用工厂方式。
State 存取是并发安全的吗?
默认使用的 state 是并发安全的,但如果是使用了自定义实现则无法保证并发安全。
怎么达到最佳性能,有最佳实践吗?
由于协程轻量灵活,一般不用做调整优化,如果节点初始化比较慢可以考虑预热 worker 池。