Skip to content

Source Code Analysis

2 posts with the tag “Source Code Analysis”

Kratos Learning Notes - Distributed Tracing with OpenTelemetry

Distributed tracing (also known as distributed request tracing) is a method used to analyze and monitor applications, especially those built using microservices architecture. Distributed tracing helps pinpoint where failures occur and what causes poor performance.

The term “Distributed Tracing” first appeared in the paper 《Dapper, a Large-Scale Distributed Systems Tracing Infrastructure》 published by Google. This paper has had a profound influence on the design concepts of later open-source distributed tracing projects such as Jaeger and Zipkin.

Microservices architecture is a distributed architecture with many different services. Different services call each other, and if an error occurs, a single request may pass through N services. As business grows, the number of service calls increases. Without a tool to record the call chain, solving problems becomes like untangling a ball of yarn played by a kitten in the image below—utterly confusing and difficult to start. image.png Therefore, a tool is needed to clearly understand which services a request has passed through and in what order, making it easy to locate issues. image.png

After Google released Dapper, more and more distributed tracing tools emerged. Below are some commonly used distributed tracing systems:

  • Skywalking
  • Alibaba Eagle Eye
  • Dianping CAT
  • Twitter Zipkin
  • Naver Pinpoint
  • Uber Jaeger

As the number of tracing tools increased, the open-source community was mainly divided into two factions. One faction followed the OpenTracing specification led by the CNCF Technical Committee, such as Jaeger and Zipkin, which adhere to the OpenTracing specification. The other faction was OpenCensus, initiated by Google. Google, being the first company to propose the concept of distributed tracing, was later joined by Microsoft in OpenCensus. 截屏2021-05-29 下午9.56.57.png

OpenTelemetry is a set of APIs, SDKs, modules, and integrations designed for creating and managing‎‎telemetry data‎‎(such as traces, metrics, and logs).

After Microsoft joined OpenCensus, it directly disrupted the previous balance, indirectly leading to the birth of OpenTelemetry. Google and Microsoft were determined to end the chaos in the community. The primary issue was how to integrate the existing projects from both communities. The main concept of OpenTelemetry is to be compatible with both OpenCensus and OpenTracing, allowing users to integrate OpenTelemetry with little or no modification.

Kratos is a lightweight Go microservices framework that includes a large number of microservices-related frameworks and tools.

Among the built-in middleware provided by the Kratos framework, there is a tracing middleware that implements distributed tracing functionality based on OpenTelemetry. The middleware code can be found at middleware/tracing.

The Kratos tracing middleware consists of three files: carrie.go, tracer.go, and tracing.go. The implementation principles for client and server are similar. This article uses the server implementation for principle analysis.

  1. First, when a request comes in, the tracing middleware is called, and the NewTracer method in tracer.go is invoked first.
// Server returns a new server middleware for OpenTelemetry.
func Server(opts ...Option) middleware.Middleware {
// Call NewTracer in tracer.go, passing a SpanKindServer and configuration options
tracer := NewTracer(trace.SpanKindServer, opts...)
// ... Code omitted
}
  1. After the NewTracer method in tracer.go is called, it returns a Tracer. The implementation is as follows:
func NewTracer(kind trace.SpanKind, opts ...Option) *Tracer {
options := options{}
for _, o := range opts {
o(&options)
}
// Check if an otel tracing provider configuration exists; if so, set it
if options.TracerProvider != nil {
otel.SetTracerProvider(options.TracerProvider)
}
/*
Check if Propagators are set; if so, set them, otherwise set a default TextMapPropagator.
Note: If the default TextMapPropagator is not set, trace information cannot be correctly propagated.
*/
if options.Propagators != nil {
otel.SetTextMapPropagator(options.Propagators)
} else {
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.Baggage{}, propagation.TraceContex{}))
}
var name string
// Determine the type of middleware: server or client
if kind == trace.SpanKindServer {
name = "server"
} else if kind == trace.SpanKindClient {
name = "client"
} else {
panic(fmt.Sprintf("unsupported span kind: %v", kind))
}
// Call the Tracer method of the otel package, passing the name to create a tracer instance
tracer := otel.Tracer(name)
return &Tracer{tracer: tracer, kind: kind}
}
  1. Determine the current request type, process the data to be collected, and call the Start method in tracer.go.
