Skip to content

Commit

Permalink
Merge pull request #1627 from daizhenyu/develop-xds-router-plugin
Browse files Browse the repository at this point in the history
router plugin: HttpAsyncClient xDS router
  • Loading branch information
Sherlockhan authored Sep 24, 2024
2 parents cecf011 + 2246c98 commit cb6d6a4
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,16 @@ public class RouterConstant {
LANE_KEY_PREFIX);

/**
* 全局级兼容的key
* a globally compatible key
*/
public static final List<String> GLOBAL_COMPATIBILITY_KEY_LIST = Arrays.asList(GLOBAL_ROUTER_KEY, GLOBAL_TAG_KEY,
GLOBAL_LANE_KEY);

/**
* point
*/
public static final String ESCAPED_POINT = "\\.";

private RouterConstant() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.router.spring.entity;

import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.nio.ContentEncoder;
import org.apache.http.nio.IOControl;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.protocol.HttpContext;

import java.io.IOException;
import java.util.function.Function;

/**
* HttpAsyncRequestProducer modifier, handling http request
*
* @author zhouss
* @since 2024-09-20
*/
public class HttpAsyncRequestProducerDecorator implements HttpAsyncRequestProducer {
private final HttpAsyncRequestProducer httpAsyncRequestProducer;

private final Function<HttpRequest, HttpRequest> requestDecorateFunc;

private final Function<HttpHost, HttpHost> hostDecorateFunc;

/**
* Constructor
*
* @param httpAsyncRequestProducer Original Producer
* @param requestDecorateFunc Request a decorator
* @param hostDecorateFunc Address decorator
*/
public HttpAsyncRequestProducerDecorator(
HttpAsyncRequestProducer httpAsyncRequestProducer,
Function<HttpRequest, HttpRequest> requestDecorateFunc,
Function<HttpHost, HttpHost> hostDecorateFunc) {
this.httpAsyncRequestProducer = httpAsyncRequestProducer;
this.requestDecorateFunc = requestDecorateFunc;
this.hostDecorateFunc = hostDecorateFunc;
}

@Override
public HttpHost getTarget() {
return hostDecorateFunc.apply(httpAsyncRequestProducer.getTarget());
}

@Override
public HttpRequest generateRequest() throws IOException, HttpException {
return requestDecorateFunc.apply(httpAsyncRequestProducer.generateRequest());
}

@Override
public void produceContent(ContentEncoder encoder, IOControl ioControl) throws IOException {
httpAsyncRequestProducer.produceContent(encoder, ioControl);
}

@Override
public void requestCompleted(HttpContext context) {
httpAsyncRequestProducer.requestCompleted(context);
}

@Override
public void failed(Exception ex) {
httpAsyncRequestProducer.failed(ex);
}

@Override
public boolean isRepeatable() {
return httpAsyncRequestProducer.isRepeatable();
}

@Override
public void resetRequest() throws IOException {
httpAsyncRequestProducer.resetRequest();
}

@Override
public void close() throws IOException {
httpAsyncRequestProducer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,37 @@

package io.sermant.router.spring.interceptor;

import io.sermant.core.common.LoggerFactory;
import io.sermant.core.plugin.agent.entity.ExecuteContext;
import io.sermant.core.plugin.config.PluginConfigManager;
import io.sermant.core.service.xds.entity.ServiceInstance;
import io.sermant.core.utils.LogUtils;
import io.sermant.core.utils.StringUtils;
import io.sermant.router.common.config.RouterConfig;
import io.sermant.router.common.constants.RouterConstant;
import io.sermant.router.common.request.RequestData;
import io.sermant.router.common.utils.FlowContextUtils;
import io.sermant.router.common.utils.ThreadLocalUtils;
import io.sermant.router.spring.entity.HttpAsyncRequestProducerDecorator;
import io.sermant.router.spring.utils.BaseHttpRouterUtils;

import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.utils.URIUtils;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.protocol.HttpContext;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* HTTP interception only for version 4. x
Expand All @@ -37,8 +55,12 @@
* @since 2022-10-31
*/
public class HttpAsyncClient4xInterceptor extends MarkInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger();

private static final int HTTPCONTEXT_INDEX = 2;

private RouterConfig routerConfig = PluginConfigManager.getPluginConfig(RouterConfig.class);

/**
* Pre trigger point
*
Expand All @@ -50,19 +72,22 @@ public class HttpAsyncClient4xInterceptor extends MarkInterceptor {
public ExecuteContext doBefore(ExecuteContext context) throws Exception {
LogUtils.printHttpRequestBeforePoint(context);
Object httpAsyncRequestProducerArgument = context.getArguments()[0];
if (httpAsyncRequestProducerArgument instanceof HttpAsyncRequestProducer) {
HttpAsyncRequestProducer httpAsyncRequestProducer
= (HttpAsyncRequestProducer) httpAsyncRequestProducerArgument;
HttpRequest httpRequest = httpAsyncRequestProducer.generateRequest();
Object argument = context.getArguments()[HTTPCONTEXT_INDEX];
if (argument instanceof HttpContext) {
HttpContext httpContext = (HttpContext) argument;
if (StringUtils.isBlank(FlowContextUtils.getTagName())) {
return context;
}
parseTags(httpContext, httpRequest);
}
if (!(httpAsyncRequestProducerArgument instanceof HttpAsyncRequestProducer)) {
return context;
}
HttpAsyncRequestProducer httpAsyncRequestProducer
= (HttpAsyncRequestProducer) httpAsyncRequestProducerArgument;
HttpRequest httpRequest = httpAsyncRequestProducer.generateRequest();
handleXdsRouterAndUpdateHttpRequest(httpRequest, context);
Object argument = context.getArguments()[HTTPCONTEXT_INDEX];
if (!(argument instanceof HttpContext)) {
return context;
}
HttpContext httpContext = (HttpContext) argument;
if (StringUtils.isBlank(FlowContextUtils.getTagName())) {
return context;
}
parseTags(httpContext, httpRequest);
return context;
}

Expand All @@ -78,11 +103,10 @@ private void parseTags(HttpContext httpContext, HttpRequest httpRequest) {
}

/**
* Post-Interception Point Note: This method does not remove thread variables from the post-interception point,
* but obtains thread variables at the NopInstanceFilterInterceptor pre-interceptor point for traffic routing,
* Remove thread variables from the NopInstanceFilterInterceptor interceptor end.
* Note: httpasyncclient uses the future.get() logic that must have a synchronous thread, otherwise the thread
* variable cannot be removed
* Post-Interception Point Note: This method does not remove thread variables from the post-interception point, but
* obtains thread variables at the NopInstanceFilterInterceptor pre-interceptor point for traffic routing, Remove
* thread variables from the NopInstanceFilterInterceptor interceptor end. Note: httpasyncclient uses the
* future.get() logic that must have a synchronous thread, otherwise the thread variable cannot be removed
*
* @param context Execution context
* @return Execution context
Expand All @@ -100,4 +124,61 @@ public ExecuteContext onThrow(ExecuteContext context) {
LogUtils.printHttpRequestOnThrowPoint(context);
return context;
}

private Map<String, String> getHeaders(HttpRequest httpRequest) {
Map<String, String> headerMap = new HashMap<>();
for (Header header : httpRequest.getAllHeaders()) {
headerMap.putIfAbsent(header.getName(), header.getValue());
}
return headerMap;
}

private void handleXdsRouterAndUpdateHttpRequest(HttpRequest httpRequest, ExecuteContext context) {
if (!routerConfig.isEnabledXdsRoute()) {
return;
}
URI uri = URI.create(httpRequest.getRequestLine().getUri());
String host = uri.getHost();
if (!BaseHttpRouterUtils.isXdsRouteRequired(host)) {
return;
}

// use xds route to find a service instance, and modify url by it
Optional<ServiceInstance> serviceInstanceOptional = BaseHttpRouterUtils
.chooseServiceInstanceByXds(host.split(RouterConstant.ESCAPED_POINT)[0], uri.getPath(),
getHeaders(httpRequest));
if (!serviceInstanceOptional.isPresent()) {
return;
}
ServiceInstance instance = serviceInstanceOptional.get();
try {
context.getArguments()[0] = rebuildProducer(context,
new URI(BaseHttpRouterUtils.rebuildUrlByXdsServiceInstance(uri, instance)));
} catch (URISyntaxException e) {
LOGGER.log(Level.WARNING, "Create uri using xds service instance failed.", e.getMessage());
}
}

private HttpAsyncRequestProducer rebuildProducer(ExecuteContext context, URI newUri) {
return new HttpAsyncRequestProducerDecorator((HttpAsyncRequestProducer) context.getArguments()[0],
buildRequestDecorator(newUri), buildHostDecorator(newUri));
}

private Function<HttpHost, HttpHost> buildHostDecorator(URI newUri) {
return httpHost -> rebuildHttpHost(newUri);
}

private Function<HttpRequest, HttpRequest> buildRequestDecorator(URI newUri) {
return httpRequest -> updateRequestUri(newUri, httpRequest);
}

private HttpHost rebuildHttpHost(URI newUri) {
return URIUtils.extractHost(newUri);
}

private HttpRequest updateRequestUri(URI newUri, HttpRequest httpUriRequest) {
HttpRequestBase httpRequest = (HttpRequestBase) httpUriRequest;
httpRequest.setURI(newUri);
return httpRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.sermant.core.utils.LogUtils;
import io.sermant.core.utils.StringUtils;
import io.sermant.router.common.config.RouterConfig;
import io.sermant.router.common.constants.RouterConstant;
import io.sermant.router.common.request.RequestData;
import io.sermant.router.common.utils.CollectionUtils;
import io.sermant.router.common.utils.FlowContextUtils;
Expand Down Expand Up @@ -136,7 +137,8 @@ private void handleXdsRouterAndUpdateHttpRequest(Object[] arguments) {

// use xds route to find a service instance, and modify url by it
Optional<ServiceInstance> serviceInstanceOptional = BaseHttpRouterUtils
.chooseServiceInstanceByXds(host.split("\\.")[0], uri.getPath(), getHeaders(httpRequest));
.chooseServiceInstanceByXds(host.split(RouterConstant.ESCAPED_POINT)[0], uri.getPath(),
getHeaders(httpRequest));
if (!serviceInstanceOptional.isPresent()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.sermant.core.utils.ReflectUtils;
import io.sermant.core.utils.StringUtils;
import io.sermant.router.common.config.RouterConfig;
import io.sermant.router.common.constants.RouterConstant;
import io.sermant.router.common.request.RequestData;
import io.sermant.router.common.utils.CollectionUtils;
import io.sermant.router.common.utils.FlowContextUtils;
Expand Down Expand Up @@ -113,7 +114,7 @@ private void handleXdsRouterAndUpdateHttpRequest(HttpURLConnection connection) {

// use xds route to find a service instance, and modify url by it
Optional<ServiceInstance> serviceInstanceOptional = BaseHttpRouterUtils
.chooseServiceInstanceByXds(host.split("\\.")[0], url.getPath(),
.chooseServiceInstanceByXds(host.split(RouterConstant.ESCAPED_POINT)[0], url.getPath(),
BaseHttpRouterUtils.processHeaders(headers));
if (!serviceInstanceOptional.isPresent()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.sermant.core.utils.ReflectUtils;
import io.sermant.core.utils.StringUtils;
import io.sermant.router.common.config.RouterConfig;
import io.sermant.router.common.constants.RouterConstant;
import io.sermant.router.common.request.RequestData;
import io.sermant.router.common.utils.FlowContextUtils;
import io.sermant.router.common.utils.ThreadLocalUtils;
Expand Down Expand Up @@ -137,7 +138,8 @@ private void handleXdsRouterAndUpdateHttpRequest(Object obj, Request request) {

// use xds route to find a service instance, and modify url by it
Optional<ServiceInstance> serviceInstanceOptional = BaseHttpRouterUtils
.chooseServiceInstanceByXds(host.split("\\.")[0], uri.getPath(), getHeaders(headers));
.chooseServiceInstanceByXds(host.split(RouterConstant.ESCAPED_POINT)[0], uri.getPath(),
getHeaders(headers));
if (!serviceInstanceOptional.isPresent()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.sermant.core.plugin.config.PluginConfigManager;
import io.sermant.core.service.xds.entity.ServiceInstance;
import io.sermant.router.common.config.RouterConfig;
import io.sermant.router.common.constants.RouterConstant;
import io.sermant.router.spring.utils.BaseHttpRouterUtils;

import java.io.IOException;
Expand Down Expand Up @@ -112,7 +113,8 @@ private void handleXdsRouterAndUpdateHttpRequest(Object[] arguments) {

// use xds route to find a service instance, and modify url by it
Optional<ServiceInstance> serviceInstanceOptional = BaseHttpRouterUtils
.chooseServiceInstanceByXds(host.split("\\.")[0], uri.getPath(), getHeaders(request));
.chooseServiceInstanceByXds(host.split(RouterConstant.ESCAPED_POINT)[0], uri.getPath(),
getHeaders(request));
if (!serviceInstanceOptional.isPresent()) {
return;
}
Expand Down

0 comments on commit cb6d6a4

Please sign in to comment.