Skip to content

Commit

Permalink
Fix error.
Browse files Browse the repository at this point in the history
  • Loading branch information
hexiaofeng committed May 21, 2024
1 parent fb28aaf commit be2e215
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ public enum EventType {
*/
AGENT_SERVICE_STOP,

/**
* Indicates that all agent's services have started.
*/
AGENT_SERVICES_START,

/**
* Indicates a successful enhancement or modification performed by the agent.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ public ServiceManager(List<AgentService> services, Publisher<AgentEvent> publish
@Override
public CompletableFuture<Void> start() {
return execute(AgentService::start, s -> new AgentEvent(EventType.AGENT_SERVICE_START,
"service " + s.getClass().getSimpleName() + " is started."));
"service " + s.getClass().getSimpleName() + " is started.")).whenComplete((v, t) -> {
if (t == null) {
publisher.offer(new Event<>(new AgentEvent(EventType.AGENT_SERVICES_START, "all services are started.")));
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public List<Service> update(String name,
if (old.getName().equals(name)) {
oldService = old;
} else {
result.add(oldService);
result.add(old);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.jd.live.agent.bootstrap.classloader.ResourcerType;
import com.jd.live.agent.bootstrap.logger.Logger;
import com.jd.live.agent.bootstrap.logger.LoggerFactory;
import com.jd.live.agent.core.event.AgentEvent;
import com.jd.live.agent.core.event.AgentEvent.EventType;
import com.jd.live.agent.core.event.Event;
import com.jd.live.agent.core.event.Publisher;
import com.jd.live.agent.core.extension.ExtensionInitializer;
Expand Down Expand Up @@ -49,7 +51,6 @@
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand All @@ -67,7 +68,11 @@ public class PolicyManager implements PolicySupervisor, InjectSourceSupplier, Ex

@Getter
@Inject(Publisher.POLICY_SUBSCRIBER)
private Publisher<PolicySubscriber> publisher;
private Publisher<PolicySubscriber> policyPublisher;

@Getter
@Inject(Publisher.SYSTEM)
private Publisher<AgentEvent> systemPublisher;

@Getter
@Inject(Application.COMPONENT_APPLICATION)
Expand Down Expand Up @@ -132,10 +137,6 @@ public class PolicyManager implements PolicySupervisor, InjectSourceSupplier, Ex
@InjectLoader(ResourcerType.CORE_IMPL)
private List<RouteFilter> routeFilters;

private final AtomicInteger counter = new AtomicInteger(0);

private final CompletableFuture<Void> future = new CompletableFuture<>();

@Override
public PolicySupplier getPolicySupplier() {
return this;
Expand Down Expand Up @@ -210,21 +211,26 @@ public List<PolicySubscriber> getSubscribers() {

@Override
public void initialize() {
ServiceConfig serviceConfig = governanceConfig == null ? null : governanceConfig.getServiceConfig();
Set<String> warmups = serviceConfig == null ? null : serviceConfig.getWarmups();
warmups = warmups == null ? new HashSet<>() : warmups;
AppService service = application.getService();
String name = service == null || service.getName() == null ? null : service.getName();
String namespace = service == null ? null : service.getNamespace();
if (name != null) {
warmups.add(name);
}
if (!warmups.isEmpty()) {
counter.set(warmups.size());
warmups.forEach(o -> subscribe(new PolicySubscriber(o, namespace, PolicyType.SERVICE_POLICY)));
} else {
future.complete(null);
}
systemPublisher.addHandler(events -> {
// subscribe after all services are started.
for (Event<AgentEvent> event : events) {
AgentEvent agentEvent = event.getData();
if (agentEvent.getType() == EventType.AGENT_SERVICES_START) {
ServiceConfig serviceConfig = governanceConfig == null ? null : governanceConfig.getServiceConfig();
Set<String> warmups = serviceConfig == null ? null : serviceConfig.getWarmups();
warmups = warmups == null ? new HashSet<>() : warmups;
AppService service = application.getService();
String namespace = service == null ? null : service.getNamespace();
String name = service == null || service.getName() == null ? null : service.getName();
if (name != null) {
warmups.add(name);
}
if (!warmups.isEmpty()) {
warmups.forEach(o -> subscribe(new PolicySubscriber(o, namespace, PolicyType.SERVICE_POLICY)));
}
}
}
});
}

protected void subscribe(PolicySubscriber subscriber) {
Expand All @@ -236,11 +242,8 @@ protected void subscribe(PolicySubscriber subscriber) {
} else {
logger.error("failed to sync service policy " + subscriber.getName());
}
if (!future.isDone() && counter.decrementAndGet() == 0) {
future.complete(null);
}
});
publisher.offer(new Event<>(subscriber));
policyPublisher.offer(new Event<>(subscriber));
} else {
exist.trigger(subscriber.getFuture());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
*/
package com.jd.live.agent.governance.policy.service.live;

import com.jd.live.agent.core.parser.json.DeserializeConverter;
import com.jd.live.agent.core.util.cache.Cache;
import com.jd.live.agent.core.util.cache.MapCache;
import com.jd.live.agent.core.util.map.ListBuilder;
import com.jd.live.agent.governance.policy.PolicyInherit.PolicyInheritWithId;
import com.jd.live.agent.governance.policy.service.annotation.Provider;
import com.jd.live.agent.governance.policy.service.live.converter.CellPolicyDeserializer;
import com.jd.live.agent.governance.policy.service.live.converter.UnitPolicyDeserializer;
import lombok.Getter;
import lombok.Setter;

Expand Down Expand Up @@ -58,6 +61,7 @@ public class ServiceLivePolicy implements LiveStrategy, Cloneable, PolicyInherit
*/
@Getter
@Setter
@DeserializeConverter(UnitPolicyDeserializer.class)
private UnitPolicy unitPolicy;

/**
Expand All @@ -79,6 +83,7 @@ public class ServiceLivePolicy implements LiveStrategy, Cloneable, PolicyInherit
*/
@Getter
@Setter
@DeserializeConverter(CellPolicyDeserializer.class)
private CellPolicy cellPolicy;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.governance.policy.service.live.converter;

import com.jd.live.agent.core.parser.json.JsonConverter;
import com.jd.live.agent.governance.policy.service.live.CellPolicy;

public class CellPolicyDeserializer implements JsonConverter<String, CellPolicy> {

@Override
public CellPolicy convert(String source) {
return source == null || source.isEmpty() ? null : CellPolicy.valueOf(source.toUpperCase());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.governance.policy.service.live.converter;

import com.jd.live.agent.core.parser.json.JsonConverter;
import com.jd.live.agent.governance.policy.service.live.UnitPolicy;

public class UnitPolicyDeserializer implements JsonConverter<String, UnitPolicy> {

@Override
public UnitPolicy convert(String source) {
return source == null || source.isEmpty() ? null : UnitPolicy.valueOf(source.toUpperCase());
}
}

0 comments on commit be2e215

Please sign in to comment.