diff --git a/aggregator.go b/aggregator.go index fadd1d4..d3deac0 100644 --- a/aggregator.go +++ b/aggregator.go @@ -48,3 +48,11 @@ type NetStat struct { BytesRecv int64 Requests int64 } + +// ParseLineError is an error type when parsing a line in the rsyncd or nginx feed +type ParseLineError struct{} + +// Error returns the error message +func (e ParseLineError) Error() string { + return "Failed to parse line" +} diff --git a/aggregator_nginx.go b/aggregator_nginx.go index 8011dcc..a2a455d 100644 --- a/aggregator_nginx.go +++ b/aggregator_nginx.go @@ -189,6 +189,7 @@ func (aggregator *NGINXProjectAggregator) Aggregate(entry NGINXLogEntry) { } } +// Send the aggregated statistics to influxdb func (aggregator *NGINXProjectAggregator) Send(writer api.WriteAPI) { t := time.Now() @@ -213,7 +214,7 @@ type NGINXLogEntry struct { Time time.Time Method string Project string - Url string + URL string Version string Status int BytesSent int64 @@ -323,11 +324,11 @@ func parseNginxLine(line string) (entry NGINXLogEntry, err error) { return entry, errors.New("invalid number of strings in request") } entry.Method = split[0] - entry.Url = split[1] + entry.URL = split[1] entry.Version = split[2] // Project is the top part of the URL path - u, err := url.Parse(entry.Url) + u, err := url.Parse(entry.URL) if err != nil { log.Fatal(err) } diff --git a/aggregator_rsyncd.go b/aggregator_rsyncd.go index 3f95212..f9faf35 100644 --- a/aggregator_rsyncd.go +++ b/aggregator_rsyncd.go @@ -17,15 +17,18 @@ import ( "github.com/nxadm/tail" ) -type RsyncdAggregator struct { +// RSYNCDAggregator is an Aggregator for rsyncd statistics +type RSYNCDAggregator struct { stat NetStat } -func NewRSYNCProjectAggregator() *RsyncdAggregator { - return &RsyncdAggregator{} +// NewRSYNCProjectAggregator returns a new RSYNCDAggregator +func NewRSYNCProjectAggregator() *RSYNCDAggregator { + return &RSYNCDAggregator{} } -func (a *RsyncdAggregator) Init(reader api.QueryAPI) (lastUpdated time.Time, err error) { +// Init initializes the aggregator with the last known value from influxdb +func (a *RSYNCDAggregator) Init(reader api.QueryAPI) (lastUpdated time.Time, err error) { // You can paste this into the influxdb data explorer /* from(bucket: "stats") @@ -103,13 +106,15 @@ func (a *RsyncdAggregator) Init(reader api.QueryAPI) (lastUpdated time.Time, err return lastUpdated, nil } -func (a *RsyncdAggregator) Aggregate(entry RsyncdLogEntry) { +// Aggregate adds a RSCYNDLogEntry into the aggregator +func (a *RSYNCDAggregator) Aggregate(entry RSCYNDLogEntry) { a.stat.BytesSent += entry.sent a.stat.BytesRecv += entry.recv a.stat.Requests++ } -func (a *RsyncdAggregator) Send(writer api.WriteAPI) { +// Send the aggregated statistics to influxdb +func (a *RSYNCDAggregator) Send(writer api.WriteAPI) { t := time.Now() p := influxdb2.NewPoint("rsyncd", map[string]string{}, map[string]interface{}{ @@ -120,13 +125,15 @@ func (a *RsyncdAggregator) Send(writer api.WriteAPI) { writer.WritePoint(p) } -type RsyncdLogEntry struct { +// RSCYNDLogEntry is a struct that represents a single line in the rsyncd log file +type RSCYNDLogEntry struct { time time.Time sent int64 recv int64 } -func TailRSYNCLogFile(logFile string, lastUpdated time.Time, channels []chan<- RsyncdLogEntry) { +// TailRSYNCLogFile tails the rsyncd log file and sends each line to the given channel +func TailRSYNCLogFile(logFile string, lastUpdated time.Time, channels []chan<- RSCYNDLogEntry) { // Find the offset of the line where the date is past lastUpdated start := time.Now() @@ -163,7 +170,7 @@ func TailRSYNCLogFile(logFile string, lastUpdated time.Time, channels []chan<- R // Parse each line as we receive it for line := range tail.Lines { - entry, err := parseRsyncdLine(line.Text) + entry, err := parseRSCYNDLine(line.Text) if err == nil { for ch := range channels { @@ -173,12 +180,6 @@ func TailRSYNCLogFile(logFile string, lastUpdated time.Time, channels []chan<- R } } -type ParseLineError struct{} - -func (e ParseLineError) Error() string { - return "Failed to parse line" -} - func parseRSYNCDate(line string) (time.Time, error) { // Split the line over whitespace parts := strings.Split(line, " ") @@ -201,7 +202,7 @@ func parseRSYNCDate(line string) (time.Time, error) { return t, nil } -func parseRsyncdLine(line string) (entry RsyncdLogEntry, err error) { +func parseRSCYNDLine(line string) (entry RSCYNDLogEntry, err error) { // 2022/04/20 20:00:10 [pid] sent XXX bytes received XXX bytes total size XXX // Split the line over whitespace diff --git a/config/configFile.go b/config/configFile.go index 376a60e..f68f7b3 100644 --- a/config/configFile.go +++ b/config/configFile.go @@ -24,7 +24,7 @@ type File struct { Projects map[string]*Project `json:"mirrors"` } -// ParseConfig reads the main mirrors.json file and checks that it matches the schema +// ReadProjectConfig reads the main mirrors.json file and checks that it matches the schema func ReadProjectConfig(cfg, schema io.Reader) (config *File, err error) { // read cfg and schema into byte arrays cfgBytes, err := io.ReadAll(cfg) @@ -101,10 +101,10 @@ func (config *File) GetProjects() []Project { return projects } -// CreateRsyncdConfig writes a rsyncd.conf file to the given writer based on the config +// CreateRSCYNDConfig writes a rsyncd.conf file to the given writer based on the config // // Consider passing a bufio.Write to this function -func (config *File) CreateRsyncdConfig(w io.Writer) error { +func (config *File) CreateRSCYNDConfig(w io.Writer) error { tmpl := `# This is a generated file. Do not edit manually. uid = nobody gid = nogroup diff --git a/config/tokens.go b/config/tokens.go index f6e1f08..c5e063b 100644 --- a/config/tokens.go +++ b/config/tokens.go @@ -11,6 +11,7 @@ type Tokens struct { Tokens []Token `toml:"tokens"` } +// ReadTokens reads the tokens.toml file into a Tokens struct func ReadTokens(r io.Reader) (tokens *Tokens, err error) { err = toml.NewDecoder(r).Decode(&tokens) if err != nil { @@ -38,6 +39,7 @@ type Token struct { Projects []string `toml:"projects"` } +// HasProject returns true if the token has permission to trigger a manual sync for the given project func (token *Token) HasProject(project string) bool { // Empty project list means all projects if len(token.Projects) == 0 { diff --git a/daily_health.go b/daily_health.go index 44ffea8..390cb42 100644 --- a/daily_health.go +++ b/daily_health.go @@ -13,6 +13,7 @@ import ( "github.com/wcharczuk/go-chart/v2/drawing" ) +// QueryDailyNginxStats gets the hourly nginx statistics from influxdb // You can paste this into the influxdb data explorer /* from(bucket: "public") @@ -41,14 +42,17 @@ func QueryDailyNginxStats() (*api.QueryTableResult, error) { return nil, errors.New("Error querying influxdb") } +// TimeSentPair is a simple product type for storing a time and the number of bytes sent type TimeSentPair struct { t time.Time sent int64 } +// PrepareDailySendStats prepares the daily send statistics for each distro +// // For each distro return a slice of (time, bytes_sent) pairs for each hour in the last 24 hours -// It should be expected that the returned slices will be of length 24 but it is not guaranteed -// It is guaranteed that the returned slices will be in chronological order +// It should be expected that the returned slices will be of length 24, but this is not guaranteed +// It is guaranteed that the returned time slices will be in chronological order func PrepareDailySendStats() (map[string][]TimeSentPair, error) { result, err := QueryDailyNginxStats() if err != nil { @@ -97,7 +101,7 @@ func PrepareDailySendStats() (map[string][]TimeSentPair, error) { return distroMap, nil } -// Create a bar chart for the bandwidth sent per hour +// CreateBarChart uses the go-chart library to create a bar chart from the given data func CreateBarChart(timeSentPairs []TimeSentPair, project string) chart.BarChart { style := chart.Style{ FillColor: drawing.ColorFromHex("#00bcd4"), diff --git a/influx.go b/influx.go index 639453a..29bda82 100644 --- a/influx.go +++ b/influx.go @@ -14,6 +14,7 @@ import ( var writer api.WriteAPI var reader api.QueryAPI +// SetupInfluxClients connects to influxdb and sets up the db clients func SetupInfluxClients(token string) { // create new client with default option for server url authenticate by token options := influxdb2.DefaultOptions() @@ -27,8 +28,7 @@ func SetupInfluxClients(token string) { reader = client.QueryAPI("COSI") } -// Gets the bytes sent for each project in the last 24 hours -// Returns a sorted list of bytes sent for each project +// QueryBytesSentByProject gets the bytes sent by each project in the last 24 hours func QueryBytesSentByProject() (map[string]int64, error) { // Map from short names to bytes sent bytesSent := make(map[string]int64) @@ -78,29 +78,30 @@ func QueryBytesSentByProject() (map[string]int64, error) { return bytesSent, nil } -// implements the sort interface +// LineChart is a type for sorting data needed to create a line chart type LineChart struct { Sent []float64 Recv []float64 Times []int64 } -func (l LineChart) Len() int { +func (l *LineChart) Len() int { return len(l.Sent) } -func (l LineChart) Swap(i, j int) { +func (l *LineChart) Swap(i, j int) { l.Sent[i], l.Sent[j] = l.Sent[j], l.Sent[i] l.Recv[i], l.Recv[j] = l.Recv[j], l.Recv[i] l.Times[i], l.Times[j] = l.Times[j], l.Times[i] } -func (l LineChart) Less(i, j int) bool { +func (l *LineChart) Less(i, j int) bool { return l.Times[i] < l.Times[j] } -// Gets the total network bytes sent and received for the last week in 1 hour blocks -func QueryWeeklyNetStats() (line LineChart, err error) { +// QueryWeeklyNetStats gets the bytes sent and received by the server in the last week +// Aggregates the data into 1 hour intervals +func QueryWeeklyNetStats() (line *LineChart, err error) { // You can paste this into the influxdb data explorer /* from(bucket: "system") @@ -114,7 +115,7 @@ func QueryWeeklyNetStats() (line LineChart, err error) { result, err := reader.Query(context.Background(), "from(bucket: \"system\") |> range(start: -7d, stop: now()) |> filter(fn: (r) => r[\"_measurement\"] == \"net\" and r[\"interface\"] == \"enp8s0\") |> filter(fn: (r) => r[\"_field\"] == \"bytes_sent\" or r[\"_field\"] == \"bytes_recv\") |> aggregateWindow(every: 1h, fn: last) |> derivative(unit: 1s, nonNegative: true) |> yield(name: \"nonnegative derivative\")") if err != nil { - return LineChart{}, err + return nil, err } sent := make([]float64, 0) @@ -154,7 +155,7 @@ func QueryWeeklyNetStats() (line LineChart, err error) { } } - line = LineChart{ + line = &LineChart{ Sent: sent, Recv: recv, Times: times, diff --git a/logging/logging.go b/logging/logging.go index b78eb5b..6c75bb8 100644 --- a/logging/logging.go +++ b/logging/logging.go @@ -126,7 +126,7 @@ func WarnF(format string, v ...interface{}) { logf(WarnT, format, v...) } -// Warning logs a message with [WARN] tag and a newline +// Warn logs a message with [WARN] tag and a newline func Warn(v ...interface{}) { logln(WarnT, v...) } diff --git a/main.go b/main.go index 6ad9819..97d9f80 100644 --- a/main.go +++ b/main.go @@ -210,11 +210,11 @@ func startNGINX(config *config.File) (chan<- NGINXLogEntry, time.Time, error) { return nginxMetrics, nginxLastUpdated, err } -func startRSYNC() (chan<- RsyncdLogEntry, time.Time, error) { +func startRSYNC() (chan<- RSCYNDLogEntry, time.Time, error) { rsyncAg := NewRSYNCProjectAggregator() - rsyncMetrics := make(chan RsyncdLogEntry) - rsyncLastUpdated, err := StartAggregator[RsyncdLogEntry](rsyncAg, rsyncMetrics, reader, writer) + rsyncMetrics := make(chan RSCYNDLogEntry) + rsyncLastUpdated, err := StartAggregator[RSCYNDLogEntry](rsyncAg, rsyncMetrics, reader, writer) return rsyncMetrics, rsyncLastUpdated, err } @@ -275,18 +275,18 @@ func main() { } // Update rsyncd.conf file based on the config file - rsyncd_conf, err := os.OpenFile("/etc/rsyncd.conf", os.O_CREATE|os.O_WRONLY, 0644) + rsyncdConf, err := os.OpenFile("/etc/rsyncd.conf", os.O_CREATE|os.O_WRONLY, 0644) if err != nil { logging.Error("Could not open rsyncd.conf: ", err.Error()) } - err = cfg.CreateRsyncdConfig(rsyncd_conf) + err = cfg.CreateRSCYNDConfig(rsyncdConf) if err != nil { logging.Error("Failed to create rsyncd.conf: ", err.Error()) } nginxChannels := make([]chan<- NGINXLogEntry, 0) nginxLastUpdated := time.Now() - rsyncChannels := make([]chan<- RsyncdLogEntry, 0) + rsyncChannels := make([]chan<- RSCYNDLogEntry, 0) rsyncLastUpdated := time.Now() if influxToken != "" { @@ -315,7 +315,7 @@ func main() { } manual := make(chan string) - scheduler := NewScheduler(cfg, context.Background()) + scheduler := NewScheduler(context.Background(), cfg) go scheduler.Start(manual) // torrent scheduler diff --git a/map.go b/map.go index 27485d9..3b88de2 100644 --- a/map.go +++ b/map.go @@ -99,6 +99,8 @@ func (c *client) write() { } } +// MapRouter adds map routes to the provided router +// Any messages sent to the broadcast channel will be forwarded to all connected clients func MapRouter(r *mux.Router, broadcast chan []byte) { r.HandleFunc("/ws", handleWebsocket) r.HandleFunc("/health", handleHealth) @@ -150,17 +152,17 @@ func entriesToMessages(entries <-chan NGINXLogEntry, messages chan<- []byte) { id := projects[entry.Project].ID // Get the location - lat_ := entry.City.Location.Latitude - long_ := entry.City.Location.Longitude + _lat := entry.City.Location.Latitude + _long := entry.City.Location.Longitude - if lat_ == 0 && long_ == 0 { + if _lat == 0 && _long == 0 { continue } // convert [-90, 90] latitude to [0, 4096] pixels - lat := int16((lat_ + 90) * 4096 / 180) + lat := int16((_lat + 90) * 4096 / 180) // convert [-180, 180] longitude to [0, 4096] pixels - long := int16((long_ + 180) * 4096 / 360) + long := int16((_long + 180) * 4096 / 360) // Create a new message msg := make([]byte, 5) diff --git a/scheduler/builder.go b/scheduler/builder.go new file mode 100644 index 0000000..8f76234 --- /dev/null +++ b/scheduler/builder.go @@ -0,0 +1,26 @@ +package scheduler + +// CalendarBuilder is a builder pattern for the Calendar struct +type CalendarBuilder[T any] struct { + tasks []T + times []uint +} + +// NewCalendarBuilder creates a new CalendarBuilder +func NewCalendarBuilder[T any]() CalendarBuilder[T] { + return CalendarBuilder[T]{ + tasks: make([]T, 0), + times: make([]uint, 0), + } +} + +// AddTask adds a task to the CalendarBuilder +func (b *CalendarBuilder[T]) AddTask(task T, timesPerDay uint) { + b.tasks = append(b.tasks, task) + b.times = append(b.times, timesPerDay) +} + +// Build builds the Calendar +func (b *CalendarBuilder[T]) Build() Calendar[T] { + return buildCalendar(b.tasks, b.times) +} diff --git a/scheduler/calander.go b/scheduler/calander.go index 784beba..294e7da 100644 --- a/scheduler/calander.go +++ b/scheduler/calander.go @@ -39,11 +39,10 @@ func (s *Calendar[T]) NextJob() (task T, dt time.Duration) { return s.tasks[s.iterator-1], dt } -// Scheduling algorithm -func BuildCalendar[T any](tasks []T, timesPerDay []uint) Calendar[T] { - total_jobs := uint(0) +func buildCalendar[T any](tasks []T, timesPerDay []uint) Calendar[T] { + totalJobs := uint(0) for _, n := range timesPerDay { - total_jobs += n + totalJobs += n } // Compute least common multiple of all sync frequencies @@ -73,10 +72,10 @@ func BuildCalendar[T any](tasks []T, timesPerDay []uint) Calendar[T] { lcm = lcm * n / a } - jobs := make([]T, total_jobs) - times := make([]float32, total_jobs) + jobs := make([]T, totalJobs) + times := make([]float32, totalJobs) - var interval float32 = 1.0 / float32(total_jobs) + var interval float32 = 1.0 / float32(totalJobs) c := 0 for i := uint(0); i < lcm; i++ { for idx, task := range tasks { @@ -85,7 +84,7 @@ func BuildCalendar[T any](tasks []T, timesPerDay []uint) Calendar[T] { // emit a job tasks[c] = task times[c] = interval * float32(c) - c += 1 + c++ } } } @@ -104,7 +103,8 @@ func (s *Calendar[T]) ForEach(f func(*T)) { } } -// Finds the first task that satisfies the predicate +// Find returns a pointer to the first task that satisfies the predicate +// If no task satisfies the predicate, nil is returned func (s *Calendar[T]) Find(f func(T) bool) *T { for i := range s.tasks { if f(s.tasks[i]) { diff --git a/sync.go b/sync.go index 09b2f11..dfb5e15 100644 --- a/sync.go +++ b/sync.go @@ -29,14 +29,14 @@ const ( TaskStatusStopped ) -// Tasks are the units of work to be preformed by the scheduler +// Task is the units of work to be preformed by the scheduler // // Each task runs in its own go-routine and the scheduler ensures that only one instance of task `Run` will be called at a time type Task interface { - Run(stdout io.Writer, stderr io.Writer, status chan<- logging.LogEntryT, context context.Context) TaskStatus + Run(context context.Context, stdout io.Writer, stderr io.Writer, status chan<- logging.LogEntryT) TaskStatus } -type sync_result_t struct { +type syncResult struct { start time.Time end time.Time status TaskStatus @@ -57,7 +57,7 @@ type SchedulerTask struct { short string queue *datarithms.CircularQueue[logging.LogEntryT] - results *datarithms.CircularQueue[sync_result_t] + results *datarithms.CircularQueue[syncResult] channel chan logging.LogEntryT stdout *bufio.Writer @@ -65,12 +65,12 @@ type SchedulerTask struct { task Task } -func NewScheduler(config *config.File, ctx context.Context) Scheduler { +// NewScheduler creates a new scheduler from a config.File +func NewScheduler(ctx context.Context, config *config.File) Scheduler { failed := false month := time.Now().UTC().Month() - tasks := make([]*SchedulerTask, 0, len(config.Projects)) - timesPerDay := make([]uint, 0, len(config.Projects)) + builer := scheduler.NewCalendarBuilder[*SchedulerTask]() for short, project := range config.Projects { var task Task @@ -78,7 +78,7 @@ func NewScheduler(config *config.File, ctx context.Context) Scheduler { switch project.SyncStyle { case "rsync": - task = NewRsyncTask(project.Rsync, short) + task = NewRSYNCTask(project.Rsync, short) syncsPerDay = project.Rsync.SyncsPerDay case "script": task = NewScriptTask(project.Script, short) @@ -88,7 +88,7 @@ func NewScheduler(config *config.File, ctx context.Context) Scheduler { } q := datarithms.NewCircularQueue[logging.LogEntryT](64) - results := datarithms.NewCircularQueue[sync_result_t](64) + results := datarithms.NewCircularQueue[syncResult](64) channel := make(chan logging.LogEntryT, 64) go func() { @@ -113,7 +113,7 @@ func NewScheduler(config *config.File, ctx context.Context) Scheduler { failed = true } - tasks = append(tasks, &SchedulerTask{ + builer.AddTask(&SchedulerTask{ running: false, short: short, queue: q, @@ -122,8 +122,7 @@ func NewScheduler(config *config.File, ctx context.Context) Scheduler { stdout: bufio.NewWriter(stdout), stderr: bufio.NewWriter(stderr), task: task, - }) - timesPerDay = append(timesPerDay, syncsPerDay) + }, syncsPerDay) } if failed { @@ -133,7 +132,7 @@ func NewScheduler(config *config.File, ctx context.Context) Scheduler { return Scheduler{ ctx: ctx, - calendar: scheduler.BuildCalendar[*SchedulerTask](tasks, timesPerDay), + calendar: builer.Build(), } } @@ -197,11 +196,11 @@ func (t *SchedulerTask) runTask(ctx context.Context) { go func() { start := time.Now() - status := t.task.Run(t.stdout, t.stderr, t.channel, ctx) + status := t.task.Run(ctx, t.stdout, t.stderr, t.channel) t.stdout.Flush() t.stderr.Flush() end := time.Now() - t.results.Push(sync_result_t{ + t.results.Push(syncResult{ start: start, end: end, status: status, diff --git a/sync_rsync.go b/sync_rsync.go index 1bc6ccd..2eddfca 100644 --- a/sync_rsync.go +++ b/sync_rsync.go @@ -37,7 +37,9 @@ func init() { rsyncErrorCodes[35] = "Timeout waiting for daemon connection" } -func RsyncErrorCodeToString(code int) string { +// RSYNCErrorCodeToString converts an rsync error code to a string +// If the error code is not known, it returns "Unknown" +func RSYNCErrorCodeToString(code int) string { if msg, ok := rsyncErrorCodes[code]; ok { return msg } @@ -45,8 +47,8 @@ func RsyncErrorCodeToString(code int) string { return "Unknown" } -// RsyncTask implements the Task interface from `scheduler` -type RsyncTask struct { +// RSYNCTask implements the Task interface from `scheduler` +type RSYNCTask struct { // Project `short` name short string args []string @@ -54,8 +56,8 @@ type RsyncTask struct { password string } -// NewRsyncTask creates a new RsyncTask from a config.Rsync -func NewRsyncTask(declaration *config.Rsync, short string) *RsyncTask { +// NewRSYNCTask creates a new RsyncTask from a config.Rsync +func NewRSYNCTask(declaration *config.Rsync, short string) *RSYNCTask { args := make([]string, 0) if declaration.User != "" { @@ -74,29 +76,28 @@ func NewRsyncTask(declaration *config.Rsync, short string) *RsyncTask { logging.Error("Failed to read password file:", err) } - return &RsyncTask{ + return &RSYNCTask{ short: short, args: args, stages: declaration.Stages, password: string(password), } - } else { - return &RsyncTask{ - short: short, - args: args, - stages: declaration.Stages, - password: "", - } + } + + return &RSYNCTask{ + short: short, + args: args, + stages: declaration.Stages, + password: "", } } // Run runs the script, blocking until it finishes -// See: Aggregator.Run -func (r *RsyncTask) Run(stdout, stderr io.Writer, status chan<- logging.LogEntryT, ctx context.Context) TaskStatus { +func (r *RSYNCTask) Run(ctx context.Context, stdout, stderr io.Writer, status chan<- logging.LogEntryT) TaskStatus { status <- logging.InfoLogEntry(fmt.Sprintf("%s: Starting rsync", r.short)) for i := 0; i < len(r.stages); i++ { - status := r.RunStage(stdout, stderr, status, ctx, i) + status := r.RunStage(ctx, stdout, stderr, status, i) if status != TaskStatusSuccess { return status } @@ -106,7 +107,7 @@ func (r *RsyncTask) Run(stdout, stderr io.Writer, status chan<- logging.LogEntry } // RunStage runs a single stage of the rsync task -func (r *RsyncTask) RunStage(stdout, stderr io.Writer, status chan<- logging.LogEntryT, ctx context.Context, stage int) TaskStatus { +func (r *RSYNCTask) RunStage(ctx context.Context, stdout, stderr io.Writer, status chan<- logging.LogEntryT, stage int) TaskStatus { // join r.args and r.stages[stage] args := make([]string, len(r.args)) copy(args, r.args) @@ -149,6 +150,6 @@ func (r *RsyncTask) RunStage(stdout, stderr io.Writer, status chan<- logging.Log return TaskStatusSuccess } - status <- logging.ErrorLogEntry(fmt.Sprintf("%s: Stage %d failed with exit code %d (%s)", r.short, stage, cmd.ProcessState.ExitCode(), RsyncErrorCodeToString(cmd.ProcessState.ExitCode()))) + status <- logging.ErrorLogEntry(fmt.Sprintf("%s: Stage %d failed with exit code %d (%s)", r.short, stage, cmd.ProcessState.ExitCode(), RSYNCErrorCodeToString(cmd.ProcessState.ExitCode()))) return TaskStatusFailure } diff --git a/sync_script.go b/sync_script.go index ea8cb92..4c5f07d 100644 --- a/sync_script.go +++ b/sync_script.go @@ -30,8 +30,7 @@ func NewScriptTask(declaration *config.Script, short string) *ScriptTask { } // Run runs the script, blocking until it finishes -// See: Aggregator.Run -func (s *ScriptTask) Run(stdout, stderr io.Writer, status chan<- logging.LogEntryT, ctx context.Context) TaskStatus { +func (s *ScriptTask) Run(ctx context.Context, stdout, stderr io.Writer, status chan<- logging.LogEntryT) TaskStatus { status <- logging.InfoLogEntry(fmt.Sprintf("%s: Starting script", s.short)) cmd := exec.Command(s.command, s.arguments...) cmd.Stdout = stdout diff --git a/webserver.go b/webserver.go index 345a2c1..2aad6f9 100644 --- a/webserver.go +++ b/webserver.go @@ -14,7 +14,7 @@ import ( var tmpls *template.Template var projects map[string]*config.Project -var projectsById []config.Project +var projectsByID []config.Project var projectsGrouped config.ProjectsGrouped var tokens *config.Tokens var dataLock = &sync.RWMutex{} @@ -40,7 +40,7 @@ func handleHome(w http.ResponseWriter, r *http.Request) { func handleMap(w http.ResponseWriter, r *http.Request) { dataLock.RLock() - err := tmpls.ExecuteTemplate(w, "map.gohtml", projectsById) + err := tmpls.ExecuteTemplate(w, "map.gohtml", projectsByID) dataLock.RUnlock() if err != nil { @@ -172,17 +172,17 @@ func handleHealth(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } -// Reload distributions and software arrays +// WebServerLoadConfig loads a new config file to define the projects func WebServerLoadConfig(cfg *config.File, t *config.Tokens) { dataLock.Lock() - projectsById = cfg.GetProjects() + projectsByID = cfg.GetProjects() projectsGrouped = cfg.GetProjectsByPage() projects = cfg.Projects tokens = t dataLock.Unlock() } -// HandleWebserver starts the webserver and listens for incoming connections +// HandleWebServer starts the webserver and listens for incoming connections // manual is a channel that project short names are sent down to manually trigger a projects rsync // entries is a channel that contains log entries that are disabled by the mirror map func HandleWebServer(manual chan<- string, entries <-chan NGINXLogEntry) {