Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
address lints
Browse files Browse the repository at this point in the history
  • Loading branch information
Alextopher committed Sep 21, 2023
1 parent 43238f5 commit 5c45739
Show file tree
Hide file tree
Showing 16 changed files with 141 additions and 97 deletions.
8 changes: 8 additions & 0 deletions aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,11 @@ type NetStat struct {
BytesRecv int64
Requests int64
}

// ParseLineError is an error type when parsing a line in the rsyncd or nginx feed
type ParseLineError struct{}

// Error returns the error message
func (e ParseLineError) Error() string {
return "Failed to parse line"
}
7 changes: 4 additions & 3 deletions aggregator_nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func (aggregator *NGINXProjectAggregator) Aggregate(entry NGINXLogEntry) {
}
}

// Send the aggregated statistics to influxdb
func (aggregator *NGINXProjectAggregator) Send(writer api.WriteAPI) {
t := time.Now()

Expand All @@ -213,7 +214,7 @@ type NGINXLogEntry struct {
Time time.Time
Method string
Project string
Url string
URL string
Version string
Status int
BytesSent int64
Expand Down Expand Up @@ -323,11 +324,11 @@ func parseNginxLine(line string) (entry NGINXLogEntry, err error) {
return entry, errors.New("invalid number of strings in request")
}
entry.Method = split[0]
entry.Url = split[1]
entry.URL = split[1]
entry.Version = split[2]

// Project is the top part of the URL path
u, err := url.Parse(entry.Url)
u, err := url.Parse(entry.URL)
if err != nil {
log.Fatal(err)
}
Expand Down
33 changes: 17 additions & 16 deletions aggregator_rsyncd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@ import (
"github.com/nxadm/tail"
)

type RsyncdAggregator struct {
// RSYNCDAggregator is an Aggregator for rsyncd statistics
type RSYNCDAggregator struct {
stat NetStat
}

func NewRSYNCProjectAggregator() *RsyncdAggregator {
return &RsyncdAggregator{}
// NewRSYNCProjectAggregator returns a new RSYNCDAggregator
func NewRSYNCProjectAggregator() *RSYNCDAggregator {
return &RSYNCDAggregator{}
}

func (a *RsyncdAggregator) Init(reader api.QueryAPI) (lastUpdated time.Time, err error) {
// Init initializes the aggregator with the last known value from influxdb
func (a *RSYNCDAggregator) Init(reader api.QueryAPI) (lastUpdated time.Time, err error) {
// You can paste this into the influxdb data explorer
/*
from(bucket: "stats")
Expand Down Expand Up @@ -103,13 +106,15 @@ func (a *RsyncdAggregator) Init(reader api.QueryAPI) (lastUpdated time.Time, err
return lastUpdated, nil
}

func (a *RsyncdAggregator) Aggregate(entry RsyncdLogEntry) {
// Aggregate adds a RSCYNDLogEntry into the aggregator
func (a *RSYNCDAggregator) Aggregate(entry RSCYNDLogEntry) {
a.stat.BytesSent += entry.sent
a.stat.BytesRecv += entry.recv
a.stat.Requests++
}

func (a *RsyncdAggregator) Send(writer api.WriteAPI) {
// Send the aggregated statistics to influxdb
func (a *RSYNCDAggregator) Send(writer api.WriteAPI) {
t := time.Now()

p := influxdb2.NewPoint("rsyncd", map[string]string{}, map[string]interface{}{
Expand All @@ -120,13 +125,15 @@ func (a *RsyncdAggregator) Send(writer api.WriteAPI) {
writer.WritePoint(p)
}

type RsyncdLogEntry struct {
// RSCYNDLogEntry is a struct that represents a single line in the rsyncd log file
type RSCYNDLogEntry struct {
time time.Time
sent int64
recv int64
}

func TailRSYNCLogFile(logFile string, lastUpdated time.Time, channels []chan<- RsyncdLogEntry) {
// TailRSYNCLogFile tails the rsyncd log file and sends each line to the given channel
func TailRSYNCLogFile(logFile string, lastUpdated time.Time, channels []chan<- RSCYNDLogEntry) {
// Find the offset of the line where the date is past lastUpdated
start := time.Now()

Expand Down Expand Up @@ -163,7 +170,7 @@ func TailRSYNCLogFile(logFile string, lastUpdated time.Time, channels []chan<- R

// Parse each line as we receive it
for line := range tail.Lines {
entry, err := parseRsyncdLine(line.Text)
entry, err := parseRSCYNDLine(line.Text)

if err == nil {
for ch := range channels {
Expand All @@ -173,12 +180,6 @@ func TailRSYNCLogFile(logFile string, lastUpdated time.Time, channels []chan<- R
}
}

type ParseLineError struct{}

func (e ParseLineError) Error() string {
return "Failed to parse line"
}

func parseRSYNCDate(line string) (time.Time, error) {
// Split the line over whitespace
parts := strings.Split(line, " ")
Expand All @@ -201,7 +202,7 @@ func parseRSYNCDate(line string) (time.Time, error) {
return t, nil
}

func parseRsyncdLine(line string) (entry RsyncdLogEntry, err error) {
func parseRSCYNDLine(line string) (entry RSCYNDLogEntry, err error) {
// 2022/04/20 20:00:10 [pid] sent XXX bytes received XXX bytes total size XXX

// Split the line over whitespace
Expand Down
6 changes: 3 additions & 3 deletions config/configFile.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type File struct {
Projects map[string]*Project `json:"mirrors"`
}

// ParseConfig reads the main mirrors.json file and checks that it matches the schema
// ReadProjectConfig reads the main mirrors.json file and checks that it matches the schema
func ReadProjectConfig(cfg, schema io.Reader) (config *File, err error) {
// read cfg and schema into byte arrays
cfgBytes, err := io.ReadAll(cfg)
Expand Down Expand Up @@ -101,10 +101,10 @@ func (config *File) GetProjects() []Project {
return projects
}

// CreateRsyncdConfig writes a rsyncd.conf file to the given writer based on the config
// CreateRSCYNDConfig writes a rsyncd.conf file to the given writer based on the config
//
// Consider passing a bufio.Write to this function
func (config *File) CreateRsyncdConfig(w io.Writer) error {
func (config *File) CreateRSCYNDConfig(w io.Writer) error {
tmpl := `# This is a generated file. Do not edit manually.
uid = nobody
gid = nogroup
Expand Down
2 changes: 2 additions & 0 deletions config/tokens.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Tokens struct {
Tokens []Token `toml:"tokens"`
}

// ReadTokens reads the tokens.toml file into a Tokens struct
func ReadTokens(r io.Reader) (tokens *Tokens, err error) {
err = toml.NewDecoder(r).Decode(&tokens)
if err != nil {
Expand Down Expand Up @@ -38,6 +39,7 @@ type Token struct {
Projects []string `toml:"projects"`
}

// HasProject returns true if the token has permission to trigger a manual sync for the given project
func (token *Token) HasProject(project string) bool {
// Empty project list means all projects
if len(token.Projects) == 0 {
Expand Down
10 changes: 7 additions & 3 deletions daily_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/wcharczuk/go-chart/v2/drawing"
)

// QueryDailyNginxStats gets the hourly nginx statistics from influxdb
// You can paste this into the influxdb data explorer
/*
from(bucket: "public")
Expand Down Expand Up @@ -41,14 +42,17 @@ func QueryDailyNginxStats() (*api.QueryTableResult, error) {
return nil, errors.New("Error querying influxdb")
}

// TimeSentPair is a simple product type for storing a time and the number of bytes sent
type TimeSentPair struct {
t time.Time
sent int64
}

// PrepareDailySendStats prepares the daily send statistics for each distro
//
// For each distro return a slice of (time, bytes_sent) pairs for each hour in the last 24 hours
// It should be expected that the returned slices will be of length 24 but it is not guaranteed
// It is guaranteed that the returned slices will be in chronological order
// It should be expected that the returned slices will be of length 24, but this is not guaranteed
// It is guaranteed that the returned time slices will be in chronological order
func PrepareDailySendStats() (map[string][]TimeSentPair, error) {
result, err := QueryDailyNginxStats()
if err != nil {
Expand Down Expand Up @@ -97,7 +101,7 @@ func PrepareDailySendStats() (map[string][]TimeSentPair, error) {
return distroMap, nil
}

// Create a bar chart for the bandwidth sent per hour
// CreateBarChart uses the go-chart library to create a bar chart from the given data
func CreateBarChart(timeSentPairs []TimeSentPair, project string) chart.BarChart {
style := chart.Style{
FillColor: drawing.ColorFromHex("#00bcd4"),
Expand Down
21 changes: 11 additions & 10 deletions influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
var writer api.WriteAPI
var reader api.QueryAPI

// SetupInfluxClients connects to influxdb and sets up the db clients
func SetupInfluxClients(token string) {
// create new client with default option for server url authenticate by token
options := influxdb2.DefaultOptions()
Expand All @@ -27,8 +28,7 @@ func SetupInfluxClients(token string) {
reader = client.QueryAPI("COSI")
}

// Gets the bytes sent for each project in the last 24 hours
// Returns a sorted list of bytes sent for each project
// QueryBytesSentByProject gets the bytes sent by each project in the last 24 hours
func QueryBytesSentByProject() (map[string]int64, error) {
// Map from short names to bytes sent
bytesSent := make(map[string]int64)
Expand Down Expand Up @@ -78,29 +78,30 @@ func QueryBytesSentByProject() (map[string]int64, error) {
return bytesSent, nil
}

// implements the sort interface
// LineChart is a type for sorting data needed to create a line chart
type LineChart struct {
Sent []float64
Recv []float64
Times []int64
}

func (l LineChart) Len() int {
func (l *LineChart) Len() int {
return len(l.Sent)
}

func (l LineChart) Swap(i, j int) {
func (l *LineChart) Swap(i, j int) {
l.Sent[i], l.Sent[j] = l.Sent[j], l.Sent[i]
l.Recv[i], l.Recv[j] = l.Recv[j], l.Recv[i]
l.Times[i], l.Times[j] = l.Times[j], l.Times[i]
}

func (l LineChart) Less(i, j int) bool {
func (l *LineChart) Less(i, j int) bool {
return l.Times[i] < l.Times[j]
}

// Gets the total network bytes sent and received for the last week in 1 hour blocks
func QueryWeeklyNetStats() (line LineChart, err error) {
// QueryWeeklyNetStats gets the bytes sent and received by the server in the last week
// Aggregates the data into 1 hour intervals
func QueryWeeklyNetStats() (line *LineChart, err error) {
// You can paste this into the influxdb data explorer
/*
from(bucket: "system")
Expand All @@ -114,7 +115,7 @@ func QueryWeeklyNetStats() (line LineChart, err error) {
result, err := reader.Query(context.Background(), "from(bucket: \"system\") |> range(start: -7d, stop: now()) |> filter(fn: (r) => r[\"_measurement\"] == \"net\" and r[\"interface\"] == \"enp8s0\") |> filter(fn: (r) => r[\"_field\"] == \"bytes_sent\" or r[\"_field\"] == \"bytes_recv\") |> aggregateWindow(every: 1h, fn: last) |> derivative(unit: 1s, nonNegative: true) |> yield(name: \"nonnegative derivative\")")

if err != nil {
return LineChart{}, err
return nil, err
}

sent := make([]float64, 0)
Expand Down Expand Up @@ -154,7 +155,7 @@ func QueryWeeklyNetStats() (line LineChart, err error) {
}
}

line = LineChart{
line = &LineChart{
Sent: sent,
Recv: recv,
Times: times,
Expand Down
2 changes: 1 addition & 1 deletion logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func WarnF(format string, v ...interface{}) {
logf(WarnT, format, v...)
}

// Warning logs a message with [WARN] tag and a newline
// Warn logs a message with [WARN] tag and a newline
func Warn(v ...interface{}) {
logln(WarnT, v...)
}
Expand Down
14 changes: 7 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,11 @@ func startNGINX(config *config.File) (chan<- NGINXLogEntry, time.Time, error) {
return nginxMetrics, nginxLastUpdated, err
}

func startRSYNC() (chan<- RsyncdLogEntry, time.Time, error) {
func startRSYNC() (chan<- RSCYNDLogEntry, time.Time, error) {
rsyncAg := NewRSYNCProjectAggregator()

rsyncMetrics := make(chan RsyncdLogEntry)
rsyncLastUpdated, err := StartAggregator[RsyncdLogEntry](rsyncAg, rsyncMetrics, reader, writer)
rsyncMetrics := make(chan RSCYNDLogEntry)
rsyncLastUpdated, err := StartAggregator[RSCYNDLogEntry](rsyncAg, rsyncMetrics, reader, writer)

return rsyncMetrics, rsyncLastUpdated, err
}
Expand Down Expand Up @@ -275,18 +275,18 @@ func main() {
}

// Update rsyncd.conf file based on the config file
rsyncd_conf, err := os.OpenFile("/etc/rsyncd.conf", os.O_CREATE|os.O_WRONLY, 0644)
rsyncdConf, err := os.OpenFile("/etc/rsyncd.conf", os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
logging.Error("Could not open rsyncd.conf: ", err.Error())
}
err = cfg.CreateRsyncdConfig(rsyncd_conf)
err = cfg.CreateRSCYNDConfig(rsyncdConf)
if err != nil {
logging.Error("Failed to create rsyncd.conf: ", err.Error())
}

nginxChannels := make([]chan<- NGINXLogEntry, 0)
nginxLastUpdated := time.Now()
rsyncChannels := make([]chan<- RsyncdLogEntry, 0)
rsyncChannels := make([]chan<- RSCYNDLogEntry, 0)
rsyncLastUpdated := time.Now()

if influxToken != "" {
Expand Down Expand Up @@ -315,7 +315,7 @@ func main() {
}

manual := make(chan string)
scheduler := NewScheduler(cfg, context.Background())
scheduler := NewScheduler(context.Background(), cfg)
go scheduler.Start(manual)

// torrent scheduler
Expand Down
12 changes: 7 additions & 5 deletions map.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ func (c *client) write() {
}
}

// MapRouter adds map routes to the provided router
// Any messages sent to the broadcast channel will be forwarded to all connected clients
func MapRouter(r *mux.Router, broadcast chan []byte) {
r.HandleFunc("/ws", handleWebsocket)
r.HandleFunc("/health", handleHealth)
Expand Down Expand Up @@ -150,17 +152,17 @@ func entriesToMessages(entries <-chan NGINXLogEntry, messages chan<- []byte) {
id := projects[entry.Project].ID

// Get the location
lat_ := entry.City.Location.Latitude
long_ := entry.City.Location.Longitude
_lat := entry.City.Location.Latitude
_long := entry.City.Location.Longitude

if lat_ == 0 && long_ == 0 {
if _lat == 0 && _long == 0 {
continue
}

// convert [-90, 90] latitude to [0, 4096] pixels
lat := int16((lat_ + 90) * 4096 / 180)
lat := int16((_lat + 90) * 4096 / 180)
// convert [-180, 180] longitude to [0, 4096] pixels
long := int16((long_ + 180) * 4096 / 360)
long := int16((_long + 180) * 4096 / 360)

// Create a new message
msg := make([]byte, 5)
Expand Down
26 changes: 26 additions & 0 deletions scheduler/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package scheduler

// CalendarBuilder is a builder pattern for the Calendar struct
type CalendarBuilder[T any] struct {
tasks []T
times []uint
}

// NewCalendarBuilder creates a new CalendarBuilder
func NewCalendarBuilder[T any]() CalendarBuilder[T] {
return CalendarBuilder[T]{
tasks: make([]T, 0),
times: make([]uint, 0),
}
}

// AddTask adds a task to the CalendarBuilder
func (b *CalendarBuilder[T]) AddTask(task T, timesPerDay uint) {
b.tasks = append(b.tasks, task)
b.times = append(b.times, timesPerDay)
}

// Build builds the Calendar
func (b *CalendarBuilder[T]) Build() Calendar[T] {
return buildCalendar(b.tasks, b.times)
}
Loading

0 comments on commit 5c45739

Please sign in to comment.