在 workflow 执行过程中,为减少等待时间常常需要并行执行Agent,Blades框架提供了 NewParallelAgent 方法构建并行Agent。

使用 NewParallelAgent 需要传递参数类型 ParallelConfig ,该类型结构如下:
type ParallelConfig struct { Name string Description string SubAgents []blades.Agent}并行Agent 使用 NewParallelAgent 实例化一个并行Agent实例,在运行 并行Agent 时会直接循环运行 SubAgents 内的所有Agent,并使用并发安全但缓冲固定的的消息队列收集所有Agent的流式输出,最后使用 yield 返回结果。
在此处我们使用一个示例说明在 Blades 中该如何使用 并行Agent
editorAgent1, err := blades.NewAgent( "editorAgent1", blades.WithModel(model), blades.WithInstruction(`Edit the paragraph for grammar. **Paragraph:** {{.draft}} `), blades.WithOutputKey("grammar_edit"),)editorAgent2, err := blades.NewAgent( "editorAgent1", blades.WithModel(model), blades.WithInstruction(`Edit the paragraph for style. **Paragraph:** {{.draft}} `), blades.WithOutputKey("style_edit"),)parallelAgent := flow.NewParallelAgent(flow.ParallelConfig{ Name: "EditorParallelAgent", Description: "Edits the drafted paragraph in parallel for grammar and style.", SubAgents: []blades.Agent{ editorAgent1, editorAgent2, },})session := blades.NewSession()input := blades.UserMessage("Please write a short paragraph about climate change.")runner := blades.NewRunner(parallelAgent, blades.WithSession(session))stream := runner.RunStream(context.Background(), input)for message, err := range stream { if err != nil { log.Fatal(err) } // Only log completed messages if message.Status != blades.StatusCompleted { continue } log.Println(message.Author, message.Text())}package main
import ( "context" "log" "os"
"github.com/go-kratos/blades" "github.com/go-kratos/blades/contrib/openai" "github.com/go-kratos/blades/flow")
func main() { model := openai.NewModel(os.Getenv("OPENAI_MODEL"), openai.Config{ APIKey: os.Getenv("OPENAI_API_KEY"), }) editorAgent1, err := blades.NewAgent( "editorAgent1", blades.WithModel(model), blades.WithInstruction(`Edit the paragraph for grammar. **Paragraph:** {{.draft}} `), blades.WithOutputKey("grammar_edit"), ) if err != nil { log.Fatal(err) } editorAgent2, err := blades.NewAgent( "editorAgent1", blades.WithModel(model), blades.WithInstruction(`Edit the paragraph for style. **Paragraph:** {{.draft}} `), blades.WithOutputKey("style_edit"), ) if err != nil { log.Fatal(err) } parallelAgent := flow.NewParallelAgent(flow.ParallelConfig{ Name: "EditorParallelAgent", Description: "Edits the drafted paragraph in parallel for grammar and style.", SubAgents: []blades.Agent{ editorAgent1, editorAgent2, }, }) session := blades.NewSession() input := blades.UserMessage("Please write a short paragraph about climate change.") runner := blades.NewRunner(parallelAgent, blades.WithSession(session)) stream := runner.RunStream(context.Background(), input) for message, err := range stream { if err != nil { log.Fatal(err) } // Only log completed messages if message.Status != blades.StatusCompleted { continue } log.Println(message.Author, message.Text()) }}