diff --git a/lib/lib.go b/lib/lib.go index 2448260..fcfff50 100644 --- a/lib/lib.go +++ b/lib/lib.go @@ -1,6 +1,9 @@ package lib -import "strings" +import ( + "strings" + "time" +) func VerifyDelimiter(name, existingDelimiter, expectedDelimiter string) string { if existingDelimiter == "" || expectedDelimiter == "" { @@ -14,3 +17,12 @@ func VerifyDelimiter(name, existingDelimiter, expectedDelimiter string) string { name = strings.ReplaceAll(name, existingDelimiter, expectedDelimiter) return name } + +// SafePadding subtract about one day to the date to make sure we don't miss a message +func SafePadding(since time.Time) time.Time { + if since.IsZero() { + return since + } + // removes a day + return since.Add(-25 * time.Hour) +} diff --git a/lib/logger.go b/lib/logger.go index f10ff22..6f2080d 100644 --- a/lib/logger.go +++ b/lib/logger.go @@ -1,11 +1,46 @@ package lib +import "testing" + type Logger interface { Print(a ...any) + Println(a ...any) Printf(format string, a ...any) } type NoLog struct{} func (l *NoLog) Print(a ...any) {} +func (l *NoLog) Println(a ...any) {} func (l *NoLog) Printf(format string, a ...any) {} + +type TestLogger struct { + t *testing.T + prefix string +} + +func NewTestLogger(t *testing.T, prefix string) *TestLogger { + return &TestLogger{ + t: t, + prefix: prefix, + } +} + +func (l *TestLogger) Print(a ...any) { + if l.prefix == "" { + l.t.Log(a...) + } else { + l.t.Log(append([]any{l.prefix + ":"}, a...)...) + } +} + +func (l *TestLogger) Println(a ...any) { + l.Print(a...) +} + +func (l *TestLogger) Printf(format string, a ...any) { + if l.prefix != "" { + format = l.prefix + ": " + format + } + l.t.Logf(format, a...) +} diff --git a/mailbox/history.go b/mailbox/history.go index a1071e5..8630180 100644 --- a/mailbox/history.go +++ b/mailbox/history.go @@ -90,3 +90,19 @@ func FindHistoryEntryFromSourceID(history *History, sourceMessageID MessageID) * } return nil } + +func FindLastAction(history *History) time.Time { + last := time.Time{} + if history == nil { + return last + } + if len(history.Actions) == 0 { + return last + } + for _, action := range history.Actions { + if action.Date.After(last) { + last = action.Date + } + } + return last +} diff --git a/storage/backend.go b/storage/backend.go index c49a490..d9bddf4 100644 --- a/storage/backend.go +++ b/storage/backend.go @@ -3,6 +3,7 @@ package storage import ( "context" "io" + "time" "github.com/creativeprojects/imap/mailbox" ) @@ -22,8 +23,9 @@ type Backend interface { // SelectMailbox opens the current mailbox for fetching messages SelectMailbox(info mailbox.Info) (*mailbox.Status, error) PutMessage(info mailbox.Info, props mailbox.MessageProperties, body io.Reader) (mailbox.MessageID, error) - // FetchMessages needs a mailbox to be selected first - FetchMessages(ctx context.Context, messages chan *mailbox.Message) error + // FetchMessages needs a mailbox to be selected first. + // Use the zero Time to fetch all messages. + FetchMessages(ctx context.Context, since time.Time, messages chan *mailbox.Message) error // UnselectMailbox after fetching messages UnselectMailbox() error AddToHistory(info mailbox.Info, actions ...mailbox.HistoryAction) error diff --git a/storage/backend_test.go b/storage/backend_test.go index 111b0f1..6141993 100644 --- a/storage/backend_test.go +++ b/storage/backend_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/creativeprojects/imap/cfg" + "github.com/creativeprojects/imap/lib" "github.com/creativeprojects/imap/mailbox" "github.com/creativeprojects/imap/storage/local" "github.com/creativeprojects/imap/storage/mdir" @@ -67,7 +68,7 @@ func TestImapBackend(t *testing.T) { Password: "password", NoTLS: true, CacheDir: t.TempDir(), - DebugLogger: &testLogger{t}, + DebugLogger: lib.NewTestLogger(t, "client"), }) assert.NoError(t, err) @@ -87,7 +88,7 @@ func TestMaildirBackend(t *testing.T) { return } root := t.TempDir() - backend, err := mdir.NewWithLogger(root, &testLogger{t}) + backend, err := mdir.NewWithLogger(root, lib.NewTestLogger(t, "client")) require.NoError(t, err) defer backend.Close() @@ -97,7 +98,7 @@ func TestMaildirBackend(t *testing.T) { func TestStoreBackend(t *testing.T) { dir := t.TempDir() - backend, err := local.NewBoltStoreWithLogger(filepath.Join(dir, "store.db"), &testLogger{t}) + backend, err := local.NewBoltStoreWithLogger(filepath.Join(dir, "store.db"), lib.NewTestLogger(t, "client")) require.NoError(t, err) defer backend.Close() @@ -109,7 +110,7 @@ func TestStoreBackend(t *testing.T) { } func TestMemoryBackend(t *testing.T) { - backend := mem.NewWithLogger(&testLogger{t}) + backend := mem.NewWithLogger(lib.NewTestLogger(t, "client")) defer backend.Close() @@ -130,7 +131,7 @@ func TestBackendFromConfig(t *testing.T) { for name, account := range config.Accounts { switch account.Type { case cfg.LOCAL: - backend, err := local.NewBoltStoreWithLogger(account.File, &testLogger{t}) + backend, err := local.NewBoltStoreWithLogger(account.File, lib.NewTestLogger(t, "client")) require.NoError(t, err) defer backend.Close() @@ -146,7 +147,7 @@ func TestBackendFromConfig(t *testing.T) { t.Log("maildir is not supported on Windows") continue } - backend, err := mdir.NewWithLogger(account.Root, &testLogger{t}) + backend, err := mdir.NewWithLogger(account.Root, lib.NewTestLogger(t, "client")) require.NoError(t, err) defer backend.Close() @@ -161,7 +162,7 @@ func TestBackendFromConfig(t *testing.T) { Password: account.Password, SkipTLSVerification: account.SkipTLSVerification, CacheDir: t.TempDir(), - DebugLogger: &testLogger{t}, + DebugLogger: lib.NewTestLogger(t, "client"), }) require.NoError(t, err) defer backend.Close() diff --git a/storage/copy.go b/storage/copy.go index 1e6af1b..fca765e 100644 --- a/storage/copy.go +++ b/storage/copy.go @@ -19,7 +19,7 @@ func CopyMessages(ctx context.Context, backendSource, backendDest Backend, mbox receiver := make(chan *mailbox.Message, 10) done := make(chan error, 1) go func() { - done <- backendSource.FetchMessages(ctx, receiver) + done <- backendSource.FetchMessages(ctx, mailbox.FindLastAction(history), receiver) }() for msg := range receiver { diff --git a/storage/local/bolt_store.go b/storage/local/bolt_store.go index 930e081..0b6e840 100644 --- a/storage/local/bolt_store.go +++ b/storage/local/bolt_store.go @@ -267,7 +267,7 @@ func (s *BoltStore) PutMessage(info mailbox.Info, props mailbox.MessagePropertie if err != nil { return fmt.Errorf("cannot save message body: %w", err) } - s.log.Printf("Message saved: mailbox=%q uid=%d size=%d flags=%+v", name, uid, read, props.Flags) + s.log.Printf("Message saved: mailbox=%q uid=%d size=%d flags=%+v date=%q", name, uid, read, props.Flags, props.InternalDate) props := &msgProps{ Flags: props.Flags, @@ -286,7 +286,7 @@ func (s *BoltStore) PutMessage(info mailbox.Info, props mailbox.MessagePropertie return messageID, err } -func (s *BoltStore) FetchMessages(ctx context.Context, messages chan *mailbox.Message) error { +func (s *BoltStore) FetchMessages(ctx context.Context, since time.Time, messages chan *mailbox.Message) error { defer close(messages) if s.selected == "" { @@ -294,6 +294,9 @@ func (s *BoltStore) FetchMessages(ctx context.Context, messages chan *mailbox.Me } name := s.selected + // removes a day + since = lib.SafePadding(since) + err := s.db.View(func(tx *bolt.Tx) error { var err error @@ -321,6 +324,10 @@ func (s *BoltStore) FetchMessages(ctx context.Context, messages chan *mailbox.Me return err } } + if !since.IsZero() && properties.Date.Before(since) { + // skip this message + return nil + } // uncompress data reader, err := zlib.NewReader(bytes.NewReader(value)) if err != nil { diff --git a/storage/logger_test.go b/storage/logger_test.go deleted file mode 100644 index 423dfed..0000000 --- a/storage/logger_test.go +++ /dev/null @@ -1,15 +0,0 @@ -package storage - -import "testing" - -type testLogger struct { - t *testing.T -} - -func (l *testLogger) Print(a ...any) { - l.t.Log(a...) -} - -func (l *testLogger) Printf(format string, a ...any) { - l.t.Logf(format, a...) -} diff --git a/storage/mdir/maildir.go b/storage/mdir/maildir.go index 94aa45b..a7ef30d 100644 --- a/storage/mdir/maildir.go +++ b/storage/mdir/maildir.go @@ -139,7 +139,7 @@ func (m *Maildir) PutMessage(info mailbox.Info, props mailbox.MessageProperties, } return mailbox.EmptyMessageID, fmt.Errorf("message body size advertised as %d bytes but read %d bytes from buffer", props.Size, copied) } - m.log.Printf("Message saved: mailbox=%q key=%q size=%d", name, key, copied) + m.log.Printf("Message saved: mailbox=%q key=%q size=%d flags=%v date=%q", name, key, copied, props.Flags, props.InternalDate) filename, err := mbox.Filename(key) if err == nil { @@ -171,13 +171,16 @@ func (m *Maildir) createFromStream(mbox maildir.Dir, flags []string, body io.Rea return key, copied, nil } -func (m *Maildir) FetchMessages(ctx context.Context, messages chan *mailbox.Message) error { +func (m *Maildir) FetchMessages(ctx context.Context, since time.Time, messages chan *mailbox.Message) error { defer close(messages) if m.selected == "" { return lib.ErrNotSelected } + // removes a day + since = lib.SafePadding(since) + name := m.selected mbox := maildir.Dir(filepath.Join(m.root, name)) keys, err := mbox.Keys() @@ -201,6 +204,10 @@ func (m *Maildir) FetchMessages(ctx context.Context, messages chan *mailbox.Mess if err != nil { return fmt.Errorf("cannot stat %q: %w", filename, err) } + if !since.IsZero() && info.ModTime().Before(since) { + // skip this message + continue + } file, err := mbox.Open(key) if err != nil { return fmt.Errorf("cannot open key %q: %w", key, err) diff --git a/storage/mem/memory.go b/storage/mem/memory.go index 990afec..f9a9095 100644 --- a/storage/mem/memory.go +++ b/storage/mem/memory.go @@ -127,17 +127,24 @@ func (m *Backend) PutMessage(info mailbox.Info, props mailbox.MessageProperties, return mailbox.NewMessageIDFromUint(uid), nil } -func (m *Backend) FetchMessages(ctx context.Context, messages chan *mailbox.Message) error { +func (m *Backend) FetchMessages(ctx context.Context, since time.Time, messages chan *mailbox.Message) error { defer close(messages) if m.selected == "" { return lib.ErrNotSelected } + // removes a day + since = lib.SafePadding(since) + for uid, msg := range m.data[m.selected].messages { if ctx.Err() != nil { return ctx.Err() } + if !since.IsZero() && msg.date.Before(since) { + // skip this message + continue + } limitReader := limitio.NewReader(bytes.NewReader(msg.content)) limitReader.SetRateLimit(1024*1024, 1024) // limit 1MiB/s diff --git a/storage/messages.go b/storage/messages.go index e093b61..e5005fb 100644 --- a/storage/messages.go +++ b/storage/messages.go @@ -5,6 +5,7 @@ import ( "crypto/sha256" "fmt" "io" + "time" "github.com/creativeprojects/imap/mailbox" ) @@ -15,7 +16,7 @@ func LoadMessageProperties(ctx context.Context, backend Backend, mbox mailbox.In receiver := make(chan *mailbox.Message, 10) done := make(chan error, 1) go func() { - done <- backend.FetchMessages(ctx, receiver) + done <- backend.FetchMessages(ctx, time.Time{}, receiver) }() for msg := range receiver { diff --git a/storage/remote/imap.go b/storage/remote/imap.go index 1b6b76f..75f1db1 100644 --- a/storage/remote/imap.go +++ b/storage/remote/imap.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "sync" + "time" "github.com/creativeprojects/imap/lib" "github.com/creativeprojects/imap/mailbox" @@ -212,20 +213,40 @@ func (i *Imap) PutMessage(info mailbox.Info, props mailbox.MessageProperties, bo name, read, flags, err, ) } - i.log.Printf("Message saved: mailbox=%q uid=%v size=%d bytes flags=%v", name, uid, read, flags) + i.log.Printf("Message saved: mailbox=%q uid=%v size=%d flags=%v date=%q", name, uid, read, flags, props.InternalDate) return mailbox.NewMessageIDFromUint(uid), nil } -func (i *Imap) FetchMessages(ctx context.Context, messages chan *mailbox.Message) error { +func (i *Imap) FetchMessages(ctx context.Context, since time.Time, messages chan *mailbox.Message) error { defer close(messages) if i.selected == nil { return lib.ErrNotSelected } - seqset := new(imap.SeqSet) - seqset.AddRange(1, i.selected.Messages) + var seqset *imap.SeqSet + + if !since.IsZero() { + // removes a day + since = lib.SafePadding(since) + i.log.Printf("searching for emails after %s", since) + seqNums, err := i.client.Search(&imap.SearchCriteria{Since: since}) + if err != nil { + i.log.Printf("error filtering emails by date: %s", err) + } + if len(seqNums) == 0 { + // no message + return nil + } + seqset = new(imap.SeqSet) + seqset.AddNum(seqNums...) + } + if seqset == nil { + // download all messages + seqset = new(imap.SeqSet) + seqset.AddRange(1, i.selected.Messages) + } section := &imap.BodySectionName{Peek: true} items := []imap.FetchItem{section.FetchItem(), imap.FetchFlags, imap.FetchUid, imap.FetchInternalDate} @@ -243,7 +264,7 @@ func (i *Imap) FetchMessages(ctx context.Context, messages chan *mailbox.Message go func() { defer wg.Done() for msg := range receiver { - i.log.Printf("Received IMAP message seq=%d", msg.SeqNum) + i.log.Printf("Received IMAP message seq=%d flags=%+v date=%q", msg.SeqNum, msg.Flags, msg.InternalDate) // receive all the messages as they get in message := &mailbox.Message{ MessageProperties: mailbox.MessageProperties{ diff --git a/storage/remote/imap_test.go b/storage/remote/imap_test.go index 1d5ecaa..7a44813 100644 --- a/storage/remote/imap_test.go +++ b/storage/remote/imap_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/creativeprojects/imap/lib" "github.com/creativeprojects/imap/storage/test" compress "github.com/emersion/go-imap-compress" "github.com/emersion/go-imap/backend/memory" @@ -20,6 +21,7 @@ func TestImapBackend(t *testing.T) { // Create a new server server := server.New(be) + server.ErrorLog = lib.NewTestLogger(t, "server") // Since we will use this server for testing only, we can allow plain text // authentication over non-encrypted connections server.AllowInsecureAuth = true @@ -39,11 +41,12 @@ func TestImapBackend(t *testing.T) { time.Sleep(100 * time.Millisecond) backend, err := NewImap(Config{ - ServerURL: listener.Addr().String(), - Username: "username", - Password: "password", - NoTLS: true, - CacheDir: t.TempDir(), + ServerURL: listener.Addr().String(), + Username: "username", + Password: "password", + NoTLS: true, + CacheDir: t.TempDir(), + DebugLogger: lib.NewTestLogger(t, "client"), }) assert.NoError(t, err) diff --git a/storage/test/backend.go b/storage/test/backend.go index 6701d73..e54f201 100644 --- a/storage/test/backend.go +++ b/storage/test/backend.go @@ -164,7 +164,7 @@ func RunTestsOnBackend(t *testing.T, backend storage.Backend) { receiver := make(chan *mailbox.Message, 10) done := make(chan error, 1) go func() { - done <- backend.FetchMessages(context.Background(), receiver) + done <- backend.FetchMessages(context.Background(), time.Time{}, receiver) }() count := 0 @@ -238,7 +238,7 @@ func RunTestsOnBackend(t *testing.T, backend storage.Backend) { receiver := make(chan *mailbox.Message, 10) done := make(chan error, 1) go func() { - done <- backend.FetchMessages(context.Background(), receiver) + done <- backend.FetchMessages(context.Background(), time.Time{}, receiver) }() wg := sync.WaitGroup{} @@ -341,6 +341,54 @@ func RunTestsOnBackend(t *testing.T, backend storage.Backend) { assert.NoError(t, err) }) + t.Run("AddsNewerMessage", func(t *testing.T) { + info := mailbox.Info{ + Delimiter: backend.Delimiter(), + Name: "Work", + } + props := mailbox.MessageProperties{ + Flags: sampleMessageFlags, + InternalDate: sampleMessageDate.Add(24 * time.Hour), + Size: uint32(len(sampleMessage)), + } + body := bytes.NewBufferString(sampleMessage) + _, err := backend.PutMessage(info, props, body) + assert.NoError(t, err) + + // Verify the mailbox has 4 messages + status, err := backend.SelectMailbox(info) + assert.NoError(t, err) + assert.Equal(t, uint32(4), status.Messages) + + // Verify we download the last message only + receiver := make(chan *mailbox.Message, 10) + done := make(chan error, 1) + go func() { + done <- backend.FetchMessages(context.Background(), sampleMessageDate.Add(26*time.Hour), receiver) + }() + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + count := 0 + for msg := range receiver { + count++ + msg.Body.Close() + } + // only 1 message should have been downloaded + assert.Equal(t, 1, count) + }() + // wait until all the messages arrived + err = <-done + assert.NoError(t, err) + + wg.Wait() + + err = backend.UnselectMailbox() + assert.NoError(t, err) + }) + t.Run("StoreOneAction", func(t *testing.T) { info := mailbox.Info{ Delimiter: backend.Delimiter(),