var (
component string
operation string
carrier propagation.TextMapCarrier
)
// Determine the request type
if info, ok := http.FromServerContext(ctx); ok {
// HTTP
component = "HTTP"
// Extract the request URI
operation = info.Request.RequestURI
// Call HeaderCarrier from the otel/propagation package to handle http.Header to satisfy the TextMapCarrier interface
// TextMapCarrier is a text mapping carrier used to carry information
carrier = propagation.HeaderCarrier(info.Request.Header)
// The otel.GetTextMapPropagator().Extract() method is used to read the text mapping carrier into the context
ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(info.Request.Header))
} else if info, ok := grpc.FromServerContext(ctx); ok {
// Grpc
component = "gRPC"
operation = info.FullMethod
//
// Call metadata.FromIncomingContext(ctx) from the grpc/metadata package, passing ctx to convert gRPC metadata
if md, ok := metadata.FromIncomingContext(ctx); ok {
// Call MetadataCarrier in carrier.go to convert MD into a text mapping carrier
carrier = MetadataCarrier(md)
}
}
// Call the tracer.Start method
ctx, span := tracer.Start(ctx, component, operation, carrier)
// ... Code omitted
  1. Call the Start method in tracing.go.
func (t *Tracer) Start(ctx context.Context, component string, operation string, carrier propagation.TextMapCarrier) (context.Context, trace.Span) {
// If the current middleware is server, inject the carrier into the context
if t.kind == trace.SpanKindServer {
ctx = otel.GetTextMapPropagator().Extract(ctx, carrier)
}
// Call the start method in the otel/tracer package to create a span
ctx, span := t.tracer.Start(ctx,
// The request route declared in tracing.go is used as the spanName
operation,
// Set span attributes, setting a component with the value of the request type
trace.WithAttributes(attribute.String("component", component)),
// Set the span kind
trace.WithSpanKind(t.kind),
)
// If the current middleware is client, inject the carrier into the request
if t.kind == trace.SpanKindClient {
otel.GetTextMapPropagator().Inject(ctx, carrier)
}
return ctx, span
}
  1. defer declares a closure method.
// Note: Use a closure here because defer parameters are evaluated in real-time. If an exception occurs, err will remain nil.
// https://github.com/go-kratos/kratos/issues/927
defer func() { tracer.End(ctx, span, err) }()
  1. The middleware continues execution.
// tracing.go line 69
reply, err = handler(ctx, req)
  1. After the middleware call ends, the closure in defer is called, executing the End method in tracer.go.
func (t *Tracer) End(ctx context.Context, span trace.Span, err error) {
// Check if an error occurred; if so, set some error information
if err != nil {
// Record the error
span.RecordError(err)
// Set span attributes
span.SetAttributes(
// Set event to error
attribute.String("event", "error"),
// Set message to err.Error().
attribute.String("message", err.Error()),
)
// Set the span status
span.SetStatus(codes.Error, err.Error())
} else {
// If no error occurred, set the span status to ok
span.SetStatus(codes.Ok, "OK")
}
// End the span
span.End()
}

Examples of using the tracing middleware can be found at kratos/examples/traces. This example simply implements cross-service distributed tracing. The following code snippets include parts of the example code.

