Skip to content

Commit

Permalink
Introduce a Parser struct (#7)
Browse files Browse the repository at this point in the history
* introduce parser fields

* store column names in db session

* handle field preparation inside in the Parse function

* move field to another file

* cleanup
  • Loading branch information
facundoolano authored Jul 29, 2024
1 parent 13c6c7d commit a27b745
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 235 deletions.
21 changes: 12 additions & 9 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type RequestCountSpec struct {

type dbSession struct {
db *sql.DB
columns []string
insertTx *sql.Tx
insertStmt *sql.Stmt
}
Expand All @@ -39,28 +40,30 @@ func InitDB(dbPath string, fields []*LogField) (*dbSession, error) {

// TODO consider adding indexes according to expected queries

var columns string
for _, field := range fields {
columns += fmt.Sprintf("%s %s,\n", field.ColumnName, field.ColumnSpec)
var columnSpecs string
columns := make([]string, len(fields))
for i, field := range fields {
columns[i] = field.ColumnName
columnSpecs += fmt.Sprintf("%s %s,\n", field.ColumnName, field.ColumnSpec)
}

sqlStmt := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS access_logs (
id INTEGER NOT NULL PRIMARY KEY,
%s
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);`, columns)
);`, columnSpecs)

_, err = db.Exec(sqlStmt)
return &dbSession{db: db}, err
return &dbSession{db: db, columns: columns}, err
}

func (dbs *dbSession) Close() {
dbs.db.Close()
}

// Prepare a transaction to insert a new batch of log entries, returning the time of the last seen log entry.
func (dbs *dbSession) PrepareForUpdate(columns []string) (*time.Time, error) {
func (dbs *dbSession) PrepareForUpdate() (*time.Time, error) {
// we want to avoid processed files that were already processed in the past. but we still want to add new log entries
// from the most recent files, which may have been extended since we last saw them.
// Since there is no "uniqueness" in logs (even the same ip can make the same request at the same second ---I checked),
Expand All @@ -87,16 +90,16 @@ func (dbs *dbSession) PrepareForUpdate(columns []string) (*time.Time, error) {
}
dbs.insertTx = tx

insertValuePlaceholder := strings.TrimSuffix(strings.Repeat("?,", len(columns)), ",")
insertStmt, err := dbs.insertTx.Prepare(fmt.Sprintf("INSERT INTO access_logs(%s) values(%s);", strings.Join(columns, ","), insertValuePlaceholder))
insertValuePlaceholder := strings.TrimSuffix(strings.Repeat("?,", len(dbs.columns)), ",")
insertStmt, err := dbs.insertTx.Prepare(fmt.Sprintf("INSERT INTO access_logs(%s) values(%s);", strings.Join(dbs.columns, ","), insertValuePlaceholder))
if err != nil {
return nil, err
}
dbs.insertStmt = insertStmt
return lastSeemTime, nil
}

func (dbs *dbSession) AddLogEntry(values ...any) error {
func (dbs *dbSession) AddLogEntry(values []any) error {
_, err := dbs.insertStmt.Exec(values...)
return err
}
Expand Down
186 changes: 186 additions & 0 deletions fields.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package main

import (
"github.com/mileusna/useragent"
"net/url"
"strings"
"time"
)

// TODO
type LogField struct {
// TODO
LogFormatVar string
// TODO
CLINames []string
// TODO
ColumnName string
// TODO
ColumnSpec string
// TODO
Parse func(string) string
// TODO
DerivedFields []string
// TODO
ParseDerivedFields func(string) map[string]string
}

// TODO
var KNOWN_FIELDS = []LogField{
{
LogFormatVar: "time_local",
ColumnName: "time",
ColumnSpec: "TIMESTAMP NOT NULL",
Parse: parseTime,
},
{
LogFormatVar: "request",
ColumnName: "request_raw",
ColumnSpec: "TEXT",
DerivedFields: []string{"path", "method", "referer"},
ParseDerivedFields: parseRequestDerivedFields,
},
{
LogFormatVar: "http_user_agent",
ColumnName: "user_agent_raw",
ColumnSpec: "TEXT",
DerivedFields: []string{"user_agent", "os", "device", "ua_type", "ua_url"},
ParseDerivedFields: parseUserAgentDerivedFields,
},
{
LogFormatVar: "http_referer",
CLINames: []string{"referer", "ref", "referrer"},
ColumnName: "referer",
ColumnSpec: "TEXT COLLATE NOCASE",
Parse: stripUrlSource,
},
{
LogFormatVar: "remote_addr",
CLINames: []string{"ip"},
ColumnName: "ip",
ColumnSpec: "TEXT",
},
{
LogFormatVar: "status",
CLINames: []string{"status"},
ColumnName: "status",
ColumnSpec: "INTEGER",
},
{
CLINames: []string{"method"},
ColumnName: "method",
ColumnSpec: "TEXT COLLATE NOCASE",
},
{
CLINames: []string{"path", "url"},
ColumnName: "path",
ColumnSpec: "TEXT",
},
{
CLINames: []string{"user_agent", "ua", "useragent"},
ColumnName: "user_agent",
ColumnSpec: "TEXT COLLATE NOCASE",
},
{
CLINames: []string{"os"},
ColumnName: "os",
ColumnSpec: "TEXT COLLATE NOCASE",
},
{
CLINames: []string{"device"},
ColumnName: "device",
ColumnSpec: "TEXT COLLATE NOCASE",
},
{
CLINames: []string{"ua_url", "uaurl"},
ColumnName: "ua_url",
ColumnSpec: "TEXT",
},
{
CLINames: []string{"ua_type", "uatype"},
ColumnName: "ua_type",
ColumnSpec: "TEXT COLLATE NOCASE",
},
}

var LOGVAR_TO_FIELD = map[string]*LogField{}
var COLUMN_NAME_TO_FIELD = map[string]*LogField{}
var CLI_NAME_TO_FIELD = map[string]*LogField{}

func init() {
for _, field := range KNOWN_FIELDS {
COLUMN_NAME_TO_FIELD[field.ColumnName] = &field
if field.LogFormatVar != "" {
LOGVAR_TO_FIELD[field.LogFormatVar] = &field
}
for _, name := range field.CLINames {
CLI_NAME_TO_FIELD[name] = &field
}
}
}

func stripUrlSource(value string) string {
value = strings.TrimPrefix(value, "http://")
value = strings.TrimPrefix(value, "https://")
value = strings.TrimPrefix(value, "www.")
value = strings.TrimSuffix(value, "/")
return value
}

// FIXME error instead of panic?
func parseTime(timestamp string) string {
t, err := time.Parse(LOG_DATE_LAYOUT, timestamp)
if err != nil {
panic("can't parse log timestamp " + timestamp)
}
return t.Format(DB_DATE_LAYOUT)
}

func parseRequestDerivedFields(request string) map[string]string {
result := make(map[string]string)
request_parts := strings.Split(request, " ")
if len(request_parts) == 3 {
// if the request line is weird, don't try to extract its fields
result["method"] = request_parts[0]
raw_path := request_parts[1]
if url, err := url.Parse(raw_path); err == nil {
result["path"] = url.Path

// if utm source and friends in query, use them as referer
keys := []string{"ref", "referer", "referrer", "utm_source"}
query := url.Query()
for _, key := range keys {
if query.Has(key) {
result["referer"] = stripUrlSource(query.Get(key))
break
}
}

} else {
result["path"] = raw_path
}
}

return result
}

func parseUserAgentDerivedFields(ua string) map[string]string {
result := make(map[string]string)
if ua != "" {
ua := useragent.Parse(ua)
result["user_agent"] = ua.Name
result["os"] = ua.OS
result["device"] = ua.Device
result["ua_url"] = stripUrlSource(ua.URL)
if ua.Bot {
result["ua_type"] = "bot"
} else if ua.Tablet {
result["ua_type"] = "tablet"
} else if ua.Mobile {
result["ua_type"] = "mobile"
} else if ua.Desktop {
result["ua_type"] = "desktop"
}
}
return result
}
26 changes: 8 additions & 18 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ func main() {
logFormat = envLogFormat
}

logFormatRegex, fields := ParseFormat(logFormat)
parser := NewParser(logFormat)

ctx, spec := querySpecFromCLI()
dbs, err := InitDB(dbPath, fields)
dbs, err := InitDB(dbPath, parser.Fields)
ctx.FatalIfErrorf(err)
defer dbs.Close()

err = loadLogs(logFormatRegex, fields, logPathPattern, dbs)
err = loadLogs(parser, logPathPattern, dbs)
ctx.FatalIfErrorf(err)

columnNames, rowValues, err := dbs.QueryTop(spec)
Expand Down Expand Up @@ -174,31 +174,21 @@ func parseDuration(duration string) (time.Time, error) {
}

// Parse the most recent nginx access.logs and insert the ones not previously seen into the DB.
func loadLogs(logFormatRegex *regexp.Regexp, fields []*LogField, logPathPattern string, dbs *dbSession) error {
func loadLogs(parser *LogParser, logPathPattern string, dbs *dbSession) error {
logFiles, err := filepath.Glob(logPathPattern)
if err != nil {
return err
}

dbColumns := make([]string, len(fields))
for i, field := range fields {
dbColumns[i] = field.ColumnName
}

lastSeenTime, err := dbs.PrepareForUpdate(dbColumns)
// Get the last log time to know when to stop parsing, and prepare a transaction to insert newer entries
lastSeenTime, err := dbs.PrepareForUpdate()
if err != nil {
return err
}

err = ProcessAccessLogs(logFormatRegex, logFiles, lastSeenTime, func(logLineFields map[string]string) error {
queryValues := make([]interface{}, len(dbColumns))
for i, field := range dbColumns {
queryValues[i] = logLineFields[field]
}
return dbs.AddLogEntry(queryValues...)
})
err = parser.Parse(logFiles, lastSeenTime, dbs.AddLogEntry)

// Rollback or commit before returning, depending on error
// Rollback or commit before returning, depending on the error value
return dbs.FinishUpdate(err)
}

Expand Down
6 changes: 3 additions & 3 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,12 @@ func runCommand(t *testing.T, logs string, cliArgs []string) ([]string, [][]stri
os.Args = append([]string{"ngtop"}, cliArgs...)
_, spec := querySpecFromCLI()

logFormatRegex, fields := ParseFormat(DEFAULT_LOG_FORMAT)
dbs, err := InitDB(dbFile.Name(), fields)
parser := NewParser(DEFAULT_LOG_FORMAT)
dbs, err := InitDB(dbFile.Name(), parser.Fields)
assertEqual(t, err, nil)
defer dbs.Close()

err = loadLogs(logFormatRegex, fields, logFile.Name(), dbs)
err = loadLogs(parser, logFile.Name(), dbs)
assertEqual(t, err, nil)
columnNames, rowValues, err := dbs.QueryTop(spec)
assertEqual(t, err, nil)
Expand Down
Loading

0 comments on commit a27b745

Please sign in to comment.