forked from ConduitIO/conduit-connector-kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.go
255 lines (227 loc) · 7.3 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
// Copyright © 2022 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:generate mockgen -destination mock/consumer.go -package mock -mock_names=Consumer=Consumer . Consumer
package kafka
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"sync"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/google/uuid"
"github.com/segmentio/kafka-go"
)
// Consumer represents a Kafka consumer in a simplified form,
// with just the functionality which is needed for this plugin.
// A Consumer's offset is being managed by the broker.
type Consumer interface {
// StartFrom instructs the consumer to connect to a broker and a topic, using the provided consumer group ID.
// The group ID is significant for this consumer's offsets.
// By using the same group ID after a restart, we make sure that the consumer continues from where it left off.
// Returns: An error, if the consumer could not be set to read from the given position, nil otherwise.
StartFrom(config Config, position []byte) error
// Get returns a message from the configured topic. Waits until a messages is available
// or until it errors out.
// Returns: a message (if available), the message's position and an error (if there was one).
Get(ctx context.Context) (*kafka.Message, []byte, error)
Ack(position sdk.Position) error
// Close this consumer and the associated resources (e.g. connections to the broker)
Close() error
}
type position struct {
GroupID string `json:"groupID"`
Topic string `json:"topic"`
Partition int `json:"partition"`
Offset int64 `json:"offset"`
}
func (p position) json() ([]byte, error) {
bytes, err := json.Marshal(p)
if err != nil {
return nil, fmt.Errorf("couldn't transform position into json: %w", err)
}
return bytes, nil
}
func parsePosition(bytes []byte) (position, error) {
pos := position{}
if len(bytes) == 0 {
return pos, nil
}
err := json.Unmarshal(bytes, &pos)
if err != nil {
return position{}, err
}
return pos, nil
}
type segmentConsumer struct {
reader *kafka.Reader
// unackMessages represents all messages which have been read but not acknowledged.
// They are ordered in the way they were read.
unackMessages []*kafka.Message
// umm is used to guard access to unackMessages
umm sync.Mutex
}
// NewConsumer creates a new Kafka consumer. The consumer needs to be started
// (using the StartFrom method) before actually being used.
func NewConsumer() (Consumer, error) {
return &segmentConsumer{}, nil
}
func (c *segmentConsumer) StartFrom(config Config, positionBytes []byte) error {
// todo if we can assume that a new Config instance will always be created by calling Parse(),
// and that the instance will not be mutated, then we can leave it out these checks.
if len(config.Servers) == 0 {
return ErrServersMissing
}
if config.Topic == "" {
return ErrTopicMissing
}
position, err := parsePosition(positionBytes)
if err != nil {
return fmt.Errorf("couldn't parse position: %w", err)
}
err = c.newReader(config, position.GroupID)
if err != nil {
return fmt.Errorf("couldn't create reader: %w", err)
}
return nil
}
func (c *segmentConsumer) newReader(cfg Config, groupID string) error {
readerCfg := kafka.ReaderConfig{
Brokers: cfg.Servers,
Topic: cfg.Topic,
WatchPartitionChanges: true,
}
// Group ID
if groupID == "" {
readerCfg.GroupID = uuid.NewString()
} else {
readerCfg.GroupID = groupID
}
// StartOffset
if cfg.ReadFromBeginning {
readerCfg.StartOffset = kafka.FirstOffset
} else {
readerCfg.StartOffset = kafka.LastOffset
}
// TLS config
if cfg.useTLS() {
err := c.withTLS(&readerCfg, cfg)
if err != nil {
return fmt.Errorf("failed to set up TLS: %w", err)
}
}
// SASL
if cfg.saslEnabled() {
err := c.withSASL(&readerCfg, cfg)
if err != nil {
return fmt.Errorf("couldn't configure SASL: %w", err)
}
}
c.reader = kafka.NewReader(readerCfg)
return nil
}
func (c *segmentConsumer) withTLS(readerCfg *kafka.ReaderConfig, cfg Config) error {
tlsCfg, err := newTLSConfig(cfg.ClientCert, cfg.ClientKey, cfg.CACert, cfg.InsecureSkipVerify)
if err != nil {
return fmt.Errorf("invalid TLS config: %w", err)
}
if readerCfg.Dialer == nil {
readerCfg.Dialer = &kafka.Dialer{}
}
readerCfg.Dialer.DualStack = true
readerCfg.Dialer.TLS = tlsCfg
return nil
}
func (c *segmentConsumer) withSASL(readerCfg *kafka.ReaderConfig, cfg Config) error {
if readerCfg.Dialer == nil {
readerCfg.Dialer = &kafka.Dialer{}
}
if !cfg.saslEnabled() {
return errors.New("input config has no SASL parameters")
}
mechanism, err := newSASLMechanism(cfg.SASLMechanism, cfg.SASLUsername, cfg.SASLPassword)
if err != nil {
return fmt.Errorf("couldn't configure SASL mechanism: %w", err)
}
readerCfg.Dialer.SASLMechanism = mechanism
return nil
}
func (c *segmentConsumer) Get(ctx context.Context) (*kafka.Message, []byte, error) {
msg, err := c.reader.FetchMessage(ctx)
if err != nil {
return nil, nil, fmt.Errorf("couldn't read message: %w", err)
}
position, err := c.positionOf(&msg)
if err != nil {
return nil, nil, fmt.Errorf("couldn't get message's position: %w", err)
}
c.umm.Lock()
defer c.umm.Unlock()
c.unackMessages = append(c.unackMessages, &msg)
return &msg, position, nil
}
func (c *segmentConsumer) positionOf(m *kafka.Message) ([]byte, error) {
p := position{
GroupID: c.readerID(),
Topic: m.Topic,
Partition: m.Partition,
Offset: m.Offset,
}
return p.json()
}
func (c *segmentConsumer) Ack(position sdk.Position) error {
c.umm.Lock()
defer c.umm.Unlock()
err := c.canAck(position)
if err != nil {
return fmt.Errorf("ack not possible: %w", err)
}
err = c.reader.CommitMessages(context.Background(), *c.unackMessages[0])
if err != nil {
return fmt.Errorf("couldn't commit messages: %w", err)
}
// remove the message from slice of unacknowledged messages
c.unackMessages = c.unackMessages[1:]
return nil
}
func (c *segmentConsumer) canAck(position sdk.Position) error {
if len(c.unackMessages) == 0 {
return fmt.Errorf("requested ack for %q but no unacknowledged messages found", position)
}
pos, err := c.positionOf(c.unackMessages[0])
if err != nil {
return fmt.Errorf("failed to get position of first unacknowledged message: %w", err)
}
// We're going to yell at Conduit for not keeping its promise:
// acks should be requested in the same order reads were done.
if !bytes.Equal(pos, position) {
return fmt.Errorf("ack is out-of-order, requested ack for %q, but first unack. message is %q", position, pos)
}
return err
}
func (c *segmentConsumer) Close() error {
if c.reader == nil {
return nil
}
// this will also make the loops in the reader goroutines stop
err := c.reader.Close()
if err != nil {
return fmt.Errorf("couldn't close reader: %w", err)
}
return nil
}
func (c *segmentConsumer) readerID() string {
return c.reader.Config().GroupID
}