// https://github.com/go-kratos/kratos/blob/7f835db398c9d0332e69b81bad4c652b4b45ae2e/examples/traces/app/message/main.go#L38
// First, call the otel library method to get a TracerProvider
func tracerProvider(url string) (*tracesdk.TracerProvider, error) {
// examples/traces uses jaeger; for other methods, refer to the official OpenTelemetry examples
exp, err := jaeger.NewRawExporter(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
if err != nil {
return nil, err
}
tp := tracesdk.NewTracerProvider(
tracesdk.WithSampler(tracesdk.AlwaysSample()),
// Set Batcher, register the jaeger exporter
tracesdk.WithBatcher(exp),
// Record some default information
tracesdk.WithResource(resource.NewWithAttributes(
semconv.ServiceNameKey.String(pb.User_ServiceDesc.ServiceName),
attribute.String("environment", "development"),
attribute.Int64("ID", 1),
)),
)
return tp, nil
}
// https://github.com/go-kratos/kratos/blob/main/examples/traces/app/message/main.go
grpcSrv := grpc.NewServer(
grpc.Address(":9000"),
grpc.Middleware(
// Configuring tracing Middleware
tracing.Server(
tracing.WithTracerProvider(tp),
),
),
)
// https://github.com/go-kratos/kratos/blob/149fc0195eb62ee1fbc2728adb92e1bcd1a12c4e/examples/traces/app/user/main.go#L63
conn, err := grpc.DialInsecure(ctx,
grpc.WithEndpoint("127.0.0.1:9000"),
grpc.WithMiddleware(
tracing.Client(
tracing.WithTracerProvider(s.tracer),
tracing.WithPropagators(
propagation.NewCompositeTextMapPropagator(propagation.Baggage{}, propagation.TraceContext{}),
),
)
),
grpc.WithTimeout(2*time.Second),
)
// https://github.com/go-kratos/kratos/blob/main/examples/traces/app/user/main.go
httpSrv := http.NewServer(http.Address(":8000"))
httpSrv.HandlePrefix("/", pb.NewUserHandler(s,
http.Middleware(
// Configuring tracing middleware
tracing.Server(
tracing.WithTracerProvider(tp),
tracing.WithPropagators(
propagation.NewCompositeTextMapPropagator(propagation.Baggage{}, propagation.TraceContext{}),
),
),
),
)
http.NewClient(ctx, http.WithMiddleware(
tracing.Client(
tracing.WithTracerProvider(s.tracer),
),
))

How to Implement Tracing for Other Scenarios

Section titled “How to Implement Tracing for Other Scenarios”

We can learn from the code of Kratos’ tracing middleware to implement tracing for other scenarios, such as database tracing. The following code snippet shows how the author implemented tracing for the qmgo library operating on MongoDB by referencing the tracing middleware.

func mongoTracer(ctx context.Context,tp trace.TracerProvider, command interface{}) {
var (
commandName string
failure string
nanos int64
reply bson.Raw
queryId int64
eventName string
)
otel.SetTracerProvider(tp)
reply = bson.Raw{}
switch value := command.(type) {
case *event.CommandStartedEvent:
commandName = value.CommandName
reply = value.Command
queryId = value.RequestID
eventName = "CommandStartedEvent"
case *event.CommandSucceededEvent:
commandName = value.CommandName
nanos = value.DurationNanos
queryId = value.RequestID
eventName = "CommandSucceededEvent"
case *event.CommandFailedEvent:
commandName = value.CommandName
failure = value.Failure
nanos = value.DurationNanos
queryId = value.RequestID
eventName = "CommandFailedEvent"
}
duration, _ := time.ParseDuration(strconv.FormatInt(nanos, 10) + "ns")
tracer := otel.Tracer("mongodb")
kind := trace.SpanKindServer
ctx, span := tracer.Start(ctx,
commandName,
trace.WithAttributes(
attribute.String("event", eventName),
attribute.String("command", commandName),
attribute.String("query", reply.String()),
attribute.Int64("queryId", queryId),
attribute.String("ms", duration.String()),
),
trace.WithSpanKind(kind),
)
if failure != "" {
span.RecordError(errors.New(failure))
}
span.End()
}

Kratos Study Notes - A Simple Analysis of How the Application Runs via Layout

0X01 Exploring Kratos Operation Principles via Layout (kratos v2.0.0-beta4)

Section titled “0X01 Exploring Kratos Operation Principles via Layout (kratos v2.0.0-beta4)”

First, install the necessary dependencies and tools:

  • go
  • protoc
  • protoc-gen-go
Terminal window
# Create project template
kratos new helloworld
cd helloworld
# Pull project dependencies
go mod download
# Generate proto template
kratos proto add api/helloworld/v1/helloworld.proto
# Generate proto source code
kratos proto client api/helloworld/v1/helloworld.proto
# Generate server template
kratos proto server api/helloworld/v1/helloworld.proto -t internal/service

After executing the commands, a service project will be generated in the current directory. The project structure is as follows. For detailed project structure description, please refer to layout image.png

Terminal window
# Generate all proto source code, wire, etc.
go generate ./...
# Compile into executable
go build -o ./bin/ ./...
# Run project
./bin/helloworld -conf ./configs

If you see the following output, it indicates the project started normally.

Terminal window
level=INFO module=app service_id=7114ad8a-b3bf-11eb-a1b9-f0189850d2cb service_name= version=
level=INFO module=transport/grpc msg=[gRPC] server listening on: [::]:9000
level=INFO module=transport/http msg=[HTTP] server listening on: [::]:8000

Test the interface:

Terminal window
curl 'http://127.0.0.1:8000/helloworld/krtaos'
Output:
{
"message": "Hello kratos"
}

image.png Through the above diagram👆, we can intuitively observe the application’s call chain. Simplified, the process is as shown below👇

未命名文件(2).png

1. Inject Dependencies and Call newApp() Method

Section titled “1. Inject Dependencies and Call newApp() Method”
helloword/cmd/main.go
func main() {
flag.Parse()
logger := log.NewStdLogger(os.Stdout)
// Call go-kratos/kratos/v2/config, create config instance, and specify source and configuration parsing method
c := config.New(
config.WithSource(
file.NewSource(flagconf),
),
config.WithDecoder(func(kv *config.KeyValue, v map[string]interface{}) error {
return yaml.Unmarshal(kv.Value, v)
}),
)
if err := c.Load(); err != nil {
panic(err)
}
// Scan configuration into the conf struct declared via proto
var bc conf.Bootstrap
if err := c.Scan(&bc); err != nil {
panic(err)
}
// Inject dependencies via wire and call newApp method
app, cleanup, err := initApp(bc.Server, bc.Data, logger)
if err != nil {
panic(err)
}
// Code omitted...
}

In the project’s main.go newApp() method, the kratos.New() method from go-kratos/kratos/v2/app.go is called.

helloword/cmd/main.go
func newApp(logger log.Logger, hs *http.Server, gs *grpc.Server) *kratos.App {
return kratos.New(
// Configure application
kratos.Name(Name),
kratos.Version(Version),
kratos.Metadata(map[string]string{}),
kratos.Logger(logger),
// http/grpc services passed via kratos.Server() will be converted to registry.ServiceInstance struct* via buildInstance()
kratos.Server(
hs,
gs,
),
)
}

This method returns an App struct, containing Run() and Stop() methods.

go-kratos/kratos/v2/app.go
type App struct {
opts options // Configuration
ctx context.Context // Context
cancel func() // Context cancellation method
instance *registry.ServiceInstance // Instance declared via kratos.Server(), converted via buildInstance() to *registry.ServiceInstance struct
log *log.Helper // Log
}
// Run executes all OnStart hooks registered with the application's Lifecycle.
func (a *App) Run() error {
// Code omitted...
}
// Stop gracefully stops the application.
func (a *App) Stop() error {
// Code omitted...
}

The project calls the Run() method of kratos.App struct in the main method.

helloword/cmd/main.go
// Code omitted...
// Start Kratos
if err := app.Run(); err != nil {
panic(err)
}

Implementation details of the Run() method:

go-kratos/kratos/v2/app.go
func (a *App) Run() error {
a.log.Infow(
"service_id", a.opts.id,
"service_name", a.opts.name,
"version", a.opts.version,
)
g, ctx := errgroup.WithContext(a.ctx)
// Iterate through service instances declared via kratos.Server()
for _, srv := range a.opts.servers {
srv := srv
// Execute two goroutines to handle service startup and shutdown
g.Go(func() error {
<-ctx.Done() // Block, wait for cancel method call
return srv.Stop() // After goroutine exits, call instance's stop method
})
g.Go(func() error {
return srv.Start() // Call instance's run method
})
}
// Check if kratos.Registrar() is configured for service discovery registry
if a.opts.registrar != nil {
// Register instance to registry
if err := a.opts.registrar.Register(a.opts.ctx, a.instance); err != nil
return err
}
}
// Listen for process exit signals
c := make(chan os.Signal, 1)
signal.Notify(c, a.opts.sigs...)
// Handle process exit and context exit
g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-c:
// Call kratos.App's stop method
a.Stop()
}
}
})
if err := g.Wait(); err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil
}

When starting, the Kratos instance listens for system process exit signals. Upon receiving an exit signal, kratos calls the Stop() method of the App struct.

go-kratos/kratos/v2/app.go
func (a *App) Stop() error {
// Check if registry configuration exists
if a.opts.registrar != nil {
// Deregister instance from registry
if err := a.opts.registrar.Deregister(a.opts.ctx, a.instance); err != nil {
return err
}
}
// Control goroutine exit. When a.cancel() is called, the <-ctx.Done() listener in Run() method receives the message, unblocks, and then calls the server's Stop() method to stop the service.
if a.cancel != nil {
a.cancel()
}
return nil
}