diff --git a/internal/redis_ts/client.go b/internal/redis_ts/client.go index 15fb439..56ca9eb 100644 --- a/internal/redis_ts/client.go +++ b/internal/redis_ts/client.go @@ -35,7 +35,7 @@ func NewFailoverClient(failoverOpt *redis.FailoverOptions) *Client { func add(key *string, labels []*prompb.Label, metric *string, timestamp *int64, value *float64) redis.Cmder { args := make([]interface{}, 0, len(labels)+3) args = append(args, "TS.ADD", *key) - args = append(args, strconv.FormatInt(*timestamp/1000, 10)) + args = append(args, strconv.FormatInt(*timestamp, 10)) args = append(args, strconv.FormatFloat(*value, 'f', 6, 64)) args = append(args, "LABELS") hasNameLabel := false @@ -137,7 +137,7 @@ func (c *Client) Read(req *prompb.ReadRequest) (returnVal *prompb.ReadResponse, if err != nil { return nil, err } - cmd := c.rangeByLabels(labelMatchers, q.StartTimestampMs/1000, q.EndTimestampMs/1000) + cmd := c.rangeByLabels(labelMatchers, q.StartTimestampMs, q.EndTimestampMs) err = pipe.Process(cmd) if err != nil { return nil, err @@ -173,7 +173,7 @@ func (c *Client) Read(req *prompb.ReadRequest) (returnVal *prompb.ReadResponse, if err != nil { return nil, err } - tsSamples = append(tsSamples, prompb.Sample{Timestamp: parsedSample[0].(int64) * 1000, Value: value}) + tsSamples = append(tsSamples, prompb.Sample{Timestamp: parsedSample[0].(int64), Value: value}) } thisSeries := &prompb.TimeSeries{ Labels: tsLabels, diff --git a/internal/redis_ts/client_test.go b/internal/redis_ts/client_test.go index e790787..1fd68ae 100644 --- a/internal/redis_ts/client_test.go +++ b/internal/redis_ts/client_test.go @@ -3,9 +3,9 @@ package redis_ts import ( "github.com/prometheus/prometheus/prompb" "testing" + "time" "github.com/go-redis/redis" - "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" ) @@ -21,10 +21,12 @@ var redisClient = redis.NewClient(&redis.Options{ }) func TestWriteSingleSample(t *testing.T) { - now := model.Now() + now := time.Now() answerToLifeTheUniverse := 42.1 - samples := []*prompb.TimeSeries{ + redisClient.Del("test_series{label_1=value_1,label_2=value_2}") + + insertedSamples := []*prompb.TimeSeries{ { Labels: []*prompb.Label{ { @@ -42,7 +44,15 @@ func TestWriteSingleSample(t *testing.T) { }, Samples: []prompb.Sample{ { - Timestamp: now.Unix() * 1000, + Timestamp: now.UnixNano() / 1000, + Value: answerToLifeTheUniverse, + }, + { + Timestamp: now.UnixNano()/1000 + 1, + Value: answerToLifeTheUniverse, + }, + { + Timestamp: now.UnixNano()/1000 + 2, Value: answerToLifeTheUniverse, }, }, @@ -50,18 +60,37 @@ func TestWriteSingleSample(t *testing.T) { } var redisTsClient = NewClient(redisAddress, redisAuth) - err := redisTsClient.Write(samples) + err := redisTsClient.Write(insertedSamples) assert.Nil(t, err, "Write of samples failed") keys := redisClient.Keys("test_series{label_1=value_1,label_2=value_2}").Val() assert.Len(t, keys, 1) - labelsMatchers := []interface{}{"label_1=value_1"} - cmd := redisTsClient.rangeByLabels(labelsMatchers, 0, now.Unix()+5) - err = redisTsClient.Process(cmd) - assert.Nil(t, err, "rangeByLabels failed to process") - ranges, err := cmd.Result() - assert.Nil(t, err, "rangeByLabels failed") - assert.Len(t, ranges, 1) + + request := prompb.ReadRequest{ + Queries: []*prompb.Query{ + { + StartTimestampMs: 0, + EndTimestampMs: int64(now.Add(time.Second*5).UnixNano() / 1000), + Matchers: []*prompb.LabelMatcher{ + { + Type: prompb.LabelMatcher_EQ, + Name: "label_1", + Value: "value_1", + }, + { + Type: prompb.LabelMatcher_EQ, + Name: "label_2", + Value: "value_2", + }, + }, + }, + }, + } + result, err := redisTsClient.Read(&request) + assert.Nil(t, err, "failed to process query") + assert.Len(t, result.Results, 1) + assert.Len(t, result.Results[0].Timeseries, 1) + assert.Equal(t, insertedSamples, result.Results[0].Timeseries) } func TestNewFailoverClient(t *testing.T) {