From a27b74532f438c93c241a1f635820e66c61821ba Mon Sep 17 00:00:00 2001 From: Facundo Olano Date: Mon, 29 Jul 2024 16:19:25 -0300 Subject: [PATCH] Introduce a Parser struct (#7) * introduce parser fields * store column names in db session * handle field preparation inside in the Parse function * move field to another file * cleanup --- db.go | 21 +++-- fields.go | 186 ++++++++++++++++++++++++++++++++++++++ main.go | 26 ++---- main_test.go | 6 +- parser.go | 246 +++++++++------------------------------------------ 5 files changed, 250 insertions(+), 235 deletions(-) create mode 100644 fields.go diff --git a/db.go b/db.go index 08ca548..bd29c20 100644 --- a/db.go +++ b/db.go @@ -20,6 +20,7 @@ type RequestCountSpec struct { type dbSession struct { db *sql.DB + columns []string insertTx *sql.Tx insertStmt *sql.Stmt } @@ -39,9 +40,11 @@ func InitDB(dbPath string, fields []*LogField) (*dbSession, error) { // TODO consider adding indexes according to expected queries - var columns string - for _, field := range fields { - columns += fmt.Sprintf("%s %s,\n", field.ColumnName, field.ColumnSpec) + var columnSpecs string + columns := make([]string, len(fields)) + for i, field := range fields { + columns[i] = field.ColumnName + columnSpecs += fmt.Sprintf("%s %s,\n", field.ColumnName, field.ColumnSpec) } sqlStmt := fmt.Sprintf(` @@ -49,10 +52,10 @@ func InitDB(dbPath string, fields []*LogField) (*dbSession, error) { id INTEGER NOT NULL PRIMARY KEY, %s created TIMESTAMP DEFAULT CURRENT_TIMESTAMP - );`, columns) + );`, columnSpecs) _, err = db.Exec(sqlStmt) - return &dbSession{db: db}, err + return &dbSession{db: db, columns: columns}, err } func (dbs *dbSession) Close() { @@ -60,7 +63,7 @@ func (dbs *dbSession) Close() { } // Prepare a transaction to insert a new batch of log entries, returning the time of the last seen log entry. -func (dbs *dbSession) PrepareForUpdate(columns []string) (*time.Time, error) { +func (dbs *dbSession) PrepareForUpdate() (*time.Time, error) { // we want to avoid processed files that were already processed in the past. but we still want to add new log entries // from the most recent files, which may have been extended since we last saw them. // Since there is no "uniqueness" in logs (even the same ip can make the same request at the same second ---I checked), @@ -87,8 +90,8 @@ func (dbs *dbSession) PrepareForUpdate(columns []string) (*time.Time, error) { } dbs.insertTx = tx - insertValuePlaceholder := strings.TrimSuffix(strings.Repeat("?,", len(columns)), ",") - insertStmt, err := dbs.insertTx.Prepare(fmt.Sprintf("INSERT INTO access_logs(%s) values(%s);", strings.Join(columns, ","), insertValuePlaceholder)) + insertValuePlaceholder := strings.TrimSuffix(strings.Repeat("?,", len(dbs.columns)), ",") + insertStmt, err := dbs.insertTx.Prepare(fmt.Sprintf("INSERT INTO access_logs(%s) values(%s);", strings.Join(dbs.columns, ","), insertValuePlaceholder)) if err != nil { return nil, err } @@ -96,7 +99,7 @@ func (dbs *dbSession) PrepareForUpdate(columns []string) (*time.Time, error) { return lastSeemTime, nil } -func (dbs *dbSession) AddLogEntry(values ...any) error { +func (dbs *dbSession) AddLogEntry(values []any) error { _, err := dbs.insertStmt.Exec(values...) return err } diff --git a/fields.go b/fields.go new file mode 100644 index 0000000..7f705e3 --- /dev/null +++ b/fields.go @@ -0,0 +1,186 @@ +package main + +import ( + "github.com/mileusna/useragent" + "net/url" + "strings" + "time" +) + +// TODO +type LogField struct { + // TODO + LogFormatVar string + // TODO + CLINames []string + // TODO + ColumnName string + // TODO + ColumnSpec string + // TODO + Parse func(string) string + // TODO + DerivedFields []string + // TODO + ParseDerivedFields func(string) map[string]string +} + +// TODO +var KNOWN_FIELDS = []LogField{ + { + LogFormatVar: "time_local", + ColumnName: "time", + ColumnSpec: "TIMESTAMP NOT NULL", + Parse: parseTime, + }, + { + LogFormatVar: "request", + ColumnName: "request_raw", + ColumnSpec: "TEXT", + DerivedFields: []string{"path", "method", "referer"}, + ParseDerivedFields: parseRequestDerivedFields, + }, + { + LogFormatVar: "http_user_agent", + ColumnName: "user_agent_raw", + ColumnSpec: "TEXT", + DerivedFields: []string{"user_agent", "os", "device", "ua_type", "ua_url"}, + ParseDerivedFields: parseUserAgentDerivedFields, + }, + { + LogFormatVar: "http_referer", + CLINames: []string{"referer", "ref", "referrer"}, + ColumnName: "referer", + ColumnSpec: "TEXT COLLATE NOCASE", + Parse: stripUrlSource, + }, + { + LogFormatVar: "remote_addr", + CLINames: []string{"ip"}, + ColumnName: "ip", + ColumnSpec: "TEXT", + }, + { + LogFormatVar: "status", + CLINames: []string{"status"}, + ColumnName: "status", + ColumnSpec: "INTEGER", + }, + { + CLINames: []string{"method"}, + ColumnName: "method", + ColumnSpec: "TEXT COLLATE NOCASE", + }, + { + CLINames: []string{"path", "url"}, + ColumnName: "path", + ColumnSpec: "TEXT", + }, + { + CLINames: []string{"user_agent", "ua", "useragent"}, + ColumnName: "user_agent", + ColumnSpec: "TEXT COLLATE NOCASE", + }, + { + CLINames: []string{"os"}, + ColumnName: "os", + ColumnSpec: "TEXT COLLATE NOCASE", + }, + { + CLINames: []string{"device"}, + ColumnName: "device", + ColumnSpec: "TEXT COLLATE NOCASE", + }, + { + CLINames: []string{"ua_url", "uaurl"}, + ColumnName: "ua_url", + ColumnSpec: "TEXT", + }, + { + CLINames: []string{"ua_type", "uatype"}, + ColumnName: "ua_type", + ColumnSpec: "TEXT COLLATE NOCASE", + }, +} + +var LOGVAR_TO_FIELD = map[string]*LogField{} +var COLUMN_NAME_TO_FIELD = map[string]*LogField{} +var CLI_NAME_TO_FIELD = map[string]*LogField{} + +func init() { + for _, field := range KNOWN_FIELDS { + COLUMN_NAME_TO_FIELD[field.ColumnName] = &field + if field.LogFormatVar != "" { + LOGVAR_TO_FIELD[field.LogFormatVar] = &field + } + for _, name := range field.CLINames { + CLI_NAME_TO_FIELD[name] = &field + } + } +} + +func stripUrlSource(value string) string { + value = strings.TrimPrefix(value, "http://") + value = strings.TrimPrefix(value, "https://") + value = strings.TrimPrefix(value, "www.") + value = strings.TrimSuffix(value, "/") + return value +} + +// FIXME error instead of panic? +func parseTime(timestamp string) string { + t, err := time.Parse(LOG_DATE_LAYOUT, timestamp) + if err != nil { + panic("can't parse log timestamp " + timestamp) + } + return t.Format(DB_DATE_LAYOUT) +} + +func parseRequestDerivedFields(request string) map[string]string { + result := make(map[string]string) + request_parts := strings.Split(request, " ") + if len(request_parts) == 3 { + // if the request line is weird, don't try to extract its fields + result["method"] = request_parts[0] + raw_path := request_parts[1] + if url, err := url.Parse(raw_path); err == nil { + result["path"] = url.Path + + // if utm source and friends in query, use them as referer + keys := []string{"ref", "referer", "referrer", "utm_source"} + query := url.Query() + for _, key := range keys { + if query.Has(key) { + result["referer"] = stripUrlSource(query.Get(key)) + break + } + } + + } else { + result["path"] = raw_path + } + } + + return result +} + +func parseUserAgentDerivedFields(ua string) map[string]string { + result := make(map[string]string) + if ua != "" { + ua := useragent.Parse(ua) + result["user_agent"] = ua.Name + result["os"] = ua.OS + result["device"] = ua.Device + result["ua_url"] = stripUrlSource(ua.URL) + if ua.Bot { + result["ua_type"] = "bot" + } else if ua.Tablet { + result["ua_type"] = "tablet" + } else if ua.Mobile { + result["ua_type"] = "mobile" + } else if ua.Desktop { + result["ua_type"] = "desktop" + } + } + return result +} diff --git a/main.go b/main.go index 442062d..3859ab0 100644 --- a/main.go +++ b/main.go @@ -56,14 +56,14 @@ func main() { logFormat = envLogFormat } - logFormatRegex, fields := ParseFormat(logFormat) + parser := NewParser(logFormat) ctx, spec := querySpecFromCLI() - dbs, err := InitDB(dbPath, fields) + dbs, err := InitDB(dbPath, parser.Fields) ctx.FatalIfErrorf(err) defer dbs.Close() - err = loadLogs(logFormatRegex, fields, logPathPattern, dbs) + err = loadLogs(parser, logPathPattern, dbs) ctx.FatalIfErrorf(err) columnNames, rowValues, err := dbs.QueryTop(spec) @@ -174,31 +174,21 @@ func parseDuration(duration string) (time.Time, error) { } // Parse the most recent nginx access.logs and insert the ones not previously seen into the DB. -func loadLogs(logFormatRegex *regexp.Regexp, fields []*LogField, logPathPattern string, dbs *dbSession) error { +func loadLogs(parser *LogParser, logPathPattern string, dbs *dbSession) error { logFiles, err := filepath.Glob(logPathPattern) if err != nil { return err } - dbColumns := make([]string, len(fields)) - for i, field := range fields { - dbColumns[i] = field.ColumnName - } - - lastSeenTime, err := dbs.PrepareForUpdate(dbColumns) + // Get the last log time to know when to stop parsing, and prepare a transaction to insert newer entries + lastSeenTime, err := dbs.PrepareForUpdate() if err != nil { return err } - err = ProcessAccessLogs(logFormatRegex, logFiles, lastSeenTime, func(logLineFields map[string]string) error { - queryValues := make([]interface{}, len(dbColumns)) - for i, field := range dbColumns { - queryValues[i] = logLineFields[field] - } - return dbs.AddLogEntry(queryValues...) - }) + err = parser.Parse(logFiles, lastSeenTime, dbs.AddLogEntry) - // Rollback or commit before returning, depending on error + // Rollback or commit before returning, depending on the error value return dbs.FinishUpdate(err) } diff --git a/main_test.go b/main_test.go index 7bfcf3a..1d6c322 100644 --- a/main_test.go +++ b/main_test.go @@ -265,12 +265,12 @@ func runCommand(t *testing.T, logs string, cliArgs []string) ([]string, [][]stri os.Args = append([]string{"ngtop"}, cliArgs...) _, spec := querySpecFromCLI() - logFormatRegex, fields := ParseFormat(DEFAULT_LOG_FORMAT) - dbs, err := InitDB(dbFile.Name(), fields) + parser := NewParser(DEFAULT_LOG_FORMAT) + dbs, err := InitDB(dbFile.Name(), parser.Fields) assertEqual(t, err, nil) defer dbs.Close() - err = loadLogs(logFormatRegex, fields, logFile.Name(), dbs) + err = loadLogs(parser, logFile.Name(), dbs) assertEqual(t, err, nil) columnNames, rowValues, err := dbs.QueryTop(spec) assertEqual(t, err, nil) diff --git a/parser.go b/parser.go index cd13e5d..5320f99 100644 --- a/parser.go +++ b/parser.go @@ -9,116 +9,9 @@ import ( "os" "path/filepath" "regexp" - "strings" "time" - - "net/url" - - "github.com/mileusna/useragent" ) -// TODO -type LogField struct { - // TODO - LogFormatVar string - // TODO - CLINames []string - // TODO - ColumnName string - // TODO - ColumnSpec string - // TODO - Parse func(string) string - // TODO - DerivedFields []string - // TODO - ParseDerivedFields func(string) map[string]string -} - -// TODO -var KNOWN_FIELDS = []LogField{ - { - LogFormatVar: "time_local", - ColumnName: "time", - ColumnSpec: "TIMESTAMP NOT NULL", - Parse: parseTime, - }, - { - LogFormatVar: "request", - ColumnName: "request_raw", - ColumnSpec: "TEXT", - DerivedFields: []string{"path", "method", "referer"}, - ParseDerivedFields: parseRequestDerivedFields, - }, - { - LogFormatVar: "http_user_agent", - ColumnName: "user_agent_raw", - ColumnSpec: "TEXT", - DerivedFields: []string{"user_agent", "os", "device", "ua_type", "ua_url"}, - ParseDerivedFields: parseUserAgentDerivedFields, - }, - { - LogFormatVar: "http_referer", - CLINames: []string{"referer", "ref", "referrer"}, - ColumnName: "referer", - ColumnSpec: "TEXT COLLATE NOCASE", - Parse: stripUrlSource, - }, - { - LogFormatVar: "remote_addr", - CLINames: []string{"ip"}, - ColumnName: "ip", - ColumnSpec: "TEXT", - }, - { - LogFormatVar: "status", - CLINames: []string{"status"}, - ColumnName: "status", - ColumnSpec: "INTEGER", - }, - { - CLINames: []string{"method"}, - ColumnName: "method", - ColumnSpec: "TEXT COLLATE NOCASE", - }, - { - CLINames: []string{"path", "url"}, - ColumnName: "path", - ColumnSpec: "TEXT", - }, - { - CLINames: []string{"user_agent", "ua", "useragent"}, - ColumnName: "user_agent", - ColumnSpec: "TEXT COLLATE NOCASE", - }, - { - CLINames: []string{"os"}, - ColumnName: "os", - ColumnSpec: "TEXT COLLATE NOCASE", - }, - { - CLINames: []string{"device"}, - ColumnName: "device", - ColumnSpec: "TEXT COLLATE NOCASE", - }, - { - CLINames: []string{"ua_url", "uaurl"}, - ColumnName: "ua_url", - ColumnSpec: "TEXT", - }, - { - CLINames: []string{"ua_type", "uatype"}, - ColumnName: "ua_type", - ColumnSpec: "TEXT COLLATE NOCASE", - }, -} - -var LOGVAR_TO_FIELD = map[string]*LogField{} -var COLUMN_NAME_TO_FIELD = map[string]*LogField{} - -// TODO revisit, may be better to do this at main instead -var CLI_NAME_TO_FIELD = map[string]*LogField{} - func init() { for _, field := range KNOWN_FIELDS { COLUMN_NAME_TO_FIELD[field.ColumnName] = &field @@ -133,13 +26,44 @@ func init() { const LOG_DATE_LAYOUT = "02/Jan/2006:15:04:05 -0700" +type LogParser struct { + formatRegex *regexp.Regexp + Fields []*LogField +} + +func NewParser(format string) *LogParser { + parser := LogParser{ + formatRegex: formatToRegex(format), + } + + // pick the subset of fields deducted from the regex, plus their derived fields + // use a map to remove duplicates + fieldSubset := make(map[string]*LogField) + for _, name := range parser.formatRegex.SubexpNames() { + if name == "" { + continue + } + fieldSubset[name] = COLUMN_NAME_TO_FIELD[name] + + for _, derived := range COLUMN_NAME_TO_FIELD[name].DerivedFields { + fieldSubset[derived] = COLUMN_NAME_TO_FIELD[derived] + } + } + + // turn the map into a valuelist + for _, field := range fieldSubset { + parser.Fields = append(parser.Fields, field) + } + + return &parser +} + // Parse the fields in the nginx access logs since the `until` time, passing them as a map into the `processFun`. // Processing is interrupted when a log older than `until` is found. -func ProcessAccessLogs( - logFormatRegex *regexp.Regexp, +func (parser LogParser) Parse( logFiles []string, until *time.Time, - processFun func(map[string]string) error, + processFun func([]any) error, ) error { var untilStr string if until != nil { @@ -166,7 +90,7 @@ func ProcessAccessLogs( scanner := bufio.NewScanner(reader) for scanner.Scan() { line := scanner.Text() - values, err := parseLogLine(logFormatRegex, line) + values, err := parseLogLine(parser.formatRegex, line) if err != nil { return err } @@ -180,7 +104,11 @@ func ProcessAccessLogs( return nil } - if err := processFun(values); err != nil { + valueList := make([]any, len(parser.Fields)) + for i, field := range parser.Fields { + valueList[i] = values[field.ColumnName] + } + if err := processFun(valueList); err != nil { return err } } @@ -192,32 +120,6 @@ func ProcessAccessLogs( return nil } -func ParseFormat(format string) (*regexp.Regexp, []*LogField) { - regex := formatToRegex(format) - - // pick the subset of fields deducted from the regex, plus their derived fields - // use a map to remove duplicates - fieldSubset := make(map[string]*LogField) - for _, name := range regex.SubexpNames() { - if name == "" { - continue - } - fieldSubset[name] = COLUMN_NAME_TO_FIELD[name] - - for _, derived := range COLUMN_NAME_TO_FIELD[name].DerivedFields { - fieldSubset[derived] = COLUMN_NAME_TO_FIELD[derived] - } - } - - // turn the map into a valuelist - fields := make([]*LogField, 0) - for _, field := range fieldSubset { - fields = append(fields, field) - } - - return regex, fields -} - // TODO func formatToRegex(format string) *regexp.Regexp { chars := []rune(format) @@ -270,7 +172,7 @@ func parseLogLine(pattern *regexp.Regexp, line string) (map[string]string, error result := make(map[string]string) for i, name := range pattern.SubexpNames() { field := COLUMN_NAME_TO_FIELD[name] - if i != 0 && name != "" && match[i] != "-" { + if name != "" && match[i] != "-" { if field.Parse != nil { result[name] = field.Parse(match[i]) } else { @@ -286,69 +188,3 @@ func parseLogLine(pattern *regexp.Regexp, line string) (map[string]string, error } return result, nil } - -func stripUrlSource(value string) string { - value = strings.TrimPrefix(value, "http://") - value = strings.TrimPrefix(value, "https://") - value = strings.TrimPrefix(value, "www.") - value = strings.TrimSuffix(value, "/") - return value -} - -// FIXME error instead of panic? -func parseTime(timestamp string) string { - t, err := time.Parse(LOG_DATE_LAYOUT, timestamp) - if err != nil { - panic("can't parse log timestamp " + timestamp) - } - return t.Format(DB_DATE_LAYOUT) -} - -func parseRequestDerivedFields(request string) map[string]string { - result := make(map[string]string) - request_parts := strings.Split(request, " ") - if len(request_parts) == 3 { - // if the request line is weird, don't try to extract its fields - result["method"] = request_parts[0] - raw_path := request_parts[1] - if url, err := url.Parse(raw_path); err == nil { - result["path"] = url.Path - - // if utm source and friends in query, use them as referer - keys := []string{"ref", "referer", "referrer", "utm_source"} - query := url.Query() - for _, key := range keys { - if query.Has(key) { - result["referer"] = stripUrlSource(query.Get(key)) - break - } - } - - } else { - result["path"] = raw_path - } - } - - return result -} - -func parseUserAgentDerivedFields(ua string) map[string]string { - result := make(map[string]string) - if ua != "" { - ua := useragent.Parse(ua) - result["user_agent"] = ua.Name - result["os"] = ua.OS - result["device"] = ua.Device - result["ua_url"] = stripUrlSource(ua.URL) - if ua.Bot { - result["ua_type"] = "bot" - } else if ua.Tablet { - result["ua_type"] = "tablet" - } else if ua.Mobile { - result["ua_type"] = "mobile" - } else if ua.Desktop { - result["ua_type"] = "desktop" - } - } - return result -}