Skip to content

Commit

Permalink
rewrite sofarpc with retry
Browse files Browse the repository at this point in the history
  • Loading branch information
hexiaofeng committed May 9, 2024
1 parent 5412c29 commit bec3617
Show file tree
Hide file tree
Showing 30 changed files with 667 additions and 548 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,16 @@ public static <T extends ModifierDesc> Junction<T> isPrivate() {
return ModifierMatcher.of(Mode.PRIVATE);
}

/**
* Creates a matcher that matches elements that are abstract.
*
* @param <T> The type of the element.
* @return A matcher that matches private elements.
*/
public static <T extends ModifierDesc> Junction<T> isAbstract() {
return ModifierMatcher.of(Mode.ABSTRACT);
}

/**
* Creates a matcher that matches methods with the specified parameter types.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.jd.live.agent.governance.rule.tag.TagCondition;

import java.util.List;
import java.util.function.Predicate;

/**
* Represents an endpoint in a distributed system, providing methods to access its properties and match against tag conditions.
Expand Down Expand Up @@ -312,22 +311,6 @@ default boolean isAccessible() {
return state == null || state.isAccessible();
}

/**
* Retrieves a predicate used for filtering endpoints.
* <p>
* This default implementation returns {@code null}, indicating that there is no
* predicate associated by default. Implementing classes should override this method
* to provide an actual predicate that can be used to filter endpoints based on
* specific criteria.
* </p>
*
* @return {@code null} by default. Implementing classes should override this
* method to return a non-null {@code Predicate<Endpoint>} instance.
*/
default Predicate<Endpoint> getPredicate() {
return null;
}

