SpringCloud_Gateway_动态路由
- 思路一
配置项目路由,一般想到的就是在配置文件内添加对应的配置,这种方式也是可以实现。
但如果增加新的项目后,对应的项目也需要修改,然后重新启动。
- 思路二
在思路一的基础上,思考我们想要的效果,那就是即使后续增加新的项目,我也不需要去改动路由项目,也就是说配置的修改和刷新是动态的。
根据这个思路,可以联想到使用的 Nacos 作为配置中心的功能模块,刚好可以实现上述的功能,因此尝试使用 Nacos 和 GateWay 来实现。
可参考下面文档:
第一步
- pom.xml 引入对应的架包
<dependencies>
<!-- gateway 网关-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!--nacos 服务-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- nacos 配置-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!-- springcloud 的负载均衡,没有这个,网关会报 503-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<!-- springboot 的监控,自动加载配置的核心-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- 使用 bootstrap.yml 需要引入这个配置-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
</dependencies>
注意:
spring-cloud-starter-gateway 不能和 spring-boot-starter-web 包共存。
如果不引入 spring-cloud-starter-loadbalancer,访问接口的时候会报:503。
第二步
- bootstrap.yml 文件添加配置
server:
port: 9094
spring:
application:
name: rhx-gateway
profiles:
active: '@profile.name@'
# 日志级别配置
logging:
level:
root: info
com.rhx.gateway: debug
- bootstrap-dev.yml 文件添加配置
spring:
# nacos 配置
cloud:
nacos:
discovery:
server-addr: xxxx:8848
username: nacos
password: nacos
group: dev
config:
server-addr: xxxx:8848
# 指定配置文件格式
file-extension: yaml
username: nacos
password: nacos
# 配置文件一定要指定命名空间,否则解析不到
namespace: 547b6b16-f571-4ad4-9c0f-ac3bee137cf5
group: DEV
注意:
- 使用动态路由,需要指定的配置文件是:bootstrap.yml,而不是:application.yml。
- 如果添加 application.yml,只是在里面指定:spring.profiles.active。在项目运行的过程中会报:currentServerAddr:xxxx:8848, err : Connect timed out 这样的错误。
- 配置文件需要指定命名空间:namespace,否则解析不到配置文件。
- 默认加载的配置文件名称为:rhx-gateway-dev.yaml。(rhx-gateway 是项目名,dev 是此时 ‘@profile.name@’ 的值,yaml 是此时指定的配置文件格式)
第三步
- 配置跨域
/**
* 配置 gateway 的跨域
*/
@Configuration
public class CorsConfig {
@Bean
public CorsWebFilter corsWebFilter() {
CorsConfiguration corsConfiguration = new CorsConfiguration();
corsConfiguration.addAllowedOrigin("*");
corsConfiguration.addAllowedHeader("*");
corsConfiguration.addAllowedMethod("*");
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource(new PathPatternParser());
source.registerCorsConfiguration("/**", corsConfiguration);
return new CorsWebFilter( source);
}
}
- 解决跨域出现 Origin 重复的问题(Spring Cloud Gateway 2.x 版本)
/**
* 解决 Spring Cloud Gateway 2.x 跨域时,出现重复 Origin 的 BUG
*/
@Component
public class CorsResponseHeaderFilter implements GlobalFilter, Ordered {
@Override
public int getOrder() {
// 指定此过滤器位于 NettyWriteResponseFilter 之后
// 即待处理完响应体后接着处理响应头
return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER + 1;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return chain.filter(exchange).then(Mono.defer(() -> {
exchange.getResponse().getHeaders().entrySet().stream()
.filter(kv -> (kv.getValue() != null && kv.getValue().size() > 1))
.filter(kv -> (kv.getKey().equals(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN)
|| kv.getKey().equals(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS)))
.forEach(kv -> kv.setValue(new ArrayList<String>() {{
add(kv.getValue().get(0));
}}));
return chain.filter(exchange);
}));
}
}
第四步
- 添加访问日志类
/**
* 网关的访问日志类
*/
@Data
public class AccessLog {
/**
* 链路追踪编号
*/
private String traceId;
/**
* 用户编号
*/
private Long userId;
/**
* 用户类型
*/
private Integer userType;
/**
* 路由
*
* 类似 ApiAccessLogCreateReqDTO 的 applicationName
*/
private Route route;
/**
* 协议
*/
private String schema;
/**
* 请求方法名
*/
private String requestMethod;
/**
* 访问地址
*/
private String requestUrl;
/**
* 查询参数
*/
private MultiValueMap<String, String> queryParams;
/**
* 请求体
*/
private String requestBody;
/**
* 请求头
*/
private MultiValueMap<String, String> requestHeaders;
/**
* 用户 IP
*/
private String userIp;
/**
* 响应体
*
* 类似 ApiAccessLogCreateReqDTO 的 resultCode + resultMsg
*/
private String responseBody;
/**
* 响应头
*/
private MultiValueMap<String, String> responseHeaders;
/**
* 响应结果
*/
private HttpStatus httpStatus;
/**
* 开始请求时间
*/
private LocalDateTime startTime;
/**
* 结束请求时间
*/
private LocalDateTime endTime;
/**
* 执行时长,单位:毫秒
*/
private Integer duration;
}
第五步
- 定义日志过滤器
/**
* 网关的访问日志过滤器
*
*/
@Slf4j
@Component
public class AccessLogFilter implements GlobalFilter, Ordered {
private final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();
/**
* 打印日志
*
* @param gatewayLog 网关日志
*/
private void writeAccessLog(AccessLog gatewayLog) {
// 方式一:打印 Logger 后,通过 ELK 进行收集
// log.info("[writeAccessLog][日志内容:{}]", JsonUtils.toJsonString(gatewayLog));
// 方式二:打印到控制台,方便排查错误
Map<String, Object> values = MapUtil.newHashMap(15, true); // 手工拼接,保证排序;15 保证不用扩容
values.put("routeId", gatewayLog.getRoute() != null ? gatewayLog.getRoute().getId() : null);
values.put("schema", gatewayLog.getSchema());
values.put("requestUrl", gatewayLog.getRequestUrl());
values.put("queryParams", gatewayLog.getQueryParams().toSingleValueMap());
values.put("requestBody", JSONUtil.isTypeJSON(gatewayLog.getRequestBody()) ? // 保证 body 的展示好看
JSONUtil.parse(gatewayLog.getRequestBody()) : gatewayLog.getRequestBody());
values.put("requestHeaders", JSONUtil.toJsonStr(gatewayLog.getRequestHeaders().toSingleValueMap()));
values.put("userIp", gatewayLog.getUserIp());
/* values.put("responseBody", JSONUtil.isTypeJSON(gatewayLog.getResponseBody()) ? // 保证 body 的展示好看
JSONUtil.parse(gatewayLog.getResponseBody()) : gatewayLog.getResponseBody());*/
values.put("responseHeaders", gatewayLog.getResponseHeaders() != null ?
JSONUtil.toJsonStr(gatewayLog.getResponseHeaders().toSingleValueMap()) : null);
values.put("httpStatus", gatewayLog.getHttpStatus());
values.put("startTime", LocalDateTimeUtil.format(gatewayLog.getStartTime(), NORM_DATETIME_MS_FORMATTER));
values.put("endTime", LocalDateTimeUtil.format(gatewayLog.getEndTime(), NORM_DATETIME_MS_FORMATTER));
values.put("duration", gatewayLog.getDuration() != null ? gatewayLog.getDuration() + " ms" : null);
log.info("[writeAccessLog][网关日志:{}]", JSONUtil.toJsonPrettyStr(values));
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 将 Request 中可以直接获取到的参数,设置到网关日志
ServerHttpRequest request = exchange.getRequest();
// TODO traceId
AccessLog gatewayLog = new AccessLog();
gatewayLog.setRoute(getGatewayRoute(exchange));
gatewayLog.setSchema(request.getURI().getScheme());
gatewayLog.setRequestMethod(request.getMethodValue());
gatewayLog.setRequestUrl(request.getURI().getRawPath());
gatewayLog.setQueryParams(request.getQueryParams());
gatewayLog.setRequestHeaders(request.getHeaders());
gatewayLog.setStartTime(LocalDateTime.now());
gatewayLog.setUserIp(getClientIP(exchange));
// 继续 filter 过滤
MediaType mediaType = request.getHeaders().getContentType();
if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)
|| MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { // 适合 JSON 和 Form 提交的请求
return filterWithRequestBody(exchange, chain, gatewayLog);
}
return filterWithoutRequestBody(exchange, chain, gatewayLog);
}
/**
* 获得客户端 IP
*
* 参考 {@link ServletUtil} 的 getClientIP 方法
*
* @param exchange 请求
* @param otherHeaderNames 其它 header 名字的数组
* @return 客户端 IP
*/
public static String getClientIP(ServerWebExchange exchange, String... otherHeaderNames) {
String[] headers = { "X-Forwarded-For", "X-Real-IP", "Proxy-Client-IP", "WL-Proxy-Client-IP", "HTTP_CLIENT_IP", "HTTP_X_FORWARDED_FOR" };
if (ArrayUtil.isNotEmpty(otherHeaderNames)) {
headers = ArrayUtil.addAll(headers, otherHeaderNames);
}
// 方式一,通过 header 获取
String ip;
for (String header : headers) {
ip = exchange.getRequest().getHeaders().getFirst(header);
if (!NetUtil.isUnknown(ip)) {
return NetUtil.getMultistageReverseProxyIp(ip);
}
}
// 方式二,通过 remoteAddress 获取
if (exchange.getRequest().getRemoteAddress() == null) {
return null;
}
ip = exchange.getRequest().getRemoteAddress().getHostString();
return NetUtil.getMultistageReverseProxyIp(ip);
}
/**
* 获得请求匹配的 Route 路由
*
* @param exchange 请求
* @return 路由
*/
public static Route getGatewayRoute(ServerWebExchange exchange) {
return exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
}
private Mono<Void> filterWithoutRequestBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessLog accessLog) {
// 包装 Response,用于记录 Response Body
ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, accessLog);
return chain.filter(exchange.mutate().response(decoratedResponse).build())
.then(Mono.fromRunnable(() -> writeAccessLog(accessLog))); // 打印日志
}
/**
* 参考 {@link ModifyRequestBodyGatewayFilterFactory} 实现
*
* 差别主要在于使用 modifiedBody 来读取 Request Body 数据
*/
private Mono<Void> filterWithRequestBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessLog gatewayLog) {
// 设置 Request Body 读取时,设置到网关日志
ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);
Mono<String> modifiedBody = serverRequest.bodyToMono(String.class).flatMap(body -> {
gatewayLog.setRequestBody(body);
return Mono.just(body);
});
// 创建 BodyInserter 对象
BodyInserter<Mono<String>, ReactiveHttpOutputMessage> bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);
// 创建 CachedBodyOutputMessage 对象
HttpHeaders headers = new HttpHeaders();
headers.putAll(exchange.getRequest().getHeaders());
// the new content type will be computed by bodyInserter
// and then set in the request decorator
headers.remove(HttpHeaders.CONTENT_LENGTH); // 移除
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
// 通过 BodyInserter 将 Request Body 写入到 CachedBodyOutputMessage 中
return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
// 包装 Request,用于缓存 Request Body
ServerHttpRequest decoratedRequest = requestDecorate(exchange, headers, outputMessage);
// 包装 Response,用于记录 Response Body
ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, gatewayLog);
// 记录普通的
return chain.filter(exchange.mutate().request(decoratedRequest).response(decoratedResponse).build())
.then(Mono.fromRunnable(() -> writeAccessLog(gatewayLog))); // 打印日志
}));
}
/**
* 记录响应日志
* 通过 DataBufferFactory 解决响应体分段传输问题。
*/
private ServerHttpResponseDecorator recordResponseLog(ServerWebExchange exchange, AccessLog gatewayLog) {
ServerHttpResponse response = exchange.getResponse();
return new ServerHttpResponseDecorator(response) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
/* DataBufferFactory bufferFactory = response.bufferFactory();*/
// 计算执行时间
gatewayLog.setEndTime(LocalDateTime.now());
gatewayLog.setDuration((int) (LocalDateTimeUtil.between(gatewayLog.getStartTime(),
gatewayLog.getEndTime()).toMillis()));
// 设置其它字段
gatewayLog.setResponseHeaders(response.getHeaders());
gatewayLog.setHttpStatus(response.getStatusCode());
// 获取响应类型,如果是 json 就打印
/* String originalResponseContentType = exchange.getAttribute(ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
if (StringUtils.isNotBlank(originalResponseContentType)
&& originalResponseContentType.contains("application/json")) {
Flux<? extends DataBuffer> fluxBody = Flux.from(body);
return super.writeWith(fluxBody.buffer().map(dataBuffers -> {
// 设置 response body 到网关日志
byte[] content = readContent(dataBuffers);
String responseResult = new String(content, StandardCharsets.UTF_8);
gatewayLog.setResponseBody(responseResult);
// 响应
return bufferFactory.wrap(content);
}));
}*/
}
// if body is not a flux. never got there.
return super.writeWith(body);
}
};
}
// ========== 参考 ModifyRequestBodyGatewayFilterFactory 中的方法 ==========
/**
* 请求装饰器,支持重新计算 headers、body 缓存
*
* @param exchange 请求
* @param headers 请求头
* @param outputMessage body 缓存
* @return 请求装饰器
*/
private ServerHttpRequestDecorator requestDecorate(ServerWebExchange exchange, HttpHeaders headers, CachedBodyOutputMessage outputMessage) {
return new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public HttpHeaders getHeaders() {
long contentLength = headers.getContentLength();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(super.getHeaders());
if (contentLength > 0) {
httpHeaders.setContentLength(contentLength);
} else {
// TODO: this causes a 'HTTP/1.1 411 Length Required' // on
// httpbin.org
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
}
return httpHeaders;
}
@Override
public Flux<DataBuffer> getBody() {
return outputMessage.getBody();
}
};
}
private byte[] readContent(List<? extends DataBuffer> dataBuffers) {
// 合并多个流集合,解决返回体分段传输
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
DataBuffer join = dataBufferFactory.join(dataBuffers);
byte[] content = new byte[join.readableByteCount()];
join.read(content);
// 释放掉内存
DataBufferUtils.release(join);
return content;
}
}
第六步
- 定义全局异常
/**
* Gateway 的全局异常
*/
@Component
@Order(-1) // 保证优先级高于默认的 Spring Cloud Gateway 的 ErrorWebExceptionHandler 实现
@Slf4j
public class GlobalExceptionHandler implements ErrorWebExceptionHandler {
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
// 已经 commit,则直接返回异常
ServerHttpResponse response = exchange.getResponse();
if (response.isCommitted()) {
return Mono.error(ex);
}
// 转换成 CommonResult
R<?> result;
if (ex instanceof ResponseStatusException) {
result = responseStatusExceptionHandler(exchange, (ResponseStatusException) ex);
} else {
result = defaultExceptionHandler(exchange, ex);
}
// 返回给前端
return writeJSON(exchange, result);
}
/**
* 处理 Spring Cloud Gateway 默认抛出的 ResponseStatusException 异常
*/
private R<?> responseStatusExceptionHandler(ServerWebExchange exchange, ResponseStatusException ex) {
ServerHttpRequest request = exchange.getRequest();
log.error("[responseStatusExceptionHandler][uri({}/{}) 发生异常]", request.getURI(), request.getMethod(), ex);
return R.fail(ex.getRawStatusCode(), ex.getReason());
}
/**
* 处理系统异常,兜底处理所有的一切
*/
@ExceptionHandler(value = Exception.class)
public R<?> defaultExceptionHandler(ServerWebExchange exchange, Throwable ex) {
ServerHttpRequest request = exchange.getRequest();
log.error("[defaultExceptionHandler][uri({}/{}) 发生异常]", request.getURI(), request.getMethod(), ex);
return R.fail(ExcepCode.SYSTEM_ERROR.code, ExcepCode.SYSTEM_ERROR.message);
}
/**
* 返回前端JSON数据
* @param exchange
* @param object
* @return
*/
public static Mono<Void> writeJSON(ServerWebExchange exchange, Object object) {
// 设置 header
ServerHttpResponse response = exchange.getResponse();
response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
// 设置 body
return response.writeWith(Mono.fromSupplier(() -> {
DataBufferFactory bufferFactory = response.bufferFactory();
try {
return bufferFactory.wrap(JSON.toJSONBytes(object));
} catch (Exception ex) {
ServerHttpRequest request = exchange.getRequest();
log.error("[writeJSON][uri({}/{}) 发生异常]", request.getURI(), request.getMethod(), ex);
return bufferFactory.wrap(new byte[0]);
}
}));
}
}
第七步
- 在 Nacos 的配置列表的命名空间内配置 rhx-gateway-dev.yaml 文件。
- 指定 Data ID:rhx-gateway-dev.yaml。
- Group:DEV。
- 配置内容:
spring: cloud: gateway: routes: - id: rhx-client uri: lb://rhx-client predicates: - Path=/client/** - id: rhx-manage uri: lb://rhx-manage predicates: - Path=/manage/**
- 这里只是配置了路由分发功能的实现,后续如果有新的项目,只需要修改 rhx-gateway-dev.yaml 文件里的配置,然后发布即可,不需要该动项目。
- 路由拦截校验根据自己实际的项目需求来增加。