|
@@ -0,0 +1,227 @@
|
|
|
|
+package com.ruoyi.common.utils;
|
|
|
|
+
|
|
|
|
+import cn.hutool.core.bean.BeanUtil;
|
|
|
|
+import cn.hutool.core.collection.CollectionUtil;
|
|
|
|
+import cn.hutool.core.date.DateUtil;
|
|
|
|
+import cn.hutool.core.util.ObjectUtil;
|
|
|
|
+import cn.hutool.http.HttpUtil;
|
|
|
|
+import com.ruoyi.common.constant.EnumType;
|
|
|
|
+import com.ruoyi.common.utils.bean.webclient.*;
|
|
|
|
+import io.netty.channel.ChannelOption;
|
|
|
|
+import io.netty.handler.timeout.ReadTimeoutHandler;
|
|
|
|
+import io.netty.handler.timeout.WriteTimeoutHandler;
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
+import org.springframework.core.ParameterizedTypeReference;
|
|
|
|
+import org.springframework.http.HttpHeaders;
|
|
|
|
+import org.springframework.http.MediaType;
|
|
|
|
+import org.springframework.http.client.reactive.ClientHttpConnector;
|
|
|
|
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
|
|
|
|
+import org.springframework.http.client.reactive.ReactorResourceFactory;
|
|
|
|
+import org.springframework.web.reactive.function.client.ClientResponse;
|
|
|
|
+import org.springframework.web.reactive.function.client.WebClient;
|
|
|
|
+import reactor.core.publisher.Mono;
|
|
|
|
+import reactor.netty.http.client.HttpClient;
|
|
|
|
+import reactor.netty.resources.ConnectionProvider;
|
|
|
|
+import reactor.netty.resources.LoopResources;
|
|
|
|
+
|
|
|
|
+import java.util.Map;
|
|
|
|
+
|
|
|
|
+/**
|
|
|
|
+ * @author ChenYang
|
|
|
|
+ * 创建一个全局唯一的webClient并对外开放底层接口
|
|
|
|
+ */
|
|
|
|
+@Slf4j
|
|
|
|
+public class WebClientUtils {
|
|
|
|
+
|
|
|
|
+ private static final ReactorResourceFactory reactorResourceFactory = new ReactorResourceFactory();
|
|
|
|
+
|
|
|
|
+ private static final WebClient webClient;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 系统级别的配置,业务配置不应该在这里处理,
|
|
|
|
+ * 这里的所有参数为WebClient的创建的时候的配置参数,
|
|
|
|
+ * 加入到系统级别配置(而非业务配置),以便于在后续测试中通过调整参数来进行性能优化
|
|
|
|
+ */
|
|
|
|
+ private static final String RESOURCE_REMARK = "http_client";
|
|
|
|
+ private static final Integer INIT_WORK_COUNT = 50;
|
|
|
|
+ private static final Integer MAX_CONN_COUNT = 50;
|
|
|
|
+ private static final Integer DEFAULT_CONNECT_TIMEOUT = 60;
|
|
|
|
+ private static final int DEFAULT_WRITE_TIMEOUT = 10;
|
|
|
|
+ private static final int DEFAULT_READ_TIMEOUT = 10;
|
|
|
|
+ private static final long DEFAULT_RETRY_COUNT = 3L;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * //TODO 下面是后期优化计划
|
|
|
|
+ * 缺点:是webClient一旦被创建则不可变, 无法对当前的web进行重写,当前妥协的方式可以提供方法由子类进行复制一份进行构建,
|
|
|
|
+ * 使用{@link WebClient#mutate()}方法即可,
|
|
|
|
+ * 创建一个WebClientConfig读取配置文件并注入到Spring中,
|
|
|
|
+ * 并定义一个WebClient处理器如同WebClientProcessor进行数据的处理和传输,
|
|
|
|
+ * 以便于自定义工作线程连接资源的数据,
|
|
|
|
+ * 这里可以使用initWebClient方法配置各种参数配置文件中,
|
|
|
|
+ */
|
|
|
|
+ static {
|
|
|
|
+
|
|
|
|
+ //创建网络连接资源的工厂参数初始化
|
|
|
|
+ reactorResourceFactory.setUseGlobalResources(Boolean.FALSE);
|
|
|
|
+ reactorResourceFactory.setConnectionProvider(ConnectionProvider.create(RESOURCE_REMARK, MAX_CONN_COUNT));
|
|
|
|
+ reactorResourceFactory.setLoopResources(LoopResources.create(RESOURCE_REMARK, INIT_WORK_COUNT, Boolean.TRUE));
|
|
|
|
+
|
|
|
|
+ ClientHttpConnector httpConnector =
|
|
|
|
+ new ReactorClientHttpConnector(reactorResourceFactory, httpClient ->
|
|
|
|
+ HttpClient.create().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, DEFAULT_CONNECT_TIMEOUT)
|
|
|
|
+ .option(ChannelOption.TCP_NODELAY, Boolean.TRUE)
|
|
|
|
+ .doOnConnected(connection ->
|
|
|
|
+ {
|
|
|
|
+ //因为HTTP协议为短链接,所有在指定之间之内没有发生读写时间,则触发断开连接的操作,
|
|
|
|
+ // 此时默认的超时时间为10s,即从本次读写事件起10s检测不到读写事件了,则断开此次连接。
|
|
|
|
+ connection.addHandlerLast(new ReadTimeoutHandler(DEFAULT_READ_TIMEOUT));
|
|
|
|
+ connection.addHandlerLast(new WriteTimeoutHandler(DEFAULT_WRITE_TIMEOUT));
|
|
|
|
+ }));
|
|
|
|
+
|
|
|
|
+ webClient = WebClient.builder()
|
|
|
|
+ .filter((request, next) -> {
|
|
|
|
+ log.info("current request url is {}, request header info is {}, current time is {}",
|
|
|
|
+ request.url(), request.headers(), DateUtil.now());
|
|
|
|
+ Mono<ClientResponse> exchange = next.exchange(request).retry(DEFAULT_RETRY_COUNT);
|
|
|
|
+ ClientResponse clientResponse = exchange.block();
|
|
|
|
+ if(ObjectUtil.isNull(clientResponse.headers())) return exchange;
|
|
|
|
+ log.info("request header info is {}, current time is {}",
|
|
|
|
+ clientResponse.headers().asHttpHeaders(), DateUtil.now());
|
|
|
|
+ return exchange;
|
|
|
|
+ }).clientConnector(httpConnector).build();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public static <T extends BizRequest, R> BaseResponse<R> invoke(String url, HttpHeaders headerInfo,
|
|
|
|
+ BaseRequest<T> request, Class<R> responseType){
|
|
|
|
+ log.info("current request body is {}, current time is {}", JacksonUtil.toString(request.getBizRequest()), DateUtil.now());
|
|
|
|
+ BaseResponse<R> baseResponse = new BaseResponse<>();
|
|
|
|
+ if(ObjectUtil.equal(request.getHttpMethod(), EnumType.HttpMethod.GET))
|
|
|
|
+ baseResponse = invokeGet(url, headerInfo, (BizGetRequest) request.getBizRequest(), responseType);
|
|
|
|
+ if(ObjectUtil.equal(request.getHttpMethod(), EnumType.HttpMethod.POST))
|
|
|
|
+ baseResponse = invokePost(url, headerInfo, (BizPostRequest) request.getBizRequest(), responseType);
|
|
|
|
+ log.info("current request body is {}, current time is {}", JacksonUtil.toString(baseResponse), DateUtil.now());
|
|
|
|
+ return baseResponse;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 创建GET请求,进行同步调用
|
|
|
|
+ * @param url 请求的Url全路径
|
|
|
|
+ * @param request 请求体,需要继承BaseRequest
|
|
|
|
+ * @param responseType 响应体类型
|
|
|
|
+ * @param <T> 请求体泛型
|
|
|
|
+ * @param <R> 响应业务数据
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
|
|
+ public static <T extends BizGetRequest, R> BaseResponse<R> invokeGet(String url, T request, Class<R> responseType){
|
|
|
|
+ return invokeGet(url, request, responseType);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 创建GET请求,进行同步调用
|
|
|
|
+ * @param url 请求的Url全路径
|
|
|
|
+ * @param request 请求体,需要继承BaseRequest
|
|
|
|
+ * @param headerInfo 请求头信息
|
|
|
|
+ * @param responseType 响应体业务类型
|
|
|
|
+ * @param <T> 请求体泛型
|
|
|
|
+ * @param <R> 响应业务数据
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
|
|
+ public static <T extends BizGetRequest, R> BaseResponse<R> invokeGet(String url, HttpHeaders headerInfo, T request, Class<R> responseType){
|
|
|
|
+
|
|
|
|
+ WebClient.RequestHeadersSpec<?> headersSpec = webClient.get().uri(url, uriBuilder ->
|
|
|
|
+ uriBuilder.query(HttpUtil.toParams(BeanUtil.beanToMap(request))).build());
|
|
|
|
+ if(CollectionUtil.isNotEmpty(headerInfo)) {
|
|
|
|
+ headersSpec = headersSpec.headers(headers -> headers.addAll(headerInfo));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ Mono<BaseResponse<R>> responseMono
|
|
|
|
+ = headersSpec.retrieve()
|
|
|
|
+ .bodyToMono(ParameterizedTypeReference.forType(JacksonUtil.constructJavaType(BaseResponse.class, responseType)));
|
|
|
|
+ return responseMono.block();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 使用路径参数的GET请求同步调用
|
|
|
|
+ * @param url 请求的Url全路径(带上占位符)
|
|
|
|
+ * @param responseType 响应的业务参数,适合响应场景是BaseResponse<Object>的情况
|
|
|
|
+ * @param pathVariables 放在路径中的参数,有多少占位符请放置多少个
|
|
|
|
+ * @param <R> 响应的业务体
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
|
|
+ public static <R> BaseResponse<R> invokeGet(String url, Class<R> responseType, Object ... pathVariables){
|
|
|
|
+ return invokeGet(url, responseType, pathVariables);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 使用路径参数的GET请求同步调用
|
|
|
|
+ * @param url 请求的Url全路径(带上占位符)
|
|
|
|
+ * @param headerInfo 请求头信息
|
|
|
|
+ * @param responseType 响应的业务参数,适合响应场景是BaseResponse<Object>的情况
|
|
|
|
+ * @param pathVariables 放在路径中的参数,有多少占位符请放置多少个
|
|
|
|
+ * @param <R>
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
|
|
+ public static <R> BaseResponse<R> invokeGet(String url, HttpHeaders headerInfo, Class<R> responseType, Object ... pathVariables){
|
|
|
|
+ WebClient.RequestHeadersSpec<?> headersSpec = webClient.get().uri(url, pathVariables);
|
|
|
|
+ if(CollectionUtil.isNotEmpty(headerInfo)) {
|
|
|
|
+ headersSpec = headersSpec.headers(headers -> headers.addAll(headerInfo));
|
|
|
|
+ }
|
|
|
|
+ Mono<BaseResponse<R>> responseMono
|
|
|
|
+ = headersSpec.retrieve()
|
|
|
|
+ .bodyToMono(ParameterizedTypeReference.forType(JacksonUtil.constructJavaType(BaseResponse.class, responseType)));;
|
|
|
|
+ return responseMono.block();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 使用路径参数的GET请求同步调用
|
|
|
|
+ * @param url 请求的Url全路径(带上占位符)
|
|
|
|
+ * @param pathVariableInfo 构建路径变量映射, 键值对一一对应
|
|
|
|
+ * @param responseType 响应的业务参数,适合响应场景是BaseResponse<Object>的情况
|
|
|
|
+ * @param <R> 响应的业务体
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
|
|
+ public static <R> BaseResponse<R> invokeGet(String url, Map<String, Object> pathVariableInfo, Class<R> responseType){
|
|
|
|
+ return invokeGet(url, pathVariableInfo, responseType);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 使用路径参数的GET请求同步调用
|
|
|
|
+ * @param url 请求的Url全路径(带上占位符)
|
|
|
|
+ * @param headerInfo 请求头信息
|
|
|
|
+ * @param responseType 响应的业务参数,适合响应场景是BaseResponse<Object>的情况
|
|
|
|
+ * @param pathVariableInfo 构建路径变量映射,有多少占位符请放置多少个
|
|
|
|
+ * @param <R>
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
|
|
+ public static <R> BaseResponse<R> invokeGet(String url, HttpHeaders headerInfo, Map<String, Object> pathVariableInfo, Class<R> responseType){
|
|
|
|
+ WebClient.RequestHeadersSpec<?> headersSpec = webClient.get().uri(url, pathVariableInfo);
|
|
|
|
+ if(CollectionUtil.isNotEmpty(headerInfo)) {
|
|
|
|
+ headersSpec = headersSpec.headers(headers -> headers.addAll(headerInfo));
|
|
|
|
+ }
|
|
|
|
+ Mono<BaseResponse<R>> responseMono
|
|
|
|
+ = headersSpec.retrieve()
|
|
|
|
+ .bodyToMono(ParameterizedTypeReference.forType(JacksonUtil.constructJavaType(BaseResponse.class, responseType)));;
|
|
|
|
+ return responseMono.block();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public static <T extends BizPostRequest, R> BaseResponse<R> invokePost(String url, T request, Class<R> responseType){
|
|
|
|
+ return invokePost(url, null, request, responseType);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public static <T extends BizPostRequest, R> BaseResponse<R> invokePost(String url, HttpHeaders headerInfo, T request, Class<R> responseType){
|
|
|
|
+ if(ObjectUtil.isNull(headerInfo.getContentType())) headerInfo.setContentType(MediaType.APPLICATION_JSON);
|
|
|
|
+ WebClient.RequestBodySpec requestBodySpec = webClient.post().uri(url);
|
|
|
|
+ if(CollectionUtil.isNotEmpty(headerInfo)) {
|
|
|
|
+ requestBodySpec = requestBodySpec.headers(headers -> headers.addAll(headerInfo));
|
|
|
|
+ }
|
|
|
|
+ Mono<BaseResponse<R>> responseMono = requestBodySpec.body(Mono.justOrEmpty(request), request.getClass()).retrieve()
|
|
|
|
+ .bodyToMono(ParameterizedTypeReference.forType(JacksonUtil.constructJavaType(BaseResponse.class, responseType)));
|
|
|
|
+ return responseMono.block();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+}
|