// MetaDataInterceptor get grpc server info, requestId/traceId/LogId funcMetaServerDataInterceptor()grpc.UnaryServerInterceptor { // 拦截器函数签名 // @params ctx Grpc context // @params req grpc request // @params info grpc request info // @params handler the grpc method returnfunc(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler)(resp interface{}, err error) { // do what you want to do // get metadata from grpc client md, ok := metadata.FromIncomingContext(ctx) if !ok { md = metadata.Pairs() } // Set request info for context. // define your key for _, key := range []string{"requestId"} { value := md.Get(key) // ignore it if not exists. iflen(value) >= 1 { // set value to context. you can use ctx.Value to get it from your grpc method ctx = context.WithValue(ctx, key, value[0]) } } // next return handler(ctx, req) } }
// API time elas time get grpc server info funcAPITimeInterceptor()grpc.UnaryServerInterceptor { // 拦截器签名 returnfunc(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler)(resp interface{}, err error) { // do what you want to do start := time.Now().UnixNano() // do gRPC method ret := handler(ctx, req) // do what you want after the grpc method fmt.Println(time.Now().UnixNano() - start) return ret } }
// WrappedStream wraps around the embedded grpc.ServerStream, and intercepts the Context type WrappedStream struct { grpc.ServerStream // serverStream interface Ctx *context.Context // 定义ctx,覆盖ServerStream中的context }
// Context override the context method and can config the context manually func(c WrappedStream)Context()context.Context { return *c.Ctx }
// stream method to get meta data funcMetaStreamServerInterceptor()grpc.StreamServerInterceptor { // 函数签名 returnfunc( srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler)error { // 获取当前 grpc context ctx := ss.Context() md, ok := metadata.FromIncomingContext(ctx) if !ok { md = metadata.Pairs() } // Set request info for context. // define your key for _, key := range []string{"requestId"} { value := md.Get(key) // ignore it if not exists. iflen(value) >= 1 { // set value to context. you can use ctx.Value to get it from your grpc method ctx = context.WithValue(ctx, key, value[0]) } } // set context to next return handler(srv, streaminterceptor.NewWrappedStream(ss, &ctx)) } }
// MetaStreamClientInterceptor get grpc client info, requestId/traceId/LogId for grpc stream server funcMetaStreamClientInterceptor()grpc.StreamClientInterceptor { // 函数签名 returnfunc(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption)(grpc.ClientStream, error) {
// 从context获取元数据 md, ok := metadata.FromOutgoingContext(ctx) if !ok { md = metadata.Pairs() } for _, key := range keyNames { value := ctx.Value(key) if strValue, ok := value.(string); ok && strValue != "" { md.Set(key, strValue) } } // set metadata to ctx ctx = metadata.NewOutgoingContext(ctx, md)
// StreamServerInterceptor provides a hook to intercept the execution of a streaming RPC on the server. // info contains all the information of this RPC the interceptor can operate on. And handler is the // service method implementation. It is the responsibility of the interceptor to invoke handler to // complete the RPC. func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler)error
// @params ctx: grpc context // @params req: the request params // @params info: the grpc request info // @params handler: the real grpc method func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler)(resp interface{}, err error)
客户端普通拦截器
golang在调用grpc之前执行的公共的操作,比如要把requestId塞到header中。
1 2 3 4 5 6 7 8 9 10 11
// @params method: the RPC name // @params req: the request // @params resp: the response // @params cc: the ClientConn on which the RPC was invoked // @params invoker: the invoker of grpc methor // @params opts: the option func( ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption, )
客户端流失拦截器
实现如下签名的函数即可
1 2 3 4 5 6
// @params desc: contains a description of the stream // @params cc: the ClientConn on which the RPC was invoked // @params method: the RPC name // @params streamer: the handler to create a ClientStream and it is the responsibility of the interceptor to call it // @params opts: the option func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption)(ClientStream, error)