Skip to content

Commit

Permalink
Concurrent restore
Browse files Browse the repository at this point in the history
  • Loading branch information
justinrlee committed Jan 8, 2018
1 parent baa4349 commit bf1d8fc
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 86 deletions.
19 changes: 1 addition & 18 deletions cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ to quickly create a Cobra application.`,
secretChan := make(chan Secret)

// fmt.Println("Calling go GetSecrets")
go cluster.GetSecrets(secrets.Array, cipherkey, secretChan, 10)
go cluster.GetSecrets(secrets.Array, cipherkey, secretChan, concurrency)

// fmt.Println("Starting to receive")
for i := 0; i < len(secrets.Array); i++ {
Expand All @@ -73,23 +73,6 @@ to quickly create a Cobra application.`,
secretSlice = append(secretSlice, s)
}

// fmt.Println(secretSlice)

// func (c *Cluster) GetSecrets(secrets []string, cipherKey string, secretChan chan<- Secret, qsize int) {
// // Get all secrets, add them to the files array
// for _, secretID := range secrets.Array {
// fmt.Printf("Getting secret '%s'\n", secretID)
// // secretValue, err := cluster.Get("/secrets/v1/secret/default/" + secretPath)
// secretJSON, returnCode, err := cluster.Call("GET", "/secrets/v1/secret/default/"+secretID, nil)
// if err != nil || returnCode != http.StatusOK {
// fmt.Println("TODO: error handling here")
// panic(err)
// }

// e := encrypt(string(secretJSON), cipherkey)
// secretSlice = append(secretSlice, Secret{ID: secretID, EncryptedJSON: e})
// }

fmt.Println("Writing to tar at " + destfile)
writeTar(secretSlice, destfile)

Expand Down
51 changes: 14 additions & 37 deletions cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package cmd

