Net Core gRPC - 06. 拦截器

Net Core gRPC 系列

1. gRPC拦截器

侦听器是一个 gRPC 概念,允许应用与传入或传出的 gRPC 调用进行交互。 它们提供了一种方法来扩充请求处理管道。
侦听器针对通道或服务进行配置,并针对每个 gRPC 调用自动执行。 由于侦听器对用户的应用程序逻辑是透明的,因此它们是适用于常见情况(例如日志记录、监视、身份验证和验证)的极佳解决方案。可以通过创建从 Interceptor 类型继承的类,为 gRPC 服务器和客户端实现侦听器.

2. 客户端侦听器

gRPC 客户端侦听器截获传出的 RPC 调用。 它们提供对发送的请求、传入的响应以及客户端调用的上下文的访问权限。

方法名解释
BlockingUnaryCall截获一元 RPC 阻塞调用
AsyncUnaryCall截获一元 RPC 异步调用
AsyncClientStreamingCall截获客户端流式处理 RPC 异步调用
AsyncServerStreamingCall截获服务器流式处理 RPC 异步调用
AsyncDuplexStreamingCall截获双向流式处理 RPC 异步调用
自定义默认拦截记录日志方法与错误处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
/// <summary>
/// 客户端基本拦截器
/// </summary>
public class ClientGrpcInterceptor : Interceptor
{
private readonly IColaLogs _colaLog;
private readonly IConfiguration _config;
public ClientGrpcInterceptor(
IColaLogs colaLog,
IConfiguration config)
{
_colaLog = colaLog;
_config = config;
}

/// <summary>
/// 截获一元 RPC 异步调用
/// </summary>
/// <param name="request"></param>
/// <param name="context"></param>
/// <param name="continuation"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <returns></returns>
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
TRequest request,
ClientInterceptorContext<TRequest, TResponse> context,
AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
{
ClientInterceptorLog($" gRpc调用类型: 一元 RPC 异步调用. \r\n gRpc调用方法: {context.Method.Name}.",context);
var call = continuation(request, context);
return new AsyncUnaryCall<TResponse>(
HandleResponse(call.ResponseAsync),
call.ResponseHeadersAsync,
call.GetStatus,
call.GetTrailers,
call.Dispose);
}

/// <summary>
/// 截获一元 RPC 阻塞调用
/// </summary>
/// <param name="request"></param>
/// <param name="context"></param>
/// <param name="continuation"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <returns></returns>
public override TResponse BlockingUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context,
BlockingUnaryCallContinuation<TRequest, TResponse> continuation)
{
ClientInterceptorLog($" gRpc调用类型: 一元 RPC 阻塞调用. \r\n gRpc调用方法: {context.Method.Name}.",context);
return continuation(request, context);
}

/// <summary>
/// 截获客户端流式处理 RPC 异步调用
/// </summary>
/// <param name="context"></param>
/// <param name="continuation"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <returns></returns>
public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context,
AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation)
{
ClientInterceptorLog($" gRpc调用类型: 客户端流式处理 RPC 异步调用. \r\n gRpc调用方法: {context.Method.Name}.",context);
var call = continuation(context);
return new AsyncClientStreamingCall<TRequest, TResponse>(
HandleResponse(call.RequestStream),
HandleResponse(call.ResponseAsync),
call.ResponseHeadersAsync,
call.GetStatus,
call.GetTrailers,
call.Dispose);
}
/// <summary>
/// 截获服务器流式处理 RPC 异步调用
/// </summary>
/// <param name="request"></param>
/// <param name="context"></param>
/// <param name="continuation"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <returns></returns>

public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(TRequest request,
ClientInterceptorContext<TRequest, TResponse> context, AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation)
{
ClientInterceptorLog($" gRpc调用类型: 服务器流式处理 RPC 异步调用. \r\n gRpc调用方法: {context.Method.Name}.",context);
var call = continuation(request,context);
return new AsyncServerStreamingCall<TResponse>(
HandleResponse(call.ResponseStream),
call.ResponseHeadersAsync,
call.GetStatus,
call.GetTrailers,
call.Dispose);
}

/// <summary>
/// 截获双向流式处理 RPC 异步调用
/// </summary>
/// <param name="context"></param>
/// <param name="continuation"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <returns></returns>
public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context,
AsyncDuplexStreamingCallContinuation<TRequest, TResponse> continuation)
{
ClientInterceptorLog($" gRpc调用类型: 双向流式处理 RPC 异步调用. \r\n gRpc调用方法: {context.Method.Name}.",context);
var call = continuation(context);
return new AsyncDuplexStreamingCall<TRequest, TResponse>(
HandleResponse(call.RequestStream),
HandleResponse(call.ResponseStream),
call.ResponseHeadersAsync,
call.GetStatus,
call.GetTrailers,
call.Dispose);
}

private async Task<TResponse> HandleResponse<TResponse>(Task<TResponse> inner)
{
try
{
return await inner;
}
catch (Exception ex)
{
_colaLog.Error(ex);
throw new InvalidOperationException("Custom error", ex);
}
}

private TResponse HandleResponse<TResponse>(TResponse inner)
{
try
{
return inner;
}
catch (Exception ex)
{
_colaLog.Error(ex);
throw new InvalidOperationException("Custom error", ex);
}
}

