Skip to content

Commit

Permalink
comment kafka push steps
Browse files Browse the repository at this point in the history
  • Loading branch information
ayushaga14 committed Dec 30, 2024
1 parent afe3a65 commit e5a5e0b
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 49 deletions.
17 changes: 1 addition & 16 deletions .github/workflows/staging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ jobs:
aws-secret-access-key: ${{secrets.AWS_SECRET_ACCESS_KEY}}
aws-region: ap-south-1

- name: Deploy polaris site to S3 bucket
run: aws s3 sync ./apps/dashboard/web/polaris_web/web/dist s3://dashboard-on-cdn/polaris_web/${{steps.docker_tag.outputs.IMAGE_TAG}}/dist --delete

- run: mvn package -Dakto-image-tag=${{ github.event.inputs.Tag }} -Dakto-build-time=$(eval "date +%s") -Dakto-release-version=${{steps.docker_tag.outputs.IMAGE_TAG}}
- name: DockerHub login
env:
Expand All @@ -74,22 +71,10 @@ jobs:
echo $IMAGE_TAG >> $GITHUB_STEP_SUMMARY
docker buildx create --use
# Build a docker container and push it to DockerHub
cd apps/dashboard
docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/$ECR_REPOSITORY-dashboard:$IMAGE_TAG $IMAGE_TAG_DASHBOARD . --push
cd ../testing
docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/akto-api-testing:$IMAGE_TAG $IMAGE_TAG_TESTING . --push
cd ../testing-cli
docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/akto-api-testing-cli:$IMAGE_TAG $IMAGE_TAG_TESTING_CLI . --push
cd ../billing
docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/akto-billing:$IMAGE_TAG . --push
cd ../internal
docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/akto-internal:$IMAGE_TAG . --push
cd ../threat-detection
cd apps/threat-detection
docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/akto-threat-detection:$IMAGE_TAG . --push
cd ../threat-detection-backend
docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/akto-threat-detection-backend:$IMAGE_TAG . --push
cd ../source-code-analyser
docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/source-code-analyser:$IMAGE_TAG . --push
- name: Set up JDK 11
uses: actions/setup-java@v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ public static void main(String[] args) throws Exception {
new FlushSampleDataTask(
sessionFactory, internalKafka, KafkaTopic.ThreatDetection.MALICIOUS_EVENTS)
.run();
new SendMaliciousEventsToBackend(
sessionFactory, internalKafka, KafkaTopic.ThreatDetection.ALERTS)
.run();
// new SendMaliciousEventsToBackend(
// sessionFactory, internalKafka, KafkaTopic.ThreatDetection.ALERTS)
// .run();
new CleanupTask(sessionFactory).run();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private boolean validateFilterForRequest(
}

private void processRecord(ConsumerRecord<String, String> record) {
System.out.println("Kafka record: " + record.value());
System.out.println("Kafka record: found ");
HttpResponseParams responseParam = HttpCallParser.parseKafkaMessage(record.value());
Context.accountId.set(Integer.parseInt(responseParam.getAccountId()));
Map<String, FilterConfig> filters = this.getFilters();
Expand Down Expand Up @@ -211,17 +211,17 @@ private void processRecord(ConsumerRecord<String, String> record) {
.setFilterId(apiFilter.getId())
.build();

try {
maliciousMessages.add(
MessageEnvelope.generateEnvelope(
responseParam.getAccountId(), actor, maliciousReq));
} catch (InvalidProtocolBufferException e) {
return;
}
// try {
// maliciousMessages.add(
// MessageEnvelope.generateEnvelope(
// responseParam.getAccountId(), actor, maliciousReq));
// } catch (InvalidProtocolBufferException e) {
// return;
// }

if (!isAggFilter) {
generateAndPushMaliciousEventRequest(
apiFilter, actor, responseParam, maliciousReq, EventType.EVENT_TYPE_SINGLE);
// generateAndPushMaliciousEventRequest(
// apiFilter, actor, responseParam, maliciousReq, EventType.EVENT_TYPE_SINGLE);
return;
}

Expand All @@ -232,33 +232,33 @@ private void processRecord(ConsumerRecord<String, String> record) {

if (result.shouldNotify()) {
System.out.print("Notifying for aggregation rule: " + rule);
generateAndPushMaliciousEventRequest(
apiFilter,
actor,
responseParam,
maliciousReq,
EventType.EVENT_TYPE_AGGREGATED);
}
// generateAndPushMaliciousEventRequest(
// apiFilter,
// actor,
// responseParam,
// maliciousReq,
// EventType.EVENT_TYPE_AGGREGATED);
}
}
});
}
}

// Should we push all the messages in one go
// or call kafka.send for each HttpRequestParams
try {
maliciousMessages.forEach(
sample -> {
sample
.marshal()
.ifPresent(
data -> {
internalKafka.send(data, KafkaTopic.ThreatDetection.MALICIOUS_EVENTS);
});
});
} catch (Exception e) {
e.printStackTrace();
}
// try {
// maliciousMessages.forEach(
// sample -> {
// sample
// .marshal()
// .ifPresent(
// data -> {
// internalKafka.send(data, KafkaTopic.ThreatDetection.MALICIOUS_EVENTS);
// });
// });
// } catch (Exception e) {
// e.printStackTrace();
// }
}

private void generateAndPushMaliciousEventRequest(
Expand Down

0 comments on commit e5a5e0b

Please sign in to comment.