Skip to content

Commit

Permalink
Fix parse until last seen bug (#21)
Browse files Browse the repository at this point in the history
* improve logging around parse until dates

* add failing test

* simplify last seen db query and fix parsing interruption bug
  • Loading branch information
facundoolano committed Aug 16, 2024
1 parent 095964b commit e3081af
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 15 deletions.
40 changes: 39 additions & 1 deletion main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,45 @@ func TestMultipleLogFiles(t *testing.T) {
// include gzipped value
}

//
func TestUpdatedLog(t *testing.T) {
// TODO refactor runCommand to reduce duplication here
logFile, err := os.CreateTemp("", "access.log")
assertEqual(t, err, nil)
defer os.Remove(logFile.Name())
dbFile, err := os.CreateTemp("", "ngtop.db")
assertEqual(t, err, nil)
defer os.Remove(dbFile.Name())

parser := ngtop.NewParser(DEFAULT_LOG_FORMAT)
dbs, err := ngtop.InitDB(dbFile.Name(), parser.Fields)
assertEqual(t, err, nil)
defer dbs.Close()

os.Args = []string{"ngtop"}
_, spec := querySpecFromCLI()

previousOffset, err := logFile.Write([]byte(`xx.xx.xx.xx - - [24/Jul/2024:00:00:28 +0000] "GET /feed HTTP/1.1" 301 169 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36"
xx.xx.xx.xx - - [24/Jul/2024:00:00:30 +0000] "GET /feed HTTP/1.1" 301 169 "-" "feedi/0.1.0 (+https://github.com/facundoolano/feedi)"`))
assertEqual(t, err, nil)

err = loadLogs(parser, logFile.Name(), dbs)
assertEqual(t, err, nil)
_, rows, err := dbs.QueryTop(spec)
assertEqual(t, err, nil)
assertEqual(t, rows[0][0], "2")

// append more logs to file
_, err = logFile.WriteAt([]byte(`xx.xx.xx.xx - - [24/Jul/2024:00:00:56 +0000] "GET /blog/deconstructing-the-role-playing-videogame/ HTTP/1.1" 200 14224 "-" "feedi/0.1.0 (+https://github.com/facundoolano/feedi)"
xx.xx.xx.xx - - [24/Jul/2024:00:01:18 +0000] "GET /feed.xml HTTP/1.1" 200 9641 "https://olano.dev/feed.xml" "FreshRSS/1.24.0 (Linux; https://freshrss.org)"`), int64(previousOffset))
assertEqual(t, err, nil)

// run again and expect to see new requests
err = loadLogs(parser, logFile.Name(), dbs)
assertEqual(t, err, nil)
_, rows, err = dbs.QueryTop(spec)
assertEqual(t, err, nil)
assertEqual(t, rows[0][0], "4")
}

// ------ HELPERS --------

Expand Down
11 changes: 0 additions & 11 deletions ngtop/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,10 @@ func (dbs *DBSession) Close() {

// Prepare a transaction to insert a new batch of log entries, returning the time of the last seen log entry.
func (dbs *DBSession) PrepareForUpdate() (*time.Time, error) {
// we want to avoid processed files that were already processed in the past. but we still want to add new log entries
// from the most recent files, which may have been extended since we last saw them.
// Since there is no "uniqueness" in logs (even the same ip can make the same request at the same second ---I checked),
// I remove the entries with the highest timestamp, and load everything up until including that timestamp but not older.
// The assumption is that any processing was completely finished, not interrupted.

var lastSeenTimeStr string
var lastSeemTime *time.Time
// this query error is acceptable in case of db not exists or empty
if err := dbs.db.QueryRow("SELECT max(time) FROM access_logs").Scan(&lastSeenTimeStr); err == nil {
_, err := dbs.db.Exec("DELETE FROM access_logs WHERE time = ?", lastSeenTimeStr)
if err != nil {
return nil, err
}

t, _ := time.Parse(DB_DATE_LAYOUT, lastSeenTimeStr)
lastSeemTime = &t
}
Expand Down
15 changes: 12 additions & 3 deletions ngtop/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (parser LogParser) Parse(

for _, path := range logFiles {

log.Printf("parsing %s", path)
log.Printf("parsing %s until %s", path, until)
file, err := os.Open(path)
if err != nil {
return err
Expand All @@ -86,6 +86,7 @@ func (parser LogParser) Parse(
}

scanner := bufio.NewScanner(reader)
alreadySeenFile := false
for scanner.Scan() {
line := scanner.Text()
values, err := parseLogLine(parser.formatRegex, line)
Expand All @@ -100,8 +101,11 @@ func (parser LogParser) Parse(
}

if untilStr != "" && values["time"] < untilStr {
// already caught up, no need to continue processing
return nil
// if this file contains entries older than the untilStr, it means we already parsed part of it before
// since the files contains oldest entries at the beginning, we need to keep parsing until the end to get
// all the updates, but we flag it as already seen so we skip parsing newer ones
alreadySeenFile = true
continue
}

valueList := make([]any, len(parser.Fields))
Expand All @@ -115,6 +119,11 @@ func (parser LogParser) Parse(
if err := scanner.Err(); err != nil {
return err
}

if alreadySeenFile {
log.Printf("%s contains older dates than %s, skipping older files", path, untilStr)
return nil
}
}

return nil
Expand Down

0 comments on commit e3081af

Please sign in to comment.