Skip to main content

One post tagged with "源码分析"

View All Tags

· One min read
shenqidebaozi

链路追踪的前世今生#

分布式跟踪(也称为分布式请求跟踪)是一种用于分析和监控应用程序的方法,尤其是使用微服务架构构建的应用程序。分布式跟踪有助于精确定位故障发生的位置以及导致性能差的原因。

起源#

链路追踪(Distributed Tracing) 一词最早出现于谷歌发布的论文 《Dapper, a Large-Scale Distributed Systems Tracing Infrastructure》 中,这篇论文对于实现链路追踪,对于后来出现的 Jaeger、Zipkin 等开源分布式追踪项目设计理念仍有很深的影响。

微服务架构是一个分布式的架构,会有很多个不同的服务。不同的服务之前相互调用,如果出现了错误由于一个请求经过了 N 个服务。随着业务的增加越来越多的服务之间的调用,如果没有一个工具去记录调用链,解决问题的时候就会像下面图片里小猫咪玩的毛线球一样,毫无头绪,无从下手 image.png 所以需要有一个工具能够清楚的了解一个请求经过了哪些服务,顺序是如何,从而能够轻易的定位问题。 image.png

百家争艳#

从谷歌发布 Dapper 后,分布式链路追踪工具越来越多,以下简单列举了一些常用的链路追踪系统

  • Skywalking
  • 阿里 鹰眼
  • 大众点评 CAT
  • Twitter Zipkin
  • Naver pinpoint
  • Uber Jaeger

争锋相对?#

随着链路追踪工具越来越多,开源领域主要分为两派,一派是以 CNCF技术委员 会为主的 OpenTracing 的规范,例如 jaeger zipkin 都是遵循了OpenTracing 的规范。而另一派则是谷歌作为发起者的 OpenCensus,而且谷歌本身还是最早提出链路追踪概念的公司,后期连微软也加入了 OpenCensus 截屏2021-05-29 下午9.56.57.png

OpenTelemetry 诞生#

OpenTelemetric 是一组 API、SDK、模组和集成,专为创建和管理‎‎遥测数据‎‎(如追踪、指标和日志)而设

微软加入 OpenCensus 后,直接打破了之前平衡的局面,间接的导致了 OpenTelemetry 的诞生 谷歌和微软下定决心结束江湖之乱,首要的问题是如何整合两个两个社区已有的项目,OpenTelemetry 主要的理念就是,兼容 OpenCensusOpenTracing ,可以让使用者无需改动或者很小的改动就可以接入 OpenTelemetry

Kratos 的链路追踪实践#

Kratos 一套轻量级 Go 微服务框架,包含大量微服务相关框架及工具。

tracing 中间件#

kratos 框架提供的自带中间件中有一个名为 tracing 中间件,它基于 Opentelemetry 实现了kratos 框架的链路追踪功能,中间件的代码可以从 middleware/tracing 中看到。

实现原理#

kratos 的链路追踪中间件由三个文件组成 carrie.go,tracer.go,tracing.go。client和 server 的实现原理基本相同,本文以 server 实现进行原理解析。

  1. 首先当请求进入时,tracing 中间件会被调用,首先调用了 tracer.go 中的 NewTracer 方法
// Server returns a new server middleware for OpenTelemetry.func Server(opts ...Option) middleware.Middleware {        // 调用 tracer.go 中的 NewTracer 传入了一个 SpanKindServer 和配置项    tracer := NewTracer(trace.SpanKindServer, opts...)        // ... 省略代码}
  1. tracer.go 中的 NewTracer 方法被调用后会返回一个 Tracer,实现如下
