Skip to content

Commit

Permalink
fetch new messages only for incremental copy
Browse files Browse the repository at this point in the history
  • Loading branch information
creativeprojects committed Sep 26, 2022
1 parent 7a2c420 commit a528e97
Show file tree
Hide file tree
Showing 14 changed files with 189 additions and 44 deletions.
14 changes: 13 additions & 1 deletion lib/lib.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package lib

import "strings"
import (
"strings"
"time"
)

func VerifyDelimiter(name, existingDelimiter, expectedDelimiter string) string {
if existingDelimiter == "" || expectedDelimiter == "" {
Expand All @@ -14,3 +17,12 @@ func VerifyDelimiter(name, existingDelimiter, expectedDelimiter string) string {
name = strings.ReplaceAll(name, existingDelimiter, expectedDelimiter)
return name
}

// SafePadding subtract about one day to the date to make sure we don't miss a message
func SafePadding(since time.Time) time.Time {
if since.IsZero() {
return since
}
// removes a day
return since.Add(-25 * time.Hour)
}
35 changes: 35 additions & 0 deletions lib/logger.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,46 @@
package lib

import "testing"

type Logger interface {
Print(a ...any)
Println(a ...any)
Printf(format string, a ...any)
}

type NoLog struct{}

func (l *NoLog) Print(a ...any) {}
func (l *NoLog) Println(a ...any) {}
func (l *NoLog) Printf(format string, a ...any) {}

type TestLogger struct {
t *testing.T
prefix string
}

func NewTestLogger(t *testing.T, prefix string) *TestLogger {
return &TestLogger{
t: t,
prefix: prefix,
}
}

func (l *TestLogger) Print(a ...any) {
if l.prefix == "" {
l.t.Log(a...)
} else {
l.t.Log(append([]any{l.prefix + ":"}, a...)...)
}
}

func (l *TestLogger) Println(a ...any) {
l.Print(a...)
}

func (l *TestLogger) Printf(format string, a ...any) {
if l.prefix != "" {
format = l.prefix + ": " + format
}
l.t.Logf(format, a...)
}
16 changes: 16 additions & 0 deletions mailbox/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,19 @@ func FindHistoryEntryFromSourceID(history *History, sourceMessageID MessageID) *
}
return nil
}

func FindLastAction(history *History) time.Time {
last := time.Time{}
if history == nil {
return last
}
if len(history.Actions) == 0 {
return last
}
for _, action := range history.Actions {
if action.Date.After(last) {
last = action.Date
}
}
return last
}
6 changes: 4 additions & 2 deletions storage/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storage
import (
"context"
"io"
"time"

"github.com/creativeprojects/imap/mailbox"
)
Expand All @@ -22,8 +23,9 @@ type Backend interface {
// SelectMailbox opens the current mailbox for fetching messages
SelectMailbox(info mailbox.Info) (*mailbox.Status, error)
PutMessage(info mailbox.Info, props mailbox.MessageProperties, body io.Reader) (mailbox.MessageID, error)
// FetchMessages needs a mailbox to be selected first
FetchMessages(ctx context.Context, messages chan *mailbox.Message) error
// FetchMessages needs a mailbox to be selected first.
// Use the zero Time to fetch all messages.
FetchMessages(ctx context.Context, since time.Time, messages chan *mailbox.Message) error
// UnselectMailbox after fetching messages
UnselectMailbox() error
AddToHistory(info mailbox.Info, actions ...mailbox.HistoryAction) error
Expand Down
15 changes: 8 additions & 7 deletions storage/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/creativeprojects/imap/cfg"
"github.com/creativeprojects/imap/lib"
"github.com/creativeprojects/imap/mailbox"
"github.com/creativeprojects/imap/storage/local"
"github.com/creativeprojects/imap/storage/mdir"
Expand Down Expand Up @@ -67,7 +68,7 @@ func TestImapBackend(t *testing.T) {
Password: "password",
NoTLS: true,
CacheDir: t.TempDir(),
DebugLogger: &testLogger{t},
DebugLogger: lib.NewTestLogger(t, "client"),
})
assert.NoError(t, err)

Expand All @@ -87,7 +88,7 @@ func TestMaildirBackend(t *testing.T) {
return
}
root := t.TempDir()
backend, err := mdir.NewWithLogger(root, &testLogger{t})
backend, err := mdir.NewWithLogger(root, lib.NewTestLogger(t, "client"))
require.NoError(t, err)

