并行智能体是Blades框架中用于实现并发执行逻辑的核心组件,它可以同时启动多个任务并发执行,并提供灵活的结果合并机制。这种模式非常适合需要并行处理以提高效率的场景。
并行智能体的结构如下:
type Parallel struct { merger ParallelMerger runners []blades.Runnable}Parallel结构体包含两个参数:可执行任务列表 (runners) 和结果合并函数 (ParallelMerger)。
[]blades.Runnablefunc(ctx context.Context, outputs []*blades.Message) (*blades.Message, error)tasks := []blades.Runnable{ // task 1 flow.NewSequential(...), // task 2 flow.NewSequential(...), // task 3 flow.NewSequential(...),}parallel := flow.NewParallel(tasks)result, err := parallel.Run(context.Background(), prompt)erger := func(ctx context.Context, outputs []*blades.Message) (*blades.Message, error) { // default merge logic result := blades.NewMessage(blades.RoleAssistant) // ... merge logic ... return result, nil}
parallel := flow.NewParallel(tasks, flow.WithParallelMerger(merger))// Define tasks to be executed in paralleltasks := []blades.Runnable{ // Task 1: Get weather information flow.NewSequential( // Weather query task implementation ), // Task 2: Get news information flow.NewSequential( // News query task implementation ), // Task 3: Get stock information flow.NewSequential( // Stock query task implementation ),}
// Custom result merger functionmerger := func(ctx context.Context, outputs []*blades.Message) (*blades.Message, error) { result := blades.NewMessage(blades.RoleAssistant) result.AddText("Comprehensive information report:")
for i, output := range outputs { switch i { case 0: result.AddText("\n[Weather Information]") case 1: result.AddText("\n[News Information]") case 2: result.AddText("\n[Stock Information]") } result.AddText(output.Text()) }
return result, nil}
// create parallel agentparallel := flow.NewParallel(tasks, flow.WithParallelMerger(merger))
// execute parallel tasksresult, err := parallel.Run(ctx, prompt)if err != nil { log.Printf("Parallel execution error: %v", err) return}
log.Printf("Parallel execution completed, merged result: %s", result.Text())