From 2246c98b6bcde04fb6295bb68dc2ce9ff792f85d Mon Sep 17 00:00:00 2001 From: daizhenyu <1449308021@qq.com> Date: Fri, 20 Sep 2024 16:23:51 +0800 Subject: [PATCH] router plugin: HttpAsyncClient xDS router Signed-off-by: daizhenyu <1449308021@qq.com> --- .../common/constants/RouterConstant.java | 7 +- .../HttpAsyncRequestProducerDecorator.java | 98 +++++++++++++++ .../HttpAsyncClient4xInterceptor.java | 115 +++++++++++++++--- .../interceptor/HttpClient4xInterceptor.java | 4 +- .../HttpUrlConnectionConnectInterceptor.java | 3 +- .../interceptor/OkHttp3ClientInterceptor.java | 4 +- ...HttpClientInterceptorChainInterceptor.java | 4 +- 7 files changed, 213 insertions(+), 22 deletions(-) create mode 100644 sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/entity/HttpAsyncRequestProducerDecorator.java diff --git a/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/constants/RouterConstant.java b/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/constants/RouterConstant.java index 1cc8f7d021..561292a1e2 100644 --- a/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/constants/RouterConstant.java +++ b/sermant-plugins/sermant-router/router-common/src/main/java/io/sermant/router/common/constants/RouterConstant.java @@ -183,11 +183,16 @@ public class RouterConstant { LANE_KEY_PREFIX); /** - * 全局级兼容的key + * a globally compatible key */ public static final List GLOBAL_COMPATIBILITY_KEY_LIST = Arrays.asList(GLOBAL_ROUTER_KEY, GLOBAL_TAG_KEY, GLOBAL_LANE_KEY); + /** + * point + */ + public static final String ESCAPED_POINT = "\\."; + private RouterConstant() { } } \ No newline at end of file diff --git a/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/entity/HttpAsyncRequestProducerDecorator.java b/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/entity/HttpAsyncRequestProducerDecorator.java new file mode 100644 index 0000000000..e39c96158f --- /dev/null +++ b/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/entity/HttpAsyncRequestProducerDecorator.java @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.router.spring.entity; + +import org.apache.http.HttpException; +import org.apache.http.HttpHost; +import org.apache.http.HttpRequest; +import org.apache.http.nio.ContentEncoder; +import org.apache.http.nio.IOControl; +import org.apache.http.nio.protocol.HttpAsyncRequestProducer; +import org.apache.http.protocol.HttpContext; + +import java.io.IOException; +import java.util.function.Function; + +/** + * HttpAsyncRequestProducer modifier, handling http request + * + * @author zhouss + * @since 2024-09-20 + */ +public class HttpAsyncRequestProducerDecorator implements HttpAsyncRequestProducer { + private final HttpAsyncRequestProducer httpAsyncRequestProducer; + + private final Function requestDecorateFunc; + + private final Function hostDecorateFunc; + + /** + * Constructor + * + * @param httpAsyncRequestProducer Original Producer + * @param requestDecorateFunc Request a decorator + * @param hostDecorateFunc Address decorator + */ + public HttpAsyncRequestProducerDecorator( + HttpAsyncRequestProducer httpAsyncRequestProducer, + Function requestDecorateFunc, + Function hostDecorateFunc) { + this.httpAsyncRequestProducer = httpAsyncRequestProducer; + this.requestDecorateFunc = requestDecorateFunc; + this.hostDecorateFunc = hostDecorateFunc; + } + + @Override + public HttpHost getTarget() { + return hostDecorateFunc.apply(httpAsyncRequestProducer.getTarget()); + } + + @Override + public HttpRequest generateRequest() throws IOException, HttpException { + return requestDecorateFunc.apply(httpAsyncRequestProducer.generateRequest()); + } + + @Override + public void produceContent(ContentEncoder encoder, IOControl ioControl) throws IOException { + httpAsyncRequestProducer.produceContent(encoder, ioControl); + } + + @Override + public void requestCompleted(HttpContext context) { + httpAsyncRequestProducer.requestCompleted(context); + } + + @Override + public void failed(Exception ex) { + httpAsyncRequestProducer.failed(ex); + } + + @Override + public boolean isRepeatable() { + return httpAsyncRequestProducer.isRepeatable(); + } + + @Override + public void resetRequest() throws IOException { + httpAsyncRequestProducer.resetRequest(); + } + + @Override + public void close() throws IOException { + httpAsyncRequestProducer.close(); + } +} diff --git a/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/interceptor/HttpAsyncClient4xInterceptor.java b/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/interceptor/HttpAsyncClient4xInterceptor.java index bd8495d69f..2a23590146 100644 --- a/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/interceptor/HttpAsyncClient4xInterceptor.java +++ b/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/interceptor/HttpAsyncClient4xInterceptor.java @@ -16,19 +16,37 @@ package io.sermant.router.spring.interceptor; +import io.sermant.core.common.LoggerFactory; import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.plugin.config.PluginConfigManager; +import io.sermant.core.service.xds.entity.ServiceInstance; import io.sermant.core.utils.LogUtils; import io.sermant.core.utils.StringUtils; +import io.sermant.router.common.config.RouterConfig; +import io.sermant.router.common.constants.RouterConstant; import io.sermant.router.common.request.RequestData; import io.sermant.router.common.utils.FlowContextUtils; import io.sermant.router.common.utils.ThreadLocalUtils; +import io.sermant.router.spring.entity.HttpAsyncRequestProducerDecorator; +import io.sermant.router.spring.utils.BaseHttpRouterUtils; +import org.apache.http.Header; +import org.apache.http.HttpHost; import org.apache.http.HttpRequest; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.client.utils.URIUtils; import org.apache.http.nio.protocol.HttpAsyncRequestProducer; import org.apache.http.protocol.HttpContext; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.logging.Level; +import java.util.logging.Logger; /** * HTTP interception only for version 4. x @@ -37,8 +55,12 @@ * @since 2022-10-31 */ public class HttpAsyncClient4xInterceptor extends MarkInterceptor { + private static final Logger LOGGER = LoggerFactory.getLogger(); + private static final int HTTPCONTEXT_INDEX = 2; + private RouterConfig routerConfig = PluginConfigManager.getPluginConfig(RouterConfig.class); + /** * Pre trigger point * @@ -50,19 +72,22 @@ public class HttpAsyncClient4xInterceptor extends MarkInterceptor { public ExecuteContext doBefore(ExecuteContext context) throws Exception { LogUtils.printHttpRequestBeforePoint(context); Object httpAsyncRequestProducerArgument = context.getArguments()[0]; - if (httpAsyncRequestProducerArgument instanceof HttpAsyncRequestProducer) { - HttpAsyncRequestProducer httpAsyncRequestProducer - = (HttpAsyncRequestProducer) httpAsyncRequestProducerArgument; - HttpRequest httpRequest = httpAsyncRequestProducer.generateRequest(); - Object argument = context.getArguments()[HTTPCONTEXT_INDEX]; - if (argument instanceof HttpContext) { - HttpContext httpContext = (HttpContext) argument; - if (StringUtils.isBlank(FlowContextUtils.getTagName())) { - return context; - } - parseTags(httpContext, httpRequest); - } + if (!(httpAsyncRequestProducerArgument instanceof HttpAsyncRequestProducer)) { + return context; + } + HttpAsyncRequestProducer httpAsyncRequestProducer + = (HttpAsyncRequestProducer) httpAsyncRequestProducerArgument; + HttpRequest httpRequest = httpAsyncRequestProducer.generateRequest(); + handleXdsRouterAndUpdateHttpRequest(httpRequest, context); + Object argument = context.getArguments()[HTTPCONTEXT_INDEX]; + if (!(argument instanceof HttpContext)) { + return context; } + HttpContext httpContext = (HttpContext) argument; + if (StringUtils.isBlank(FlowContextUtils.getTagName())) { + return context; + } + parseTags(httpContext, httpRequest); return context; } @@ -78,11 +103,10 @@ private void parseTags(HttpContext httpContext, HttpRequest httpRequest) { } /** - * Post-Interception Point Note: This method does not remove thread variables from the post-interception point, - * but obtains thread variables at the NopInstanceFilterInterceptor pre-interceptor point for traffic routing, - * Remove thread variables from the NopInstanceFilterInterceptor interceptor end. - * Note: httpasyncclient uses the future.get() logic that must have a synchronous thread, otherwise the thread - * variable cannot be removed + * Post-Interception Point Note: This method does not remove thread variables from the post-interception point, but + * obtains thread variables at the NopInstanceFilterInterceptor pre-interceptor point for traffic routing, Remove + * thread variables from the NopInstanceFilterInterceptor interceptor end. Note: httpasyncclient uses the + * future.get() logic that must have a synchronous thread, otherwise the thread variable cannot be removed * * @param context Execution context * @return Execution context @@ -100,4 +124,61 @@ public ExecuteContext onThrow(ExecuteContext context) { LogUtils.printHttpRequestOnThrowPoint(context); return context; } + + private Map getHeaders(HttpRequest httpRequest) { + Map headerMap = new HashMap<>(); + for (Header header : httpRequest.getAllHeaders()) { + headerMap.putIfAbsent(header.getName(), header.getValue()); + } + return headerMap; + } + + private void handleXdsRouterAndUpdateHttpRequest(HttpRequest httpRequest, ExecuteContext context) { + if (!routerConfig.isEnabledXdsRoute()) { + return; + } + URI uri = URI.create(httpRequest.getRequestLine().getUri()); + String host = uri.getHost(); + if (!BaseHttpRouterUtils.isXdsRouteRequired(host)) { + return; + } + + // use xds route to find a service instance, and modify url by it + Optional serviceInstanceOptional = BaseHttpRouterUtils + .chooseServiceInstanceByXds(host.split(RouterConstant.ESCAPED_POINT)[0], uri.getPath(), + getHeaders(httpRequest)); + if (!serviceInstanceOptional.isPresent()) { + return; + } + ServiceInstance instance = serviceInstanceOptional.get(); + try { + context.getArguments()[0] = rebuildProducer(context, + new URI(BaseHttpRouterUtils.rebuildUrlByXdsServiceInstance(uri, instance))); + } catch (URISyntaxException e) { + LOGGER.log(Level.WARNING, "Create uri using xds service instance failed.", e.getMessage()); + } + } + + private HttpAsyncRequestProducer rebuildProducer(ExecuteContext context, URI newUri) { + return new HttpAsyncRequestProducerDecorator((HttpAsyncRequestProducer) context.getArguments()[0], + buildRequestDecorator(newUri), buildHostDecorator(newUri)); + } + + private Function buildHostDecorator(URI newUri) { + return httpHost -> rebuildHttpHost(newUri); + } + + private Function buildRequestDecorator(URI newUri) { + return httpRequest -> updateRequestUri(newUri, httpRequest); + } + + private HttpHost rebuildHttpHost(URI newUri) { + return URIUtils.extractHost(newUri); + } + + private HttpRequest updateRequestUri(URI newUri, HttpRequest httpUriRequest) { + HttpRequestBase httpRequest = (HttpRequestBase) httpUriRequest; + httpRequest.setURI(newUri); + return httpRequest; + } } diff --git a/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/interceptor/HttpClient4xInterceptor.java b/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/interceptor/HttpClient4xInterceptor.java index 1b0d303e15..9fd717d474 100644 --- a/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/interceptor/HttpClient4xInterceptor.java +++ b/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/interceptor/HttpClient4xInterceptor.java @@ -23,6 +23,7 @@ import io.sermant.core.utils.LogUtils; import io.sermant.core.utils.StringUtils; import io.sermant.router.common.config.RouterConfig; +import io.sermant.router.common.constants.RouterConstant; import io.sermant.router.common.request.RequestData; import io.sermant.router.common.utils.CollectionUtils; import io.sermant.router.common.utils.FlowContextUtils; @@ -136,7 +137,8 @@ private void handleXdsRouterAndUpdateHttpRequest(Object[] arguments) { // use xds route to find a service instance, and modify url by it Optional serviceInstanceOptional = BaseHttpRouterUtils - .chooseServiceInstanceByXds(host.split("\\.")[0], uri.getPath(), getHeaders(httpRequest)); + .chooseServiceInstanceByXds(host.split(RouterConstant.ESCAPED_POINT)[0], uri.getPath(), + getHeaders(httpRequest)); if (!serviceInstanceOptional.isPresent()) { return; } diff --git a/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/interceptor/HttpUrlConnectionConnectInterceptor.java b/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/interceptor/HttpUrlConnectionConnectInterceptor.java index e590162981..6cd7decb9f 100644 --- a/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/interceptor/HttpUrlConnectionConnectInterceptor.java +++ b/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/interceptor/HttpUrlConnectionConnectInterceptor.java @@ -25,6 +25,7 @@ import io.sermant.core.utils.ReflectUtils; import io.sermant.core.utils.StringUtils; import io.sermant.router.common.config.RouterConfig; +import io.sermant.router.common.constants.RouterConstant; import io.sermant.router.common.request.RequestData; import io.sermant.router.common.utils.CollectionUtils; import io.sermant.router.common.utils.FlowContextUtils; @@ -113,7 +114,7 @@ private void handleXdsRouterAndUpdateHttpRequest(HttpURLConnection connection) { // use xds route to find a service instance, and modify url by it Optional serviceInstanceOptional = BaseHttpRouterUtils - .chooseServiceInstanceByXds(host.split("\\.")[0], url.getPath(), + .chooseServiceInstanceByXds(host.split(RouterConstant.ESCAPED_POINT)[0], url.getPath(), BaseHttpRouterUtils.processHeaders(headers)); if (!serviceInstanceOptional.isPresent()) { return; diff --git a/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/interceptor/OkHttp3ClientInterceptor.java b/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/interceptor/OkHttp3ClientInterceptor.java index 37c9879a35..c85ae717e0 100644 --- a/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/interceptor/OkHttp3ClientInterceptor.java +++ b/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/interceptor/OkHttp3ClientInterceptor.java @@ -23,6 +23,7 @@ import io.sermant.core.utils.ReflectUtils; import io.sermant.core.utils.StringUtils; import io.sermant.router.common.config.RouterConfig; +import io.sermant.router.common.constants.RouterConstant; import io.sermant.router.common.request.RequestData; import io.sermant.router.common.utils.FlowContextUtils; import io.sermant.router.common.utils.ThreadLocalUtils; @@ -137,7 +138,8 @@ private void handleXdsRouterAndUpdateHttpRequest(Object obj, Request request) { // use xds route to find a service instance, and modify url by it Optional serviceInstanceOptional = BaseHttpRouterUtils - .chooseServiceInstanceByXds(host.split("\\.")[0], uri.getPath(), getHeaders(headers)); + .chooseServiceInstanceByXds(host.split(RouterConstant.ESCAPED_POINT)[0], uri.getPath(), + getHeaders(headers)); if (!serviceInstanceOptional.isPresent()) { return; } diff --git a/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/interceptor/OkHttpClientInterceptorChainInterceptor.java b/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/interceptor/OkHttpClientInterceptorChainInterceptor.java index d2dded7768..1f3bb51d88 100644 --- a/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/interceptor/OkHttpClientInterceptorChainInterceptor.java +++ b/sermant-plugins/sermant-router/spring-router-plugin/src/main/java/io/sermant/router/spring/interceptor/OkHttpClientInterceptorChainInterceptor.java @@ -26,6 +26,7 @@ import io.sermant.core.plugin.config.PluginConfigManager; import io.sermant.core.service.xds.entity.ServiceInstance; import io.sermant.router.common.config.RouterConfig; +import io.sermant.router.common.constants.RouterConstant; import io.sermant.router.spring.utils.BaseHttpRouterUtils; import java.io.IOException; @@ -112,7 +113,8 @@ private void handleXdsRouterAndUpdateHttpRequest(Object[] arguments) { // use xds route to find a service instance, and modify url by it Optional serviceInstanceOptional = BaseHttpRouterUtils - .chooseServiceInstanceByXds(host.split("\\.")[0], uri.getPath(), getHeaders(request)); + .chooseServiceInstanceByXds(host.split(RouterConstant.ESCAPED_POINT)[0], uri.getPath(), + getHeaders(request)); if (!serviceInstanceOptional.isPresent()) { return; }