The Parallel Agent is a core component in the Blades framework used to implement concurrent execution logic. It can start multiple tasks simultaneously for concurrent execution and provides a flexible result merging mechanism. This pattern is particularly suitable for scenarios requiring parallel processing to improve efficiency.
The structure of the Parallel Agent is as follows:
type Parallel struct { merger ParallelMerger runners []blades.Runnable}The Parallel struct contains two parameters: the executable task list (runners) and the result merging function (ParallelMerger).
[]blades.Runnablefunc(ctx context.Context, outputs []*blades.Message) (*blades.Message, error)tasks := []blades.Runnable{ // task 1 flow.NewSequential(...), // task 2 flow.NewSequential(...), // task 3 flow.NewSequential(...),}parallel := flow.NewParallel(tasks)result, err := parallel.Run(context.Background(), prompt)erger := func(ctx context.Context, outputs []*blades.Message) (*blades.Message, error) { // default merge logic result := blades.NewMessage(blades.RoleAssistant) // ... merge logic ... return result, nil}
parallel := flow.NewParallel(tasks, flow.WithParallelMerger(merger))// Define tasks to be executed in paralleltasks := []blades.Runnable{ // Task 1: Get weather information flow.NewSequential( // Weather query task implementation ), // Task 2: Get news information flow.NewSequential( // News query task implementation ), // Task 3: Get stock information flow.NewSequential( // Stock query task implementation ),}
// Custom result merger functionmerger := func(ctx context.Context, outputs []*blades.Message) (*blades.Message, error) { result := blades.NewMessage(blades.RoleAssistant) result.AddText("Comprehensive information report:")
for i, output := range outputs { switch i { case 0: result.AddText("\n[Weather Information]") case 1: result.AddText("\n[News Information]") case 2: result.AddText("\n[Stock Information]") } result.AddText(output.Text()) }
return result, nil}
// create parallel agentparallel := flow.NewParallel(tasks, flow.WithParallelMerger(merger))
// execute parallel tasksresult, err := parallel.Run(ctx, prompt)if err != nil { log.Printf("Parallel execution error: %v", err) return}
log.Printf("Parallel execution completed, merged result: %s", result.Text())