spring cloud gateway系列教程3—Global Filters

spring cloud gateway系列教程目录

  1. spring cloud gateway系列教程1—Route Predicate
  2. spring cloud gateway系列教程2——GatewayFilter_上篇
  3. spring cloud gateway系列教程2——GatewayFilter_下篇
  4. spring cloud gateway系列教程3—Global Filters
  5. spring cloud gateway系列教程4—其他配置

Global Filters

GlobalFilter接口方法和GatewayFilter是一样的,GlobalFilter特别之处在于它的作用是全局的。

1. Combined Global Filter and GatewayFilter Ordering

当请求到来时,Filtering Web Handler处理器会添加所有GlobalFilter实例和匹配的GatewayFilter实例到过滤器链中,通过对filterbean配置注解@Order,则过滤器链会对这些过滤器实例bean进行排序。

Spring Cloud Gateway将过滤器的逻辑按请求执行点分为”pre”和”post”的一前一后处理,如果是高优先级的过滤器,则在”pre”逻辑中最先执行,在”post”逻辑中最后执行。

ExampleConfiguration.java.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Bean
@Order(-1)
public GlobalFilter a() {
return (exchange, chain) -> {
log.info("first pre filter");
return chain.filter(exchange).then(Mono.fromRunnable(() -> {
log.info("third post filter");
}));
};
}

@Bean
@Order(0)
public GlobalFilter b() {
return (exchange, chain) -> {
log.info("second pre filter");
return chain.filter(exchange).then(Mono.fromRunnable(() -> {
log.info("second post filter");
}));
};
}

@Bean
@Order(1)
public GlobalFilter c() {
return (exchange, chain) -> {
log.info("third pre filter");
return chain.filter(exchange).then(Mono.fromRunnable(() -> {
log.info("first post filter");
}));
};
}

上面例子陆续会打印的是:

1
2
3
4
5
6
first pre filter
second pre filter
third pre filter
first post filter
second post filter
third post filter

2. Forward Routing Filter

ForwardRoutingFilter会查看exchange的属性ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR的URI内容,如果url的scheme是forward,比如:forward://localendpoint,则它会使用Spirng的DispatcherHandler来处理这个请求。

源码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

String scheme = requestUrl.getScheme();
if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {
return chain.filter(exchange);
}
setAlreadyRouted(exchange);

//TODO: translate url?

if (log.isTraceEnabled()) {
log.trace("Forwarding to URI: "+requestUrl);
}

return this.dispatcherHandler.handle(exchange);
}

3. LoadBalancerClient Filter

LoadBalancerClientFilter会查看exchange的属性ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR的URI内容,如果url的scheme是lb,比如:lb://myservice,或者是ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR属性的内容是lb,则它会使用Spring Cloud的LoadBalancerClient来将host转化为实际的host和port,并以此替换属性ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR的内容,原来的URL则会被添加到ServerWebExchangeUtils.GATEWAY_ORIGINAL_REQUEST_URL_ATTR属性的列表中。

源码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
//preserve the original url
addOriginalRequestUrl(exchange, url);

log.trace("LoadBalancerClientFilter url before: " + url);

final ServiceInstance instance = loadBalancer.choose(url.getHost());

if (instance == null) {
throw new NotFoundException("Unable to find instance for " + url.getHost());
}

URI uri = exchange.getRequest().getURI();

// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
// if the loadbalancer doesn't provide one.
String overrideScheme = null;
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}

URI requestUrl = loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri);

log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}

application.yml.

