Skip to content

Commit

Permalink
Update to v0.23.0 of pgmq (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
Craig Pastro authored Sep 4, 2023
1 parent 8054057 commit 4d7cfc2
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 48 deletions.
36 changes: 28 additions & 8 deletions pgmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,21 @@ func (p *PGMQ) Archive(ctx context.Context, queue string, msgID int64) (bool, er
// table by their ids. View messages on the archive table with sql:
//
// select * from pgmq_<queue_name>_archive;
func (p *PGMQ) ArchiveBatch(ctx context.Context, queue string, msgIDs []int64) (bool, error) {
var archived bool
err := p.db.QueryRow(ctx, "select pgmq_archive($1, $2::bigint[])", queue, msgIDs).Scan(&archived)
func (p *PGMQ) ArchiveBatch(ctx context.Context, queue string, msgIDs []int64) ([]bool, error) {
rows, err := p.db.Query(ctx, "select pgmq_archive($1, $2::bigint[])", queue, msgIDs)
if err != nil {
return false, wrapPostgresError(err)
return nil, wrapPostgresError(err)
}
defer rows.Close()

var archived []bool
for rows.Next() {
var b bool
err = rows.Scan(&b)
if err != nil {
return nil, wrapPostgresError(err)
}
archived = append(archived, b)
}

return archived, nil
Expand All @@ -264,11 +274,21 @@ func (p *PGMQ) Delete(ctx context.Context, queue string, msgID int64) (bool, err
// DeleteBatch deletes a batch of messages from the queue by their ids. This
// is a permanent delete and cannot be undone. If you want to retain a log of
// the messages, use the ArchiveBatch method.
func (p *PGMQ) DeleteBatch(ctx context.Context, queue string, msgIDs []int64) (bool, error) {
var deleted bool
err := p.db.QueryRow(ctx, "select pgmq_delete($1, $2::bigint[])", queue, msgIDs).Scan(&deleted)
func (p *PGMQ) DeleteBatch(ctx context.Context, queue string, msgIDs []int64) ([]bool, error) {
rows, err := p.db.Query(ctx, "select pgmq_delete($1, $2::bigint[])", queue, msgIDs)
if err != nil {
return false, wrapPostgresError(err)
return nil, wrapPostgresError(err)
}
defer rows.Close()

var deleted []bool
for rows.Next() {
var b bool
err = rows.Scan(&b)
if err != nil {
return nil, wrapPostgresError(err)
}
deleted = append(deleted, b)
}

return deleted, nil
Expand Down
68 changes: 28 additions & 40 deletions pgmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,14 @@ func TestArchiveBatch(t *testing.T) {
ids, err := q.SendBatch(ctx, queue, []map[string]any{testMsg1, testMsg2})
require.NoError(t, err)

// Add a msgID that definitely does not exist to the end.
ids = append(ids, -1)

archived, err := q.ArchiveBatch(ctx, queue, ids)
require.NoError(t, err)
require.True(t, archived)
require.Equal(t, []bool{true, true, false}, archived)

// Let's just check that something landed in the archive table.
// Let's check that the two messages landed in the archive table.
stmt := fmt.Sprintf("select * from pgmq_%s_archive", queue)
tag, err := q.db.Exec(ctx, stmt)
require.NoError(t, err)
Expand All @@ -268,18 +271,6 @@ func TestArchiveBatch(t *testing.T) {
require.ErrorIs(t, err, ErrNoRows)
}

func TestArchiveBatchNotExists(t *testing.T) {
ctx := context.Background()
queue := t.Name()

err := q.CreateQueue(ctx, queue)
require.NoError(t, err)

archived, err := q.ArchiveBatch(ctx, queue, []int64{100})
require.NoError(t, err)
require.True(t, archived)
}

func TestDelete(t *testing.T) {
ctx := context.Background()
queue := t.Name()
Expand Down Expand Up @@ -320,31 +311,23 @@ func TestDeleteBatch(t *testing.T) {
ids, err := q.SendBatch(ctx, queue, []map[string]any{testMsg1, testMsg2})
require.NoError(t, err)

// Add a msgID that definitely does not exist to the end.
ids = append(ids, -1)

deleted, err := q.DeleteBatch(ctx, queue, ids)
require.NoError(t, err)
require.True(t, deleted)
require.EqualValues(t, []bool{true, true, false}, deleted)

_, err = q.Read(ctx, queue, 0)
require.ErrorIs(t, err, ErrNoRows)
}

func TestDeleteBatchNotExists(t *testing.T) {
ctx := context.Background()
queue := t.Name()

err := q.CreateQueue(ctx, queue)
require.NoError(t, err)

archived, err := q.DeleteBatch(ctx, queue, []int64{100})
require.NoError(t, err)
require.True(t, archived)
}

func TestErrorCases(t *testing.T) {
ctx := context.Background()

queue := t.Name()
testErr := errors.New("an error")
cmdTag := pgconn.NewCommandTag("")

ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand All @@ -355,70 +338,75 @@ func TestErrorCases(t *testing.T) {
mockRow := mocks.NewMockRow(ctrl)

t.Run("createQueueError", func(t *testing.T) {
mockDB.EXPECT().Exec(ctx, "select pgmq_create($1)", queue).Return(pgconn.NewCommandTag(""), testErr)
mockDB.EXPECT().Exec(ctx, "select pgmq_create($1)", queue).Return(cmdTag, testErr)
err := q.CreateQueue(ctx, queue)
require.ErrorContains(t, err, "postgres error")
})

t.Run("dropQueueError", func(t *testing.T) {
mockDB.EXPECT().Exec(ctx, "select pgmq_drop_queue($1)", queue).Return(pgconn.NewCommandTag(""), testErr)
mockDB.EXPECT().Exec(ctx, "select pgmq_drop_queue($1)", queue).Return(cmdTag, testErr)
err := q.DropQueue(ctx, queue)
require.ErrorContains(t, err, "postgres error")
})

t.Run("sendError", func(t *testing.T) {
mockRow.EXPECT().Scan(gomock.Any()).Return(testErr)
mockDB.EXPECT().QueryRow(ctx, "select * from pgmq_send($1, $2)", queue, gomock.Any()).Return(mockRow)
mockRow.EXPECT().Scan(gomock.Any()).Return(testErr)
id, err := q.Send(ctx, queue, testMsg1)
require.EqualValues(t, 0, id)
require.ErrorContains(t, err, "postgres error")
})

t.Run("sendBatchError", func(t *testing.T) {
mockDB.EXPECT().Query(ctx, "select * from pgmq_send_batch($1, $2::jsonb[])", queue, gomock.Any()).Return(nil, testErr)
ids, err := q.SendBatch(ctx, queue, []map[string]any{testMsg1})
require.Nil(t, ids)
require.ErrorContains(t, err, "postgres error")
})

t.Run("readError", func(t *testing.T) {
mockRow.EXPECT().Scan(gomock.Any()).Return(testErr)
mockDB.EXPECT().QueryRow(ctx, "select * from pgmq_read($1, $2, $3)", queue, gomock.Any(), gomock.Any()).Return(mockRow)
mockRow.EXPECT().Scan(gomock.Any()).Return(testErr)
msg, err := q.Read(ctx, queue, 0)
require.Nil(t, msg)
require.ErrorContains(t, err, "postgres error")
})

t.Run("popError", func(t *testing.T) {
mockRow.EXPECT().Scan(gomock.Any()).Return(testErr)
mockDB.EXPECT().QueryRow(ctx, "select * from pgmq_pop($1)", queue).Return(mockRow)
mockRow.EXPECT().Scan(gomock.Any()).Return(testErr)
msg, err := q.Pop(ctx, queue)
require.Nil(t, msg)
require.ErrorContains(t, err, "postgres error")
})

t.Run("archiveError", func(t *testing.T) {
mockRow.EXPECT().Scan(gomock.Any()).Return(testErr)
mockDB.EXPECT().QueryRow(ctx, "select pgmq_archive($1, $2::bigint)", queue, gomock.Any()).Return(mockRow)
mockRow.EXPECT().Scan(gomock.Any()).Return(testErr)
archived, err := q.Archive(ctx, queue, 7)
require.False(t, archived)
require.ErrorContains(t, err, "postgres error")
})

t.Run("archiveBatchError", func(t *testing.T) {
mockRow.EXPECT().Scan(gomock.Any()).Return(testErr)
mockDB.EXPECT().QueryRow(ctx, "select pgmq_archive($1, $2::bigint[])", queue, gomock.Any()).Return(mockRow)
mockDB.EXPECT().Query(ctx, "select pgmq_archive($1, $2::bigint[])", queue, gomock.Any()).Return(nil, testErr)
archived, err := q.ArchiveBatch(ctx, queue, []int64{7})
require.False(t, archived)
require.Nil(t, archived)
require.ErrorContains(t, err, "postgres error")
})

t.Run("deleteError", func(t *testing.T) {
mockRow.EXPECT().Scan(gomock.Any()).Return(testErr)
mockDB.EXPECT().QueryRow(ctx, "select pgmq_delete($1, $2::bigint)", queue, gomock.Any()).Return(mockRow)
mockRow.EXPECT().Scan(gomock.Any()).Return(testErr)
deleted, err := q.Delete(ctx, queue, 7)
require.False(t, deleted)
require.ErrorContains(t, err, "postgres error")
})

t.Run("deleteBatchError", func(t *testing.T) {
mockRow.EXPECT().Scan(gomock.Any()).Return(testErr)
mockDB.EXPECT().QueryRow(ctx, "select pgmq_delete($1, $2::bigint[])", queue, gomock.Any()).Return(mockRow)
mockDB.EXPECT().Query(ctx, "select pgmq_delete($1, $2::bigint[])", queue, gomock.Any()).Return(nil, testErr)
deleted, err := q.DeleteBatch(ctx, queue, []int64{7})
require.False(t, deleted)
require.Nil(t, deleted)
require.ErrorContains(t, err, "postgres error")
})
}

0 comments on commit 4d7cfc2

Please sign in to comment.