Skip to main content

gRPC

transporter/grpc 中基于谷歌的 grpc 框架实现了Transporter,用以注册 grpc 到 kratos.Server() 中。

Server

配置

Network(network string) ServerOption

配置服务端的 network 协议,如 tcp

Address(addr string) ServerOption

配置服务端监听的地址

Timeout(timeout time.Duration) ServerOption

配置服务端的超时设置

Logger(logger log.Logger) ServerOption

配置服务端使用的日志组件

Middleware(m ...middleware.Middleware) ServerOption

配置服务端的 kratos 中间件

TLSConfig(c *tls.Config) ServerOption

配置服务端的 TLS 配置

UnaryInterceptor(in ...grpc.UnaryServerInterceptor) ServerOption

配置服务端使用的 grpc 单元拦截器

StreamInterceptor(in ...grpc.StreamServerInterceptor) ServerOption

配置服务端使用的 grpc 流媒体拦截器

Options(opts ...grpc.ServerOption) ServerOption

配置一些额外的 grpc.ServerOption

主要的实现细节

NewServer()

func NewServer(opts ...ServerOption) *Server {
// grpc server 默认配置
srv := &Server{
network: "tcp",
address: ":0",
timeout: 1 * time.Second,
health: health.NewServer(),
log: log.NewHelper(log.GetLogger()),
}
// 递归 opts
for _, o := range opts {
o(srv)
}
// kratos middleware 转换成 grpc 拦截器,并处理一些细节
unaryInts := []grpc.UnaryServerInterceptor{
srv.unaryServerInterceptor(),
}
streamInts := []grpc.StreamServerInterceptor{
srv.streamServerInterceptor(),
}

if len(srv.unaryInts) > 0 {
unaryInts = append(unaryInts, srv.unaryInts...)
}
if len(srv.streamInts) > 0 {
streamInts = append(streamInts, srv.streamInts...)
}

// 将 UnaryInterceptor 和 StreamInterceptor 转换成 ServerOption
var grpcOpts = []grpc.ServerOption{
grpc.ChainUnaryInterceptor(unaryInts...),
grpc.ChainStreamInterceptor(streamInts...),
}
// 将 TLS 配置转化成 ServerOption
if srv.tlsConf != nil {
grpcOpts = append(grpcOpts, grpc.Creds(credentials.NewTLS(srv.tlsConf)))
}
// 追加通过 Options(opts ...grpc.ServerOption) 添加的options
if len(srv.grpcOpts) > 0 {
grpcOpts = append(grpcOpts, srv.grpcOpts...)
}
// 创建 grpc server
srv.Server = grpc.NewServer(grpcOpts...)
// 创建 metadata server
srv.metadata = apimd.NewServer(srv.Server)
// 配置 lis 和 endpoint
srv.err = srv.listenAndEndpoint()
// 内部注册
grpc_health_v1.RegisterHealthServer(srv.Server, srv.health)
apimd.RegisterMetadataServer(srv.Server, srv.metadata)
reflection.Register(srv.Server)
return srv
}

unaryServerInterceptor()

func (s *Server) unaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 把两个 ctx 合并成一个
ctx, cancel := ic.Merge(ctx, s.ctx)
defer cancel()
// 从 ctx 中取出 metadata
md, _ := grpcmd.FromIncomingContext(ctx)
// 把一些信息绑定到 ctx 上
replyHeader := grpcmd.MD{}
ctx = transport.NewServerContext(ctx, &Transport{
endpoint: s.endpoint.String(),
operation: info.FullMethod,
reqHeader: headerCarrier(md),
replyHeader: headerCarrier(replyHeader),
})
// ctx 超时设置
if s.timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, s.timeout)
defer cancel()
}
// 中间件处理
h := func(ctx context.Context, req interface{}) (interface{}, error) {
return handler(ctx, req)
}
if len(s.middleware) > 0 {
h = middleware.Chain(s.middleware...)(h)
}
// 执行中间件 handler
reply, err := h(ctx, req)
if len(replyHeader) > 0 {
_ = grpc.SetHeader(ctx, replyHeader)
}
return reply, err
}
}

使用方式

简单列举了一些 kratos 中 grpc 的用法,其他 grpc 用法可以到 grpc 仓库中查看。

注册 grpc server

gs := grpc.NewServer()
app := kratos.New(
kratos.Name("kratos"),
kratos.Version("v1.0.0"),
kratos.Server(gs),
)

