Skip to content

Commit

Permalink
fix(pubsub/pstest): make invalid filter return error instead of panic (
Browse files Browse the repository at this point in the history
  • Loading branch information
hongalex authored Nov 7, 2024
1 parent 3b06513 commit 45e1ce7
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 8 deletions.
15 changes: 8 additions & 7 deletions pubsub/pstest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,14 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p
}

sub := newSubscription(top, &s.mu, s.now, deadLetterTopic, ps)
if ps.Filter != "" {
filter, err := parseFilter(ps.Filter)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "bad filter: %v", err)
}
sub.filter = &filter
}

top.subs[ps.Name] = sub
s.subs[ps.Name] = sub
sub.start(&s.wg)
Expand Down Expand Up @@ -905,13 +913,6 @@ func newSubscription(t *topic, mu *sync.Mutex, timeNowFunc func() time.Time, dea
done: make(chan struct{}),
timeNowFunc: timeNowFunc,
}
if ps.Filter != "" {
filter, err := parseFilter(ps.Filter)
if err != nil {
panic(fmt.Sprintf("pstest: bad filter: %v", err))
}
sub.filter = &filter
}
return sub
}

Expand Down
31 changes: 30 additions & 1 deletion pubsub/pstest/fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,12 +1115,28 @@ func TestUpdateRetryPolicy(t *testing.T) {
}
}

func TestUpdateFilter(t *testing.T) {
func TestSubscriptionFilter(t *testing.T) {
ctx := context.Background()
pclient, sclient, _, cleanup := newFake(ctx, t)
defer cleanup()

top := mustCreateTopic(ctx, t, pclient, &pb.Topic{Name: "projects/P/topics/T"})

// Creating a subscription with invalid filter should return an error.
_, err := sclient.CreateSubscription(ctx, &pb.Subscription{
Name: "projects/p/subscriptions/s",
Topic: top.Name,
AckDeadlineSeconds: 30,
EnableMessageOrdering: true,
Filter: "bad filter",
})
if err == nil {
t.Fatal("expected bad filter error, got nil")
}
if st := status.Convert(err); st.Code() != codes.InvalidArgument {
t.Fatalf("got err status: %v, want: %v", st.Code(), codes.InvalidArgument)
}

sub := mustCreateSubscription(ctx, t, sclient, &pb.Subscription{
AckDeadlineSeconds: minAckDeadlineSecs,
Name: "projects/P/subscriptions/S",
Expand All @@ -1143,6 +1159,19 @@ func TestUpdateFilter(t *testing.T) {
if got, want := updated.Filter, update.Filter; got != want {
t.Fatalf("got %v, want %v", got, want)
}

// Updating a subscription with bad filter should return an error.
update.Filter = "bad filter"
updated, err = sclient.UpdateSubscription(ctx, &pb.UpdateSubscriptionRequest{
Subscription: update,
UpdateMask: &field_mask.FieldMask{Paths: []string{"filter"}},
})
if err == nil {
t.Fatal("expected bad filter error, got nil")
}
if st := status.Convert(err); st.Code() != codes.InvalidArgument {
t.Fatalf("got err status: %v, want: %v", st.Code(), codes.InvalidArgument)
}
}

func TestUpdateEnableExactlyOnceDelivery(t *testing.T) {
Expand Down

0 comments on commit 45e1ce7

Please sign in to comment.