Skip to content

Commit

Permalink
Merge pull request #87
Browse files Browse the repository at this point in the history
Fix #86
  • Loading branch information
chenzhiguo authored Oct 14, 2024
2 parents 62a3e21 + abf1d34 commit e0d3429
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public RejectUnitException(Throwable cause) {
/**
* RejectEscapeException
*/
public static class RejectEscapeException extends RejectUnitException {
public static class RejectEscapeException extends RejectException {

public RejectEscapeException() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
*/
package com.jd.live.agent.governance.invoke;

import com.jd.live.agent.bootstrap.exception.RejectException.*;
import com.jd.live.agent.core.Constants;
import com.jd.live.agent.core.instance.GatewayRole;
import com.jd.live.agent.governance.context.RequestContext;
import com.jd.live.agent.governance.context.bag.Carrier;
import com.jd.live.agent.governance.event.TrafficEvent;
import com.jd.live.agent.governance.event.TrafficEvent.*;
import com.jd.live.agent.governance.event.TrafficEvent.ActionType;
import com.jd.live.agent.governance.event.TrafficEvent.ComponentType;
import com.jd.live.agent.governance.event.TrafficEvent.Direction;
import com.jd.live.agent.governance.event.TrafficEvent.TrafficEventBuilder;
import com.jd.live.agent.governance.invoke.metadata.LiveDomainMetadata;
import com.jd.live.agent.governance.invoke.metadata.parser.LaneMetadataParser.HttpInboundLaneMetadataParser;
import com.jd.live.agent.governance.invoke.metadata.parser.LiveMetadataParser;
Expand Down Expand Up @@ -125,21 +127,6 @@ public void onForward() {
* @param throwable the exception that caused the failure.
*/
public void onFailure(Throwable throwable) {
if (throwable instanceof RejectUnreadyException) {
publish(context.getTrafficPublisher(), TrafficEvent.builder().actionType(ActionType.REJECT).rejectType(RejectType.REJECT_UNREADY).requests(1));
} else if (throwable instanceof RejectUnitException) {
publish(context.getTrafficPublisher(), TrafficEvent.builder().actionType(ActionType.REJECT).rejectType(RejectType.REJECT_UNIT_UNAVAILABLE).requests(1));
} else if (throwable instanceof RejectCellException) {
publish(context.getTrafficPublisher(), TrafficEvent.builder().actionType(ActionType.REJECT).rejectType(RejectType.REJECT_CELL_UNAVAILABLE).requests(1));
} else if (throwable instanceof RejectEscapeException) {
publish(context.getTrafficPublisher(), TrafficEvent.builder().actionType(ActionType.REJECT).rejectType(RejectType.REJECT_ESCAPE).requests(1));
} else if (throwable instanceof RejectLimitException) {
publish(context.getTrafficPublisher(), TrafficEvent.builder().actionType(ActionType.REJECT).rejectType(RejectType.REJECT_LIMIT).requests(1));
} else if (throwable instanceof RejectPermissionException) {
publish(context.getTrafficPublisher(), TrafficEvent.builder().actionType(ActionType.REJECT).rejectType(RejectType.REJECT_PERMISSION_DENIED).requests(1));
} else if (throwable instanceof RejectAuthException) {
publish(context.getTrafficPublisher(), TrafficEvent.builder().actionType(ActionType.REJECT).rejectType(RejectType.REJECT_UNAUTHORIZED).requests(1));
}
if (listeners != null) {
listeners.forEach(listener -> listener.onFailure(this, throwable));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.jd.live.agent.governance.invoke;

import com.jd.live.agent.bootstrap.exception.RejectException;
import com.jd.live.agent.bootstrap.exception.RejectException.*;
import com.jd.live.agent.core.event.Publisher;
import com.jd.live.agent.core.instance.Location;
import com.jd.live.agent.core.util.URI;
Expand Down Expand Up @@ -248,6 +250,29 @@ public void degrade(FaultType type, String reason, DegradeConfig config) {
request.degrade(type, reason, config);
}

/**
* Handles a reject event by publishing a traffic event with the appropriate reject type.
*
* @param exception the reject exception that occurred.
*/
public void onReject(RejectException exception) {
if (exception instanceof RejectUnreadyException) {
publish(context.getTrafficPublisher(), TrafficEvent.builder().actionType(TrafficEvent.ActionType.REJECT).rejectType(TrafficEvent.RejectType.REJECT_UNREADY).requests(1));
} else if (exception instanceof RejectUnitException) {
publish(context.getTrafficPublisher(), TrafficEvent.builder().actionType(TrafficEvent.ActionType.REJECT).rejectType(TrafficEvent.RejectType.REJECT_UNIT_UNAVAILABLE).requests(1));
} else if (exception instanceof RejectCellException) {
publish(context.getTrafficPublisher(), TrafficEvent.builder().actionType(TrafficEvent.ActionType.REJECT).rejectType(TrafficEvent.RejectType.REJECT_CELL_UNAVAILABLE).requests(1));
} else if (exception instanceof RejectEscapeException) {
publish(context.getTrafficPublisher(), TrafficEvent.builder().actionType(TrafficEvent.ActionType.REJECT).rejectType(TrafficEvent.RejectType.REJECT_ESCAPE).requests(1));
} else if (exception instanceof RejectLimitException) {
publish(context.getTrafficPublisher(), TrafficEvent.builder().actionType(TrafficEvent.ActionType.REJECT).rejectType(TrafficEvent.RejectType.REJECT_LIMIT).requests(1));
} else if (exception instanceof RejectPermissionException) {
publish(context.getTrafficPublisher(), TrafficEvent.builder().actionType(TrafficEvent.ActionType.REJECT).rejectType(TrafficEvent.RejectType.REJECT_PERMISSION_DENIED).requests(1));
} else if (exception instanceof RejectAuthException) {
publish(context.getTrafficPublisher(), TrafficEvent.builder().actionType(TrafficEvent.ActionType.REJECT).rejectType(TrafficEvent.RejectType.REJECT_UNAUTHORIZED).requests(1));
}
}

/**
* Publishes a live event to a specified publisher using a configured live event builder.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ default <R extends InboundRequest> void inbound(InboundInvocation<R> invocation)
chain.filter(invocation);
invocation.onForward();
} catch (RejectException e) {
invocation.onFailure(e);
invocation.onReject(e);
throw e;
}
}
Expand All @@ -285,7 +285,7 @@ default <R extends InboundRequest> void inbound(InboundInvocation<R> invocation)
* @throws RejectException if the request is rejected during filtering.
*/
default <R extends OutboundRequest, E extends Endpoint> E route(OutboundInvocation<R> invocation, List<E> instances) {
return route(invocation, instances, (RouteFilter[]) null, true);
return route(invocation, instances, (RouteFilter[]) null);
}

/**
Expand All @@ -304,16 +304,14 @@ default <R extends OutboundRequest, E extends Endpoint> E route(OutboundInvocati
* @throws RejectException if the request is rejected during filtering.
*/
default <R extends OutboundRequest,
E extends Endpoint, P> E route(OutboundInvocation<R> invocation,
List<P> instances,
Function<P, E> converter) {
E extends Endpoint, P> E route(OutboundInvocation<R> invocation, List<P> instances, Function<P, E> converter) {
List<E> endpoints = instances == null ? new ArrayList<>() : new ArrayList<>(instances.size());
if (instances != null) {
for (P instance : instances) {
endpoints.add(converter.apply(instance));
}
}
return route(invocation, endpoints, null, true);
return route(invocation, endpoints, (RouteFilter[]) null);
}

/**
Expand All @@ -328,7 +326,7 @@ E extends Endpoint, P> E route(OutboundInvocation<R> invocation,
* @throws RejectException if the request is rejected during filtering.
*/
default <R extends OutboundRequest, E extends Endpoint> E route(OutboundInvocation<R> invocation) {
return route(invocation, null, null, true);
return route(invocation, null, (RouteFilter[]) null);
}

/**
Expand All @@ -344,18 +342,14 @@ default <R extends OutboundRequest, E extends Endpoint> E route(OutboundInvocati
* @param instances A list of initial {@link Endpoint} instances to be considered for the request.
* @param filters A collection of {@link RouteFilter} instances to apply to the endpoints. If
* {@code null} or empty, the default set of route filters is used.
* @param notifyListener Whether to notify the listeners upon success or failure.
* @return An {@link Endpoint} instance that has been filtered according to the
* specified (or default) filters and is deemed suitable for the outbound request.
* @throws RejectNoProviderException if no provider is found for the invocation.
* @throws RejectException if the request is rejected during filtering.
*/
@SuppressWarnings("unchecked")
default <R extends OutboundRequest,
E extends Endpoint> E route(OutboundInvocation<R> invocation,
List<E> instances,
RouteFilter[] filters,
boolean notifyListener) {
E extends Endpoint> E route(OutboundInvocation<R> invocation, List<E> instances, RouteFilter[] filters) {
if (instances != null && !instances.isEmpty()) {
invocation.setInstances(instances);
}
Expand All @@ -365,17 +359,13 @@ E extends Endpoint> E route(OutboundInvocation<R> invocation,
List<? extends Endpoint> endpoints = invocation.getEndpoints();
Endpoint endpoint = endpoints != null && !endpoints.isEmpty() ? endpoints.get(0) : null;
if (endpoint != null || !invocation.getRequest().isInstanceSensitive()) {
if (notifyListener) {
invocation.onSuccess(endpoint, null);
}
invocation.onForward(endpoint);
return (E) endpoint;
} else {
throw new RejectNoProviderException("There is no provider for invocation " + invocation.getRequest().getService());
}
} catch (RejectException e) {
if (notifyListener) {
invocation.onFailure(null, e);
}
invocation.onReject(e);
throw e;
}
}
Expand Down Expand Up @@ -549,29 +539,24 @@ public <R extends InboundRequest> void inbound(InboundInvocation<R> invocation)

@Override
public <R extends OutboundRequest, E extends Endpoint> E route(OutboundInvocation<R> invocation, List<E> instances) {
return delegate.route(invocation, instances, (RouteFilter[]) null, true);
return delegate.route(invocation, instances, (RouteFilter[]) null);
}

@Override
public <R extends OutboundRequest,
E extends Endpoint, P> E route(OutboundInvocation<R> invocation,
List<P> instances,
Function<P, E> converter) {
E extends Endpoint, P> E route(OutboundInvocation<R> invocation, List<P> instances, Function<P, E> converter) {
return InvocationContext.super.route(invocation, instances, converter);
}

@Override
public <R extends OutboundRequest, E extends Endpoint> E route(OutboundInvocation<R> invocation) {
return delegate.route(invocation, null, (RouteFilter[]) null, true);
return delegate.route(invocation, null, (RouteFilter[]) null);
}

@Override
public <R extends OutboundRequest,
E extends Endpoint> E route(OutboundInvocation<R> invocation,
List<E> instances,
RouteFilter[] filters,
boolean notifyListener) {
return delegate.route(invocation, instances, filters, true);
E extends Endpoint> E route(OutboundInvocation<R> invocation, List<E> instances, RouteFilter[] filters) {
return delegate.route(invocation, instances, filters);
}

@Override
Expand Down Expand Up @@ -601,11 +586,8 @@ public HttpForwardContext(InvocationContext delegate) {

@Override
public <R extends OutboundRequest,
E extends Endpoint> E route(OutboundInvocation<R> invocation,
List<E> instances,
RouteFilter[] filters,
boolean notifyListener) {
E result = super.route(invocation, instances, filters, notifyListener);
E extends Endpoint> E route(OutboundInvocation<R> invocation, List<E> instances, RouteFilter[] filters) {
E result = super.route(invocation, instances, filters);
if (invocation.getRequest() instanceof HttpOutboundRequest) {
HttpOutboundRequest request = (HttpOutboundRequest) invocation.getRequest();
RouteTarget target = invocation.getRouteTarget();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package com.jd.live.agent.governance.invoke;

import com.jd.live.agent.bootstrap.exception.RejectException.*;
import com.jd.live.agent.governance.event.TrafficEvent;
import com.jd.live.agent.governance.event.TrafficEvent.*;
import com.jd.live.agent.governance.event.TrafficEvent.ActionType;
import com.jd.live.agent.governance.event.TrafficEvent.ComponentType;
import com.jd.live.agent.governance.event.TrafficEvent.Direction;
import com.jd.live.agent.governance.event.TrafficEvent.TrafficEventBuilder;
import com.jd.live.agent.governance.instance.Endpoint;
import com.jd.live.agent.governance.invoke.metadata.parser.LiveMetadataParser.OutboundLiveMetadataParser;
import com.jd.live.agent.governance.invoke.metadata.parser.LiveMetadataParser.RpcOutboundLiveMetadataParser;
Expand Down Expand Up @@ -143,7 +145,9 @@ public boolean onElect(Endpoint endpoint) {
* @param endpoint the endpoint to which the invocation is forwarded.
*/
public void onForward(Endpoint endpoint) {
request.addAttempt(endpoint.getId());
if (endpoint != null) {
request.addAttempt(endpoint.getId());
}
publish(context.getTrafficPublisher(), TrafficEvent.builder().actionType(ActionType.FORWARD).requests(1));
if (listeners != null) {
listeners.forEach(listener -> listener.onForward(endpoint, this));
Expand All @@ -170,19 +174,6 @@ public void onSuccess(Endpoint endpoint, ServiceResponse response) {
* @param throwable the exception that caused the failure.
*/
public void onFailure(Endpoint endpoint, Throwable throwable) {
if (throwable == null) {
return;
} else if (throwable instanceof RejectUnitException) {
publish(context.getTrafficPublisher(), TrafficEvent.builder().actionType(ActionType.REJECT).rejectType(RejectType.REJECT_UNIT_UNAVAILABLE).requests(1));
} else if (throwable instanceof RejectCellException) {
publish(context.getTrafficPublisher(), TrafficEvent.builder().actionType(ActionType.REJECT).rejectType(RejectType.REJECT_CELL_UNAVAILABLE).requests(1));
} else if (throwable instanceof RejectNoProviderException) {
publish(context.getTrafficPublisher(), TrafficEvent.builder().actionType(ActionType.REJECT).rejectType(RejectType.REJECT_NO_PROVIDER).requests(1));
} else if (throwable instanceof RejectCircuitBreakException) {
publish(context.getTrafficPublisher(), TrafficEvent.builder().actionType(ActionType.REJECT).rejectType(RejectType.REJECT_CIRCUIT_BREAK).requests(1));
} else if (throwable instanceof RejectLimitException) {
publish(context.getTrafficPublisher(), TrafficEvent.builder().actionType(ActionType.REJECT).rejectType(RejectType.REJECT_LIMIT).requests(1));
}
if (listeners != null) {
listeners.forEach(listener -> listener.onFailure(endpoint, this, throwable));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
import com.jd.live.agent.bootstrap.logger.Logger;
import com.jd.live.agent.bootstrap.logger.LoggerFactory;
import com.jd.live.agent.core.instance.AppStatus;
import com.jd.live.agent.governance.exception.ServiceError;
import com.jd.live.agent.governance.instance.Endpoint;
import com.jd.live.agent.governance.invoke.InvocationContext;
import com.jd.live.agent.governance.invoke.OutboundInvocation;
import com.jd.live.agent.governance.policy.service.cluster.ClusterPolicy;
import com.jd.live.agent.governance.request.ServiceRequest.OutboundRequest;
import com.jd.live.agent.governance.exception.ServiceError;
import com.jd.live.agent.governance.response.ServiceResponse.OutboundResponse;

import java.util.List;
Expand Down Expand Up @@ -91,7 +91,7 @@ E extends Endpoint> CompletionStage<O> invoke(LiveCluster<R, O, E> cluster, Outb
if (t == null) {
E endpoint = null;
try {
endpoint = context.route(invocation, v, null, false);
endpoint = context.route(invocation, v);
E instance = endpoint;
onStart(cluster, request, endpoint);
context.outbound(cluster, invocation, endpoint).whenComplete((o, r) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public <T extends OutboundRequest> void filter(OutboundInvocation<T> invocation,
if (backend == null) {
return null;
} else if (invocation.onElect(backend)) {
invocation.onForward(backend);
return Collections.singletonList(backend);
}
backends = backends == endpoints ? new ArrayList<>(endpoints) : backends;
Expand Down Expand Up @@ -95,7 +94,6 @@ private List<? extends Endpoint> preferSticky(RouteTarget target, OutboundInvoca
endpoint = iterator.next();
if (id.equals(endpoint.getId())) {
if (invocation.onElect(endpoint)) {
invocation.onForward(endpoint);
return Collections.singletonList(endpoint);
}
iterator.remove();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ private static class StickyListener implements OutboundListener {

@Override
public void onForward(Endpoint endpoint, OutboundInvocation<?> invocation) {
invocation.getRequest().setStickyId(endpoint.getId());
if (endpoint != null) {
invocation.getRequest().setStickyId(endpoint.getId());
}
}

}
Expand Down
Loading

0 comments on commit e0d3429

Please sign in to comment.