Spring Cloud Gateway网关全局核心过滤器路由执行过程详解

频道:行业资讯 日期: 浏览:134
环境:SpringBoot2.7.10 + Spring Cloud gateway3.1.6

1 RouteToRequestUrlFilter

根据路由配置的url信息,构建成为要访问的目标地址,如下路由配置:?

复制

spring:

cloud:

gateway:

enabled: true

# 全局超时配置

httpclient:

connect-timeout: 10000

response-timeout: 5000

discovery:

locator:

enabled: true

lowerCaseServiceId: true

# 这里是全局过滤器,也就是下面在介绍过滤器执行的时候一定会执行StripPrefixGatewayFilterFactory#apply

# 返回的过滤器,如下路由配置:该过滤器会将你的请求转换为:http://localhost:8088/demos,保存到上下文中

# ServerWebExchange#getAttributes().put(GATEWAY_REQUEST_URL_ATTR, newRequest.getURI())

default-filters:

- StripPrefix=1

routes:

- id: R001

uri: http://localhost:8787

predicates:

- Path=/api-1/**,/api-2/**

metadata:

akf: "dbc"

#局部超时设置

connect-timeout: 10000

response-timeout: 5000

- id: st001

uri: lb://storage-service

predicates:

- Path=/api-x/**

- id: o001

uri: lb://order-service

predicates:

- Path=/api-a/**, /api-b/**

metadata:

akf: "dbc"

#局部超时设置

connect-timeout: 10000

response-timeout: 5000

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

19.

20.

21.

22.

23.

24.

25.

26.

27.

28.

29.

30.

31.

32.

33.

34.

35.

36.

37.

38.

39.

40.

访问:??http://localhost:8088/api-1/demos??

转换后:??http://localhost:8787/demos??

该过滤器最后会将转换后的url保存到上下文中

复制

ServerWebExchange#getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);

1.

注意:上面的StripPrefixGatewayFilterFactory#apply过滤器执行完后,才会执行该过滤器。

总结:

访问:

http://localhost:9090/api-x/orders ,路由地址:lb://order-service

转换地址

转换后:http://localhost:9090/orders

合并地址

将上一步的地址进一步合并为:lb://order-service/orders

将地址存储到上下文中:exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);

2 ReactiveLoadBalancerClientFilter

如果URL有一个lb(例如lb://order-service),它使用Spring Cloud ReactorLoadBalancer将名称(在本例中为order-service)解析为一个实际的主机和端口,并替换相同属性中的URI。?

复制

public class ReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {

private final LoadBalancerClientFactory clientFactory;

private final GatewayLoadBalancerProperties properties;

private final LoadBalancerProperties loadBalancerProperties;

public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {

// 从上下文中获取,如:lb://order-service/orders

URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);

String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);

if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {

return chain.filter(exchange);

}

// preserve the original url

addOriginalRequestUrl(exchange, url);

// 再次获取

URI requestUri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);

// 获取服务名;order-service

String serviceId = requestUri.getHost();

// clientFactory.getInstances方法会从NamedContextFactory.contexts集合中查找以order-service为key对应的

// AnnotationConfigApplicationContext,然后从这个容器中查找LoadBalancerLifecycle,默认返回{}

// ------------------------------------------------------------

/**

* 每个服务对应的ApplicationContext包含如下13个Bean

* org.springframework.context.annotation.internalConfigurationAnnotationProcessor

* org.springframework.context.annotation.internalAutowiredAnnotationProcessor

* org.springframework.context.annotation.internalCommonAnnotationProcessor

* org.springframework.context.event.internalEventListenerProcessor

* org.springframework.context.event.internalEventListenerFactory

* propertyPlaceholderAutoConfiguration loadBalancerClientConfiguration

* propertySourcesPlaceholderConfigurer

* LoadBalancerClientConfiguration$ReactiveSupportConfiguration

* discoveryClientServiceInstanceListSupplier

* LoadBalancerClientConfiguration$BlockingSupportConfiguration,

* reactorServiceInstanceLoadBalancer

*/

// 这里集合返回{}

Set supportedLifecycleProcessors = LoadBalancerLifecycleValidator

.getSupportedLifecycleProcessors(clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),

RequestDataContext.class, ResponseData.class, ServiceInstance.class);

DefaultRequest lbRequest = new DefaultRequest<>(new RequestDataContext(

new RequestData(exchange.getRequest()), getHint(serviceId, loadBalancerProperties.getHint())));

// choose负载查找指定服务(order-server)

return choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response -> {

if (!response.hasServer()) {

supportedLifecycleProcessors.forEach(lifecycle -> lifecycle

.onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, response)));

throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost());

}

ServiceInstance retrievedInstance = response.getServer();

URI uri = exchange.getRequest().getURI();

// if the `lb:` mechanism was used, use `` as the default,

// if the loadbalancer doesn't provide one.

String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";

if (schemePrefix != null) {

overrideScheme = url.getScheme();

}

DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance, overrideScheme);

URI requestUrl = reconstructURI(serviceInstance, uri);

exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);

exchange.getAttributes().put(GATEWAY_LOADBALANCER_RESPONSE_ATTR, response);

supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, response));

}).then(chain.filter(exchange))

.doOnError(throwable -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(

new CompletionContext(CompletionContext.Status.FAILED,

throwable, lbRequest, exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR)))))

.doOnSuccess(aVoid -> supportedLifecycleProcessors.forEach(

lifecycle -> lifecycle.onComplete(new CompletionContext(

CompletionContext.Status.SUCCESS, lbRequest, exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR),

new ResponseData(exchange.getResponse(), new RequestData(exchange.getRequest()))))));

}

protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {

return LoadBalancerUriTools.reconstructURI(serviceInstance, original);

}

private Mono> choose(Request lbRequest, String serviceId,

Set supportedLifecycleProcessors) {

// 从order-service对应的ApplicationContext中查找ReactorServiceInstanceLoadBalancer

ReactorLoadBalancer loadBalancer = this.clientFactory.getInstance(serviceId,

ReactorServiceInstanceLoadBalancer.class);

if (loadBalancer == null) {

throw new NotFoundException("No loadbalancer available for " + serviceId);

}

supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));

// 查找服务实例

return loadBalancer.choose(lbRequest);

}

private String getHint(String serviceId, Map hints) {

String defaultHint = hints.getOrDefault("default", "default");

String hintPropertyValue = hints.get(serviceId);

return hintPropertyValue != null ? hintPropertyValue : defaultHint;

}

}

// 轮询算分

public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {

final AtomicInteger position;

ObjectProvider serviceInstanceListSupplierProvider;

public Mono> choose(Request request) {

// 接下面ClientFactoryObjectProvider中获取ServiceInstanceListSupplier

ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider

.getIfAvailable(NoopServiceInstanceListSupplier::new);

return supplier.get(request).next().map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));

}

private Response processInstanceResponse(ServiceInstanceListSupplier supplier,

List serviceInstances) {

Response serviceInstanceResponse = getInstanceResponse(serviceInstances);

if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {

((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());

}

return serviceInstanceResponse;

}

private Response getInstanceResponse(List instances) {

if (instances.isEmpty()) {

return new EmptyResponse();

}

// TODO: enforce order?

int pos = Math.abs(this.position.incrementAndGet());

ServiceInstance instance = instances.get(pos % instances.size());

return new DefaultResponse(instance);

}

}

class ClientFactoryObjectProvider implements ObjectProvider {

private final NamedContextFactory clientFactory;

// type = ServiceInstanceListSupplier

private final Class type;

// name = order-service

private final String name;

private ObjectProvider delegate() {

if (this.provider == null) {

// 从order-service对应ApplicationContext中获取ServiceInstanceListSupplier

// 这里最终返回的是:DiscoveryClientServiceInstanceListSupplier

this.provider = this.clientFactory.getProvider(this.name, this.type);

}

return this.provider;

}

}

public class LoadBalancerClientConfiguration {

@Configuration(proxyBeanMethods = false)

@ConditionalOnReactiveDiscoveryEnabled

@Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER)

public static class ReactiveSupportConfiguration {

@Bean

@ConditionalOnBean(ReactiveDiscoveryClient.class)

@ConditionalOnMissingBean

@ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "default", matchIfMissing = true)

public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(

ConfigurableApplicationContext context) {

// 这里最终构建的是:DiscoveryClientServiceInstanceListSupplier

return ServiceInstanceListSupplier.builder().withDiscoveryClient().withCaching().build(context);

}

}

}

public final class ServiceInstanceListSupplierBuilder {

public ServiceInstanceListSupplierBuilder withDiscoveryClient() {

this.baseCreator = context -> {

// 先从order-service对应的ApplicationContext中查找ReactiveDiscoveryClient,如果你没有自定义,那么就会从

// 父容器中查找,如果你使用的nacos,那么会返回NacosReactiveDiscoveryClient

ReactiveDiscoveryClient discoveryClient = context.getBean(ReactiveDiscoveryClient.class);

return new DiscoveryClientServiceInstanceListSupplier(discoveryClient, context.getEnvironment());

};

return this;

}

}

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