func NewTracer(kind trace.SpanKind, opts ...Option) *Tracer {    options := options{}    for _, o := range opts {        o(&options)    }    // 判断是否存在 otel 追踪提供者配置,如果存在则设置    if options.TracerProvider != nil {        otel.SetTracerProvider(options.TracerProvider)    }    /*    判断是否存在 Propagators 设置,如果存在设置则覆盖,不存在则设置一个默认的TextMapPropagator    注意如果没有设置默认的TextMapPropagator,链路信息则无法正确的传递    */    if options.Propagators != nil {        otel.SetTextMapPropagator(options.Propagators)    } else {    otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.Baggage{}, propagation.TraceContext{}))    }

    var name string    // 判断当前中间件的类型,是 server 还是 client    if kind == trace.SpanKindServer {        name = "server"    } else if kind == trace.SpanKindClient {        name = "client"    } else {        panic(fmt.Sprintf("unsupported span kind: %v", kind))    }    // 调用 otel包的 Tracer 方法 传入 name 用来创建一个 tracer 实例    tracer := otel.Tracer(name)    return &Tracer{tracer: tracer, kind: kind}}
  1. 判断当前请求类型,处理需要采集的数据,并调用 tracer.go 中的 Start 方法
var (    component string    operation string    carrier   propagation.TextMapCarrier)// 判断请求类型if info, ok := http.FromServerContext(ctx); ok {    // HTTP    component = "HTTP"    // 取出请求的地址    operation = info.Request.RequestURI    // 调用 otel/propagation包中的 HeaderCarrier,会处理 http.Header 以用来满足TextMapCarrier interface    // TextMapCarrier 是一个文本映射载体,用于承载信息    carrier = propagation.HeaderCarrier(info.Request.Header)    // otel.GetTextMapPropagator().Extract() 方法用于将文本映射载体,读取到上下文中    ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(info.Request.Header))} else if info, ok := grpc.FromServerContext(ctx); ok {    // Grpc    component = "gRPC"    operation = info.FullMethod    //    // 调用 grpc/metadata包中metadata.FromIncomingContext(ctx)传入 ctx,转换 grpc 的元数据    if md, ok := metadata.FromIncomingContext(ctx); ok {        // 调用carrier.go 中的 MetadataCarrier 将 MD 转换 成文本映射载体        carrier = MetadataCarrier(md)    }}// 调用 tracer.Start 方法ctx, span := tracer.Start(ctx, component, operation, carrier)// ... 省略代码}
  1. 调用 tracing.go 中的 Start 方法
func (t *Tracer) Start(ctx context.Context, component string, operation string, carrier propagation.TextMapCarrier) (context.Context, trace.Span) {    // 判断当前中间件如果是 server则将 carrier 注入到上下文中    if t.kind == trace.SpanKindServer {        ctx = otel.GetTextMapPropagator().Extract(ctx, carrier)    }    // 调用otel/tracer 包中的 start 方法,用来创建一个 span    ctx, span := t.tracer.Start(ctx,        // tracing.go 中声明的请求路由作为 spanName        operation,        // 设置 span 的属性,设置了一个 component,component的值为请求类型        trace.WithAttributes(attribute.String("component", component)),        // 设置 span种类        trace.WithSpanKind(t.kind),    )    // 判断如果当前中间件是 client 则将 carrier 注入到请求里面    if t.kind == trace.SpanKindClient {        otel.GetTextMapPropagator().Inject(ctx, carrier)    }    return ctx, span}
  1. defer 声明了一个闭包方法
// 这个地方要注意,需要使用闭包,因为 defer 的参数是实时计算的如果异常发生,err 会一直为 nil// https://github.com/go-kratos/kratos/issues/927defer func() { tracer.End(ctx, span, err) }()
  1. 中间件继续执行
// tracing.go 69行reply, err = handler(ctx, req)
  1. 中间件调用结束 defer 中的闭包被调用后执行了 tracer.go 中的 End 方法
func (t *Tracer) End(ctx context.Context, span trace.Span, err error) {    // 判断是否有异常发生,如果有则设置一些异常信息    if err != nil {        // 记录异常        span.RecordError(err)        // 设置span 属性        span.SetAttributes(            // 设置事件为异常            attribute.String("event", "error"),            // 设置 message 为 err.Error().            attribute.String("message", err.Error()),        )        //设置了 span 的状态        span.SetStatus(codes.Error, err.Error())    } else {        // 如果没有发生异常,span 状态则为 ok        span.SetStatus(codes.Ok, "OK")    }    // 中止 span    span.End()}

如何使用#

tracing 中间件的使用示例可以从 kratos/examples/traces ,该示例简单的实现了跨服务间的链路追踪,以下代码片段包含部分示例代码。

// https://github.com/go-kratos/kratos/blob/7f835db398c9d0332e69b81bad4c652b4b45ae2e/examples/traces/app/message/main.go#L38// 首先调用otel 库方法,得到一个 TracerProviderfunc tracerProvider(url string) (*tracesdk.TracerProvider, error) {    // examples/traces 中使用的是 jaeger,其他方式可以查看 opentelemetry 官方示例    exp, err := jaeger.NewRawExporter(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))    if err != nil {        return nil, err    }    tp := tracesdk.NewTracerProvider(        tracesdk.WithSampler(tracesdk.AlwaysSample()),        // 设置 Batcher,注册jaeger导出程序        tracesdk.WithBatcher(exp),        // 记录一些默认信息        tracesdk.WithResource(resource.NewWithAttributes(            semconv.ServiceNameKey.String(pb.User_ServiceDesc.ServiceName),            attribute.String("environment", "development"),            attribute.Int64("ID", 1),        )),    )    return tp, nil}

在 grpc/server 中使用#

// https://github.com/go-kratos/kratos/blob/main/examples/traces/app/message/main.gogrpcSrv := grpc.NewServer(    grpc.Address(":9000"),    grpc.Middleware(        // Configuring tracing Middleware        tracing.Server(            tracing.WithTracerProvider(tp),        ),    ),)

在 grpc/client 中使用#

// https://github.com/go-kratos/kratos/blob/149fc0195eb62ee1fbc2728adb92e1bcd1a12c4e/examples/traces/app/user/main.go#L63conn, err := grpc.DialInsecure(ctx,    grpc.WithEndpoint("127.0.0.1:9000"),    grpc.WithMiddleware(        tracing.Client(            tracing.WithTracerProvider(s.tracer),            tracing.WithPropagators(                propagation.NewCompositeTextMapPropagator(propagation.Baggage{}, propagation.TraceContext{}),            ),        )    ),    grpc.WithTimeout(2*time.Second),)

在 http/server 中使用#

// https://github.com/go-kratos/kratos/blob/main/examples/traces/app/user/main.gohttpSrv := http.NewServer(http.Address(":8000"))httpSrv.HandlePrefix("/", pb.NewUserHandler(s,    http.Middleware(        // Configuring tracing middleware        tracing.Server(            tracing.WithTracerProvider(tp),            tracing.WithPropagators(                propagation.NewCompositeTextMapPropagator(propagation.Baggage{}, propagation.TraceContext{}),            ),        ),    ),)

在 http/client 中使用#

http.NewClient(ctx, http.WithMiddleware(    tracing.Client(        tracing.WithTracerProvider(s.tracer),    ),))

如何实现一个其他场景的 tracing#

我们可以借鉴 kratostracing 中间件的代码来实现例如数据库的 tracing,如下面的代码片段,作者借鉴了tracing 中间件,实现了 qmgo 库操作 MongoDB 数据库的 tracing

func mongoTracer(ctx context.Context,tp trace.TracerProvider, command interface{}) {    var (        commandName string        failure     string        nanos       int64        reply       bson.Raw        queryId     int64        eventName   string    )    otel.SetTracerProvider(tp)    reply = bson.Raw{}    switch value := command.(type) {    case *event.CommandStartedEvent:        commandName = value.CommandName        reply = value.Command        queryId = value.RequestID        eventName = "CommandStartedEvent"    case *event.CommandSucceededEvent:        commandName = value.CommandName        nanos = value.DurationNanos        queryId = value.RequestID        eventName = "CommandSucceededEvent"    case *event.CommandFailedEvent:        commandName = value.CommandName        failure = value.Failure        nanos = value.DurationNanos        queryId = value.RequestID        eventName = "CommandFailedEvent"    }    duration, _ := time.ParseDuration(strconv.FormatInt(nanos, 10) + "ns")    tracer := otel.Tracer("mongodb")    kind := trace.SpanKindServer    ctx, span := tracer.Start(ctx,        commandName,        trace.WithAttributes(            attribute.String("event", eventName),            attribute.String("command", commandName),            attribute.String("query", reply.String()),            attribute.Int64("queryId", queryId),            attribute.String("ms", duration.String()),        ),        trace.WithSpanKind(kind),    )    if failure != "" {        span.RecordError(errors.New(failure))    }    span.End()}

参考文献#

· One min read
shenqidebaozi

0X01 通过 layout 探索 kratos 运行原理(kratos v2.0.0-beta4)#

创建项目#

首先需要安装好对应的依赖环境,以及工具:

  • go
  • protoc
  • protoc-gen-go
  # 创建项目模板kratos new helloworld
cd helloworld# 拉取项目依赖go mod download# 生成proto模板kratos proto add api/helloworld/helloworld.proto# 生成proto源码kratos proto client api/helloworld/helloworld.proto# 生成server模板kratos proto server api/helloworld/helloworld.proto -t internal/service

执行命令后,会在当前目录下生成一个 service 工程,工程骨架如下,具体的工程骨架说明可以访问 layout image.png

运行项目#

# 生成所有proto源码、wire等等go generate ./...
# 编译成可执行文件go build -o ./bin/ ./...
# 运行项目./bin/helloworld -conf ./configs

看到如下输出则证明项目启动正常

level=INFO module=app service_id=7114ad8a-b3bf-11eb-a1b9-f0189850d2cb service_name=  version=level=INFO module=transport/grpc msg=[gRPC] server listening on: [::]:9000level=INFO module=transport/http msg=[HTTP] server listening on: [::]:8000 

测试接口

curl 'http://127.0.0.1:8000/helloworld/krtaos'
输出:{  "message": "Hello kratos"}

应用是如何跑起来的?#

image.png 通过上面的图例👆,我们可以直观观察到应用的调用链,简化来说如下图流程所示👇

未命名文件(2).png

1. 注入依赖并调用 newApp() 方法#

// helloword/cmd/main.gofunc main() {    flag.Parse()    logger := log.NewStdLogger(os.Stdout)
    // 调用 go-kratos/kratos/v2/config,创建 config 实例,并指定了来源和配置解析方法    c := config.New(    config.WithSource(        file.NewSource(flagconf),    ),    config.WithDecoder(func(kv *config.KeyValue, v map[string]interface{}) error {        return yaml.Unmarshal(kv.Value, v)    }),    )    if err := c.Load(); err != nil {        panic(err)    }
    // 将配置扫描到,通过 proto 声明的 conf struct 上    var bc conf.Bootstrap    if err := c.Scan(&bc); err != nil {        panic(err)    }
    // 通过 wire 将依赖注入,并调用 newApp 方法    app, cleanup, err := initApp(bc.Server, bc.Data, logger)    if err != nil {        panic(err)    }    // 省略代码...}

2. 创建 kratos 实例#

项目 main.go 的 newApp() 方法中,调用了 go-kratos/kratos/v2/app.go 中的 kratos.New() 方法

// helloword/cmd/main.gofunc newApp(logger log.Logger, hs *http.Server, gs *grpc.Server) *kratos.App {    return kratos.New(        // 配置应用           kratos.Name(Name),        kratos.Version(Version),        kratos.Metadata(map[string]string{}),        kratos.Logger(logger),        // kratos.Server() 传入的 http/grpc 服务会通过 buildInstance() 转换成registry.ServiceInstance struct*        kratos.Server(            hs,            gs,        ),    )}

该方法会返回一个 App struct,包含 Run()Stop() 方法

// go-kratos/kratos/v2/app.gotype App struct {    opts     options //配置    ctx      context.Context // 上下文    cancel   func() // context 的取消方法    instance *registry.ServiceInstance //通过 kratos.Server()声明的实例,并通过 buildInstance() 转换后的 *registry.ServiceInstance struct    log      *log.Helper // 日志}
// Run executes all OnStart hooks registered with the application's Lifecycle.func (a *App) Run() error {    // 省略代码...}
// Stop gracefully stops the application.func (a *App) Stop() error {    // 省略代码...}

3. 调用 Run() 方法#

项目在 main 方法中调用了 kratos.App structRun() 方法.

// helloword/cmd/main.go// 省略代码...// 启动 Kratosif err := app.Run(); err != nil {    panic(err)}

Run() 方法的实现细节

// go-kratos/kratos/v2/app.gofunc (a *App) Run() error {    a.log.Infow(        "service_id", a.opts.id,        "service_name", a.opts.name,        "version", a.opts.version,    )    g, ctx := errgroup.WithContext(a.ctx)        // 遍历通过 kratos.Server() 声明的服务实例    for _, srv := range a.opts.servers {        srv := srv                // 执行两个goroutine, 用于处理服务启动和退出        g.Go(func() error {            <-ctx.Done() // 阻塞,等待调用 cancel 方法            return srv.Stop() // 协程退出后,调用实例的停止方法        })        g.Go(func() error {            return srv.Start() // 调用实例的运行方法        })    }        // 判断是否调用 kratos.Registrar() 配置了注册发现中心    if a.opts.registrar != nil {        // 将实例注册到注册中心        if err := a.opts.registrar.Register(a.opts.ctx, a.instance); err != nil             return err        }    }        // 监听进程退出信号    c := make(chan os.Signal, 1)    signal.Notify(c, a.opts.sigs...)                // 处理进程退出和 context 退出    g.Go(func() error {        for {            select {            case <-ctx.Done():                return ctx.Err()            case <-c:                        // 调用 kratos.App 的停止方法                a.Stop()            }        }    })    if err := g.Wait(); err != nil && !errors.Is(err, context.Canceled) {        return err    }    return nil}

4. 应用退出#

Kratos 实例在启动时,监听了系统的进程退出信号,当收到退出信号时,kratos 会调用 App structStop() 方法

// go-kratos/kratos/v2/app.gofunc (a *App) Stop() error {    // 判断是否有注册中心配置    if a.opts.registrar != nil {        // 在注册中心中将实例注销        if err := a.opts.registrar.Deregister(a.opts.ctx, a.instance); err != nil {            return err        }    }    // 控制 goroutine 的退出,当调用 a.cancel()时,Run()方法中 监听的 <-ctx.Done() 收到消息后,没有阻塞后,方法会调用 server 的 Stop()方法,停止服务    if a.cancel != nil {        a.cancel()    }    return nil}