Skip to main content

gRPC

Our transporter/grpc is developed upon gRPC, and implements Transporter interface. You could use it for the communication between services on gRPC protocol.

Server

Options

Network()

To set communication protocol such as tcp.

Address()

To set server's listening address.

Timeout()

To set the server-side timeout.

Logger()

To set logger.

Middleware()

To set middleware for gRPC server.

UnaryInterceptor()

To set interceptors for gRPC server.

Options()

To set some extra grpc.ServerOption

Implementation Details

NewServer()

func NewServer(opts ...ServerOption) *Server {
// grpc server default configuration
srv := &Server{
network: "tcp",
address: ":0",
timeout: 1 * time.Second,
health: health.NewServer(),
log: log.NewHelper(log.DefaultLogger),
}
// apply opts
for _, o := range opts {
o(srv)
}
// convert middleware to grpc interceptor
var ints = []grpc.UnaryServerInterceptor{
srv.unaryServerInterceptor(),
}

if len(srv.ints) > 0 {
ints = append(ints, srv.ints...)
}

// convert UnaryInterceptor to ServerOption
var grpcOpts = []grpc.ServerOption{
grpc.ChainUnaryInterceptor(ints...),
}
if len(srv.grpcOpts) > 0 {
grpcOpts = append(grpcOpts, srv.grpcOpts...)
}
// create grpc server
srv.Server = grpc.NewServer(grpcOpts...)
// create metadata server
srv.metadata = apimd.NewServer(srv.Server)
// register these internal API
grpc_health_v1.RegisterHealthServer(srv.Server, srv.health)
apimd.RegisterMetadataServer(srv.Server, srv.metadata)
reflection.Register(srv.Server)
return srv
}

unaryServerInterceptor()

func (s *Server) unaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// merge two ctx
ctx, cancel := ic.Merge(ctx, s.ctx)
defer cancel()
// get metadata from ctx
md, _ := grpcmd.FromIncomingContext(ctx)
// bind some information into ctx
ctx = transport.NewServerContext(ctx, &Transport{
endpoint: s.endpoint.String(),
operation: info.FullMethod,
header: headerCarrier(md),
})
// set timeout
if s.timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, s.timeout)
defer cancel()
}
// middleware
h := func(ctx context.Context, req interface{}) (interface{}, error) {
return handler(ctx, req)
}
if len(s.middleware) > 0 {
h = middleware.Chain(s.middleware...)(h)
}
return h(ctx, req)
}
}

Usage

These are some basic usage of gRPC, you could refer to gRPC Docs for advanced examples.

Register gRPC Server

gs := grpc.NewServer()
app := kratos.New(
kratos.Name("kratos"),
kratos.Version("v1.0.0"),
kratos.Server(gs),
)

Set middleware in gRPC Server

grpcSrv := grpc.NewServer(
grpc.Address(":9000"),
grpc.Middleware(
logging.Server(),
),
)

Process Request in gRPC Middleware

if info, ok := transport.FromServerContext(ctx); ok {
kind = info.Kind().String()
operation = info.Operation()
}

client

Options

WithEndpoint()

To set the endpoint which the client will connect to.

WithTimeout()

To set the client-side timeout.

WithMiddleware()

To set middleware.

WithDiscovery()

To set the discovery for gRPC client.

WithUnaryInterceptor()

To set interceptors for gRPC client.

WithOptions()

To set some extra grpc.ClientOption

Implementation Details

dial()

func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) {
// default options
options := clientOptions{
timeout: 500 * time.Millisecond,
}
// apply opts
for _, o := range opts {
o(&options)
}
// convert middleware to grpc interceptor
var ints = []grpc.UnaryClientInterceptor{
unaryClientInterceptor(options.middleware, options.timeout),
}
if len(options.ints) > 0 {
ints = append(ints, options.ints...)
}
var grpcOpts = []grpc.DialOption{
// client side balancer
grpc.WithBalancerName(roundrobin.Name),
grpc.WithChainUnaryInterceptor(ints...),
}
if options.discovery != nil {
// To use service discovery
grpcOpts = append(grpcOpts, grpc.WithResolvers(discovery.NewBuilder(options.discovery)))
}
if insecure {
// to disable transport security for connection
grpcOpts = append(grpcOpts, grpc.WithInsecure())
}
if len(options.grpcOpts) > 0 {
grpcOpts = append(grpcOpts, options.grpcOpts...)
}
return grpc.DialContext(ctx, options.endpoint, grpcOpts...)
}

unaryClientInterceptor()

func unaryClientInterceptor(ms []middleware.Middleware, timeout time.Duration) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// bind some information into ctx
ctx = transport.NewClientContext(ctx, &Transport{
endpoint: cc.Target(),
operation: method,
header: headerCarrier{},
})
if timeout > 0 {
// set the timeout
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
// middleware
h := func(ctx context.Context, req interface{}) (interface{}, error) {
if tr, ok := transport.FromClientContext(ctx); ok {
keys := tr.Header().Keys()
keyvals := make([]string, 0, len(keys))
for _, k := range keys {
keyvals = append(keyvals, k, tr.Header().Get(k))
}
ctx = grpcmd.AppendToOutgoingContext(ctx, keyvals...)
}
return reply, invoker(ctx, method, req, reply, cc, opts...)
}
if len(ms) > 0 {
h = middleware.Chain(ms...)(h)
}
_, err := h(ctx, req)
return err
}
}

Usage

Client Connection

    conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("127.0.0.1:9000"),
)

Middleware

conn, err := grpc.DialInsecure(
context.Background(),
transport.WithEndpoint("127.0.0.1:9000"),
transport.WithMiddleware(
recovery.Recovery(),
),
)

Service Discovery

conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("discovery:///helloworld"),
grpc.WithDiscovery(r),
)

References