/**
* Evaluates the predicate associated with this endpoint, if any, to determine
* if this endpoint satisfies the conditions defined by the predicate.
Expand All @@ -341,8 +324,7 @@ default Predicate<Endpoint> getPredicate() {
* test passes for this endpoint; {@code false} otherwise.
*/
default boolean predicate() {
Predicate<Endpoint> predicate = getPredicate();
return predicate == null || predicate.test(this);
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,14 @@ protected O routing(R request, List<? extends Endpoint> instances) {
* This method must be implemented by concrete subclasses to define how the route filters are applied.
*
* @param invocation the OutboundInvocation to which the filters will be applied
* @return A list of endpoints that have been determined as suitable targets for the invocation after
* applying the route filters.
*/
protected abstract void routing(O invocation);
protected List<? extends Endpoint> routing(O invocation) {
RouteFilterChain.Chain chain = new RouteFilterChain.Chain(routeFilters);
chain.filter(invocation);
return invocation.getEndpoints();
}

/**
* Creates a new OutboundInvocation for the given outbound request.
Expand All @@ -309,6 +315,42 @@ protected O createOutlet(R request, List<? extends Endpoint> instances) {
result.setInstances(instances);
return result;
}

/**
* Executes an RPC call with retry logic based on the invocation's associated retry policy.
* <p>
* This method first determines if a retry policy is defined and enabled for the given invocation. If a retry policy
* is applicable, it attempts to execute the supplied RPC call using a retrier that adheres to the retry policy's
* parameters, such as retry type and timeout. If the retry policy specifies a timeout, this method also adjusts
* the request context's deadline attribute accordingly.
* </p>
* <p>
* If no retry policy is enabled or if the retrier cannot be obtained, this method directly executes the RPC call
* once without any retry logic.
* </p>
*
* @param invocation The invocation context of the RPC call, which may contain service metadata including the retry policy.
* @param supplier A supplier that executes the RPC call and returns a response.
* @return The response from the RPC call, either from the initial attempt or after retrying according to the retry policy.
*/
protected Response invokeWithRetry(O invocation, Supplier<Response> supplier) {
ServicePolicy servicePolicy = invocation == null ? null : invocation.getServiceMetadata().getServicePolicy();
RetryPolicy retryPolicy = servicePolicy == null ? null : servicePolicy.getRetryPolicy();
if (retryPolicy != null && retryPolicy.isEnabled()) {
RetrierFactory retrierFactory = context.getOrDefaultRetrierFactory(retryPolicy.getType());
Retrier retrier = retrierFactory == null ? null : retrierFactory.get(retryPolicy);
if (retrier != null) {
Long timeout = retryPolicy.getTimeout();
if (timeout != null && timeout > 0) {
RequestContext.getOrCreate().setAttribute(Carrier.ATTRIBUTE_DEADLINE, System.currentTimeMillis() + timeout);
} else {
RequestContext.removeAttribute(Carrier.ATTRIBUTE_DEADLINE);
}
return retrier.execute(supplier);
}
}
return supplier.get();
}
}

/**
Expand Down Expand Up @@ -408,11 +450,6 @@ protected HttpOutboundInvocation<T> createOutlet(T request, List<? extends Endpo
result.setInstances(instances);
return result;
}

@Override
protected void routing(HttpOutboundInvocation<T> invocation) {
new RouteFilterChain.Chain(routeFilters).filter(invocation);
}
}

/**
Expand Down Expand Up @@ -507,9 +544,13 @@ protected OutboundInvocation<? extends O> routing(I request) {
* Applies the route filters to the given OutboundInvocation as part of the routing process.
*
* @param invocation The OutboundInvocation to which the route filters will be applied.
* @return A list of endpoints that have been determined as suitable targets for the invocation after applying the
* route filters.
*/
protected void routing(OutboundInvocation<? extends O> invocation) {
new RouteFilterChain.Chain(routeFilters).filter(invocation);
protected List<? extends Endpoint> routing(OutboundInvocation<? extends O> invocation) {
RouteFilterChain.Chain chain = new RouteFilterChain.Chain(routeFilters);
chain.filter(invocation);
return invocation.getEndpoints();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.jd.live.agent.governance.invoke.loadbalance.randomweight.RandomWeightLoadBalancer;
import com.jd.live.agent.governance.policy.service.ServicePolicy;
import com.jd.live.agent.governance.policy.service.loadbalance.LoadBalancePolicy;
import com.jd.live.agent.governance.policy.service.loadbalance.StickyType;
import com.jd.live.agent.governance.request.Request;
import com.jd.live.agent.governance.request.ServiceRequest;

Expand All @@ -58,6 +59,10 @@ public class LoadBalanceFilter implements RouteFilter {
@Override
public <T extends ServiceRequest.OutboundRequest> void filter(OutboundInvocation<T> invocation, RouteFilterChain chain) {
RouteTarget target = invocation.getRouteTarget();
ServicePolicy servicePolicy = invocation.getServiceMetadata().getServicePolicy();
LoadBalancePolicy loadBalancePolicy = servicePolicy == null ? null : servicePolicy.getLoadBalancePolicy();
StickyType stickyType = loadBalancePolicy == null ? null : loadBalancePolicy.getStickyType();
stickyType = stickyType == null ? StickyType.NONE : stickyType;
if (!target.isEmpty()) {
List<? extends Endpoint> candidates = preferSticky(target);
if (candidates != null && !candidates.isEmpty()) {
Expand All @@ -80,15 +85,16 @@ public <T extends ServiceRequest.OutboundRequest> void filter(OutboundInvocation
return null;
});
}
if (stickyType != StickyType.NONE) {
candidates = target.getEndpoints();
if (candidates != null && !candidates.isEmpty()) {
invocation.getRequest().setStickyId(candidates.get(0).getId());
}
}
}

T request = invocation.getRequest();
if (target.isEmpty()) {
RuntimeException exception = request.createNoAvailableEndpointException();
if (exception != null) {
throw exception;
}
} else {
if (!target.isEmpty()) {
Endpoint endpoint = target.getEndpoints().get(0);
request.addAttempt(endpoint.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import lombok.Getter;
import lombok.Setter;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

/**
Expand Down Expand Up @@ -73,7 +71,7 @@ public class RetryPolicy extends PolicyId implements PolicyInheritWithId<RetryPo
/**
* Collection of retry status codes. This parameter specifies which status codes should be considered retryable.
*/
private Set<String> retryStatuses = new HashSet<>(Arrays.asList("500", "502", "503"));
private Set<String> retryStatuses;

/**
* A collection of retryable exception class names.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public AbstractServiceRequest(T request) {
this.request = request;
}

@Override
public void addAttempt(String attempt) {
if (attempt != null) {
if (attempts == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,34 +58,6 @@ public interface ServiceRequest extends Request {
*/
String getPath();

/**
* Retrieves a set of identifiers that represent the attempts made for this request.
*
* @return A Set of String identifiers for the attempts.
*/
Set<String> getAttempts();

/**
* Adds an attempt identifier to this service request.
* <p>
* This method is used to track retries or other types of attempts related to processing the service request.
* If the {@code attempt} parameter is not {@code null}, it will be added to the set of attempt identifiers.
* If no attempts have previously been added, a new set will be initialized.
* </p>
*
* @param attempt The identifier of the attempt to add.
*/
void addAttempt(String attempt);

/**
* Retrieves the sticky session ID associated with the request, if any.
*
* @return The sticky session ID as a String, or {@code null} if not applicable.
*/
default String getStickyId() {
return null;
}

/**
* Rejects the request with the given fault type and reason.
*
Expand Down Expand Up @@ -130,7 +102,26 @@ interface InboundRequest extends ServiceRequest {
* @author Zhiguo.Chen
* @since 1.0.0
*/
interface OutboundRequest extends ServiceRequest {
interface OutboundRequest extends ServiceRequest, StickyRequest {

/**
* Retrieves a set of identifiers that represent the attempts made for this request.
*
* @return A Set of String identifiers for the attempts.
*/
Set<String> getAttempts();

/**
* Adds an attempt identifier to this service request.
* <p>
* This method is used to track retries or other types of attempts related to processing the service request.
* If the {@code attempt} parameter is not {@code null}, it will be added to the set of attempt identifiers.
* If no attempts have previously been added, a new set will be initialized.
* </p>
*
* @param attempt The identifier of the attempt to add.
*/
void addAttempt(String attempt);

/**
* Retrieves the configured timeout value.
Expand Down Expand Up @@ -162,16 +153,6 @@ default long getTimeout() {
default void setTimeout(long timeout) {
}

/**
* Provides a default exception to be thrown or handled when there is not any available endpoint.
*
* @return a {@link Throwable} representing the exception to be thrown or handled when
* there is not any available endpoint. By default, a {@link RuntimeException} is returned.
*/
default RuntimeException createNoAvailableEndpointException() {
return null;
}

}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.jd.live.agent.governance.request;

/**
* The StickyRequest interface defines the methods that must be implemented by
* any class that wishes to support "sticky" sessions in a distributed system.
* Sticky sessions are used to ensure that requests from the same client are
* consistently routed to the same service instance, typically to maintain session
* state or to provide a consistent context for stateful interactions.
*/
public interface StickyRequest {

/**
* Retrieves the sticky session ID associated with the request, if any.
* Implementations should return the current session ID that is used to
* stick the request to a particular service instance. If sticky sessions
* are not used or the ID is not set, the method should return {@code null}.
*
* @return The sticky session ID as a String, or {@code null} if not applicable.
*/
default String getStickyId() {
return null;
}

/**
* Sets the sticky session ID for the request. Implementations should use this
* method to assign a specific session ID that will be used to route subsequent
* requests to the same service instance. If sticky sessions are not supported,
* this method may be left empty.
*
* @param stickyId The sticky session ID to be set.
*/
default void setStickyId(String stickyId) {

}
}
Loading

0 comments on commit bec3617

Please sign in to comment.