Skip to content
This repository has been archived by the owner on Jan 21, 2019. It is now read-only.

Commit

Permalink
Improve concurrency, close database cleanly on interrupt
Browse files Browse the repository at this point in the history
Also clean up downloaded files if error occurs after file is downloaded.
Leaving stray files around might not be a great thing in the long term,
plus it will confuse the program when looking for an available filename
for a new item.
  • Loading branch information
mholt committed Jan 11, 2017
1 parent 8ceda42 commit 3e265c9
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 22 deletions.
23 changes: 20 additions & 3 deletions cmd/photobak/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io/ioutil"
"log"
"os"
"os/signal"
"strconv"
"time"

Expand Down Expand Up @@ -131,19 +132,35 @@ func parseEvery(every string) (time.Duration, error) {
}

func run() error {
waitchan := make(chan struct{})

repo, err := photobak.OpenRepo(repoDir)
if err != nil {
return fmt.Errorf("opening repo: %v", err)
}
defer repo.Close()

// TODO: Close the repo cleanly if SIGINT is received
defer close(waitchan)

// cleanly close repository when interrupted
// or when the function ends
go func() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, os.Interrupt)
select {
case <-waitchan:
repo.Close()
case <-sigchan:
log.Println("[INTERRUPT] Closing database and quitting")
repo.Close()
os.Exit(0)
}
}()

repo.NumWorkers = concurrency

if prune {
return repo.Prune()
}

return repo.Store(keepEverything)
}

Expand Down
12 changes: 5 additions & 7 deletions googlephotos/googlephotos.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,16 @@ func (c *Client) ListCollections() ([]photobak.Collection, error) {
func (c *Client) ListCollectionItems(col photobak.Collection, itemChan chan photobak.Item) (err error) {
defer close(itemChan)
url := "https://picasaweb.google.com/data/feed/api/user/default/albumid/" + col.CollectionID()

// try a few times in case there's a network error
for i := 0; i < 3; i++ {
err = c.listAllPhotos(url, itemChan)
if err != nil {
log.Printf("listing collection items (attempt %d): %v", i+1, err)
continue
if err == nil {
break
}
break
}
if err != nil {
log.Printf("listing collection items: %v; giving up", err)
log.Printf("listing collection items (attempt %d): %v", i+1, err)
}

return
}

Expand Down
32 changes: 20 additions & 12 deletions repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,11 @@ func (r *Repository) Store(saveEverything bool) error {

// perform downloads for each account
var collWg sync.WaitGroup
throttle := make(chan struct{}, r.NumWorkers)
numCollWorkers := r.NumWorkers / 2
if numCollWorkers < 1 {
numCollWorkers = 1
}
throttle := make(chan struct{}, numCollWorkers)
for _, ac := range accounts {
listedCollections, err := ac.client.ListCollections()
if err != nil {
Expand All @@ -209,13 +213,10 @@ func (r *Repository) Store(saveEverything bool) error {
}(listedColl)
}
for i := 0; i < cap(throttle); i++ {
throttle <- struct{}{} // make sure all goroutines have finished
throttle <- struct{}{} // make sure all goroutines finish
}
}

// block until all the workers are finished
workerWg.Wait()

// block until the processCollection() goroutines have finished
// wrapping all items; this is important because the context
// channel needs to be closed once they're done, but it is not
Expand All @@ -224,6 +225,10 @@ func (r *Repository) Store(saveEverything bool) error {
collWg.Wait()

close(ctxChan)

// block until all the workers are finished
workerWg.Wait()

return nil
}

Expand Down Expand Up @@ -321,7 +326,7 @@ func (r *Repository) processCollection(listedColl Collection, ac accountClient,
// begin processing all the items for this collection
err = ac.client.ListCollectionItems(coll, itemChan)
if err != nil {
return fmt.Errorf("listing collection items: %v", err)
return fmt.Errorf("client error listing collection items, giving up: %v", err)
}

return nil
Expand Down Expand Up @@ -617,6 +622,7 @@ func (r *Repository) downloadAndSaveItem(client Client, it item, coll collection
log.Printf("[ERROR] downloading %s, attempt %d: %v; retrying", it.filePath, i+1, err)
}
if err != nil {
os.Remove(outFilePath) // no need for it, the download failed
return fmt.Errorf("repeatedly failed downloading %s: %v", it.filePath, err)
}

Expand Down Expand Up @@ -672,6 +678,7 @@ func (r *Repository) downloadAndSaveItem(client Client, it item, coll collection
if it.isNew {
sameItems, err := r.db.itemsWithChecksum(dbi.Checksum)
if err != nil {
os.Remove(outFilePath)
return fmt.Errorf("de-duplicating item '%s': %v", it.fileName, err)
}
if len(sameItems) > 0 {
Expand All @@ -682,6 +689,12 @@ func (r *Repository) downloadAndSaveItem(client Client, it item, coll collection
// hard copy of the file we just downloaded since we'll point
// to where it already exists in the repository.

// delete the physical copy we just downloaded
err = os.Remove(outFilePath)
if err != nil {
return err
}

// load any item that has this checksum, they should all point to the
// same file path; use it to set this item's file path.
sameContent, err := r.db.loadItem(sameItems[0].AcctKey, sameItems[0].ItemID)
Expand All @@ -695,19 +708,14 @@ func (r *Repository) downloadAndSaveItem(client Client, it item, coll collection
if err != nil {
return err
}

// delete the physical copy we just downloaded
err = os.Remove(outFilePath)
if err != nil {
return err
}
}
}

// we've got everything on disk that we need,
// now commit this item to the database!
err = r.db.saveItem(pa.key(), itemID, dbi)
if err != nil {
os.Remove(outFilePath) // no record of it in the database, so don't keep it on disk...
return fmt.Errorf("saving item '%s' to database: %v", it.fileName, err)
}

Expand Down

0 comments on commit 3e265c9

Please sign in to comment.