19.

20.

21.

22.

23.

24.

25.

26.

27.

28.

29.

30.

31.

32.

33.

34.

35.

36.

37.

38.

39.

40.

41.

42.

43.

44.

45.

46.

47.

48.

49.

50.

51.

52.

53.

54.

55.

56.

57.

58.

59.

60.

61.

62.

63.

64.

65.

66.

67.

68.

69.

70.

71.

72.

73.

74.

75.

76.

77.

78.

79.

80.

81.

82.

83.

84.

85.

86.

87.

88.

89.

90.

91.

92.

93.

94.

95.

96.

97.

98.

99.

100.

101.

102.

103.

104.

105.

106.

107.

108.

109.

110.

111.

112.

113.

114.

115.

116.

117.

118.

119.

120.

121.

122.

123.

124.

125.

126.

127.

128.

129.

130.

131.

132.

133.

134.

135.

136.

137.

138.

139.

140.

141.

142.

143.

144.

145.

146.

147.

148.

149.

150.

151.

152.

153.

154.

155.

156.

157.

158.

159.

160.

161.

162.

163.

164.

165.

166.

167.

168.

169.

170.

171.

172.

173.

174.

175.

176.

177.

178.

179.

180.

181.

182.

183.

184.

185.

186.

总结:

获取地址

获取上一步中保存在上下文的地址

URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);

获取LoadBalancerLifecycle

取得当前服务(order-service),对应的AnnotationConfigApplicationContext中配置的LoadBalancerLifecycle,该负载均衡生命周期能够监控负载均衡的执行过程。该类是泛型类,3个泛型参数,类型依次为:RequestDataContext.class, ResponseData.class, ServiceInstance.class。

获取ReactorServiceInstanceLoadBalancer

获取当前服务(order-server),对应的AnnotationConfigApplicationContext中配置的ReactorServiceInstanceLoadBalancer。每一个服务都有一个对应的默认配置类LoadBalancerClientConfiguration,该配置类中有默认的RoundRobinLoadBalancer。我们可以为具体的服务提供LoadBalancerClientSpecification?类型的Bean,该类我们可以指定你要配置的serviceId及配置类,在配置类中我们可以自定义ReactorServiceInstanceLoadBalancer?的实现类Bean。

选择服务

在上一步中获得ReactorServiceInstanceLoadBalancer后,接下来就是选取一个服务实例了。

重构URI

上一步中获取了ServiceInstance?后就能够重构URL了,当前的URL为: http://localhost:9090/orders 构建后:http://localhost:9093/storages ,将该URL保存到上下文中 exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);

3 NettyRoutingFilter

复制

public class NettyRoutingFilter implements GlobalFilter {

private final HttpClient httpClient;

public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {

// 从上下文中获取解析后的目标地址

URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

// ...

// 获取上下文中的路由信息

Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);

// getHttpClient获取客户端信息

Flux responseFlux = getHttpClient(route, exchange).headers(headers -> {

// ...

}).request(method).uri(url).send((req, nettyOutbound) -> {

// 发送网络请求

return nettyOutbound.send(request.getBody().map(this::getByteBuf));

}).responseConnection((res, connection) -> {

exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);

// 建立的Connection对象保存到上下文中,在后续的NettyWriteResponseFilter中会获取该对象获取响应数据

exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);

ServerHttpResponse response = exchange.getResponse();

HttpHeaders headers = new HttpHeaders();

res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));

String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);

if (StringUtils.hasLength(contentTypeValue)) {

exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);

}

setResponseStatus(res, response);

HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange,

Type.RESPONSE);

if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING)

&& filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) {

response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);

}

exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());

response.getHeaders().putAll(filteredResponseHeaders);

return Mono.just(res);

});

// 从路由中的元数据中获取response-timeout响应超时时间

Duration responseTimeout = getResponseTimeout(route);

if (responseTimeout != null) {

responseFlux = responseFlux

// 设置超时时间

.timeout(responseTimeout,

Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout)))

.onErrorMap(TimeoutException.class,

th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th));

}

return responseFlux.then(chain.filter(exchange));

}

