Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/real time usage update #807

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
2e30937
update usage in dashboard realtime for runtime and testing
notshivansh Jan 2, 2024
e287829
Changes for refreshing usage data
aktoboy Dec 14, 2023
cb60fea
Added more logs and try catch around flushUsageDataForOrg
aktoboy Dec 14, 2023
0d627f9
Removed sync check
aktoboy Dec 14, 2023
c13a16d
fix fn call
notshivansh Jan 2, 2024
3a21b10
update entitlements check to use local data
notshivansh Jan 3, 2024
a335b95
remove standar grace period, save usage metrics once per sync cycle a…
notshivansh Jan 3, 2024
bd80eb0
add support to update local metrics in real time
notshivansh Jan 8, 2024
6a6611c
Merge branch 'develop' into feature/real_time_usage_update
notshivansh Jan 8, 2024
fd1b055
refractor some code and update usage metrics for deleted test runs
notshivansh Jan 8, 2024
3285df8
fix diff algo
notshivansh Jan 9, 2024
3d591fe
fix bugs and make code uniform
notshivansh Jan 9, 2024
97c14f9
Merge branch 'develop' into feature/real_time_usage_update
notshivansh Jan 9, 2024
7782960
update metric calc according to new runtime
notshivansh Jan 9, 2024
4155e51
clean some code
notshivansh Jan 9, 2024
9c761dd
Merge branch 'develop' into feature/real_time_usage_update
notshivansh Jan 11, 2024
c5c27bd
update util to check only latest endpoints
notshivansh Jan 11, 2024
9f5d563
use current time for all har uploads
notshivansh Jan 12, 2024
0df5b9d
Merge branch 'develop' into feature/real_time_usage_update
notshivansh Jan 19, 2024
6a04987
fix comments
notshivansh Jan 19, 2024
689ae7c
Merge branch 'develop' into feature/real_time_usage_update
notshivansh Jan 19, 2024
d36d95c
Merge branch 'develop' into feature/real_time_usage_update
notshivansh Feb 12, 2024
5556f65
fix refractor
notshivansh Feb 12, 2024
c03e3fe
fix pipeline fn
notshivansh Feb 12, 2024
e566078
add more logs
notshivansh Feb 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions apps/api-runtime/src/main/java/com/akto/runtime/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.util.concurrent.TimeUnit;

import com.akto.DaoInit;
import com.akto.billing.UsageMetricUtils;
import com.akto.dao.*;
import com.akto.dao.context.Context;
import com.akto.dto.APIConfig;
Expand All @@ -20,6 +19,7 @@
import com.akto.parsers.HttpCallParser;
import com.akto.dto.HttpResponseParams;
import com.akto.util.AccountTask;
import com.akto.utils.EndpointUtil;
import com.google.gson.Gson;
import com.mongodb.ConnectionString;
import com.mongodb.client.model.Filters;
Expand Down Expand Up @@ -283,17 +283,6 @@ public void run() {
continue;
}

if (UsageMetricUtils.checkActiveEndpointOverage(accountIdInt)) {
int now = Context.now();
int lastSent = logSentMap.getOrDefault(accountIdInt, 0);
if (now - lastSent > LoggerMaker.LOG_SAVE_INTERVAL) {
logSentMap.put(accountIdInt, now);
loggerMaker.infoAndAddToDb("Active endpoint overage detected for account " + accountIdInt
+ ". Ingestion stopped " + now, LogDb.RUNTIME);
}
continue;
}

if (!httpCallParserMap.containsKey(accountId)) {
HttpCallParser parser = new HttpCallParser(
apiConfig.getUserIdentifier(), apiConfig.getThreshold(), apiConfig.getSync_threshold_count(),
Expand Down Expand Up @@ -321,6 +310,7 @@ public void run() {
}
}
}
EndpointUtil.calcAndDeleteEndpoints();
} catch (Exception e) {
loggerMaker.errorAndAddToDb(e.toString(), LogDb.RUNTIME);
}
Expand Down
156 changes: 156 additions & 0 deletions apps/api-runtime/src/main/java/com/akto/utils/EndpointUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package com.akto.utils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.bson.conversions.Bson;

import com.akto.billing.UsageMetricCalculator;
import com.akto.billing.UsageMetricHandler;
import com.akto.dao.ApiCollectionsDao;
import com.akto.dao.MCollection;
import com.akto.dao.SingleTypeInfoDao;
import com.akto.dao.context.Context;
import com.akto.dto.ApiCollection;
import com.akto.dto.ApiCollectionUsers;
import com.akto.dto.ApiCollectionUsers.CollectionType;
import com.akto.dto.ApiInfo.ApiInfoKey;
import com.akto.dto.billing.FeatureAccess;
import com.akto.dto.type.SingleTypeInfo;
import com.akto.dto.usage.MetricTypes;
import com.akto.log.LoggerMaker;
import com.akto.log.LoggerMaker.LogDb;
import com.akto.util.Constants;
import com.mongodb.BasicDBObject;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;