import (
"encoding/json"
"fmt"
"os"

Expand Down Expand Up @@ -45,45 +44,23 @@ to quickly create a Cobra application.`,
os.Exit(1)
}

secrets := readTar(sourcefile)
for _, item := range secrets {
plaintext := decrypt(item.EncryptedJSON, cipherkey)
secrets := readTar(sourcefile) // secrets []Secret

// Prior to restore, will try to unmarshal. If unable to unmarshal, we have invalid cipherkey.
var t struct {
Value string `json:"value"`
}
err := json.Unmarshal([]byte(plaintext), &t)
if err != nil || t.Value == "" {
fmt.Println("Unable to decrypt. You likely have an invalid cipherkey.")
os.Exit(1)
}
rchan := make(chan int) // Used to wait till done

fmt.Printf("Processing [%s] ...\n", item.ID)
secretPath := "/secrets/v1/secret/default/" + item.ID
// Populate connection pool
pool := make(chan int, concurrency)
for i:= 0; i < concurrency; i++ {
pool <- 0
}

for _, secret := range secrets {
go cluster.PushSecret(secret, cipherkey, pool, rchan)
}

resp, code, err := cluster.Call("PUT", secretPath, []byte(plaintext))
if code == 201 {
fmt.Println("Secret [" + item.ID + "] successfully created.")
} else if code == 409 {
presp, pcode, perr := cluster.Call("PATCH", secretPath, []byte(plaintext))
if pcode == 204 {
fmt.Println("Secret [" + item.ID + "] successfully updated.")
} else if perr != nil {
fmt.Println("Error:")
fmt.Println(perr)
} else {
fmt.Println(string(presp))
fmt.Println(pcode)
}
} else if err != nil {
fmt.Println("Error:")
fmt.Println(err)
} else {
fmt.Println("Something else happened:")
fmt.Printf("HTTP %s: %s\n", code, string(resp))
fmt.Println(err)
}
// Wait for all secrets to be processed before quitting
for i := 0; i < len(secrets); i++ {
<- rchan
}
},
}
Expand Down
12 changes: 7 additions & 5 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var password string
var cipherkey string
var destfile string
var sourcefile string
var concurrency int

// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Expand All @@ -55,11 +56,12 @@ func init() {
// Cobra supports persistent flags, which, if defined here,
// will be global for your application.
rootCmd.PersistentFlags().StringVar(&hostname, "hostname", "", "Hostname of cluster")
rootCmd.PersistentFlags().StringVar(&username, "username", "", "username for cluster")
rootCmd.PersistentFlags().StringVar(&password, "password", "", "password for cluster")
rootCmd.PersistentFlags().StringVar(&cipherkey, "cipherkey", "", "cipherkey for encryption/decryption")
rootCmd.PersistentFlags().StringVar(&destfile, "destfile", "secrets.tar", "Filename to write tar of secrets")
rootCmd.PersistentFlags().StringVar(&sourcefile, "sourcefile", "secrets.tar", "Filename to read tar of secrets")
rootCmd.PersistentFlags().StringVarP(&username, "username", "u", "", "username for cluster")
rootCmd.PersistentFlags().StringVarP(&password, "password", "p", "", "password for cluster")
rootCmd.PersistentFlags().StringVarP(&cipherkey, "cipherkey", "k", "", "cipherkey for encryption/decryption")
rootCmd.PersistentFlags().StringVarP(&destfile, "destfile", "d", "secrets.tar", "Filename to write tar of secrets")
rootCmd.PersistentFlags().StringVarP(&sourcefile, "sourcefile", "s", "secrets.tar", "Filename to read tar of secrets")
rootCmd.PersistentFlags().IntVarP(&concurrency, "concurrency", "c", 10, "Number of concurrent queries")
// rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.dcos-secrets-backup.yaml)")

// Cobra also supports local flags, which will only run
Expand Down
80 changes: 58 additions & 22 deletions cmd/utility-cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"os"
)

type User struct {
Expand Down Expand Up @@ -119,42 +120,77 @@ func (c *Cluster) Call(verb string, path string, buf []byte) (body []byte, retur
}

// Get secret
func (c *Cluster) GetSecret(secretID string, cipherKey string, secretChan chan<- Secret, queue chan int) {
<- queue // Wait for there to be an open spot in the queue
func (c *Cluster) GetSecret(secretID string, cipherKey string, pool chan int, secretChan chan<- Secret) {
<- pool // Wait for there to be an open spot in the queue
defer func() {
pool <- 0
}()

fmt.Printf("Getting secret '%s'\n", secretID)
secretJSON, returnCode, err := c.Call("GET", "/secrets/v1/secret/default/"+secretID, nil)
if err != nil || returnCode != http.StatusOK {
fmt.Println("Unable to retrieve secret '%s'\n. [%d]: %s", secretID, returnCode, err.Error)
secretChan <- Secret{ID: "", EncryptedJSON: ""}
queue <- 0 // Release queue spot
} else {
e := encrypt(string(secretJSON), cipherKey)
secretChan <- Secret{ID: secretID, EncryptedJSON: e}
queue <- 0 // Release queue spot
}
}

func (c *Cluster) GetSecrets(secrets []string, cipherKey string, secretChan chan Secret, qsize int) {
queue := make(chan int, qsize)
for i:= 0; i < qsize; i++ {
func (c *Cluster) GetSecrets(secrets []string, cipherKey string, secretChan chan Secret, psize int) {
pool := make(chan int, psize)
for i:= 0; i < psize; i++ {
// fmt.Println("Writing 0 to queue")
queue <- 0
pool <- 0
}
// Spins off a bunch of goroutines to get secrets and add them to secretChan. Should be rate limited by qsize
// Spins off a bunch of goroutines to get secrets and add them to secretChan. Should be rate limited by psize
for _, secretID := range secrets {
go c.GetSecret(secretID, cipherKey, secretChan, queue)
go c.GetSecret(secretID, cipherKey, pool, secretChan)
}
}

// for _, secretID := range secrets.Array {
// fmt.Printf("Getting secret '%s'\n", secretID)
// // secretValue, err := cluster.Get("/secrets/v1/secret/default/" + secretPath)
// secretJSON, returnCode, err := cluster.Call("GET", "/secrets/v1/secret/default/"+secretID, nil)
// if err != nil || returnCode != http.StatusOK {
// fmt.Println("TODO: error handling here")
// panic(err)
// }

// e := encrypt(string(secretJSON), cipherkey)
// secretSlice = append(secretSlice, Secret{ID: secretID, EncryptedJSON: e})
// }
// Will attempt to PUT; if it gets a 409 back (i.e., a 'conflict'), will then attempt a PATCH
func (c * Cluster) PushSecret(secret Secret, cipherKey string, pool chan int, rchan chan<- int) {

// We don't really need to throttle decryption / unmarshalling
plaintext := decrypt(secret.EncryptedJSON, cipherkey)

var t struct {
Value string `json:"value"`
}
err := json.Unmarshal([]byte(plaintext), &t)
if err != nil || t.Value == "" {
fmt.Printf("Unable to decrypt [%s]. You likely have an invalid cipherkey.\n", secret.ID)
os.Exit(1)
}

fmt.Printf("Queueing secret [%s] ...\n", secret.ID)
secretPath := "/secrets/v1/secret/default/" + secret.ID

<- pool // throttle
defer func() {
pool <- 0
rchan <- 0
}()

resp, code, err := c.Call("PUT", secretPath, []byte(plaintext))
if code == 201 {
fmt.Println("Secret [" + secret.ID + "] successfully created.")
} else if code == 409 {
// fmt.Printf("[%s] already exists, updating ...\n", secret.ID)
presp, pcode, perr := c.Call("PATCH", secretPath, []byte(plaintext))
if pcode == 204 {
fmt.Println("Secret [" + secret.ID + "] successfully updated.")
} else if perr != nil {
fmt.Printf("Error when attempting to update [%s]: %s\n", secret.ID, perr.Error())
} else {
fmt.Printf("Error when attempting to update [%s]. [%s]: %s\n", secret.ID, pcode, string(presp))
}
} else if err != nil {
fmt.Printf("Error when attempting to create [%s]: %s\n", secret.ID, err.Error())
} else {
fmt.Printf("Error when attempting to create [%s]. [%s]: %s\n", secret.ID, code, string(resp))
}


}
8 changes: 4 additions & 4 deletions cmd/utility-file.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ func writeTar(files []Secret, filename string) {
}
}

func readTar(filename string) (files []Secret) {
files = []Secret{}
func readTar(filename string) (secrets []Secret) {
secrets = []Secret{}
f, err := os.Open(filename)
if err != nil {
panic(err)
Expand All @@ -158,9 +158,9 @@ func readTar(filename string) (files []Secret) {
buf := new(bytes.Buffer)
buf.ReadFrom(tr)
s := buf.String()
files = append(files, Secret{ID: hdr.Name, EncryptedJSON: s})
secrets = append(secrets, Secret{ID: hdr.Name, EncryptedJSON: s})
}
return files
return secrets
}

// func createDirFor(path string) {
Expand Down

0 comments on commit bf1d8fc

Please sign in to comment.