diff --git a/README.md b/README.md index 7e77dcc6..4ea7a112 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,7 @@ docker-compose up -d 3. Apply the topic configs in [`examples/local-cluster/topics`](/examples/local-cluster/topics): ``` -topicctl apply --skip-confirm examples/local-cluster/topics/*yaml +topicctl apply --skip-confirm examples/local-cluster/topics/*.yaml ``` 4. Send some test messages to the `topic-default` topic: @@ -205,13 +205,62 @@ subcommands interactively. topicctl reset-offsets [topic] [group] [flags] ``` -The `reset-offsets` subcommand allows resetting the offsets for a consumer group in a topic. There are 2 main approaches for setting the offsets: - -1. Use a combination of `--partitions`, `--offset`, `--to-earliest` and `--to-latest` flags. `--partitions` flag specifies a list of partitions to be reset e.g. `1,2,3 ...`. If not used, the command defaults to resetting consumer group offsets for ALL of the partitions. `--offset` flag indicates the specific value that all desired consumer group partitions will be set to. If not set, it will default to -2. Finally, `--to-earliest` flag resets offsets of consumer group members to earliest offsets of partitions while `--to-latest` resets offsets of consumer group members to latest offsets of partitions. However, only one of the `--to-earliest`, `--to-latest` and `--offset` flags can be used at a time. This approach is easy to use but lacks the ability for detailed offset configuration. - -2. Use `--partition-offset-map` flag to specify a detailed offset configuration for individual partitions. For example, `1=5,2=10,7=12,...` means that the consumer group offset for partition 1 must be set to 5, partition 2 to offset 10, partition 7 to offset 12 and so on. This approach provides greater flexibility and fine-grained control for this operation. Note that `--partition-offset-map` flag is standalone and cannot be coupled with any of the previous flags. - - +The `reset-offsets` subcommand allows resetting the offsets +for a consumer group in a topic. +There are a few typical approaches for setting the offsets: + +1. Use `--delete` alongside `--before-earliest`: + This will unblock consumers which are stuck on an offsets + which are no longer in range, + without affecting healthy consumers. + Typically this follows an outage or sustained slow consumption. +2. Use one of the partition selectors: + `--before-earliest`, `--after-latest`, or `--partitions`, + and combine it with one of the offset operators: + `--delete`, `--offset`, `--to-earliest` or `--to-latest`. + Aside from `--to-latest`, this is a legacy approach that is largely + superseded by approach 1. +3. Use `--partition-offset-map` to pass specific offsets per partition. + For example, `1=5,2=10` means that the consumer group offset + for partition 1 must be set to 5, and partition 2 to offset 10. + This is mainly used for replays of specific traffic, + such as when a deploy has mishandled or corrupted state, + and the prior release must be rerun + starting at a specific offset per partition. + This is the most flexible approach for offset setting. + +Note that `--partition-offset-map` flag is standalone +and cannot be coupled with other flags. + +##### Partition selection flags + +At most one of the following may be selected: + +* `--partitions` specifies a comma-separated list of partitions IDs. +* `--before-earliest` selects partitions whose group offset is older + than the oldest still-retained offset. +* `--after-latest` selects partitions whose group offset is newer + than the newest offset that has been published to the topic. + +If none of these are specified, +the command defaults to selecting ALL of the partitions. + +##### Offset selection flags + +At most one of the following may be selected: + +* `--delete` removes stored group offsets. + This will generally have the same effect as `--to-earliest` or `--to-latest`, + depending on the consumer group configuration. + However, `--delete` is more reliable and convenient, + since `--to-earliest` in particular involves a race with message retention + that may require numerous attempts. +* `--offset` indicates the specific value that all selected + consumer group partitions will be set to. +* `--to-earliest` resets group offsets to oldest still-retained per partition. +* `--to-latest` resets group offsets to newest per partitions. + +If none of these are specified, `--to-earliest` will be the default. #### tail diff --git a/cmd/topicctl/subcmd/reset.go b/cmd/topicctl/subcmd/reset.go index 4fa4981e..4b49f8c0 100644 --- a/cmd/topicctl/subcmd/reset.go +++ b/cmd/topicctl/subcmd/reset.go @@ -6,7 +6,6 @@ import ( "fmt" "strconv" - "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/apply" "github.com/segmentio/topicctl/pkg/cli" "github.com/segmentio/topicctl/pkg/groups" @@ -26,6 +25,8 @@ type resetOffsetsCmdConfig struct { offset int64 partitions []int partitionOffsetMap map[string]int64 + beforeEarliest bool + afterLatest bool toEarliest bool toLatest bool delete bool @@ -36,65 +37,99 @@ type resetOffsetsCmdConfig struct { var resetOffsetsConfig resetOffsetsCmdConfig func init() { - resetOffsetsCmd.Flags().Int64Var( - &resetOffsetsConfig.offset, + cfg := &resetOffsetsConfig + cmd := resetOffsetsCmd + flags := cmd.Flags() + + flags.Int64Var( + &cfg.offset, "offset", -2, "Desired offset for the target partitions", ) - resetOffsetsCmd.Flags().IntSliceVar( - &resetOffsetsConfig.partitions, + flags.IntSliceVar( + &cfg.partitions, "partitions", []int{}, "List of partitions to reset e.g. 1,2,3,.. (defaults to all)", ) - resetOffsetsCmd.Flags().StringToInt64Var( - &resetOffsetsConfig.partitionOffsetMap, + flags.StringToInt64Var( + &cfg.partitionOffsetMap, "partition-offset-map", map[string]int64{}, "Map of partition IDs to their corresponding desired offsets e.g. 1=5,2=10,3=12,...", ) - resetOffsetsCmd.Flags().BoolVar( - &resetOffsetsConfig.toEarliest, + flags.BoolVar( + &cfg.beforeEarliest, + "before-earliest", + false, + "Apply only to offsets below the partition minimum", + ) + flags.BoolVar( + &cfg.afterLatest, + "after-latest", + false, + "Apply only to offsets above the partition maximum", + ) + flags.BoolVar( + &cfg.toEarliest, "to-earliest", false, - "Resets offsets of consumer group members to earliest offsets of partitions") - resetOffsetsCmd.Flags().BoolVar( - &resetOffsetsConfig.toLatest, + "Resets offsets of consumer group members to earliest offsets of partitions", + ) + flags.BoolVar( + &cfg.toLatest, "to-latest", false, - "Resets offsets of consumer group members to latest offsets of partitions") - resetOffsetsCmd.Flags().BoolVar( - &resetOffsetsConfig.delete, + "Resets offsets of consumer group members to latest offsets of partitions", + ) + flags.BoolVar( + &cfg.delete, "delete", false, - "Deletes offsets for the given consumer group") + "Deletes offsets for the given consumer group", + ) - addSharedFlags(resetOffsetsCmd, &resetOffsetsConfig.shared) - RootCmd.AddCommand(resetOffsetsCmd) + addSharedFlags(cmd, &resetOffsetsConfig.shared) + RootCmd.AddCommand(cmd) } func resetOffsetsPreRun(cmd *cobra.Command, args []string) error { - resetOffsetSpec := "You must choose only one of the following " + - "reset-offset specifications: --delete, --to-earliest, --to-latest, " + - "--offset, or --partition-offset-map." - offsetMapSpec := "--partition-offset-map option cannot be used with --partitions." + const ( + resetOffsetSpec = "You must choose only one of the following " + + "reset-offset specifications: --delete, --to-earliest, --to-latest, " + + "--offset, or --partition-offset-map" + offsetMapSpec = "--partition-offset-map option cannot be used with " + + "--partitions, --before-earliest, or --after-latest" + rangeSpec = "--before-earliest cannot be combined with --after-latest" + ) cfg := resetOffsetsConfig + hasMap := len(cfg.partitionOffsetMap) > 0 + hasSlice := len(cfg.partitions) > 0 + numOffsetSpecs := numTrue( cfg.toEarliest, cfg.toLatest, cfg.delete, cmd.Flags().Changed("offset"), - len(cfg.partitionOffsetMap) > 0, + hasMap, ) if numOffsetSpecs > 1 { return errors.New(resetOffsetSpec) } - if len(cfg.partitionOffsetMap) > 0 && len(cfg.partitions) > 0 { + if cfg.beforeEarliest && cfg.afterLatest { + return errors.New(rangeSpec) + } + + if hasMap && hasSlice { + return errors.New(offsetMapSpec) + } + + if numTrue(hasMap, cfg.beforeEarliest, cfg.afterLatest) > 1 { return errors.New(offsetMapSpec) } @@ -105,6 +140,9 @@ func resetOffsetsRun(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + topic := args[0] + group := args[1] + cfg := resetOffsetsConfig adminClient, err := cfg.shared.getAdminClient(ctx, nil, true) @@ -116,30 +154,24 @@ func resetOffsetsRun(cmd *cobra.Command, args []string) error { connector := adminClient.GetConnector() - topic := args[0] - group := args[1] + getLagsInput := groups.GetMemberLagsInput{ + GroupID: group, + Topic: topic, - topicInfo, err := adminClient.GetTopic(ctx, topic, false) - if err != nil { - return err + // We need partition-accurate range bounds, + // but don't care about consumer-group message timings. + FullRange: true, } - partitionIDsMap := make(map[int]struct{}, len(topicInfo.Partitions)) - for _, partitionInfo := range topicInfo.Partitions { - partitionIDsMap[partitionInfo.ID] = struct{}{} + partitionLags, err := groups.GetMemberLags(ctx, connector, &getLagsInput) + if err != nil { + return err } - var strategy string - - switch { - case resetOffsetsConfig.toLatest: - strategy = groups.LatestResetOffsetsStrategy - case resetOffsetsConfig.toEarliest: - strategy = groups.EarliestResetOffsetsStrategy - } + infoByPartition := sliceToMapKeyFunc(partitionLags, func(v *groups.MemberPartitionLag) int { return v.Partition }) // If explicit per-partition offsets were specified, set them now. - partitionOffsets, err := parsePartitionOffsetMap(partitionIDsMap, cfg.partitionOffsetMap) + partitionOffsets, err := parsePartitionOffsetMap(infoByPartition, cfg.partitionOffsetMap) if err != nil { return err } @@ -149,47 +181,67 @@ func resetOffsetsRun(cmd *cobra.Command, args []string) error { // these will only take effect of per-partition offsets were not specified. partitions := cfg.partitions if len(partitions) == 0 && len(partitionOffsets) == 0 { - convert := func(info admin.PartitionInfo) int { return info.ID } - partitions = convertSlice(topicInfo.Partitions, convert) + partitions = mapKeys(infoByPartition) } - for _, partition := range partitions { - _, ok := partitionIDsMap[partition] - if !ok { + // Re-append applicable partitions back over the same slice. + n := len(partitions) + partitions = partitions[:0] + + for _, partition := range partitions[:n] { + info := infoByPartition[partition] + if info == nil { format := "Partition %d not found in topic %s" return fmt.Errorf(format, partition, topic) } - if strategy == "" { - partitionOffsets[partition] = cfg.offset - return nil + // Skip partitions with in-range group offsets. + switch { + case cfg.beforeEarliest && info.MemberOffset >= info.OldestOffset: + continue + case cfg.afterLatest && info.MemberOffset <= info.NewestOffset: + continue } - input := groups.GetEarliestOrLatestOffsetInput{ - Connector: connector, - Strategy: strategy, - Topic: topic, - Partition: partition, - } + partitions = append(partitions, partition) - offset, err := groups.GetEarliestOrLatestOffset(ctx, &input) - if err != nil { - return err + offset := cfg.offset + switch { + case cfg.delete: + continue // storing an offset is not applicable when deleting. + case cfg.toEarliest: + offset = info.OldestOffset + case cfg.toLatest: + offset = info.NewestOffset } partitionOffsets[partition] = offset } - log.Infof( - "This will reset the offsets for the following partitions "+ - "in topic %s for group %s:\n%s", - topic, - group, - groups.FormatPartitionOffsets(partitionOffsets), - ) + if cfg.delete { - log.Info("Please ensure that all other consumers are stopped, " + - "otherwise the reset might be overridden.") + } + + message := "This will reset the offsets for the following partitions " + + "in topic %s for group %s:\n%s" + formatTable := func() string { return groups.FormatPartitionOffsets(partitionOffsets) } + + if cfg.delete { + message = "This will delete the offsets for the following partitions " + + "in topic %s for group %s:\n%s" + formatTable = func() string { return groups.FormatPartitions(partitions) } + } + + log.Infof(message, topic, group, formatTable()) + + // Stopping consumers is typically only relevant to resets, + // since deleting offsets is usually just for unblocking stuck partitions: + // if the group offset for a partition is being actively updated, + // then it's not stuck. + if !cfg.delete { + log.Info("Please ensure that all other consumers are stopped, " + + "otherwise the reset might be overridden.") + } ok, _ := apply.Confirm("OK to continue?", false) if !ok { @@ -198,7 +250,7 @@ func resetOffsetsRun(cmd *cobra.Command, args []string) error { cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner) - if resetOffsetsConfig.delete { + if cfg.delete { input := groups.DeleteOffsetsInput{ GroupID: group, Topic: topic, @@ -228,6 +280,32 @@ func numTrue(bools ...bool) int { return n } +func mapKeys[K comparable, V any](m map[K]V) []K { + s := make([]K, 0, len(m)) + + for k := range m { + s = append(s, k) + } + + return s +} + +func sliceToMapKeyFunc[K comparable, V any](s []V, fn func(*V) K) map[K]*V { + return sliceToMapFunc(s, func(v *V) (K, *V) { return fn(v), v }) +} + +func sliceToMapFunc[K comparable, V1, V2 any](s []V1, fn func(*V1) (K, V2)) map[K]V2 { + m := make(map[K]V2, len(s)) + + for i := range s { + v1 := &s[i] + k, v2 := fn(v1) + m[k] = v2 + } + + return m +} + func convertSlice[T1, T2 any](input []T1, fn func(T1) T2) []T2 { out := make([]T2, len(input)) @@ -238,7 +316,7 @@ func convertSlice[T1, T2 any](input []T1, fn func(T1) T2) []T2 { return out } -func parsePartitionOffsetMap(partitionIDsMap map[int]struct{}, input map[string]int64) (map[int]int64, error) { +func parsePartitionOffsetMap[T any](partitionIDsMap map[int]T, input map[string]int64) (map[int]int64, error) { out := make(map[int]int64, len(input)) for partition, offset := range input { diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 0684208e..2135d878 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -372,14 +372,13 @@ func (c *CLIRunner) GetMemberLags( return fmt.Errorf("Error fetching topic info: %+v", err) } - memberLags, err := groups.GetMemberLags( - ctx, - c.adminClient.GetConnector(), - topic, - groupID, - ) - c.stopSpinner() + getLagsInput := groups.GetMemberLagsInput{ + GroupID: groupID, + Topic: topic, + } + memberLags, err := groups.GetMemberLags(ctx, c.adminClient.GetConnector(), &getLagsInput) + c.stopSpinner() if err != nil { return err } diff --git a/pkg/groups/format.go b/pkg/groups/format.go index c1ea801c..28780560 100644 --- a/pkg/groups/format.go +++ b/pkg/groups/format.go @@ -3,7 +3,9 @@ package groups import ( "bytes" "fmt" + "slices" "sort" + "strconv" "strings" "time" @@ -270,12 +272,39 @@ func FormatMemberLags(memberLags []MemberPartitionLag, full bool) string { return string(bytes.TrimRight(buf.Bytes(), "\n")) } +// FormatPartitions generates a pretty table that shows a list of partitions. +func FormatPartitions(partitions []int) string { + var buf bytes.Buffer + + table := tablewriter.NewWriter(&buf) + table.SetHeader([]string{"Partition"}) + table.SetAutoWrapText(false) + table.SetColumnAlignment([]int{tablewriter.ALIGN_RIGHT}) + + table.SetBorders(tablewriter.Border{ + Left: false, + Top: true, + Right: false, + Bottom: true, + }) + + slices.Sort(partitions) + + for _, partition := range partitions { + table.Append([]string{strconv.Itoa(partition)}) + } + + table.Render() + + return string(bytes.TrimRight(buf.Bytes(), "\n")) +} + // FormatPartitionOffsets generates a pretty table that shows the proposed offsets for each // partition in a reset. func FormatPartitionOffsets(partitionOffsets map[int]int64) string { - buf := &bytes.Buffer{} + var buf bytes.Buffer - table := tablewriter.NewWriter(buf) + table := tablewriter.NewWriter(&buf) table.SetHeader( []string{ "Partition", diff --git a/pkg/groups/groups.go b/pkg/groups/groups.go index fe7ae7fc..b05f19c7 100644 --- a/pkg/groups/groups.go +++ b/pkg/groups/groups.go @@ -74,11 +74,7 @@ func GetGroups( } // GetGroupDetails returns the details (membership, etc.) for a single consumer group. -func GetGroupDetails( - ctx context.Context, - connector *admin.Connector, - groupID string, -) (*GroupDetails, error) { +func GetGroupDetails(ctx context.Context, connector *admin.Connector, groupID string) (*GroupDetails, error) { req := kafka.DescribeGroupsRequest{ GroupIDs: []string{groupID}, } @@ -135,15 +131,21 @@ func GetGroupDetails( return &groupDetails, nil } -// GetMemberLags returns the lag for each partition being consumed by the argument group in the -// argument topic. -func GetMemberLags( - ctx context.Context, - connector *admin.Connector, - topic string, - groupID string, -) ([]MemberPartitionLag, error) { - groupDetails, err := GetGroupDetails(ctx, connector, groupID) +// GetMemberLagsInput configures a call to [GetMemberLags]. +type GetMemberLagsInput struct { + GroupID string + Topic string + + // FullRange will make fetched partition ranges accurate + // from the partition's perspective, ignoring consumer group: + // this has the downside of potentially making MemberTime inaccurate. + FullRange bool +} + +// GetMemberLags returns the lag for each partition on the given topic, +// being consumed by the given group in the argument topic. +func GetMemberLags(ctx context.Context, connector *admin.Connector, input *GetMemberLagsInput) ([]MemberPartitionLag, error) { + groupDetails, err := GetGroupDetails(ctx, connector, input.GroupID) if err != nil { return nil, err } @@ -152,40 +154,49 @@ func GetMemberLags( return nil, errors.New("Group state is dead; check that group ID is valid") } - partitionMembers := groupDetails.PartitionMembers(topic) + partitionMembers := groupDetails.PartitionMembers(input.Topic) - offsets, err := connector.KafkaClient.ConsumerOffsets( - ctx, kafka.TopicAndGroup{ - Topic: topic, - GroupId: groupID, - }, - ) + offsetInput := kafka.TopicAndGroup{ + GroupId: input.GroupID, + Topic: input.Topic, + } + + offsets, err := connector.KafkaClient.ConsumerOffsets(ctx, offsetInput) if err != nil { return nil, err } - bounds, err := messages.GetAllPartitionBounds(ctx, connector, topic, offsets) + boundsOffsetsInput := offsets + if input.FullRange { + boundsOffsetsInput = nil + } + + bounds, err := messages.GetAllPartitionBounds(ctx, connector, input.Topic, boundsOffsetsInput) if err != nil { return nil, err } - partitionLags := []MemberPartitionLag{} + partitionLags := make([]MemberPartitionLag, len(bounds)) - for _, bound := range bounds { - partitionLag := MemberPartitionLag{ - Topic: topic, + for i, bound := range bounds { + lag := &partitionLags[i] + *lag = MemberPartitionLag{ + Topic: input.Topic, Partition: bound.Partition, MemberID: partitionMembers[bound.Partition].MemberID, - MemberOffset: offsets[bound.Partition], + OldestOffset: bound.FirstOffset, NewestOffset: bound.LastOffset, + MemberOffset: offsets[bound.Partition], + OldestTime: bound.FirstTime, NewestTime: bound.LastTime, } - if bound.FirstOffset == offsets[bound.Partition] { - partitionLag.MemberTime = bound.FirstTime + switch lag.MemberOffset { + case bound.LastOffset: + lag.MemberTime = bound.LastTime + case bound.FirstOffset: + lag.MemberTime = bound.FirstTime } - - partitionLags = append(partitionLags, partitionLag) } return partitionLags, nil @@ -260,7 +271,6 @@ func ResetOffsets(ctx context.Context, connector *admin.Connector, input *ResetO // GetEarliestOrLatestOffsetInput configures a call to [GetEarliestOrLatestOffset]. type GetEarliestOrLatestOffsetInput struct { - Connector *admin.Connector Strategy string Topic string Partition int @@ -268,12 +278,12 @@ type GetEarliestOrLatestOffsetInput struct { // GetEarliestorLatestOffset gets earliest/latest offset // for a given topic partition for resetting offsets of consumer group. -func GetEarliestOrLatestOffset(ctx context.Context, input *GetEarliestOrLatestOffsetInput) (int64, error) { +func GetEarliestOrLatestOffset(ctx context.Context, connector *admin.Connector, input *GetEarliestOrLatestOffsetInput) (int64, error) { if !isValidOffsetStrategy(input.Strategy) { return 0, errors.New("Invalid reset offset strategy provided.") } - partitionBound, err := messages.GetPartitionBounds(ctx, input.Connector, input.Topic, input.Partition, 0) + partitionBound, err := messages.GetPartitionBounds(ctx, connector, input.Topic, input.Partition, 0) if err != nil { return 0, err } diff --git a/pkg/groups/groups_test.go b/pkg/groups/groups_test.go index 6dab4c00..4bca1277 100644 --- a/pkg/groups/groups_test.go +++ b/pkg/groups/groups_test.go @@ -285,7 +285,12 @@ func TestGetLags(t *testing.T) { require.NoError(t, err) } - lags, err := GetMemberLags(ctx, connector, topicName, groupID) + getLagsInput := GetMemberLagsInput{ + GroupID: groupID, + Topic: topicName, + } + + lags, err := GetMemberLags(ctx, connector, &getLagsInput) require.NoError(t, err) require.Equal(t, 2, len(lags)) @@ -332,18 +337,17 @@ func TestGetEarliestOrLatestOffset(t *testing.T) { for _, partition := range groupPartitions { input := GetEarliestOrLatestOffsetInput{ - Connector: connector, Topic: topicName, Partition: partition, } input.Strategy = LatestResetOffsetsStrategy - offset, err := GetEarliestOrLatestOffset(ctx, &input) + offset, err := GetEarliestOrLatestOffset(ctx, connector, &input) require.NoError(t, err) assert.Equal(t, int64(4), offset) input.Strategy = EarliestResetOffsetsStrategy - offset, err = GetEarliestOrLatestOffset(ctx, &input) + offset, err = GetEarliestOrLatestOffset(ctx, connector, &input) require.NoError(t, err) assert.Equal(t, int64(0), offset) } @@ -390,7 +394,12 @@ func TestResetOffsets(t *testing.T) { err = ResetOffsets(ctx, connector, &input) require.NoError(t, err) - lags, err := GetMemberLags(ctx, connector, topicName, groupID) + getLagsInput := GetMemberLagsInput{ + GroupID: groupID, + Topic: topicName, + } + + lags, err := GetMemberLags(ctx, connector, &getLagsInput) require.NoError(t, err) require.Equal(t, 2, len(lags)) @@ -399,24 +408,22 @@ func TestResetOffsets(t *testing.T) { // latest offset of partition 0 getInput := GetEarliestOrLatestOffsetInput{ - Connector: connector, Topic: topicName, Strategy: LatestResetOffsetsStrategy, Partition: 0, } - latestOffset, err := GetEarliestOrLatestOffset(ctx, &getInput) + latestOffset, err := GetEarliestOrLatestOffset(ctx, connector, &getInput) require.NoError(t, err) // earliest offset of partition 1 getInput = GetEarliestOrLatestOffsetInput{ - Connector: connector, Topic: topicName, Strategy: EarliestResetOffsetsStrategy, Partition: 1, } - earliestOffset, err := GetEarliestOrLatestOffset(ctx, &getInput) + earliestOffset, err := GetEarliestOrLatestOffset(ctx, connector, &getInput) require.NoError(t, err) resetInput := ResetOffsetsInput{ @@ -431,7 +438,7 @@ func TestResetOffsets(t *testing.T) { err = ResetOffsets(ctx, connector, &resetInput) require.NoError(t, err) - lags, err = GetMemberLags(ctx, connector, topicName, groupID) + lags, err = GetMemberLags(ctx, connector, &getLagsInput) require.NoError(t, err) require.Equal(t, 2, len(lags)) diff --git a/pkg/groups/types.go b/pkg/groups/types.go index 989bddc9..8073be6d 100644 --- a/pkg/groups/types.go +++ b/pkg/groups/types.go @@ -83,9 +83,11 @@ type MemberPartitionLag struct { Topic string Partition int MemberID string + OldestOffset int64 NewestOffset int64 - NewestTime time.Time MemberOffset int64 + OldestTime time.Time + NewestTime time.Time MemberTime time.Time } diff --git a/pkg/messages/bounds.go b/pkg/messages/bounds.go index 8ad973f6..26396322 100644 --- a/pkg/messages/bounds.go +++ b/pkg/messages/bounds.go @@ -1,9 +1,10 @@ package messages import ( + "cmp" "context" "fmt" - "sort" + "slices" "time" "github.com/segmentio/kafka-go" @@ -119,8 +120,8 @@ func GetAllPartitionBounds( } } - sort.Slice(allBounds, func(a, b int) bool { - return allBounds[a].Partition < allBounds[b].Partition + slices.SortFunc(allBounds, func(a, b Bounds) int { + return cmp.Compare(a.Partition, b.Partition) }) return allBounds, nil