public class EndpointUtil {

private static final LoggerMaker loggerMaker = new LoggerMaker(EndpointUtil.class, LogDb.RUNTIME);

public static void calcAndDeleteEndpoints() {
// check if overage happened and delete over the limit data
int accountId = Context.accountId.get();
FeatureAccess featureAccess = UsageMetricHandler.calcAndFetchFeatureAccess(MetricTypes.ACTIVE_ENDPOINTS, accountId);

if (featureAccess.checkOverageAfterGrace()) {

loggerMaker.infoAndAddToDb("overage detected while processing endpoints, running overage function");
int usageLimit = featureAccess.getUsageLimit();
int measureEpoch = featureAccess.getMeasureEpoch();

/*
* delete all data related to endpoints after the
* specified limit for the current measureEpoch.
*/
deleteEndpoints(usageLimit, measureEpoch);
}
}

final static int delta = 60 * 20; // 20 minutes

private static void deleteEndpoints(int skip, int timestamp) {

Bson filters = Filters.and(
Filters.gt(SingleTypeInfo._TIMESTAMP, timestamp),
UsageMetricCalculator.excludeDemos(SingleTypeInfo._API_COLLECTION_ID));

boolean hasMore = false;

do {
hasMore = false;

/*
we query up to 100 endpoints at a time
Using the delta epoch to bring the latest traffic only.
*/

int now = Context.now();
int deltaEpoch = now - delta;
List<ApiInfoKey> apis = SingleTypeInfoDao.instance.getEndpointsAfterOverage(filters, skip, deltaEpoch);

// This contains all collections related to endpoints
Map<CollectionType, MCollection<?>[]> collectionsMap = ApiCollectionUsers.COLLECTIONS_WITH_API_COLLECTION_ID;

for (Map.Entry<CollectionType, MCollection<?>[]> collections : collectionsMap.entrySet()) {
deleteInManyCollections(collections.getValue(), createFilters(collections.getKey(), apis));
}

// we need to update the api collection with the new list of urls
Map<Integer, List<String>> urls = new HashMap<>();

for (ApiInfoKey api : apis) {
List<String> urlList = urls.get(api.getApiCollectionId());
if (urlList == null) {
urlList = new ArrayList<>();
}
urlList.add(api.getUrl() + " " + api.getMethod().toString());
urls.put(api.getApiCollectionId(), urlList);
}

for (Map.Entry<Integer, List<String>> entry : urls.entrySet()) {
ApiCollectionsDao.instance.updateOne(Filters.eq(Constants.ID, entry.getKey()),
Updates.pullAll(ApiCollection._URLS, entry.getValue()));
}

if (apis != null && !apis.isEmpty()) {
hasMore = true;
}

} while (hasMore);
}

private static void deleteInManyCollections(MCollection<?>[] collections, Bson filter) {
for (MCollection<?> collection : collections) {
collection.deleteAll(filter);
}
}

private static String getFilterPrefix(CollectionType type) {
String prefix = "";
switch (type) {
case Id_ApiCollectionId:
prefix = "_id.";
break;

case Id_ApiInfoKey_ApiCollectionId:
prefix = "_id.apiInfoKey.";
break;

case ApiCollectionId:
default:
break;
}
return prefix;
}

private static Bson createApiFilters(CollectionType type, ApiInfoKey api) {

String prefix = getFilterPrefix(type);

return Filters.and(
Filters.eq(prefix + SingleTypeInfo._URL, api.getUrl()),
Filters.eq(prefix + SingleTypeInfo._METHOD, api.getMethod().toString()),
Filters.in(SingleTypeInfo._COLLECTION_IDS, api.getApiCollectionId()));

}

private static Bson createFilters(CollectionType type, List<ApiInfoKey> apiList) {
Set<ApiInfoKey> apiSet = new HashSet<>(apiList);
List<Bson> apiFilters = new ArrayList<>();
if (apiSet != null && !apiSet.isEmpty()) {
for (ApiInfoKey api : apiSet) {
apiFilters.add(createApiFilters(type, api));
}
return Filters.or(apiFilters);
}

return Filters.nor(new BasicDBObject());
}

}
65 changes: 57 additions & 8 deletions apps/billing/src/main/java/com/akto/action/usage/UsageAction.java
Original file line number Diff line number Diff line change
@@ -1,40 +1,43 @@
package com.akto.action.usage;

import java.util.Set;
import java.util.function.Consumer;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import javax.servlet.http.HttpServletRequest;

import com.akto.util.UsageCalculator;
import com.akto.util.tasks.OrganizationTask;
import com.akto.util.UsageUtils;
import org.apache.struts2.interceptor.ServletRequestAware;

