transporter/grpc 中基于谷歌的 grpc 框架实现了Transporter,用以注册 grpc 到 kratos.Server() 中。
Network(network string) ServerOption 配置服务端的 network 协议,如 tcp
Address(addr string) ServerOption配置服务端监听的地址
Timeout(timeout time.Duration) ServerOption配置服务端的超时设置
Logger(logger log.Logger) ServerOption配置服务端使用的日志组件
Middleware(m ...middleware.Middleware) ServerOption配置服务端的 kratos 中间件
TLSConfig(c *tls.Config) ServerOption配置服务端的 TLS 配置
UnaryInterceptor(in ...grpc.UnaryServerInterceptor) ServerOption配置服务端使用的 grpc 单元拦截器
StreamInterceptor(in ...grpc.StreamServerInterceptor) ServerOption配置服务端使用的 grpc 流媒体拦截器
Options(opts ...grpc.ServerOption) ServerOption配置一些额外的 grpc.ServerOption
NewServer()func NewServer(opts ...ServerOption) *Server { // grpc server 默认配置 srv := &Server{ network: "tcp", address: ":0", timeout: 1 * time.Second, health: health.NewServer(), log: log.NewHelper(log.GetLogger()), } // 递归 opts for _, o := range opts { o(srv) } // kratos middleware 转换成 grpc 拦截器,并处理一些细节 unaryInts := []grpc.UnaryServerInterceptor{ srv.unaryServerInterceptor(), } streamInts := []grpc.StreamServerInterceptor{ srv.streamServerInterceptor(), }
if len(srv.unaryInts) > 0 { unaryInts = append(unaryInts, srv.unaryInts...) } if len(srv.streamInts) > 0 { streamInts = append(streamInts, srv.streamInts...) }
// 将 UnaryInterceptor 和 StreamInterceptor 转换成 ServerOption var grpcOpts = []grpc.ServerOption{ grpc.ChainUnaryInterceptor(unaryInts...), grpc.ChainStreamInterceptor(streamInts...), } // 将 TLS 配置转化成 ServerOption if srv.tlsConf != nil { grpcOpts = append(grpcOpts, grpc.Creds(credentials.NewTLS(srv.tlsConf))) } // 追加通过 Options(opts ...grpc.ServerOption) 添加的options if len(srv.grpcOpts) > 0 { grpcOpts = append(grpcOpts, srv.grpcOpts...) } // 创建 grpc server srv.Server = grpc.NewServer(grpcOpts...) // 创建 metadata server srv.metadata = apimd.NewServer(srv.Server) // 配置 lis 和 endpoint srv.err = srv.listenAndEndpoint() // 内部注册 grpc_health_v1.RegisterHealthServer(srv.Server, srv.health) apimd.RegisterMetadataServer(srv.Server, srv.metadata) reflection.Register(srv.Server) return srv}unaryServerInterceptor()func (s *Server) unaryServerInterceptor() grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { // 把两个 ctx 合并成一个 ctx, cancel := ic.Merge(ctx, s.ctx) defer cancel() // 从 ctx 中取出 metadata md, _ := grpcmd.FromIncomingContext(ctx) // 把一些信息绑定到 ctx 上 replyHeader := grpcmd.MD{} ctx = transport.NewServerContext(ctx, &Transport{ endpoint: s.endpoint.String(), operation: info.FullMethod, reqHeader: headerCarrier(md), replyHeader: headerCarrier(replyHeader), }) // ctx 超时设置 if s.timeout > 0 { ctx, cancel = context.WithTimeout(ctx, s.timeout) defer cancel() } // 中间件处理 h := func(ctx context.Context, req interface{}) (interface{}, error) { return handler(ctx, req) } if len(s.middleware) > 0 { h = middleware.Chain(s.middleware...)(h) } // 执行中间件 handler reply, err := h(ctx, req) if len(replyHeader) > 0 { _ = grpc.SetHeader(ctx, replyHeader) } return reply, err }}简单列举了一些 kratos 中 grpc 的用法,其他 grpc 用法可以到 grpc 仓库中查看。
gs := grpc.NewServer()app := kratos.New( kratos.Name("kratos"), kratos.Version("v1.0.0"), kratos.Server(gs),)grpcSrv := grpc.NewServer( grpc.Address(":9000"), grpc.Middleware( logging.Server(), ),)if info, ok := transport.FromServerContext(ctx); ok { kind = info.Kind().String() operation = info.Operation()}WithEndpoint(endpoint string) ClientOption配置客户端使用的对端连接地址,如果不使用服务发现则为ip:port,如果使用服务发现则格式为discovery://<authority>/<serviceName>
WithTimeout(timeout time.Duration) ClientOption配置客户端的请求默认超时时间,如果有链路超时优先使用链路超时时间
WithMiddleware(m ...middleware.Middleware) ClientOption配置客户端使用的 kratos 中间件
WithDiscovery(d registry.Discovery) ClientOption配置客户端使用的服务发现
WithTLSConfig(c *tls.Config) ClientOption配置客户端使用的 TLS 配置
WithUnaryInterceptor(in ...grpc.UnaryClientInterceptor) ClientOption配置客户端使用的 grpc 原生拦截器
WithOptions(opts ...grpc.DialOption) ClientOption配置一些额外的 grpc.ClientOption
WithHealthCheck(healthCheck bool) ClientOption配置是否开启健康检查
WithNodeFilter(filters ...selector.NodeFilter) ClientOption配置过滤某些不希望被请求的节点
dial()func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) { // 默认配置 options := clientOptions{ timeout: 2000 * time.Millisecond, balancerName: wrr.Name, logger: log.GetLogger(), } // 遍历 opts for _, o := range opts { o(&options) } // 将 kratos 中间件转化成 grpc 拦截器 ints := []grpc.UnaryClientInterceptor{ unaryClientInterceptor(options.middleware, options.timeout, options.filters), } if len(options.ints) > 0 { ints = append(ints, options.ints...) } // 负载均衡 grpcOpts := []grpc.DialOption{ grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, options.balancerName)), grpc.WithChainUnaryInterceptor(ints...), } if options.discovery != nil { // 如果存在服务发现配置,就配置 grpc 的 Resolvers grpcOpts = append(grpcOpts, grpc.WithResolvers( discovery.NewBuilder( options.discovery, discovery.WithInsecure(insecure), discovery.WithLogger(options.logger), ))) } if insecure { // 跳过证书验证 grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcinsecure.NewCredentials())) } // TLS 配置 if options.tlsConf != nil { grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(credentials.NewTLS(options.tlsConf))) } if len(options.grpcOpts) > 0 { grpcOpts = append(grpcOpts, options.grpcOpts...) } return grpc.DialContext(ctx, options.endpoint, grpcOpts...)}unaryClientInterceptor()func unaryClientInterceptor(ms []middleware.Middleware, timeout time.Duration) grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { // 把一些信息绑定到 ctx 上 ctx = transport.NewClientContext(ctx, &Transport{ endpoint: cc.Target(), operation: method, reqHeader: headerCarrier{}, filters: filters, }) if timeout > 0 { // timeout 如果大于 0,就重新设置一下 ctx 的超时时间 var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, timeout) defer cancel() } // 中间件处理 h := func(ctx context.Context, req interface{}) (interface{}, error) { if tr, ok := transport.FromClientContext(ctx); ok { header := tr.RequestHeader() keys := header.Keys() keyvals := make([]string, 0, len(keys)) for _, k := range keys { keyvals = append(keyvals, k, header.Get(k)) } ctx = grpcmd.AppendToOutgoingContext(ctx, keyvals...) } return reply, invoker(ctx, method, req, reply, cc, opts...) } if len(ms) > 0 { h = middleware.Chain(ms...)(h) } _, err := h(ctx, req) return err }}conn, err := grpc.DialInsecure( context.Background(), grpc.WithEndpoint("127.0.0.1:9000"), )conn, err := grpc.DialInsecure( context.Background(), grpc.WithEndpoint("127.0.0.1:9000"), grpc.WithTimeout(3600 * time.Second), grpc.WithMiddleware( recovery.Recovery(), validate.Validator(), ),)conn, err := grpc.DialInsecure( context.Background(), grpc.WithEndpoint("discovery:///helloworld"), grpc.WithDiscovery(r),)