Skip to content

Commit

Permalink
add sofarpc stickid and review RequestContext.
Browse files Browse the repository at this point in the history
  • Loading branch information
hexiaofeng committed May 8, 2024
1 parent 48d8683 commit 6370e29
Show file tree
Hide file tree
Showing 21 changed files with 129 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ public static void remove() {
*
* @param consumer The {@link Consumer} to process each {@link Cargo} contained in the {@link Carrier}.
*/
public static void traverse(Consumer<Cargo> consumer) {
public static void cargos(Consumer<Cargo> consumer) {
Carrier carrier = CARRIER.get();
if (carrier != null) {
carrier.traverse(consumer);
carrier.cargos(consumer);
}
}

Expand All @@ -119,10 +119,10 @@ public static void traverse(Consumer<Cargo> consumer) {
*
* @param consumer The {@link BiConsumer} to process each key-value pair contained in the {@link Carrier}.
*/
public static void traverse(BiConsumer<String, String> consumer) {
public static void cargos(BiConsumer<String, String> consumer) {
Carrier carrier = CARRIER.get();
if (carrier != null) {
carrier.traverse(consumer);
carrier.cargos(consumer);
}
}

Expand All @@ -137,6 +137,16 @@ public static Cargo getCargo(String key) {
return carrier == null ? null : carrier.getCargo(key);
}

/**
* Checks if the current {@link Carrier} instance contains any {@link Cargo}.
*
* @return {@code true} if the current {@link Carrier} contains {@link Cargo}, {@code false} otherwise.
*/
public static boolean hasCargo() {
Carrier carrier = CARRIER.get();
return carrier != null && carrier.getCargos() != null && !carrier.getCargos().isEmpty();
}

/**
* Retrieves a specific attribute by key from the current {@link Carrier} instance.
*
Expand All @@ -162,13 +172,15 @@ public static <T> T removeAttribute(String key) {
}

/**
* Checks if the current {@link Carrier} instance contains any {@link Cargo}.
* Sets or replaces an attribute with the specified key and value.
*
* @return {@code true} if the current {@link Carrier} contains {@link Cargo}, {@code false} otherwise.
* @param key The key of the attribute.
* @param value The value of the attribute.
*/
public static boolean hasCargo() {
Carrier carrier = CARRIER.get();
return carrier != null && carrier.getCargos() != null && !carrier.getCargos().isEmpty();
public static void setAttribute(String key, Object value) {
if (key != null && value != null) {
getOrCreate().setAttribute(key, value);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ default <T> void addCargo(CargoRequire require, Iterable<T> targets, Function<T,
*
* @param consumer A consumer to process each cargo.
*/
default void traverse(Consumer<Cargo> consumer) {
default void cargos(Consumer<Cargo> consumer) {
if (consumer != null) {
Collection<Cargo> cargos = getCargos();
if (cargos != null) {
Expand All @@ -228,7 +228,7 @@ default void traverse(Consumer<Cargo> consumer) {
*
* @param consumer A bi-consumer to process each key-value pair.
*/
default void traverse(BiConsumer<String, String> consumer) {
default void cargos(BiConsumer<String, String> consumer) {
if (consumer != null) {
Collection<Cargo> cargos = getCargos();
if (cargos != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
*/
package com.jd.live.agent.plugin.router.sofarpc.request;

import com.alipay.sofa.rpc.client.ProviderHelper;
import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.context.RpcInternalContext;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.jd.live.agent.core.util.cache.LazyObject;
import com.jd.live.agent.governance.request.AbstractRpcRequest.AbstractRpcInboundRequest;
import com.jd.live.agent.governance.request.AbstractRpcRequest.AbstractRpcOutboundRequest;

Expand All @@ -26,8 +31,32 @@
*/
public interface SofaRpcRequest {

/**
* Represents an inbound RPC request for the SOFA framework, encapsulating the necessary
* details for processing the request on the server side.
* <p>
* This class extends {@link AbstractRpcInboundRequest} with a specific focus on SOFA RPC requests,
* providing a structure to handle incoming service method invocations. It captures essential
* details such as the service interface name, method name, arguments, and any additional
* properties attached with the request. This information facilitates the execution of the
* corresponding service method on the server.
* </p>
*
* @see AbstractRpcInboundRequest for the base class functionality.
*/
class SofaRpcInboundRequest extends AbstractRpcInboundRequest<SofaRequest> implements SofaRpcRequest {

/**
* Constructs a new {@code SofaRpcInboundRequest} with the specified SOFA request details.
* <p>
* Initializes the request with comprehensive details about the service method to be executed,
* including the service interface name, the unique name of the target service, the method name,
* method arguments, and any additional properties (attachments) that may accompany the request.
* This constructor parses the unique service name to extract the service group if specified.
* </p>
*
* @param request the {@link SofaRequest} containing the details of the service method invocation.
*/
public SofaRpcInboundRequest(SofaRequest request) {
super(request);
this.service = request.getInterfaceName();
Expand All @@ -40,8 +69,39 @@ public SofaRpcInboundRequest(SofaRequest request) {
}
}

/**
* Represents an outbound RPC request specifically designed for the SOFA framework.
* <p>
* This class encapsulates the details required to execute a remote procedure call using the SOFA framework,
* including service identification, method invocation details, and any additional attachments that may be necessary
* for the call. It extends the generic {@link AbstractRpcOutboundRequest} class, providing SOFA-specific
* implementation details.
* </p>
*
* @see AbstractRpcOutboundRequest for more information on the base class functionality.
*/
class SofaRpcOutboundRequest extends AbstractRpcOutboundRequest<SofaRequest> implements SofaRpcRequest {

/**
* Lazily initialized identifier used for sticky connections, typically based on the target service's IP and port.
* <p>
* This ID is determined by inspecting the current RPC context for a specific attachment that indicates the target
* service's location. If present, this information is used to construct a unique identifier for the request, which
* can be used to optimize connection reuse.
* </p>
*/
private final LazyObject<String> stickyId = new LazyObject<>(SofaRpcOutboundRequest::getStickyIdFromContext);

/**
* Constructs a new {@code SofaRpcOutboundRequest} with the specified SOFA request details.
* <p>
* This constructor initializes the request with comprehensive details about the service method to be invoked,
* including the service interface name, method name, method arguments, and any additional properties attached
* to the request.
* </p>
*
* @param request the {@link SofaRequest} containing the details of the SOFA service method to be invoked.
*/
public SofaRpcOutboundRequest(SofaRequest request) {
super(request);
this.service = request.getInterfaceName();
Expand All @@ -52,5 +112,32 @@ public SofaRpcOutboundRequest(SofaRequest request) {
this.arguments = request.getMethodArgs();
this.attachments = request.getRequestProps();
}

@Override
public String getStickyId() {
return stickyId.get();
}

/**
* Attempts to extract a sticky ID from the current RPC context.
* <p>
* This method inspects the current {@link RpcInternalContext} for an attachment indicating the target
* service's IP and port. If found, it constructs and returns a unique identifier based on this information.
* </p>
*
* @return a unique identifier for the target service, or {@code null} if it cannot be determined.
*/
private static String getStickyIdFromContext() {
RpcInternalContext context = RpcInternalContext.peekContext();
String targetIP = (String) context.getAttachment(RpcConstants.HIDDEN_KEY_PINPOINT);
if (targetIP != null && !targetIP.isEmpty()) {
try {
ProviderInfo providerInfo = ProviderHelper.toProviderInfo(targetIP);
return providerInfo.getHost() + ":" + providerInfo.getPort();
} catch (Throwable ignore) {
}
}
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void onEnter(ExecutableContext ctx) {

private void attachTag(RpcInvocation invocation) {
Carrier carrier = RequestContext.getOrCreate();
carrier.traverse(tag -> invocation.setAttachment(tag.getKey(), tag.getValue()));
carrier.cargos(tag -> invocation.setAttachment(tag.getKey(), tag.getValue()));
carrier.addCargo(require, RpcContext.getContext().getAttachments(), Label::parseValue);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void onEnter(ExecutableContext ctx) {

private void attachTag(RpcInvocation invocation) {
Carrier carrier = RequestContext.getOrCreate();
carrier.traverse(tag -> invocation.setAttachment(tag.getKey(), tag.getValue()));
carrier.cargos(tag -> invocation.setAttachment(tag.getKey(), tag.getValue()));
carrier.addCargo(require, RpcContext.getContext().getObjectAttachments(), Label::parseValue);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void onEnter(ExecutableContext ctx) {

private void attachTag(RpcInvocation invocation) {
Carrier carrier = RequestContext.getOrCreate();
carrier.traverse(tag -> invocation.setAttachment(tag.getKey(), tag.getValue()));
carrier.cargos(tag -> invocation.setAttachment(tag.getKey(), tag.getValue()));
carrier.addCargo(require, RpcContext.getClientAttachment().getObjectAttachments(), Label::parseValue);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void onEnter(ExecutableContext ctx) {
}

private void attachTag(Metadata metadata) {
RequestContext.traverse(tag -> metadata.put(Metadata.Key.of(tag.getKey(), Metadata.ASCII_STRING_MARSHALLER), tag.getValue()));
RequestContext.cargos(tag -> metadata.put(Metadata.Key.of(tag.getKey(), Metadata.ASCII_STRING_MARSHALLER), tag.getValue()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ public void onEnter(ExecutableContext ctx) {
}

private void attachTag(HttpMethod method) {
RequestContext.traverse(tag -> method.setRequestHeader(tag.getKey(), tag.getValue()));
RequestContext.cargos(tag -> method.setRequestHeader(tag.getKey(), tag.getValue()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ public void onEnter(ExecutableContext ctx) {
}

private void attachTag(HttpRequestBase request) {
RequestContext.traverse((k, v) -> request.addHeader(k, v));
RequestContext.cargos((k, v) -> request.addHeader(k, v));
}

private void attachTag(HttpRequest request) {
RequestContext.traverse((k, v) -> request.addHeader(k, v));
RequestContext.cargos((k, v) -> request.addHeader(k, v));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void onEnter(ExecutableContext ctx) {
* @param header The HTTP request header object to which the tags will be attached.
*/
private void attachTag(Object header) {
RequestContext.traverse((key, value) -> addHeader(header, key, value));
RequestContext.cargos((key, value) -> addHeader(header, key, value));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void onEnter(ExecutableContext ctx) {
* @param header The {@link MessageHeader} to which the tag will be attached.
*/
private void attachTag(MessageHeader header) {
RequestContext.traverse(header::add);
RequestContext.cargos(header::add);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ public void onEnter(ExecutableContext ctx) {

private void attachTag(ProducerRecord<?, ?> record) {
Headers headers = record.headers();
RequestContext.traverse((k, v) -> headers.add(k, v == null ? null : k.getBytes(StandardCharsets.UTF_8)));
RequestContext.cargos((k, v) -> headers.add(k, v == null ? null : k.getBytes(StandardCharsets.UTF_8)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void onSuccess(ExecutableContext ctx) {

private HttpClient attachTag(HttpClient httpClient) {
if (RequestContext.hasCargo()) {
return httpClient.headers((headers) -> RequestContext.traverse(tag -> headers.set(tag.getKey(), tag.getValue())));
return httpClient.headers((headers) -> RequestContext.cargos(tag -> headers.set(tag.getKey(), tag.getValue())));
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void onEnter(ExecutableContext ctx) {
}

private void attachTag(Builder builder) {
RequestContext.traverse(builder::addHeader);
RequestContext.cargos(builder::addHeader);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void onEnter(ExecutableContext ctx) {
}

private void attachTag(Request.Builder builder) {
RequestContext.traverse(builder::addHeader);
RequestContext.cargos(builder::addHeader);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void onEnter(ExecutableContext ctx) {
private void attachTag(SendMessageRequestHeader header) {
String properties = header.getProperties();
StringBuilder builder = new StringBuilder();
RequestContext.traverse(tag -> append(builder, tag));
RequestContext.cargos(tag -> append(builder, tag));
if (builder.length() > 0) {
if (properties != null && !properties.isEmpty()) {
builder.append(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void onEnter(ExecutableContext ctx) {
private void attachTag(SendMessageRequestHeader header) {
String properties = header.getProperties();
StringBuilder builder = new StringBuilder();
RequestContext.traverse(tag -> append(builder, tag));
RequestContext.cargos(tag -> append(builder, tag));
if (builder.length() > 0) {
if (properties != null && !properties.isEmpty()) {
builder.append(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void onEnter(ExecutableContext ctx) {

private void attachTag(SofaRequest request) {
Carrier carrier = RequestContext.getOrCreate();
carrier.traverse(tag -> request.addRequestProp(tag.getKey(), tag.getValue()));
carrier.cargos(tag -> request.addRequestProp(tag.getKey(), tag.getValue()));
if (RpcInvokeContext.isBaggageEnable()) {
carrier.addCargo(require, RpcInvokeContext.getContext().getAllRequestBaggage(), Label::parseValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void onEnter(ExecutableContext ctx) {
}

private void attachTag(HttpHeaders headers) {
RequestContext.traverse(tag -> headers.addAll(tag.getKey(), tag.getValues()));
RequestContext.cargos(tag -> headers.addAll(tag.getKey(), tag.getValues()));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void onEnter(ExecutableContext ctx) {
}

private void attachTag(HttpHeaders headers) {
RequestContext.traverse(tag -> headers.addAll(tag.getKey(), tag.getValues()));
RequestContext.cargos(tag -> headers.addAll(tag.getKey(), tag.getValues()));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void onSuccess(ExecutableContext ctx) {
ServerWebExchange exchange = (ServerWebExchange) ctx.getArguments()[0];
HttpHeaders headers = exchange.getResponse().getHeaders();
Mono<Void> mono = (Mono<Void>) mc.getResult();
mono = mono.doFirst(() -> RequestContext.traverse(tag -> headers.addAll(tag.getKey(), tag.getValues())));
mono = mono.doFirst(() -> RequestContext.cargos(tag -> headers.addAll(tag.getKey(), tag.getValues())));
mc.setResult(mono);
}
}

0 comments on commit 6370e29

Please sign in to comment.