diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 0000000..76d2c40 --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,42 @@ +name: main + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + + build: + name: Build + runs-on: ubuntu-latest + steps: + + - name: Set up Go 1.x + uses: actions/setup-go@v2 + with: + go-version: ^1.15 + id: go + + - name: Check out code into the Go module directory + uses: actions/checkout@v2 + + - name: Get dependencies + run: | + go get -v -t -d ./... + if [ -f Gopkg.toml ]; then + curl https://raw.githubusercontent.com/golang/dep/main/install.sh | sh + dep ensure + fi + - name: Build + run: go build -v . + + - name: Test + run: | + go test -v . + env: + GITHUB_AUTH_TOKEN: ${{ secrets.TOKEN}} + GITHUB_EMAIL: ${{ secrets.EMAIL}} + + diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..131afd6 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,24 @@ +on: release +name: Release +jobs: + goreleaser: + runs-on: ubuntu-latest + steps: + - + name: Checkout + uses: actions/checkout@v2 + with: + fetch-depth: 0 + - + name: Set up Go + uses: actions/setup-go@v2 + with: + go-version: 1.15 + - + name: Run GoReleaser + uses: goreleaser/goreleaser-action@v2 + with: + version: latest + args: release --rm-dist + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/README.md b/README.md index 93d9381..4b81de7 100644 --- a/README.md +++ b/README.md @@ -1 +1,46 @@ -# sqs-pub \ No newline at end of file +# SQS Messages replayer + +## Overview + +This is a simple tool that allows replaying messages from the AWS SQS DLQ. While the primary function of this tool is to replay messages present in the DLQ, it can also be used to push a message from one given queue to another regardless if the source queue is a DLQ or not. + +## Usage + +Below is the print of the following command: `./sqs-pub -help`, which describe the usage of this tool + + +``` +USAGE + sqs-pub [-from queue1 - to queue2 -filter text1,text2,...] + +FLAGS + -delete true delete messages from source after successfuly pushed to destination queue + -filters 10104211111292 comma separted text that can be used a message body filter + -from vf-cm-dev-marketplace-emea-bi-deadletter-queue sqs queue from where messages will be sourced from + -to vf-cm-dev-marketplace-emea-bi-orders sqs queue from where messages will be sourced from + +``` + +## Flags +1. `delete`: by default this is set to true and the messages will be deleted from the soruce queue once successfully published to the destination queue. +2. `filters`: a comma separated list of keys that can be used to identify a given message is meant to be processed or not. This is a very simple filtering system, in subsequent releases timestamp or messageId can be used to filter out messages. +3. `from`: The source queue name (exp, DLQ) +4. `to`: The destionation queue name. +5. `dryrun`: By the default this flag is to false. If it is set to true then it will run the replay process but withouth sending the actual messages to the destination queue and reports will be generated as it has been a real dry. This flag is useful to get an picture of what is this tool attempt to do when run it for real. + +## How to run + +1. Set the AWS Credentials in the terminal + +``` +export AWS_ACCESS_KEY_ID="xxxxxxxV2M3L" +export AWS_SECRET_ACCESS_KEY="xxxxxxxxDK" +export AWS_SESSION_TOKEN="IQoJxxxxxxxxx" +``` + +2. Run the binary as follow +``` +./sqs-pub -from=queue-name-source -to=queue-name-destination -filters=text1,text2 + +## License +[MIT](LICENSE) diff --git a/failed.log b/failed.log deleted file mode 100644 index e69de29..0000000 diff --git a/filtered.log b/filtered.log deleted file mode 100644 index 2babef5..0000000 --- a/filtered.log +++ /dev/null @@ -1,144 +0,0 @@ -1f737271-6ba1-40b2-8ac9-a0c2f315d55a{ - "event_id": "4798ca4c-5192-4022-bd15-ca8a6da96ce4", - "order_id": "2869505b-7316-47a1-ae9e-6491951d2ffa", - "order_number": "10104211111292", - "state": "fulfilled", - "store_id": "1588", - "timestamp": 1605094700418, - "delivery_details": { - "delivery_tracking_number": "00340434461283103862", - "delivery_carrier_name": "DHL_DE", - "return_tracking_number": "744647704564", - "return_carrier_name": "DHL_DE" - }, - "items": [{ - "item_id": "eaa8a6b8-459d-4ce8-9fdb-b5e79bdbaf8f", - "ean": "0193391111155", - "price": 117.52, - "currency": "EUR", - "article_number": "TI112M00F-N110012000", - "article_location": null, - "return_reason_code": 2 - }] -} - -1f737271-6ba1-40b2-8ac9-a0c2f315d55a{ - "event_id": "4798ca4c-5192-4022-bd15-ca8a6da96ce4", - "order_id": "2869505b-7316-47a1-ae9e-6491951d2ffa", - "order_number": "10104211111292", - "state": "fulfilled", - "store_id": "1588", - "timestamp": 1605094700418, - "delivery_details": { - "delivery_tracking_number": "00340434461283103862", - "delivery_carrier_name": "DHL_DE", - "return_tracking_number": "744647704564", - "return_carrier_name": "DHL_DE" - }, - "items": [{ - "item_id": "eaa8a6b8-459d-4ce8-9fdb-b5e79bdbaf8f", - "ean": "0193391111155", - "price": 117.52, - "currency": "EUR", - "article_number": "TI112M00F-N110012000", - "article_location": null, - "return_reason_code": 2 - }] -} - -4e847d2b-45ee-43fd-8cc3-108ade5b2a1c{ - "event_id": "4798ca4c-5192-4022-bd15-ca8a6da96ce4", - "order_id": "2869505b-7316-47a1-ae9e-6491951d2ffa", - "order_number": "10104211111292", - "state": "fulfilled", - "store_id": "1588", - "timestamp": 1605094700418, - "delivery_details": { - "delivery_tracking_number": "00340434461283103862", - "delivery_carrier_name": "DHL_DE", - "return_tracking_number": "744647704564", - "return_carrier_name": "DHL_DE" - }, - "items": [{ - "item_id": "eaa8a6b8-459d-4ce8-9fdb-b5e79bdbaf8f", - "ean": "0193391111155", - "price": 117.52, - "currency": "EUR", - "article_number": "TI112M00F-N110012000", - "article_location": null, - "return_reason_code": 2 - }] -} - -1f737271-6ba1-40b2-8ac9-a0c2f315d55a{ - "event_id": "4798ca4c-5192-4022-bd15-ca8a6da96ce4", - "order_id": "2869505b-7316-47a1-ae9e-6491951d2ffa", - "order_number": "10104211111292", - "state": "fulfilled", - "store_id": "1588", - "timestamp": 1605094700418, - "delivery_details": { - "delivery_tracking_number": "00340434461283103862", - "delivery_carrier_name": "DHL_DE", - "return_tracking_number": "744647704564", - "return_carrier_name": "DHL_DE" - }, - "items": [{ - "item_id": "eaa8a6b8-459d-4ce8-9fdb-b5e79bdbaf8f", - "ean": "0193391111155", - "price": 117.52, - "currency": "EUR", - "article_number": "TI112M00F-N110012000", - "article_location": null, - "return_reason_code": 2 - }] -} - -4e847d2b-45ee-43fd-8cc3-108ade5b2a1c{ - "event_id": "4798ca4c-5192-4022-bd15-ca8a6da96ce4", - "order_id": "2869505b-7316-47a1-ae9e-6491951d2ffa", - "order_number": "10104211111292", - "state": "fulfilled", - "store_id": "1588", - "timestamp": 1605094700418, - "delivery_details": { - "delivery_tracking_number": "00340434461283103862", - "delivery_carrier_name": "DHL_DE", - "return_tracking_number": "744647704564", - "return_carrier_name": "DHL_DE" - }, - "items": [{ - "item_id": "eaa8a6b8-459d-4ce8-9fdb-b5e79bdbaf8f", - "ean": "0193391111155", - "price": 117.52, - "currency": "EUR", - "article_number": "TI112M00F-N110012000", - "article_location": null, - "return_reason_code": 2 - }] -} - -1f737271-6ba1-40b2-8ac9-a0c2f315d55a{ - "event_id": "4798ca4c-5192-4022-bd15-ca8a6da96ce4", - "order_id": "2869505b-7316-47a1-ae9e-6491951d2ffa", - "order_number": "10104211111292", - "state": "fulfilled", - "store_id": "1588", - "timestamp": 1605094700418, - "delivery_details": { - "delivery_tracking_number": "00340434461283103862", - "delivery_carrier_name": "DHL_DE", - "return_tracking_number": "744647704564", - "return_carrier_name": "DHL_DE" - }, - "items": [{ - "item_id": "eaa8a6b8-459d-4ce8-9fdb-b5e79bdbaf8f", - "ean": "0193391111155", - "price": 117.52, - "currency": "EUR", - "article_number": "TI112M00F-N110012000", - "article_location": null, - "return_reason_code": 2 - }] -} - diff --git a/main.go b/main.go index 7d346fa..bb93beb 100644 --- a/main.go +++ b/main.go @@ -15,10 +15,11 @@ var ( ) func init() { - rootFlagSet.StringVar(&replayer.cfg.from, "from", "vf-cm-dev-marketplace-emea-bi-deadletter-queue", "sqs queue from where messages will be sourced from") - rootFlagSet.StringVar(&replayer.cfg.to, "to", "vf-cm-dev-marketplace-emea-bi-orders", "sqs queue from where messages will be sourced from") + rootFlagSet.StringVar(&replayer.cfg.from, "from", "queue-name-source", "sqs queue from where messages will be sourced from") + rootFlagSet.StringVar(&replayer.cfg.to, "to", "queue-name-destination", "sqs queue from where messages will be sourced from") rootFlagSet.StringVar(&replayer.cfg.filters, "filters", "10104211111292", "comma separted text that can be used a message body filter") rootFlagSet.BoolVar(&replayer.cfg.deleteFromSource, "delete", true, "delete messages from source after successfuly pushed to destination queue") + rootFlagSet.BoolVar(&replayer.cfg.dryrun, "dryrun", false, "a flag to run the replay in dry run mode.") } func main() { diff --git a/processed.log b/processed.log deleted file mode 100644 index e69de29..0000000 diff --git a/pub.go b/sqs.go similarity index 58% rename from pub.go rename to sqs.go index 1ac7c44..2734e71 100644 --- a/pub.go +++ b/sqs.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "os" + "strconv" "strings" "github.com/aws/aws-sdk-go/aws" @@ -21,6 +22,7 @@ type SQSReplayConfig struct { from string to string deleteFromSource bool + dryrun bool filters string } @@ -36,68 +38,91 @@ func (s *SQSReplayer) replay(ctx context.Context, args []string) error { // Fetch the urls for the given queues fromQueue, toQueue := s.fetchQueueUrl(s.cfg.from, s.cfg.to) - // Create report files - preport := createReportFile("processed.log") - failedReport := createReportFile("failed.log") - filtered := createReportFile("filtered.log") + processedBody := []string{} + failedBody := []string{} + filteredBody := []string{} - defer preport.Close() - defer failedReport.Close() - defer filtered.Close() + messages := map[string]*[]string{"processed": &processedBody, "failed": &failedBody, "filtered": &filteredBody} // Fetch all the messages from the source queue and publish them to the destination queue - processed := 0 - failureToReadFromQueue := 0 - for { + numOfMessags := s.fetchNumberOfMessages(fromQueue) + + for i := 0; i < numOfMessags; i++ { msgResult, err := s.read(fromQueue) if err != nil { log.Printf("failed to read from the queue: %v", err) - failureToReadFromQueue++ - } - - if len(msgResult.Messages) == 0 || failureToReadFromQueue > 20 { - log.Println("\n no more messages in the queue to replay") - break } for _, msg := range msgResult.Messages { if !s.filter(msg.Body) { - log.Printf("Processing message: %d with id %s\n", processed, *msg.MessageId) + log.Printf("Processing message: %d with id %s\n", len(processedBody), *msg.MessageId) if err := s.send(toQueue, *msg.Body); err != nil { - log.Printf("failed to send message: %s", *msg.Body) - failedReport.WriteString(*msg.MessageId) - failedReport.WriteString(*msg.Body) - failedReport.WriteString("\n\n") + log.Printf("failed to send message: %s", *msg.Body) + failedBody = append(failedBody, *msg.Body) } else { - preport.WriteString(*msg.MessageId) - preport.WriteString(*msg.Body) - preport.WriteString("\n\n") + processedBody = append(processedBody, *msg.Body) if s.cfg.deleteFromSource { if err := s.delete(fromQueue, *msg.ReceiptHandle); err != nil { log.Printf("failed to delete message with id %s", *msg.MessageId) } } } - processed++ } else { log.Printf("Message with message id filtered: %s", *msg.MessageId) - filtered.WriteString(*msg.MessageId) - filtered.WriteString(*msg.Body) - filtered.WriteString("\n\n") + filteredBody = append(filteredBody, *msg.Body) } } } - log.Printf("number of messages processed %d", processed) - log.Printf("number of messages failed to be processed %d", failureToReadFromQueue) + + log.Printf("number of messages processed %d", len(processedBody)) + log.Printf("number of messages filtered %d", len(filteredBody)) + log.Printf("number of messages failed %d", len(failedBody)) + + // generating the reports + generateReport(messages) return nil } +func generateReport(messages map[string]*[]string) { + for n, msg := range messages { + if len(*msg) > 0 { + pfile := createReportFile(fmt.Sprintf("%s.log", n)) + defer pfile.Close() + for _, body := range *msg { + pfile.WriteString(body) + pfile.WriteString("\n") + pfile.WriteString("-----------------------------------------------------\n") + pfile.WriteString("\n\n") + } + log.Printf("%s.log report generated", n) + } + } +} + +func (s *SQSReplayer) fetchNumberOfMessages(queue string) int { + svc := sqs.New(s.sess) + numOfMessags := "ApproximateNumberOfMessages" + result, err := svc.GetQueueAttributes(&sqs.GetQueueAttributesInput{QueueUrl: &queue, AttributeNames: []*string{&numOfMessags}}) + + if err != nil { + log.Fatal(err) + } + + num, err := strconv.Atoi(*result.Attributes[numOfMessags]) + + if err != nil { + log.Fatalf("failed to retrieve the number of messages present in the source queue, %v", err) + } + log.Printf("found %d messages in the source queue %s", num, queue) + return num +} + func (s *SQSReplayer) filter(body *string) bool { fmt.Printf("filters %v", s.cfg.filters) if len(s.cfg.filters) > 0 { @@ -151,25 +176,31 @@ func (s *SQSReplayer) read(queue string) (*sqs.ReceiveMessageOutput, error) { QueueUrl: &queue, MaxNumberOfMessages: aws.Int64(5), }) + } func (s *SQSReplayer) send(queue string, body string) error { - svc := sqs.New(s.sess) - _, err := svc.SendMessage(&sqs.SendMessageInput{ - - MessageBody: aws.String(body), - QueueUrl: &queue, - }) - return err + if !s.cfg.dryrun { + svc := sqs.New(s.sess) + _, err := svc.SendMessage(&sqs.SendMessageInput{ + + MessageBody: aws.String(body), + QueueUrl: &queue, + }) + return err + } + return nil } func (s *SQSReplayer) delete(queueURL string, receiptHandle string) error { - - svc := sqs.New(s.sess) - - _, err := svc.DeleteMessage(&sqs.DeleteMessageInput{ - QueueUrl: &queueURL, - ReceiptHandle: &receiptHandle, - }) - return err + if !s.cfg.dryrun { + svc := sqs.New(s.sess) + + _, err := svc.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: &queueURL, + ReceiptHandle: &receiptHandle, + }) + return err + } + return nil }