protected HttpClient getHttpClient(Route route, ServerWebExchange exchange) {

// 从路由的元数据中获取配置的连接超时时间:connect-timeout

Object connectTimeoutAttr = route.getMetadata().get(CONNECT_TIMEOUT_ATTR);

if (connectTimeoutAttr != null) {

Integer connectTimeout = getInteger(connectTimeoutAttr);

// 设置Netty的连接超时时间

// io.netty.channel.ChannelOption

return this.httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);

}

return httpClient;

}

}

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

19.

20.

21.

22.

23.

24.

25.

26.

27.

28.

29.

30.

31.

32.

33.

34.

35.

36.

37.

38.

39.

40.

41.

42.

43.

44.

45.

46.

47.

48.

49.

50.

51.

52.

53.

54.

55.

56.

57.

58.

59.

60.

61.

62.

63.

64.

65.

66.

总结:

获取URL

获取上一步保存在上下文中的URL

URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

设置当前路由状态

设置当前路由已经路由状态

setAlreadyRouted(exchange);

exchange.getAttributes().put(GATEWAY_ALREADY_ROUTED_ATTR, true);

获取路由

Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);

获取当前的Route信息。主要就用来获取配置路由时提供的配置信息,比如:超时时间设置,如上配置。RoutePredicateHandlerMapping#getHandlerInternal方法中保存路由到上下文中

构建HttpClient

通过上一步取得的Route对象,配置HttpClient相关属性,比如:超时时间。配置基本的http相关信息,建立连接后将Connection对象保存到上下文中,供下一个过滤器获取响应数据

4 NettyWriteResponseFilter

该过滤器的作用是处理由NettyRoutingFilter中建立的HTTP请求(包括:请求参数,请求头,建立连接);在NettyRoutingFilter中会将建立连接后的Connection保存到ServerWebExchange上下文中。?

复制

public class NettyWriteResponseFilter implements GlobalFilter, Ordered {

public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {

// NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added

// until the NettyRoutingFilter is run

// @formatter:off

return chain.filter(exchange)

.doOnError(throwable -> cleanup(exchange))

.then(Mono.defer(() -> {

Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);

if (connection == null) {

return Mono.empty();

}

ServerHttpResponse response = exchange.getResponse();

// TODO: needed?

final Flux body = connection

.inbound()

.receive()

.retain()

.map(byteBuf -> wrap(byteBuf, response));

MediaType contentType = null;

try {

contentType = response.getHeaders().getContentType();

}

// 根据不同的ContentType做不同的响应

return (isStreamingMediaType(contentType)

? response.writeAndFlushWith(body.map(Flux::just))

: response.writeWith(body));

})).doOnCancel(() -> cleanup(exchange));

// @formatter:on

}

protected DataBuffer wrap(ByteBuf byteBuf, ServerHttpResponse response) {

DataBufferFactory bufferFactory = response.bufferFactory();

if (bufferFactory instanceof NettyDataBufferFactory) {

NettyDataBufferFactory factory = (NettyDataBufferFactory) bufferFactory;

return factory.wrap(byteBuf);

}

// MockServerHttpResponse creates these

else if (bufferFactory instanceof DefaultDataBufferFactory) {

DataBuffer buffer = ((DefaultDataBufferFactory) bufferFactory).allocateBuffer(byteBuf.readableBytes());

buffer.write(byteBuf.nioBuffer());

byteBuf.release();

return buffer;

}

throw new IllegalArgumentException("Unkown DataBufferFactory type " + bufferFactory.getClass());

}

private void cleanup(ServerWebExchange exchange) {

Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);

if (connection != null && connection.channel().isActive() && !connection.isPersistent()) {

connection.dispose();

}

}

private boolean isStreamingMediaType(@Nullable MediaType contentType) {

return (contentType != null && this.streamingMediaTypes.stream().anyMatch(contentType::isCompatibleWith));

}

}

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

19.

20.

21.

22.

23.

24.

25.

26.

27.

28.

29.

30.

31.

32.

33.

34.

35.

36.

37.

38.

39.

40.

41.

42.

43.

44.

45.

46.

47.

48.

49.

50.

51.

52.

53.

54.

55.

56.

57.

58.

59.

60.

61.

总结:

取得Connection

取得上一步中保存的Connection

Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);

响应内容

输出微服务端响应的数据?

复制

final Flux body = connection

.inbound()

.receive()

.retain()

.map(byteBuf -> wrap(byteBuf, response));

1.

2.

3.

4.

5.

以上就是Gateway在处理一个路由请求的执行流程

完毕!!!

0 留言

评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。