/// <summary>
/// 私有方法记录日志
/// </summary>
/// <param name="logInfo"></param>
/// <param name="context"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
private void ClientInterceptorLog<TRequest, TResponse>(string logInfo,ClientInterceptorContext<TRequest, TResponse> context) where TRequest:class where TResponse:class
{
var interceptorLog = _config.GetSection(SystemConstant.CONSTANT_COLAGRPCCLIENT_SECTION).Get<GrpcClientOption>()
.InterceptorLog;
if (interceptorLog)
{
_colaLog.Info(logInfo);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//注入方式
builder.Services.AddGrpcClient<Greeter.GreeterClient>(o =>
{
o.Address = new Uri("https://localhost:5005");
o.Interceptors.Add(new ClientGrpcInterceptor(builder.Services.BuildServiceProvider().GetService<IColaLogs>()!, config));
}).ConfigureChannel(options =>
{
options.CreateGrpcClientChannelOptions(config);
});
var client = builder.Services.BuildServiceProvider().GetService<Greeter.GreeterClient>();

// IPC进程间调用
var channel = new ColaGrpcHelper(new ColaWindowsGrpc()).CreateChannel("https://localhost:5005",config);
var invoker =
channel.Intercept(new ClientGrpcInterceptor(builder.Services.BuildServiceProvider().GetService<IColaLogs>()!,
config));
var client = new Greeter.GreeterClient(invoker);

3. 服务端侦听器

gRPC 服务器侦听器截获传入的 RPC 请求。 它们提供对传入的请求、传出的响应和服务器端调用的上下文的访问权限。

方法名解释
UnaryServerHandler截获一元 RPC
ClientStreamingServerHandler截获客户端流式处理 RPC
ServerStreamingServerHandler截获服务器流式处理 RPC
DuplexStreamingServerHandler截获双向流式处理 RPC
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/// <summary>
/// 服务端基本拦截器
/// </summary>
public class ServerGrpcInterceptor : Interceptor
{
private readonly IColaLogs _colaLog;
private readonly IConfiguration _config;
public ServerGrpcInterceptor(
IColaLogs colaLog,
IConfiguration config)
{
_colaLog = colaLog;
_config = config;
}

/// <summary>
/// 截获一元 RPC
/// </summary>
/// <param name="request"></param>
/// <param name="context"></param>
/// <param name="continuation"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <returns></returns>
public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(TRequest request, ServerCallContext context,
UnaryServerMethod<TRequest, TResponse> continuation)
{
ClientInterceptorLog($" gRpc类型: 一元 RPC 异步方法. \r\n gRpc方法: {context.Method}.",context);
try
{
return await continuation(request, context);
}
catch (Exception ex)
{
_colaLog.Error(ex);
throw;
}
}


/// <summary>
/// 截获客户端流式处理 RPC
/// </summary>
/// <param name="requestStream"></param>
/// <param name="context"></param>
/// <param name="continuation"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <returns></returns>
public override async Task<TResponse> ClientStreamingServerHandler<TRequest, TResponse>(IAsyncStreamReader<TRequest> requestStream, ServerCallContext context,
ClientStreamingServerMethod<TRequest, TResponse> continuation)
{
ClientInterceptorLog($" gRpc类型: 客户端流 RPC 异步方法. \r\n gRpc方法: {context.Method}.",context);
try
{
return await continuation(requestStream,context);
}
catch (Exception ex)
{
_colaLog.Error(ex);
throw;
}
}

/// <summary>
/// 截获服务器流式处理 RPC
/// </summary>
/// <param name="request"></param>
/// <param name="responseStream"></param>
/// <param name="context"></param>
/// <param name="continuation"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
public override async Task ServerStreamingServerHandler<TRequest, TResponse>(TRequest request, IServerStreamWriter<TResponse> responseStream,
ServerCallContext context, ServerStreamingServerMethod<TRequest, TResponse> continuation)
{
ClientInterceptorLog($" gRpc类型: 服务器流 RPC 异步方法. \r\n gRpc方法: {context.Method}.",context);
try
{
await continuation(request, responseStream, context);
}
catch (Exception ex)
{
_colaLog.Error(ex);
throw;
}
}

/// <summary>
/// 截获双向流式处理 RPC
/// </summary>
/// <param name="requestStream"></param>
/// <param name="responseStream"></param>
/// <param name="context"></param>
/// <param name="continuation"></param>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <returns></returns>
public override async Task DuplexStreamingServerHandler<TRequest, TResponse>(IAsyncStreamReader<TRequest> requestStream,
IServerStreamWriter<TResponse> responseStream, ServerCallContext context, DuplexStreamingServerMethod<TRequest, TResponse> continuation)
{
ClientInterceptorLog($" gRpc类型: 双向流 RPC 异步方法. \r\n gRpc方法: {context.Method}.",context);
try
{
await continuation(requestStream, responseStream, context);
}
catch (Exception ex)
{
_colaLog.Error(ex);
throw;
}
}


/// <summary>
/// 私有方法记录日志
/// </summary>
/// <param name="logInfo"></param>
/// <param name="context"></param>
private void ClientInterceptorLog(string logInfo,ServerCallContext context)
{
var interceptorLog = _config.GetSection(SystemConstant.CONSTANT_COLAGRPCCLIENT_SECTION).Get<GrpcClientOption>()
.InterceptorLog;
if (interceptorLog)
{
_colaLog.Info(logInfo);
}
}
}
1
2
3
4
5
6
7
8
/*
AddColaGrpc 已经默认添加了基础的服务端拦截器
如果服务需要单独添加拦截器,在实现接口后使用如下形式注入
如果没有则使用注释掉的代码注入单独的服务
*/
builder.Services.AddColaGrpc(config)
.AddColaSingleServerGrpc<GreeterService,ServerGrpcInterceptor>(config);
// .AddColaSingleServerGrpc<GreeterService>(config);

完整代码可以在 GitHub

WebApplication1Test 整合了gRPC,ConsoleApp1Test有具体调用示例