1
2
3
4
5
6
7
8
spring:
cloud:
gateway:
routes:
- id: myRoute
uri: lb://service
predicates:
- Path=/service/**

默认情况下,如果LoadBalancer找不到服务实例,则会返回HTTP状态码503,你也可以通过修改spring.cloud.gateway.loadbalancer.use404=true配置修改为返回状态码404

4. Netty Routing Filter

如果ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR属性中的url的scheme是httphttps,则Netty Routing Filter才会执行,并使用Netty作为http请求客户端对下游进行代理请求。请求的响应会放在exchange的ServerWebExchangeUtils.CLIENT_RESPONSE_ATTR属性中,以便后面的filter做进一步的处理。

5. Netty Write Response Filter

如果NettyWriteResponseFilter发现exchange的ServerWebExchangeUtils.CLIENT_RESPONSE_ATTR属性中存在Netty的HttpClientResponse类型实例,在所有过滤器都执行完毕后,它会将响应写回到gateway客户端的响应中。

源码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_ATTR is not added
// until the WebHandler is run
return chain.filter(exchange).then(Mono.defer(() -> {
HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR);

if (clientResponse == null) {
return Mono.empty();
}
log.trace("NettyWriteResponseFilter start");
ServerHttpResponse response = exchange.getResponse();

NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
//TODO: what if it's not netty

final Flux<NettyDataBuffer> body = clientResponse.receive()
.retain() //TODO: needed?
.map(factory::wrap);

MediaType contentType = response.getHeaders().getContentType();
return (isStreamingMediaType(contentType) ?
response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body));
}));
}

//TODO: use framework if possible
//TODO: port to WebClientWriteResponseFilter
private boolean isStreamingMediaType(@Nullable MediaType contentType) {
return (contentType != null && this.streamingMediaTypes.stream()
.anyMatch(contentType::isCompatibleWith));
}

6. RouteToRequestUrl Filter

如果exchange的ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR属性存放了Route对象,则RouteToRequestUrlFilter会根据基于请求的URI创建新的URI,新的URI会更新到ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR属性中。

如果URI有scheme前缀,比如:lb:ws://serviceidlbscheme截取出来放到ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR属性中,方便后面的filter使用。

源码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
if (route == null) {
return chain.filter(exchange);
}
log.trace("RouteToRequestUrlFilter start");
URI uri = exchange.getRequest().getURI();
boolean encoded = containsEncodedParts(uri);
URI routeUri = route.getUri();

if (hasAnotherScheme(routeUri)) {
// this is a special url, save scheme to special attribute
// replace routeUri with schemeSpecificPart
exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme());
routeUri = URI.create(routeUri.getSchemeSpecificPart());
}

URI requestUrl = UriComponentsBuilder.fromUri(uri)
.uri(routeUri)
.build(encoded)
.toUri();
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}

/* for testing */ static boolean hasAnotherScheme(URI uri) {
return schemePattern.matcher(uri.getSchemeSpecificPart()).matches() && uri.getHost() == null
&& uri.getRawPath() == null;
}

7. Websocket Routing Filter

如果请求URL的scheme是wswss的话,那么Websocket Routing Filter就会使用Spring Web Socket底层来处理对下游的请求转发。

如果Websocket也使用了负载均衡,则需要这样配置:lb:ws://serviceid.

源码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
changeSchemeIfIsWebSocketUpgrade(exchange);

URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
String scheme = requestUrl.getScheme();

if (isAlreadyRouted(exchange) || (!"ws".equals(scheme) && !"wss".equals(scheme))) {
return chain.filter(exchange);
}
setAlreadyRouted(exchange);


HttpHeaders headers = exchange.getRequest().getHeaders();
HttpHeaders filtered = filterRequest(getHeadersFilters(),
exchange);

List<String> protocols = headers.get(SEC_WEBSOCKET_PROTOCOL);
if (protocols != null) {
protocols = headers.get(SEC_WEBSOCKET_PROTOCOL).stream()
.flatMap(header -> Arrays.stream(commaDelimitedListToStringArray(header)))
.map(String::trim)
.collect(Collectors.toList());
}

return this.webSocketService.handleRequest(exchange,
new ProxyWebSocketHandler(requestUrl, this.webSocketClient,
filtered, protocols));
}

8. Making An Exchange As Routed

上面一些像ForwardRoutingFilter、Websocket Routing Filter的源码中,都可以清楚看到gateway通过设置gatewayAlreadyRouted标识这个请求是否已经路由转发出去了,无需其他filter重复路由,这样就可以避免重复错误的路由操作,保证了路由的实现灵活性。

ServerWebExchangeUtils.isAlreadyRouted检查是否已被路由,ServerWebExchangeUtils.setAlreadyRouted标记已被路由状态。

这一章介绍了Spring Cloud Gateway官方的Global Filters使用场景,下一章讲gateway其他配置的使用

如果想查看其他spring cloud gateway的案例和使用,可以点击查看