defer backend.Close()
Expand All @@ -97,7 +98,7 @@ func TestMaildirBackend(t *testing.T) {

func TestStoreBackend(t *testing.T) {
dir := t.TempDir()
backend, err := local.NewBoltStoreWithLogger(filepath.Join(dir, "store.db"), &testLogger{t})
backend, err := local.NewBoltStoreWithLogger(filepath.Join(dir, "store.db"), lib.NewTestLogger(t, "client"))
require.NoError(t, err)

defer backend.Close()
Expand All @@ -109,7 +110,7 @@ func TestStoreBackend(t *testing.T) {
}

func TestMemoryBackend(t *testing.T) {
backend := mem.NewWithLogger(&testLogger{t})
backend := mem.NewWithLogger(lib.NewTestLogger(t, "client"))

defer backend.Close()

Expand All @@ -130,7 +131,7 @@ func TestBackendFromConfig(t *testing.T) {
for name, account := range config.Accounts {
switch account.Type {
case cfg.LOCAL:
backend, err := local.NewBoltStoreWithLogger(account.File, &testLogger{t})
backend, err := local.NewBoltStoreWithLogger(account.File, lib.NewTestLogger(t, "client"))
require.NoError(t, err)
defer backend.Close()

Expand All @@ -146,7 +147,7 @@ func TestBackendFromConfig(t *testing.T) {
t.Log("maildir is not supported on Windows")
continue
}
backend, err := mdir.NewWithLogger(account.Root, &testLogger{t})
backend, err := mdir.NewWithLogger(account.Root, lib.NewTestLogger(t, "client"))
require.NoError(t, err)
defer backend.Close()

Expand All @@ -161,7 +162,7 @@ func TestBackendFromConfig(t *testing.T) {
Password: account.Password,
SkipTLSVerification: account.SkipTLSVerification,
CacheDir: t.TempDir(),
DebugLogger: &testLogger{t},
DebugLogger: lib.NewTestLogger(t, "client"),
})
require.NoError(t, err)
defer backend.Close()
Expand Down
2 changes: 1 addition & 1 deletion storage/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func CopyMessages(ctx context.Context, backendSource, backendDest Backend, mbox
receiver := make(chan *mailbox.Message, 10)
done := make(chan error, 1)
go func() {
done <- backendSource.FetchMessages(ctx, receiver)
done <- backendSource.FetchMessages(ctx, mailbox.FindLastAction(history), receiver)
}()

for msg := range receiver {
Expand Down
11 changes: 9 additions & 2 deletions storage/local/bolt_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (s *BoltStore) PutMessage(info mailbox.Info, props mailbox.MessagePropertie
if err != nil {
return fmt.Errorf("cannot save message body: %w", err)
}
s.log.Printf("Message saved: mailbox=%q uid=%d size=%d flags=%+v", name, uid, read, props.Flags)
s.log.Printf("Message saved: mailbox=%q uid=%d size=%d flags=%+v date=%q", name, uid, read, props.Flags, props.InternalDate)

props := &msgProps{
Flags: props.Flags,
Expand All @@ -286,14 +286,17 @@ func (s *BoltStore) PutMessage(info mailbox.Info, props mailbox.MessagePropertie
return messageID, err
}

func (s *BoltStore) FetchMessages(ctx context.Context, messages chan *mailbox.Message) error {
func (s *BoltStore) FetchMessages(ctx context.Context, since time.Time, messages chan *mailbox.Message) error {
defer close(messages)

if s.selected == "" {
return lib.ErrNotSelected
}
name := s.selected

// removes a day
since = lib.SafePadding(since)

err := s.db.View(func(tx *bolt.Tx) error {
var err error

Expand Down Expand Up @@ -321,6 +324,10 @@ func (s *BoltStore) FetchMessages(ctx context.Context, messages chan *mailbox.Me
return err
}
}
if !since.IsZero() && properties.Date.Before(since) {
// skip this message
return nil
}
// uncompress data
reader, err := zlib.NewReader(bytes.NewReader(value))
if err != nil {
Expand Down
15 changes: 0 additions & 15 deletions storage/logger_test.go

This file was deleted.

11 changes: 9 additions & 2 deletions storage/mdir/maildir.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (m *Maildir) PutMessage(info mailbox.Info, props mailbox.MessageProperties,
}
return mailbox.EmptyMessageID, fmt.Errorf("message body size advertised as %d bytes but read %d bytes from buffer", props.Size, copied)
}
m.log.Printf("Message saved: mailbox=%q key=%q size=%d", name, key, copied)
m.log.Printf("Message saved: mailbox=%q key=%q size=%d flags=%v date=%q", name, key, copied, props.Flags, props.InternalDate)

filename, err := mbox.Filename(key)
if err == nil {
Expand Down Expand Up @@ -171,13 +171,16 @@ func (m *Maildir) createFromStream(mbox maildir.Dir, flags []string, body io.Rea
return key, copied, nil
}

func (m *Maildir) FetchMessages(ctx context.Context, messages chan *mailbox.Message) error {
func (m *Maildir) FetchMessages(ctx context.Context, since time.Time, messages chan *mailbox.Message) error {
defer close(messages)

if m.selected == "" {
return lib.ErrNotSelected
}

// removes a day
since = lib.SafePadding(since)

name := m.selected
mbox := maildir.Dir(filepath.Join(m.root, name))
keys, err := mbox.Keys()
Expand All @@ -201,6 +204,10 @@ func (m *Maildir) FetchMessages(ctx context.Context, messages chan *mailbox.Mess
if err != nil {
return fmt.Errorf("cannot stat %q: %w", filename, err)
}
if !since.IsZero() && info.ModTime().Before(since) {
// skip this message
continue
}
file, err := mbox.Open(key)
if err != nil {
return fmt.Errorf("cannot open key %q: %w", key, err)
Expand Down
9 changes: 8 additions & 1 deletion storage/mem/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,24 @@ func (m *Backend) PutMessage(info mailbox.Info, props mailbox.MessageProperties,
return mailbox.NewMessageIDFromUint(uid), nil
}

func (m *Backend) FetchMessages(ctx context.Context, messages chan *mailbox.Message) error {
func (m *Backend) FetchMessages(ctx context.Context, since time.Time, messages chan *mailbox.Message) error {
defer close(messages)

if m.selected == "" {
return lib.ErrNotSelected
}

// removes a day
since = lib.SafePadding(since)

for uid, msg := range m.data[m.selected].messages {
if ctx.Err() != nil {
return ctx.Err()
}
if !since.IsZero() && msg.date.Before(since) {
// skip this message
continue
}
limitReader := limitio.NewReader(bytes.NewReader(msg.content))
limitReader.SetRateLimit(1024*1024, 1024) // limit 1MiB/s

Expand Down
3 changes: 2 additions & 1 deletion storage/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/sha256"
"fmt"
"io"
"time"

"github.com/creativeprojects/imap/mailbox"
)
Expand All @@ -15,7 +16,7 @@ func LoadMessageProperties(ctx context.Context, backend Backend, mbox mailbox.In
receiver := make(chan *mailbox.Message, 10)
done := make(chan error, 1)
go func() {
done <- backend.FetchMessages(ctx, receiver)
done <- backend.FetchMessages(ctx, time.Time{}, receiver)
}()

for msg := range receiver {
Expand Down
31 changes: 26 additions & 5 deletions storage/remote/imap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"path/filepath"
"sync"
"time"

"github.com/creativeprojects/imap/lib"
"github.com/creativeprojects/imap/mailbox"
Expand Down Expand Up @@ -212,20 +213,40 @@ func (i *Imap) PutMessage(info mailbox.Info, props mailbox.MessageProperties, bo
name, read, flags, err,
)
}
i.log.Printf("Message saved: mailbox=%q uid=%v size=%d bytes flags=%v", name, uid, read, flags)
i.log.Printf("Message saved: mailbox=%q uid=%v size=%d flags=%v date=%q", name, uid, read, flags, props.InternalDate)

return mailbox.NewMessageIDFromUint(uid), nil
}

func (i *Imap) FetchMessages(ctx context.Context, messages chan *mailbox.Message) error {
func (i *Imap) FetchMessages(ctx context.Context, since time.Time, messages chan *mailbox.Message) error {
defer close(messages)

if i.selected == nil {
return lib.ErrNotSelected
}

seqset := new(imap.SeqSet)
seqset.AddRange(1, i.selected.Messages)
var seqset *imap.SeqSet

if !since.IsZero() {
// removes a day
since = lib.SafePadding(since)
i.log.Printf("searching for emails after %s", since)
seqNums, err := i.client.Search(&imap.SearchCriteria{Since: since})
if err != nil {
i.log.Printf("error filtering emails by date: %s", err)
}
if len(seqNums) == 0 {
// no message
return nil
}
seqset = new(imap.SeqSet)
seqset.AddNum(seqNums...)
}
if seqset == nil {
// download all messages
seqset = new(imap.SeqSet)
seqset.AddRange(1, i.selected.Messages)
}

section := &imap.BodySectionName{Peek: true}
items := []imap.FetchItem{section.FetchItem(), imap.FetchFlags, imap.FetchUid, imap.FetchInternalDate}
Expand All @@ -243,7 +264,7 @@ func (i *Imap) FetchMessages(ctx context.Context, messages chan *mailbox.Message
go func() {
defer wg.Done()
for msg := range receiver {
i.log.Printf("Received IMAP message seq=%d", msg.SeqNum)
i.log.Printf("Received IMAP message seq=%d flags=%+v date=%q", msg.SeqNum, msg.Flags, msg.InternalDate)
// receive all the messages as they get in
message := &mailbox.Message{
MessageProperties: mailbox.MessageProperties{
Expand Down
Loading

0 comments on commit a528e97

Please sign in to comment.