diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..ba0c789 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,7 @@ +.dockerignore +.gitignore +.idea/ +.github/ +docker-compose.yml +Dockerfile +README.md diff --git a/.hadolint.yaml b/.hadolint.yaml new file mode 100644 index 0000000..cece24b --- /dev/null +++ b/.hadolint.yaml @@ -0,0 +1,2 @@ +ignored: + - DL3007 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..9edd29b --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,30 @@ +fail_fast: false + +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.3.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: mixed-line-ending + args: [ '--fix=lf' ] + - id: check-yaml + - id: check-json + - id: check-added-large-files + - id: pretty-format-json + args: [ '--autofix', '--no-sort-keys' ] + + - repo: https://github.com/golangci/golangci-lint + rev: v1.46.2 + hooks: + - id: golangci-lint + + - repo: https://github.com/hadolint/hadolint + rev: v2.10.0 + hooks: + - id: hadolint + + - repo: https://github.com/segmentio/golines + rev: v0.10.0 + hooks: + - id: golines diff --git a/Dockerfile b/Dockerfile index ec85243..1c58f0e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,20 +2,20 @@ ARG GO_VERSION=1.18 FROM golang:${GO_VERSION}-alpine as builder -WORKDIR /go/src/ +WORKDIR /build + +COPY go.mod go.sum ./ +RUN go mod download COPY . . ENV CGO_ENABLED=0 -ENV GO_OSARCH="linux/amd64" - -RUN go build -o /go/bin/binary main.go +RUN go build -ldflags "-s -w" -o ./app . -FROM gcr.io/distroless/base +FROM gcr.io/distroless/base:latest -COPY --from=builder /go/bin/binary /go/bin/binary +COPY --from=builder /build/app /app -ENV OUTPUT_DIRECTORY=/tmp/output -VOLUME /tmp/output +ENV OUTPUT_DIRECTORY=/tmp/records -CMD ["/go/bin/binary"] +CMD ["/app"] diff --git a/README.md b/README.md index d204362..4e907e4 100644 --- a/README.md +++ b/README.md @@ -5,14 +5,14 @@ sites, and it will successfully detect file type and download content. In an endless loop with a break on delay -(to avoid creating a heavy load on the target server), +(to avoid creating a heavy load on the target server), the application will wait for a `OK 200` response, after which it will copy the response body into the generated file. ## Features * File type detection (without losing bytes) -* Waiting for broadcast availability +* Waiting for broadcast availability * Delay for avoiding creating a heavy load on the target server * Automatic generation of human-readable file names * Unlimited number of files (Limited by disk space) @@ -20,17 +20,19 @@ body into the generated file. ## Configuration -You can use `dumper.yml` file or **environment variables** for +You can use `dumper.yml` file or **environment variables** for configuration. -| yml | env | default | description | -|------------------|------------------|------------|--------------------------------------------------------------| -| source_url | SOURCE_URL | | Address from where to download information | -| file_prefix | FILE_PREFIX | | Prefix for files | -| file_date_format | FILE_DATE_FORMAT | 02_01_2006 | Date format for files | -| output_directory | OUTPUT_DIRECTORY | . | Directory where to save files | -| timeout | TIMEOUT | 10 | Delay between sending requests if response status is not 200 | -| log_level | LOG_LEVEL | info | Log level | +| yml | env | default | description | +|------------------|------------------|------------|-------------------------------------------------------------| +| source_url | SOURCE_URL | | Address from where to download information | +| file_prefix | FILE_PREFIX | | Prefix for files | +| schedule | SCHEDULE | | Schedule for downloading information in cron format | +| duration | DURATION | | Duration of the dump | +| delay | DELAY | 5s | Delay between request in case of interruption of the stream | +| file_date_format | FILE_DATE_FORMAT | 02_01_2006 | Date format for files | +| output_directory | OUTPUT_DIRECTORY | . | Directory where to save files | +| log_level | LOG_LEVEL | info | Log level | ## Run with [docker compose](https://docs.docker.com/compose/) diff --git a/copier/file_builder.go b/copier/file_builder.go index c028f63..40e55ac 100644 --- a/copier/file_builder.go +++ b/copier/file_builder.go @@ -67,5 +67,10 @@ func (f *DatedFileBuilder) GetFileName(ext string) string { } func (f *DatedFileBuilder) GetOutput(ext string) (io.WriteCloser, error) { - return os.Create(f.GetFileName(ext)) + filename := f.GetFileName(ext) + return os.OpenFile( + filename, + os.O_CREATE|os.O_WRONLY, + fs.ModePerm, + ) } diff --git a/copier/runner.go b/copier/runner.go new file mode 100644 index 0000000..9cb5d09 --- /dev/null +++ b/copier/runner.go @@ -0,0 +1,83 @@ +package copier + +import ( + "fmt" + "github.com/robfig/cron/v3" + "github.com/rs/zerolog/log" + "time" +) + +const ( + readableTimeLayout = "02.01.2006 15:04:05" +) + +type Runner struct { + copier *StreamCopier +} + +func NewRunner(copier *StreamCopier) *Runner { + return &Runner{ + copier: copier, + } +} + +func (r *Runner) ScheduleRecording( + cronString string, + duration time.Duration, + url string, + outputFunc GetOutputFunc, + delay time.Duration, +) error { + c := cron.New() + entryID, err := c.AddFunc(cronString, func() { r.record(url, outputFunc, duration, delay) }) + if err != nil { + return err + } + + c.Start() + + entry := getEntryByID(c, entryID) + if entry == nil { + return fmt.Errorf("entry with id %v not found", entryID) + } + + r.copier.logger.Info(). + Str("url", url). + Str("delay", delay.String()). + Str("next_call", entry.Next.Format(readableTimeLayout)). + Str("duration", duration.String()). + Msg("Starting listening") + + return nil +} + +func (r *Runner) record( + url string, + outputFunc GetOutputFunc, + duration time.Duration, + delay time.Duration, +) { + start := time.Now() + finish := start.Add(duration) + for { + log.Debug().Msg("RUN") + if time.Now().After(finish) { + return + } + if err := r.copier.CopyStream(url, outputFunc); err != nil && err != ErrStreamClosed { + r.copier.logger.Error().Err(err).Msg("Error copying stream") + return + } + time.Sleep(delay) + } +} + +func getEntryByID(c *cron.Cron, id cron.EntryID) *cron.Entry { + for _, entry := range c.Entries() { + if entry.ID == id { + return &entry + } + } + + return nil +} diff --git a/copier/stream_copier.go b/copier/stream_copier.go index 7d760d0..b3ae75b 100644 --- a/copier/stream_copier.go +++ b/copier/stream_copier.go @@ -9,7 +9,6 @@ import ( "io" "net/http" "os" - "time" ) const ( @@ -58,6 +57,8 @@ func (d *StreamCopier) CopyStream(url string, getOutput GetOutputFunc) error { return ErrStreamClosed } + log.Info().Msg("recording started") + buf := bufio.NewReader(resp.Body) fileHeader, err := buf.Peek(fileHeaderSize) if err != nil && err != io.EOF { @@ -83,18 +84,6 @@ func (d *StreamCopier) CopyStream(url string, getOutput GetOutputFunc) error { bytesCopied, err := io.Copy(output, buf) log.Debug().Int64("bytes_copied", bytesCopied).Msg("copied bytes") + log.Info().Msg("recording finished") return err } - -func (d *StreamCopier) ListenAndCopy( - url string, - getOutput GetOutputFunc, - delay time.Duration, -) error { - for { - if err := d.CopyStream(url, getOutput); err != nil && err != ErrStreamClosed { - return err - } - time.Sleep(delay) - } -} diff --git a/copier/stream_copier_test.go b/copier/stream_copier_test.go index 89b2db2..505350f 100644 --- a/copier/stream_copier_test.go +++ b/copier/stream_copier_test.go @@ -5,6 +5,7 @@ import ( "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "io" + "log" "net/http" "net/http/httptest" "testing" @@ -20,41 +21,95 @@ func (c *closableBuffer) Close() error { return nil } -func TestStreamCopier_CopyStream_Success(t *testing.T) { +func TestStreamCopier(t *testing.T) { t.Parallel() - serverOutput := []byte("Hello World!") + testCases := []struct { + name string + handler http.HandlerFunc + err error + output []byte + }{ + { + name: "success", + handler: handlerSuccess, + output: []byte("Hello World!"), + err: nil, + }, + { + name: "not found", + handler: handlerNotFound, + output: []byte{}, + err: ErrStreamClosed, + }, + { + name: "with interrupt", + handler: getHandlerWithInterrupt(), + output: []byte("12"), + err: nil, + }, + } - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - _, err := w.Write(serverOutput) - assert.NoError(t, err) - })) - defer server.Close() + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - copier := NewStreamCopier(http.DefaultClient, testLogger) + server := httptest.NewServer(tc.handler) + defer server.Close() - output := new(closableBuffer) + copier := NewStreamCopier(http.DefaultClient, testLogger) + output := new(closableBuffer) - err := copier.CopyStream(server.URL, func(_ string) (io.WriteCloser, error) { - return output, nil - }) - assert.NoError(t, err) - assert.Equal(t, serverOutput, output.Bytes()) + err := copier.CopyStream(server.URL, func(_ string) (io.WriteCloser, error) { + return output, nil + }) + assert.Equal(t, tc.err, err) + }) + } } -func TestStreamCopier_CopyStream_WithStreamClosed(t *testing.T) { - t.Parallel() - - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusNotFound) - })) - defer server.Close() +func handlerSuccess(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte("Hello World!")) + if err != nil { + log.Println(err) + } +} - copier := NewStreamCopier(http.DefaultClient, testLogger) +func handlerNotFound(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNotFound) +} - err := copier.CopyStream(server.URL, func(_ string) (io.WriteCloser, error) { - return new(closableBuffer), nil - }) +func getHandlerWithInterrupt() http.HandlerFunc { + responses := []struct { + status int + body []byte + }{ + { + status: http.StatusOK, + body: []byte("1"), + }, + { + status: http.StatusNotFound, + }, + { + status: http.StatusOK, + body: []byte("2"), + }, + } + return func(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + http.NotFound(w, r) + return + } - assert.ErrorIs(t, err, ErrStreamClosed) + for _, response := range responses { + w.WriteHeader(response.status) + _, err := w.Write(response.body) + log.Println(err) + flusher.Flush() + } + } } diff --git a/go.mod b/go.mod index f107bd2..b45ca31 100644 --- a/go.mod +++ b/go.mod @@ -6,13 +6,14 @@ require ( github.com/gabriel-vasile/mimetype v1.4.0 github.com/google/uuid v1.3.0 github.com/ilyakaznacheev/cleanenv v1.2.6 + github.com/robfig/cron/v3 v3.0.1 github.com/rs/zerolog v1.27.0 github.com/stretchr/testify v1.7.2 ) require ( github.com/BurntSushi/toml v1.1.0 // indirect - github.com/davecgh/go-spew v1.1.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/joho/godotenv v1.4.0 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect diff --git a/go.sum b/go.sum index ee6f3db..4b78230 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,9 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I= github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gabriel-vasile/mimetype v1.4.0 h1:Cn9dkdYsMIu56tGho+fqzh7XmvY2YyGU0FnbhiOsEro= github.com/gabriel-vasile/mimetype v1.4.0/go.mod h1:fA8fi6KUiG7MgQQ+mEWotXoEOvmxRtOJlERCzSmRvr8= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -21,6 +22,8 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.27.0 h1:1T7qCieN22GVc8S4Q2yuexzBb1EqjbgjSH9RohbMjKs= github.com/rs/zerolog v1.27.0/go.mod h1:7frBqO0oezxmnO7GF86FY++uy8I0Tk/If5ni1G9Qc0U= diff --git a/main.go b/main.go index af00928..a44b648 100644 --- a/main.go +++ b/main.go @@ -1,9 +1,11 @@ package main import ( + "context" syslog "log" "net/http" "os" + "os/signal" "time" "github.com/F0rzend/radiot_dumper/copier" @@ -17,12 +19,14 @@ const ( ) type Config struct { - SourceURL string `yaml:"source_url" env:"SOURCE_URL"` - FilePrefix string `yaml:"file_prefix" env:"FILE_PREFIX"` - FileDateFormat string `yaml:"file_date_format" env:"FILE_DATE_FORMAT" env-default:"02_01_2006"` + SourceURL string `yaml:"source_url" env:"SOURCE_URL"` + FilePrefix string `yaml:"file_prefix" env:"FILE_PREFIX"` + Schedule string `yaml:"schedule" env:"SCHEDULE"` + Duration string `yaml:"duration" env:"DURATION"` OutputDirectory string `yaml:"output_directory" env:"OUTPUT_DIRECTORY"` - Delay string `yaml:"delay" env:"DELAY" env-default:"10s"` - LogLevel string `yaml:"log_level" env:"LOG_LEVEL" env-default:"info"` + FileDateFormat string `yaml:"file_date_format" env:"FILE_DATE_FORMAT" env-default:"02_01_2006"` + Delay string `yaml:"delay" env:"DELAY" env-default:"5s"` + LogLevel string `yaml:"log_level" env:"LOG_LEVEL" env-default:"info"` } func Run() error { @@ -43,6 +47,10 @@ func Run() error { if err != nil { return err } + duration, err := time.ParseDuration(cfg.Duration) + if err != nil { + return err + } datedFileBuilder := copier.NewDatedFileBuilder( cfg.OutputDirectory, @@ -65,12 +73,22 @@ func Run() error { logger, ) - logger.Info().Str("url", cfg.SourceURL).Dur("delay", delay).Msg("Starting dumping") - return streamCopier.ListenAndCopy( + runner := copier.NewRunner(streamCopier) + + ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + + if err = runner.ScheduleRecording( + cfg.Schedule, + duration, cfg.SourceURL, datedFileBuilder.GetOutput, delay, - ) + ); err != nil { + logger.Error().Err(err).Msg("Error scheduling recording") + } + + <-ctx.Done() + return nil } func main() {