During workflow execution, to reduce waiting time, it is often necessary to execute Agents in parallel. The Blades framework provides the NewParallelAgent method to construct parallel Agents.

Using NewParallelAgent requires passing a parameter of type ParallelConfig, which has the following structure:
type ParallelConfig struct { Name string Description string SubAgents []blades.Agent}Parallel Agent uses NewParallelAgent to instantiate a parallel Agent instance. When running the parallel Agent, it will directly loop through and run all Agents in SubAgents, using a concurrency-safe but fixed-buffer message queue to collect the streaming output from all Agents, and finally use yield to return the results.
Here we use an example to illustrate how to use parallel Agents in Blades.
editorAgent1, err := blades.NewAgent( "editorAgent1", blades.WithModel(model), blades.WithInstruction(`Edit the paragraph for grammar. **Paragraph:** {{.draft}} `), blades.WithOutputKey("grammar_edit"),)editorAgent2, err := blades.NewAgent( "editorAgent1", blades.WithModel(model), blades.WithInstruction(`Edit the paragraph for style. **Paragraph:** {{.draft}} `), blades.WithOutputKey("style_edit"),)parallelAgent := flow.NewParallelAgent(flow.ParallelConfig{ Name: "EditorParallelAgent", Description: "Edits the drafted paragraph in parallel for grammar and style.", SubAgents: []blades.Agent{ editorAgent1, editorAgent2, },})session := blades.NewSession()input := blades.UserMessage("Please write a short paragraph about climate change.")runner := blades.NewRunner(parallelAgent, blades.WithSession(session))stream := runner.RunStream(context.Background(), input)for message, err := range stream { if err != nil { log.Fatal(err) } // Only log completed messages if message.Status != blades.StatusCompleted { continue } log.Println(message.Author, message.Text())}package main
import ( "context" "log" "os"
"github.com/go-kratos/blades" "github.com/go-kratos/blades/contrib/openai" "github.com/go-kratos/blades/flow")
func main() { model := openai.NewModel(os.Getenv("OPENAI_MODEL"), openai.Config{ APIKey: os.Getenv("OPENAI_API_KEY"), }) editorAgent1, err := blades.NewAgent( "editorAgent1", blades.WithModel(model), blades.WithInstruction(`Edit the paragraph for grammar. **Paragraph:** {{.draft}} `), blades.WithOutputKey("grammar_edit"), ) if err != nil { log.Fatal(err) } editorAgent2, err := blades.NewAgent( "editorAgent1", blades.WithModel(model), blades.WithInstruction(`Edit the paragraph for style. **Paragraph:** {{.draft}} `), blades.WithOutputKey("style_edit"), ) if err != nil { log.Fatal(err) } parallelAgent := flow.NewParallelAgent(flow.ParallelConfig{ Name: "EditorParallelAgent", Description: "Edits the drafted paragraph in parallel for grammar and style.", SubAgents: []blades.Agent{ editorAgent1, editorAgent2, }, }) session := blades.NewSession() input := blades.UserMessage("Please write a short paragraph about climate change.") runner := blades.NewRunner(parallelAgent, blades.WithSession(session)) stream := runner.RunStream(context.Background(), input) for message, err := range stream { if err != nil { log.Fatal(err) } // Only log completed messages if message.Status != blades.StatusCompleted { continue } log.Println(message.Author, message.Text()) }}