diff --git a/integrity_test.go b/integrity_test.go index 5fc6da1..7890dd1 100644 --- a/integrity_test.go +++ b/integrity_test.go @@ -84,6 +84,25 @@ func (flog *ParquetFlog) Deref() Flog { // - Download parquet files from the store created by Parseable for the minute // - Compare the sent logs with the ones loaded from the downloaded parquet func TestIntegrity(t *testing.T) { + + flogs := createAndIngest(t) + parquetFiles := downloadParquetFiles(NewGlob.Stream, NewGlob.MinIoConfig) + actualFlogs := loadFlogsFromParquetFiles(parquetFiles) + + rowCount := len(actualFlogs) + + for i, expectedFlog := range flogs { + // The rows in parquet written by Parseable will be latest first, so we + // compare the first of ours with the last of what we got from Parseable's + // store. + actualFlog := actualFlogs[rowCount-i-1].Deref() + require.Equal(t, actualFlog, expectedFlog) + } + + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) +} + +func createAndIngest(t *testing.T) []Flog { CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) iterations := 2 flogsPerIteration := 100 @@ -127,20 +146,7 @@ func TestIntegrity(t *testing.T) { // XXX: We don't need to sleep for the entire minute, just until the next minute boundary. } - parquetFiles := downloadParquetFiles(NewGlob.Stream, NewGlob.MinIoConfig) - actualFlogs := loadFlogsFromParquetFiles(parquetFiles) - - rowCount := len(actualFlogs) - - for i, expectedFlog := range flogs { - // The rows in parquet written by Parseable will be latest first, so we - // compare the first of ours with the last of what we got from Parseable's - // store. - actualFlog := actualFlogs[rowCount-i-1].Deref() - require.Equal(t, actualFlog, expectedFlog) - } - - DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) + return flogs } func ingestFlogs(flogs []Flog, stream string) error { diff --git a/quest_test.go b/quest_test.go index 1ec7047..1af5f8c 100644 --- a/quest_test.go +++ b/quest_test.go @@ -34,6 +34,13 @@ const ( events_count = "5" ) +type StreamHotTier struct { + Size string `json:"size"` + UsedSize *string `json:"used_size,omitempty"` + AvailableSize *string `json:"available_size,omitempty"` + OldestDateTimeEntry *string `json:"oldest_date_time_entry,omitempty"` +} + func TestSmokeListLogStream(t *testing.T) { CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) req, err := NewGlob.QueryClient.NewRequest("GET", "logstream", nil) @@ -402,6 +409,40 @@ func TestSmokeGetRetention(t *testing.T) { DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) } +func TestHotTierGetsLogs(t *testing.T) { + // create stream, put hot tier, ingest data for a duration, wait for 2-3 mins to see if all data is available in hot tier +} + +func TestHotTierGetsLogsAfter(t *testing.T) { + logs := createAndIngest(t) + + activateHotTier(t) + time.Sleep(60 * 2 * time.Second) // wait for 2 minutes to allow hot tier to sync + + // fetch the logs from hot tier + req, _ := NewGlob.QueryClient.NewRequest("GET", "logstream/"+NewGlob.Stream+"/hottier", nil) + response, err := NewGlob.QueryClient.Do(req) + require.NoErrorf(t, err, "Fetching hot tier stream failed: %s", err) + + // ascertain that they are in expected schema. prolly will be, just to be sure + body, err := readJsonBody[StreamHotTier](response.Body) + require.NoErrorf(t, err, "Hot tier response not in correct schema: %s", err) + + // ascertain that the ingested all the ingested logs are present in hot tier + require.Equalf(t, len(logs), "%d", body.Size, "Total no. of ingested logs is %d but hot tier contains %d logs", len(logs), body.Size) + + disableHotTier(t) + DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) +} + +func TestHotTierLogCount(t *testing.T) { + // create stream, ingest data, query get count, set hot tier, wait for 2-3 mins, query again get count, both counts should match +} + +func TestOldestHotTierEntry(t *testing.T) { + // create stream, ingest data for a duration, call GET /logstream/{logstream}/info - to get the first_event_at field then set hot tier, wait for 2-3 mins, call GET /hottier - to get oldest entry in hot tier then match both +} + // This test calls all the User API endpoints // in a sequence to check if they work as expected. func TestSmoke_AllUsersAPI(t *testing.T) { diff --git a/test_utils.go b/test_utils.go index 7c7bb70..12f39f3 100644 --- a/test_utils.go +++ b/test_utils.go @@ -561,3 +561,19 @@ func checkAPIAccess(t *testing.T, client HTTPClient, stream string, role string) require.Equalf(t, 403, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, readAsString(response.Body)) } } + +func activateHotTier(t *testing.T) { + req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+NewGlob.Stream+"/hottier", nil) + response, err := NewGlob.QueryClient.Do(req) + body := readAsString(response.Body) + require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body) + require.NoErrorf(t, err, "Activating hot tier failed: %s", err) +} + +func disableHotTier(t *testing.T) { + req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+NewGlob.Stream+"/hottier", nil) + response, err := NewGlob.QueryClient.Do(req) + body := readAsString(response.Body) + require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body) + require.NoErrorf(t, err, "Disabling hot tier failed: %s", err) +}