grpc server 中使用 kratos middleware

grpcSrv := grpc.NewServer(
grpc.Address(":9000"),
grpc.Middleware(
logging.Server(),
),
)

middleware 中处理 grpc 请求

if info, ok := transport.FromServerContext(ctx); ok {
kind = info.Kind().String()
operation = info.Operation()
}

Client

配置

WithEndpoint(endpoint string) ClientOption

配置客户端使用的对端连接地址,如果不使用服务发现则为ip:port,如果使用服务发现则格式为discovery://\<authority>/\<serviceName>

WithTimeout(timeout time.Duration) ClientOption

配置客户端的请求默认超时时间,如果有链路超时优先使用链路超时时间

WithMiddleware(m ...middleware.Middleware) ClientOption

配置客户端使用的 kratos 中间件

WithDiscovery(d registry.Discovery) ClientOption

配置客户端使用的服务发现

WithTLSConfig(c *tls.Config) ClientOption

配置客户端使用的 TLS 配置

WithUnaryInterceptor(in ...grpc.UnaryClientInterceptor) ClientOption

配置客户端使用的 grpc 原生拦截器

WithOptions(opts ...grpc.DialOption) ClientOption

配置一些额外的 grpc.ClientOption

WithHealthCheck(healthCheck bool) ClientOption

配置是否开启健康检查

WithNodeFilter(filters ...selector.NodeFilter) ClientOption

配置过滤某些不希望被请求的节点

主要的实现细节

dial()

func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) {
// 默认配置
options := clientOptions{
timeout: 2000 * time.Millisecond,
balancerName: wrr.Name,
logger: log.GetLogger(),
}
// 遍历 opts
for _, o := range opts {
o(&options)
}
// 将 kratos 中间件转化成 grpc 拦截器
ints := []grpc.UnaryClientInterceptor{
unaryClientInterceptor(options.middleware, options.timeout, options.filters),
}
if len(options.ints) > 0 {
ints = append(ints, options.ints...)
}
// 负载均衡
grpcOpts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, options.balancerName)),
grpc.WithChainUnaryInterceptor(ints...),
}
if options.discovery != nil {
// 如果存在服务发现配置,就配置 grpc 的 Resolvers
grpcOpts = append(grpcOpts,
grpc.WithResolvers(
discovery.NewBuilder(
options.discovery,
discovery.WithInsecure(insecure),
discovery.WithLogger(options.logger),
)))
}
if insecure {
// 跳过证书验证
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcinsecure.NewCredentials()))
}
// TLS 配置
if options.tlsConf != nil {
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(credentials.NewTLS(options.tlsConf)))
}
if len(options.grpcOpts) > 0 {
grpcOpts = append(grpcOpts, options.grpcOpts...)
}
return grpc.DialContext(ctx, options.endpoint, grpcOpts...)
}

unaryClientInterceptor()

func unaryClientInterceptor(ms []middleware.Middleware, timeout time.Duration) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// 把一些信息绑定到 ctx 上
ctx = transport.NewClientContext(ctx, &Transport{
endpoint: cc.Target(),
operation: method,
reqHeader: headerCarrier{},
filters: filters,
})
if timeout > 0 {
// timeout 如果大于 0,就重新设置一下 ctx 的超时时间
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
// 中间件处理
h := func(ctx context.Context, req interface{}) (interface{}, error) {
if tr, ok := transport.FromClientContext(ctx); ok {
header := tr.RequestHeader()
keys := header.Keys()
keyvals := make([]string, 0, len(keys))
for _, k := range keys {
keyvals = append(keyvals, k, header.Get(k))
}
ctx = grpcmd.AppendToOutgoingContext(ctx, keyvals...)
}
return reply, invoker(ctx, method, req, reply, cc, opts...)
}
if len(ms) > 0 {
h = middleware.Chain(ms...)(h)
}
_, err := h(ctx, req)
return err
}
}

使用方式

创建客户端连接

conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("127.0.0.1:9000"),
)

使用中间件

conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("127.0.0.1:9000"),
grpc.WithTimeout(3600 * time.Second),
grpc.WithMiddleware(
recovery.Recovery(),
validate.Validator(),
),
)

使用服务发现

conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("discovery:///helloworld"),
grpc.WithDiscovery(r),
)

References