import com.akto.dao.context.Context;
import com.akto.dao.billing.OrganizationsDao;
import com.akto.dao.usage.UsageMetricsDao;
import com.akto.dao.usage.UsageMetricInfoDao;
import com.akto.dto.billing.Organization;
import com.akto.dto.test_editor.YamlTemplate;
import com.akto.dto.usage.MetricTypes;
import com.akto.dto.usage.UsageMetric;
import com.akto.dto.usage.UsageMetricInfo;
import com.akto.dto.usage.metadata.ActiveAccounts;
import com.akto.listener.InitializerListener;
import com.akto.log.LoggerMaker;
import com.akto.log.LoggerMaker.LogDb;
import com.google.gson.Gson;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;
import com.opensymphony.xwork2.Action;
import static com.opensymphony.xwork2.Action.SUCCESS;
import com.opensymphony.xwork2.ActionSupport;

public class UsageAction implements ServletRequestAware {
public class UsageAction extends ActionSupport implements ServletRequestAware {
private UsageMetric usageMetric;
private HttpServletRequest request;

private static final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

private static final LoggerMaker loggerMaker = new LoggerMaker(UsageAction.class);
private int usageLowerBound;
private int usageUpperBound;

private String organizationId;

public String ingestUsage() {
try {
String organizationId = usageMetric.getOrganizationId();
Expand Down Expand Up @@ -84,6 +87,44 @@ public String ingestUsage() {
return SUCCESS.toUpperCase();
}

public String flushUsageDataForOrg(){

if(organizationId == null || organizationId.isEmpty()){
addActionError("Organization id not provided");
return Action.ERROR.toUpperCase();
}

try {
Organization organization = OrganizationsDao.instance.findOne(Filters.eq(Organization.ID, organizationId));
if (organization == null) {
String message = String.format("Organization %s does not exist", organizationId);
addActionError(message);
loggerMaker.errorAndAddToDb(message, LogDb.BILLING);
return Action.ERROR.toUpperCase();
}
int now = Context.now();
/*
* since we just recorded and sent the data from dashboard,
* we need to set the limits to check for the current epoch,
* even though it would be a non-standard epoch.
*/
usageLowerBound = now - UsageUtils.USAGE_UPPER_BOUND_DL;
usageUpperBound = usageLowerBound + UsageUtils.USAGE_UPPER_BOUND_DL;
executorService.schedule(new Runnable() {
public void run() {
InitializerListener.aggregateAndSinkUsageData(organization, usageLowerBound, usageUpperBound);
loggerMaker.infoAndAddToDb(String.format("Flushed usage data for organization %s", organizationId), LogDb.BILLING);
}
}, 0, TimeUnit.SECONDS);
return SUCCESS.toUpperCase();
} catch (Exception e) {
String commonMessage = "Error while flushing usage data for organization";
loggerMaker.errorAndAddToDb(e, String.format( commonMessage + " %s. Error: %s", organizationId, e.getMessage()), LogDb.BILLING);
addActionError(commonMessage);
return Action.ERROR.toUpperCase();
}
}

public void setUsageMetric(UsageMetric usageMetric) {
this.usageMetric = usageMetric;
}
Expand All @@ -108,4 +149,12 @@ public int getUsageUpperBound() {
public void setUsageUpperBound(int usageUpperBound) {
this.usageUpperBound = usageUpperBound;
}

public String getOrganizationId() {
return organizationId;
}

public void setOrganizationId(String organizationId) {
this.organizationId = organizationId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ public void run() {
OrganizationTask.instance.executeTask(new Consumer<Organization>() {
@Override
public void accept(Organization o) {
UsageCalculator.instance.aggregateUsageForOrg(o, finalUsageLowerBound, finalUsageUpperBound);
UsageCalculator.instance.sendOrgUsageDataToAllSinks(o);
aggregateAndSinkUsageData(o, finalUsageLowerBound, finalUsageUpperBound);
}
}, "usage-reporting-scheduler");

Expand All @@ -145,4 +144,8 @@ public void accept(Organization o) {
}, 0, 1, UsageUtils.USAGE_CRON_PERIOD);
}

public static void aggregateAndSinkUsageData(Organization organization, int usageLowerBound, int usageUpperBound) {
UsageCalculator.instance.aggregateUsageForOrg(organization, usageLowerBound, usageUpperBound);
UsageCalculator.instance.sendOrgUsageDataToAllSinks(organization);
}
}
12 changes: 12 additions & 0 deletions apps/billing/src/main/resources/struts.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,18 @@
</result>
</action>


<action name="api/flushUsageDataForOrg" class="com.akto.action.usage.UsageAction" method="flushUsageDataForOrg">
<interceptor-ref name="json"/>
<interceptor-ref name="defaultStack" />
<result name="SUCCESS" type="json"/>
<result name="ERROR" type="json">
<param name="statusCode">422</param>
<param name="ignoreHierarchy">false</param>
<param name="includeProperties">^actionErrors.*</param>
</result>
</action>

</package>

</struts>
Loading
Loading