Skip to content

Commit

Permalink
groundwork for logical tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vishalkrishnads committed Aug 12, 2024
1 parent f1cfd68 commit 88aa161
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 14 deletions.
34 changes: 20 additions & 14 deletions integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,25 @@ func (flog *ParquetFlog) Deref() Flog {
// - Download parquet files from the store created by Parseable for the minute
// - Compare the sent logs with the ones loaded from the downloaded parquet
func TestIntegrity(t *testing.T) {

flogs := createAndIngest(t)
parquetFiles := downloadParquetFiles(NewGlob.Stream, NewGlob.MinIoConfig)
actualFlogs := loadFlogsFromParquetFiles(parquetFiles)

rowCount := len(actualFlogs)

for i, expectedFlog := range flogs {
// The rows in parquet written by Parseable will be latest first, so we
// compare the first of ours with the last of what we got from Parseable's
// store.
actualFlog := actualFlogs[rowCount-i-1].Deref()
require.Equal(t, actualFlog, expectedFlog)
}

DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
}

func createAndIngest(t *testing.T) []Flog {
CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
iterations := 2
flogsPerIteration := 100
Expand Down Expand Up @@ -127,20 +146,7 @@ func TestIntegrity(t *testing.T) {
// XXX: We don't need to sleep for the entire minute, just until the next minute boundary.
}

parquetFiles := downloadParquetFiles(NewGlob.Stream, NewGlob.MinIoConfig)
actualFlogs := loadFlogsFromParquetFiles(parquetFiles)

rowCount := len(actualFlogs)

for i, expectedFlog := range flogs {
// The rows in parquet written by Parseable will be latest first, so we
// compare the first of ours with the last of what we got from Parseable's
// store.
actualFlog := actualFlogs[rowCount-i-1].Deref()
require.Equal(t, actualFlog, expectedFlog)
}

DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
return flogs
}

func ingestFlogs(flogs []Flog, stream string) error {
Expand Down
41 changes: 41 additions & 0 deletions quest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ const (
events_count = "5"
)

type StreamHotTier struct {
Size string `json:"size"`
UsedSize *string `json:"used_size,omitempty"`
AvailableSize *string `json:"available_size,omitempty"`
OldestDateTimeEntry *string `json:"oldest_date_time_entry,omitempty"`
}

func TestSmokeListLogStream(t *testing.T) {
CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
req, err := NewGlob.QueryClient.NewRequest("GET", "logstream", nil)
Expand Down Expand Up @@ -402,6 +409,40 @@ func TestSmokeGetRetention(t *testing.T) {
DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
}

func TestHotTierGetsLogs(t *testing.T) {
// create stream, put hot tier, ingest data for a duration, wait for 2-3 mins to see if all data is available in hot tier
}

func TestHotTierGetsLogsAfter(t *testing.T) {
logs := createAndIngest(t)

activateHotTier(t)
time.Sleep(60 * 2 * time.Second) // wait for 2 minutes to allow hot tier to sync

// fetch the logs from hot tier
req, _ := NewGlob.QueryClient.NewRequest("GET", "logstream/"+NewGlob.Stream+"/hottier", nil)
response, err := NewGlob.QueryClient.Do(req)
require.NoErrorf(t, err, "Fetching hot tier stream failed: %s", err)

// ascertain that they are in expected schema. prolly will be, just to be sure
body, err := readJsonBody[StreamHotTier](response.Body)
require.NoErrorf(t, err, "Hot tier response not in correct schema: %s", err)

// ascertain that the ingested all the ingested logs are present in hot tier
require.Equalf(t, len(logs), "%d", body.Size, "Total no. of ingested logs is %d but hot tier contains %d logs", len(logs), body.Size)

disableHotTier(t)
DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
}

func TestHotTierLogCount(t *testing.T) {
// create stream, ingest data, query get count, set hot tier, wait for 2-3 mins, query again get count, both counts should match
}

func TestOldestHotTierEntry(t *testing.T) {
// create stream, ingest data for a duration, call GET /logstream/{logstream}/info - to get the first_event_at field then set hot tier, wait for 2-3 mins, call GET /hottier - to get oldest entry in hot tier then match both
}

// This test calls all the User API endpoints
// in a sequence to check if they work as expected.
func TestSmoke_AllUsersAPI(t *testing.T) {
Expand Down
16 changes: 16 additions & 0 deletions test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,3 +561,19 @@ func checkAPIAccess(t *testing.T, client HTTPClient, stream string, role string)
require.Equalf(t, 403, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, readAsString(response.Body))
}
}

func activateHotTier(t *testing.T) {
req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+NewGlob.Stream+"/hottier", nil)
response, err := NewGlob.QueryClient.Do(req)
body := readAsString(response.Body)
require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body)
require.NoErrorf(t, err, "Activating hot tier failed: %s", err)
}

func disableHotTier(t *testing.T) {
req, _ := NewGlob.QueryClient.NewRequest("PUT", "logstream/"+NewGlob.Stream+"/hottier", nil)
response, err := NewGlob.QueryClient.Do(req)
body := readAsString(response.Body)
require.Equalf(t, 200, response.StatusCode, "Server returned http code: %s and response: %s", response.Status, body)
require.NoErrorf(t, err, "Disabling hot tier failed: %s", err)
}

0 comments on commit 88aa161

Please sign in to comment.