diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 2c0f23d..1dbcd44 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -15,7 +15,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: 1.16 + go-version: 1.19 - name: Run GoReleaser uses: goreleaser/goreleaser-action@v3 diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 1877ee1..2f51f9f 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -17,7 +17,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: 1.16 + go-version: 1.19 - name: Run GoReleaser uses: goreleaser/goreleaser-action@v3 diff --git a/README.md b/README.md index c4519f9..578a920 100644 --- a/README.md +++ b/README.md @@ -131,6 +131,17 @@ job "foo" { } ``` +To redirect the output of a job to a separate file, `stdout` and/or `stderr` can be specified: + +```hcl +job "foo" { + command = "/usr/local/bin/foo" + args = ["bar"] + stdout = "/tmp/foo.log" + stderr = "/tmp/foo-errors.log" +} +``` + You can configure a Job to watch files and to send a signal to the managed process if that file changes. This can be used, for example, to send a `SIGHUP` to a process to reload its configuration file when it changes. ```hcl @@ -401,6 +412,30 @@ Flags: -h, --help help for mittnitectl Use "mittnitectl [command] --help" for more information about a command. +``` +```shell +$ mittnitectl job --help +This command can be used to control a managed job. + +Usage: + mittnitectl job [command] + +Available Commands: + list List jobs + logs Get logs from job + restart Restart a job + start Start a job + status Show job status + stop Stop a job + +Flags: + -h, --help help for job + +Global Flags: + --api-address string write mittnites process id to this file (default "unix:///var/run/mittnite.sock") + +Use "mittnitectl job [command] --help" for more information about a command. +``` ### job diff --git a/cmd/job.go b/cmd/job.go index d2bad59..0536425 100644 --- a/cmd/job.go +++ b/cmd/job.go @@ -1,44 +1,15 @@ package cmd import ( - "github.com/mittwald/mittnite/pkg/cli" - log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) func init() { ctlCommand.AddCommand(jobCommand) - jobCommand.SetHelpTemplate(`{{.Long}} - -Usage: - {{.UseLine}} - -Arguments: - name: the name of the job - action: possible values are "start", "restart", "stop" and "status" - -Flags: - {{.LocalFlags.FlagUsages | trimTrailingWhitespaces}} - -Global Flags: - {{.InheritedFlags.FlagUsages | trimTrailingWhitespaces}} -`) } var jobCommand = &cobra.Command{ - Use: "job ", - Args: cobra.ExactArgs(2), - ArgAliases: []string{"name", "action"}, - Short: "Control a job via command line", - Long: "This command can be used to control a job managed by mittnite.", - Run: func(cmd *cobra.Command, args []string) { - job := args[0] - action := args[1] - apiClient := cli.NewApiClient(apiAddress) - - resp := apiClient.CallAction(job, action) - if err := resp.Print(); err != nil { - log.Errorf("failed to print output: %s", err.Error()) - } - }, + Use: "job", + Short: "Control a job via command line", + Long: "This command can be used to control a managed job.", } diff --git a/cmd/job_actions.go b/cmd/job_actions.go new file mode 100644 index 0000000..6afc9be --- /dev/null +++ b/cmd/job_actions.go @@ -0,0 +1,34 @@ +package cmd + +import ( + "fmt" + "github.com/mittwald/mittnite/pkg/cli" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +func init() { + jobCommand.AddCommand(buildJobActionCommand("start", "Start a job", "This command can be used to start a managed job.")) + jobCommand.AddCommand(buildJobActionCommand("restart", "Restart a job", "This command can be used to restart a managed job.")) + jobCommand.AddCommand(buildJobActionCommand("stop", "Stop a job", "This command can be used to stop a managed job.")) + jobCommand.AddCommand(buildJobActionCommand("status", "Show job status", "This command can be used to show the status of a managed job.")) +} + +func buildJobActionCommand(action string, shortDesc, longDesc string) *cobra.Command { + return &cobra.Command{ + Use: fmt.Sprintf("%s ", action), + Args: cobra.ExactArgs(1), + ArgAliases: []string{"job"}, + Short: shortDesc, + Long: longDesc, + Run: func(cmd *cobra.Command, args []string) { + job := args[0] + apiClient := cli.NewApiClient(apiAddress) + + resp := apiClient.CallAction(job, action) + if err := resp.Print(); err != nil { + log.Errorf("failed to print output: %s", err.Error()) + } + }, + } +} diff --git a/cmd/job_list.go b/cmd/job_list.go new file mode 100644 index 0000000..d80ce2d --- /dev/null +++ b/cmd/job_list.go @@ -0,0 +1,25 @@ +package cmd + +import ( + "github.com/mittwald/mittnite/pkg/cli" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +func init() { + jobCommand.AddCommand(jobListCommand) +} + +var jobListCommand = &cobra.Command{ + Use: "list", + Short: "List jobs", + Long: "This command can be used to list all managed jobs.", + Run: func(cmd *cobra.Command, args []string) { + apiClient := cli.NewApiClient(apiAddress) + + resp := apiClient.JobList() + if err := resp.Print(); err != nil { + log.Errorf("failed to print output: %s", err.Error()) + } + }, +} diff --git a/cmd/job_logs.go b/cmd/job_logs.go new file mode 100644 index 0000000..850a2f8 --- /dev/null +++ b/cmd/job_logs.go @@ -0,0 +1,36 @@ +package cmd + +import ( + "github.com/mittwald/mittnite/pkg/cli" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +var ( + follow bool + tailLen int +) + +func init() { + jobLogsCommand.PersistentFlags().BoolVarP(&follow, "follow", "f", false, "output appended data as the file grows") + jobLogsCommand.PersistentFlags().IntVarP(&tailLen, "tail", "", -1, "output last n lines") + jobCommand.AddCommand(jobLogsCommand) +} + +var jobLogsCommand = &cobra.Command{ + Use: "logs ", + Short: "Get logs from job", + Long: "This command can be used to get the logs of a managed job.", + Run: func(cmd *cobra.Command, args []string) { + job := args[0] + apiClient := cli.NewApiClient(apiAddress) + + if tailLen < -1 { + tailLen = -1 + } + resp := apiClient.JobLogs(job, follow, tailLen) + if err := resp.Print(); err != nil { + log.Errorf("failed to print output: %s", err.Error()) + } + }, +} diff --git a/go.mod b/go.mod index dc53cb7..0b79b7b 100644 --- a/go.mod +++ b/go.mod @@ -1,14 +1,14 @@ module github.com/mittwald/mittnite +go 1.19 + require ( github.com/Masterminds/sprig/v3 v3.2.2 github.com/go-redis/redis v6.15.6+incompatible github.com/go-sql-driver/mysql v1.5.0 - github.com/google/go-cmp v0.5.4 // indirect github.com/gorilla/mux v1.8.0 + github.com/gorilla/websocket v1.5.0 github.com/hashicorp/hcl v1.0.0 - github.com/onsi/ginkgo v1.15.2 // indirect - github.com/onsi/gomega v1.11.0 // indirect github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.8.1 github.com/spf13/cobra v1.1.3 @@ -18,4 +18,36 @@ require ( go.mongodb.org/mongo-driver v1.5.1 ) -go 1.16 +require ( + github.com/Masterminds/goutils v1.1.1 // indirect + github.com/Masterminds/semver/v3 v3.1.1 // indirect + github.com/aws/aws-sdk-go v1.34.28 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-stack/stack v1.8.0 // indirect + github.com/golang/snappy v0.0.1 // indirect + github.com/google/go-cmp v0.5.4 // indirect + github.com/google/uuid v1.1.1 // indirect + github.com/huandu/xstrings v1.3.1 // indirect + github.com/imdario/mergo v0.3.11 // indirect + github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect + github.com/klauspost/compress v1.9.5 // indirect + github.com/mitchellh/copystructure v1.0.0 // indirect + github.com/mitchellh/reflectwalk v1.0.0 // indirect + github.com/onsi/ginkgo v1.15.2 // indirect + github.com/onsi/gomega v1.11.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/shopspring/decimal v1.2.0 // indirect + github.com/spf13/cast v1.3.1 // indirect + github.com/spf13/pflag v1.0.5 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.0.2 // indirect + github.com/xdg-go/stringprep v1.0.2 // indirect + github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect + golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect + golang.org/x/net v0.4.0 // indirect + golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 // indirect + golang.org/x/sys v0.3.0 // indirect + golang.org/x/text v0.5.0 // indirect + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect +) diff --git a/go.sum b/go.sum index 5aa6d03..aea367a 100644 --- a/go.sum +++ b/go.sum @@ -123,6 +123,8 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORR github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= @@ -344,8 +346,9 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU= +golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -382,14 +385,16 @@ golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210112080510-489259a85091 h1:DMyOG0U+gKfu8JZzg2UQe9MeaC1X+xQWlAKcRnjxjCw= golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= +golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM= +golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/internal/config/types.go b/internal/config/types.go index 006146b..1c00989 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -94,6 +94,8 @@ type BaseJobConfig struct { CanFail bool `hcl:"canFail"` Controllable bool `hcl:"controllable" json:"controllable"` WorkingDirectory string `hcl:"workingDirectory" json:"workingDirectory,omitempty"` + Stdout string `hcl:"stdout" json:"stdout,omitempty"` + Stderr string `hcl:"stderr" json:"stderr,omitempty"` } type Laziness struct { diff --git a/pkg/cli/api_address_builder.go b/pkg/cli/api_address_builder.go new file mode 100644 index 0000000..b4b2c21 --- /dev/null +++ b/pkg/cli/api_address_builder.go @@ -0,0 +1,52 @@ +package cli + +import ( + "context" + "github.com/gorilla/websocket" + "net" + "net/http" + "net/url" +) + +func (api *ApiClient) buildHTTPClientAndURL() (*http.Client, *url.URL, error) { + u, err := url.Parse(api.apiAddress) + if err != nil { + return nil, nil, err + } + if u.Scheme != "unix" { + return &http.Client{}, u, nil + } + + socketPath := u.Path + u.Scheme = "http" + u.Host = "unix" + return &http.Client{ + Transport: &http.Transport{ + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", socketPath) + }, + }, + }, u, nil +} + +func (api *ApiClient) buildWebsocketURL() (*websocket.Dialer, *url.URL, error) { + u, err := url.Parse(api.apiAddress) + if err != nil { + return nil, nil, err + } + if u.Scheme != "unix" { + u.Scheme = "ws" + return websocket.DefaultDialer, u, nil + } + socketPath := u.Path + + dialer := &websocket.Dialer{ + NetDial: func(network, addr string) (net.Conn, error) { + return net.Dial("unix", socketPath) + }, + } + + u.Scheme = "ws" + u.Host = "unix" + return dialer, u, nil +} diff --git a/pkg/cli/api_client.go b/pkg/cli/api_client.go index f011188..9babef3 100644 --- a/pkg/cli/api_client.go +++ b/pkg/cli/api_client.go @@ -3,9 +3,8 @@ package cli import ( "context" "fmt" - "net" + "github.com/gorilla/websocket" "net/http" - "net/url" ) const ( @@ -13,6 +12,7 @@ const ( ApiActionJobRestart = "restart" ApiActionJobStop = "stop" ApiActionJobStatus = "status" + ApiActionJobLogs = "logs" ) type ApiClient struct { @@ -25,7 +25,7 @@ func NewApiClient(apiAddress string) *ApiClient { } } -func (api *ApiClient) CallAction(job, action string) *ApiResponse { +func (api *ApiClient) CallAction(job, action string) ApiResponse { switch action { case ApiActionJobStart: return api.JobStart(job) @@ -36,7 +36,7 @@ func (api *ApiClient) CallAction(job, action string) *ApiResponse { case ApiActionJobStatus: return api.JobStatus(job) default: - return &ApiResponse{ + return &CommonApiResponse{ StatusCode: http.StatusBadRequest, Body: "", Error: fmt.Errorf("unknown action %s", action), @@ -44,53 +44,84 @@ func (api *ApiClient) CallAction(job, action string) *ApiResponse { } } -func (api *ApiClient) JobStart(job string) *ApiResponse { - client, addr, err := api.buildHttpClientAndAddress() +func (api *ApiClient) JobStart(job string) ApiResponse { + client, url, err := api.buildHTTPClientAndURL() if err != nil { - return &ApiResponse{Error: err} + return &CommonApiResponse{Error: err} } - return NewApiResponse(client.Post(fmt.Sprintf("%s/v1/job/%s/start", addr, job), "application/json", nil)) + + url.Path = fmt.Sprintf("/v1/job/%s/start", job) + return NewApiResponse(client.Post(url.String(), "application/json", nil)) } -func (api *ApiClient) JobRestart(job string) *ApiResponse { - client, addr, err := api.buildHttpClientAndAddress() +func (api *ApiClient) JobRestart(job string) ApiResponse { + client, url, err := api.buildHTTPClientAndURL() if err != nil { - return &ApiResponse{Error: err} + return &CommonApiResponse{Error: err} } - return NewApiResponse(client.Post(fmt.Sprintf("%s/v1/job/%s/restart", addr, job), "application/json", nil)) + url.Path = fmt.Sprintf("/v1/job/%s/restart", job) + return NewApiResponse(client.Post(url.String(), "application/json", nil)) } -func (api *ApiClient) JobStop(job string) *ApiResponse { - client, addr, err := api.buildHttpClientAndAddress() +func (api *ApiClient) JobStop(job string) ApiResponse { + client, url, err := api.buildHTTPClientAndURL() if err != nil { - return &ApiResponse{Error: err} + return &CommonApiResponse{Error: err} } - return NewApiResponse(client.Post(fmt.Sprintf("%s/v1/job/%s/stop", addr, job), "application/json", nil)) + + url.Path = fmt.Sprintf("/v1/job/%s/stop", job) + return NewApiResponse(client.Post(url.String(), "application/json", nil)) } -func (api *ApiClient) JobStatus(job string) *ApiResponse { - client, addr, err := api.buildHttpClientAndAddress() +func (api *ApiClient) JobStatus(job string) ApiResponse { + client, url, err := api.buildHTTPClientAndURL() if err != nil { - return &ApiResponse{Error: err} + return &CommonApiResponse{Error: err} } - return NewApiResponse(client.Get(fmt.Sprintf("%s/v1/job/%s/status", addr, job))) + url.Path = fmt.Sprintf("/v1/job/%s/status", job) + return NewApiResponse(client.Get(url.String())) } -func (api *ApiClient) buildHttpClientAndAddress() (*http.Client, string, error) { - u, err := url.Parse(api.apiAddress) +func (api *ApiClient) JobList() ApiResponse { + client, url, err := api.buildHTTPClientAndURL() if err != nil { - return nil, "", err + return &CommonApiResponse{Error: err} } - if u.Scheme != "unix" { - return &http.Client{}, api.apiAddress, nil + + url.Path = "/v1/jobs" + return NewApiResponse(client.Get(url.String())) +} + +func (api *ApiClient) JobLogs(job string, follow bool, tailLen int) ApiResponse { + dialer, url, err := api.buildWebsocketURL() + if err != nil { + return &CommonApiResponse{Error: err} } - return &http.Client{ - Transport: &http.Transport{ - DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { - return net.Dial("unix", u.Path) - }, - }, - }, "http://unix", nil + qryValues := url.Query() + qryValues.Add("taillen", fmt.Sprintf("%d", tailLen)) + if follow { + qryValues.Add("follow", "true") + } + + url.RawQuery = qryValues.Encode() + url.Path = fmt.Sprintf("/v1/job/%s/logs", job) + + handler := func(ctx context.Context, conn *websocket.Conn, msgChan chan []byte, errChan chan error) { + for { + select { + default: + _, msg, err := conn.ReadMessage() + if err != nil { + errChan <- err + return + } + msgChan <- msg + case <-ctx.Done(): + return + } + } + } + return NewStreamingApiResponse(url, dialer, handler) } diff --git a/pkg/cli/api_response.go b/pkg/cli/api_response.go index b5b9c9a..d894850 100644 --- a/pkg/cli/api_response.go +++ b/pkg/cli/api_response.go @@ -10,15 +10,21 @@ import ( "net/http" ) -type ApiResponse struct { +type ApiResponse interface { + Print() error +} + +var _ ApiResponse = &CommonApiResponse{} + +type CommonApiResponse struct { StatusCode int `json:"statusCode"` Body string `json:"body"` Error error `json:"error"` contentType string } -func NewApiResponse(resp *http.Response, err error) *ApiResponse { - apiRes := &ApiResponse{ +func NewApiResponse(resp *http.Response, err error) ApiResponse { + apiRes := &CommonApiResponse{ Error: err, } if resp == nil { @@ -36,7 +42,7 @@ func NewApiResponse(resp *http.Response, err error) *ApiResponse { return apiRes } -func (resp *ApiResponse) Print() error { +func (resp *CommonApiResponse) Print() error { var out string if resp.Error != nil { fmt.Println(resp.Error.Error()) diff --git a/pkg/cli/api_response_streaming.go b/pkg/cli/api_response_streaming.go new file mode 100644 index 0000000..1415001 --- /dev/null +++ b/pkg/cli/api_response_streaming.go @@ -0,0 +1,64 @@ +package cli + +import ( + "context" + "fmt" + "github.com/gorilla/websocket" + "net/url" +) + +var _ ApiResponse = &StreamingApiResponse{} + +type StreamingApiResponseHandler func(ctx context.Context, conn *websocket.Conn, msg chan []byte, err chan error) + +type StreamingApiResponse struct { + url *url.URL + streamContext context.Context + cancel context.CancelFunc + messageChan chan []byte + errorChan chan error + streamingFunc StreamingApiResponseHandler + dialer *websocket.Dialer +} + +func NewStreamingApiResponse(url *url.URL, dialer *websocket.Dialer, streamingFunc StreamingApiResponseHandler) ApiResponse { + ctx, cancel := context.WithCancel(context.Background()) + return &StreamingApiResponse{ + url: url, + streamContext: ctx, + cancel: cancel, + messageChan: make(chan []byte), + errorChan: make(chan error), + streamingFunc: streamingFunc, + dialer: dialer, + } +} + +func (resp *StreamingApiResponse) Print() error { + conn, _, err := resp.dialer.Dial(resp.url.String(), nil) + if err != nil { + return err + } + defer func() { + resp.cancel() + close(resp.messageChan) + close(resp.errorChan) + conn.Close() + }() + + go resp.streamingFunc(resp.streamContext, conn, resp.messageChan, resp.errorChan) + + for { + select { + case msg := <-resp.messageChan: + fmt.Println(string(msg)) + case err := <-resp.errorChan: + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + return nil + } + return err + case <-resp.streamContext.Done(): + return nil + } + } +} diff --git a/pkg/proc/api.go b/pkg/proc/api.go index 41cf2e9..bbd760a 100644 --- a/pkg/proc/api.go +++ b/pkg/proc/api.go @@ -16,8 +16,8 @@ const ( contextKeyJob = "job" ) -func (api *Api) RegisterHandler(path string, methods []string, handler func(http.ResponseWriter, *http.Request)) { - api.router. +func (api *Api) RegisterHandler(router *mux.Router, path string, methods []string, handler func(http.ResponseWriter, *http.Request)) { + router. Path(path). HandlerFunc(handler). Methods(methods...) diff --git a/pkg/proc/basejob.go b/pkg/proc/basejob.go index b986e8c..ae17709 100644 --- a/pkg/proc/basejob.go +++ b/pkg/proc/basejob.go @@ -1,11 +1,15 @@ package proc import ( + "bufio" + "container/list" "context" "errors" "fmt" + "io" "os" "os/exec" + "path" "syscall" "time" @@ -49,14 +53,36 @@ func (job *baseJob) GetName() string { return job.Config.Name } +func (job *baseJob) StreamStdOut(ctx context.Context, outChan chan []byte, errChan chan error, follow bool, tailLen int) { + if len(job.Config.Stdout) == 0 { + return + } + job.readStdFile(ctx, job.Config.Stdout, outChan, errChan, follow, tailLen) +} + +func (job *baseJob) StreamStdErr(ctx context.Context, outChan chan []byte, errChan chan error, follow bool, tailLen int) { + if len(job.Config.Stderr) == 0 { + return + } + job.readStdFile(ctx, job.Config.Stderr, outChan, errChan, follow, tailLen) +} + +func (job *baseJob) StreamStdOutAndStdErr(ctx context.Context, outChan chan []byte, errChan chan error, follow bool, tailLen int) { + job.StreamStdOut(ctx, outChan, errChan, follow, tailLen) + if job.Config.Stdout != job.Config.Stderr { + job.StreamStdErr(ctx, outChan, errChan, follow, tailLen) + } +} + func (job *baseJob) startOnce(ctx context.Context, process chan<- *os.Process) error { l := log.WithField("job.name", job.Config.Name) + defer job.closeStdFiles() job.cmd = exec.Command(job.Config.Command, job.Config.Args...) - job.cmd.Stdout = os.Stdout - job.cmd.Stderr = os.Stderr job.cmd.Env = os.Environ() job.cmd.Dir = job.Config.WorkingDirectory + job.cmd.Stdout = job.stdout + job.cmd.Stderr = job.stderr if job.Config.Env != nil { job.cmd.Env = append(job.cmd.Env, job.Config.Env...) @@ -114,3 +140,86 @@ func (job *baseJob) startOnce(ctx context.Context, process chan<- *os.Process) e } } } + +func (job *baseJob) closeStdFiles() { + hasStdout := len(job.Config.Stdout) > 0 + hasStderr := len(job.Config.Stderr) > 0 && job.Config.Stderr != job.Config.Stdout + if hasStdout { + job.stdout.Close() + } + + if hasStderr { + job.stderr.Close() + } +} + +func (job *baseJob) readStdFile(ctx context.Context, filePath string, outChan chan []byte, errChan chan error, follow bool, tailLen int) { + stdFile, err := os.OpenFile(filePath, os.O_RDONLY, 0o666) + if err != nil { + errChan <- err + return + } + defer stdFile.Close() + seekTail(tailLen, stdFile, outChan) + + read := func() { + scanner := bufio.NewScanner(stdFile) + for scanner.Scan() { + line := scanner.Bytes() + outChan <- line + } + if err := scanner.Err(); err != nil { + errChan <- err + return + } + } + + for { + select { + default: + read() + if !follow { + errChan <- io.EOF + return + } + case <-ctx.Done(): + return + } + } +} + +func seekTail(lines int, stdFile *os.File, outChan chan []byte) { + if lines < 0 { + return + } + + if lines == 0 { + _, _ = stdFile.Seek(0, io.SeekEnd) + return + } + + scanner := bufio.NewScanner(stdFile) + tailBuffer := list.New() + for scanner.Scan() { + line := scanner.Bytes() + if tailBuffer.Len() >= lines { + tailBuffer.Remove(tailBuffer.Front()) + } + tailBuffer.PushBack(line) + } + for tailBuffer.Len() > 0 { + item := tailBuffer.Front() + line, ok := item.Value.([]byte) + if ok { + outChan <- line + } + tailBuffer.Remove(item) + } +} + +func prepareStdFile(filePath string) (*os.File, error) { + if err := os.MkdirAll(path.Dir(filePath), 0o755); err != nil { + return nil, err + } + return os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_APPEND|os.O_SYNC, 0o666) +} diff --git a/pkg/proc/runner.go b/pkg/proc/runner.go index 335f07f..2ef7d8c 100644 --- a/pkg/proc/runner.go +++ b/pkg/proc/runner.go @@ -2,6 +2,7 @@ package proc import ( "context" + "fmt" "sync" "time" @@ -137,13 +138,13 @@ func (r *Runner) exec() { var job Job // init non-lazy jobs if r.IgnitionConfig.Jobs[j].Laziness == nil { - job = NewCommonJob(&r.IgnitionConfig.Jobs[j]) + job, err = NewCommonJob(&r.IgnitionConfig.Jobs[j]) } else { job, err = NewLazyJob(&r.IgnitionConfig.Jobs[j]) - if err != nil { - r.errChan <- err - return - } + } + if err != nil { + r.errChan <- err + return } r.addAndStartJob(job) } @@ -206,11 +207,11 @@ func (r *Runner) findCommonJobByName(name string) *CommonJob { return nil } -func (r *Runner) findCommonIgnitionJobByName(name string) *CommonJob { +func (r *Runner) findCommonIgnitionJobByName(name string) (*CommonJob, error) { for i, ignJob := range r.IgnitionConfig.Jobs { if ignJob.Name == name && ignJob.Laziness == nil { return NewCommonJob(&r.IgnitionConfig.Jobs[i]) } } - return nil + return nil, fmt.Errorf("can't find ignition config for job %q", name) } diff --git a/pkg/proc/runner_api_v1.go b/pkg/proc/runner_api_v1.go index 1d68847..898e76a 100644 --- a/pkg/proc/runner_api_v1.go +++ b/pkg/proc/runner_api_v1.go @@ -3,9 +3,16 @@ package proc import ( "context" "encoding/json" + "errors" "fmt" "github.com/gorilla/mux" + "github.com/gorilla/websocket" + log "github.com/sirupsen/logrus" + "io" "net/http" + "strconv" + "strings" + "time" ) func (r *Runner) startApiV1() error { @@ -13,33 +20,44 @@ func (r *Runner) startApiV1() error { return nil } - r.api.RegisterMiddlewareFuncs(func(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - jobName, ok := mux.Vars(req)["job"] - if !ok { - http.Error(w, "job parameter is missing", http.StatusBadRequest) - return - } + jobRouter := r.api.router.PathPrefix("/v1/job").Subrouter() + jobRouter.Use(r.apiV1JobMiddleware) + r.api.RegisterHandler(jobRouter, "/{job}/start", []string{http.MethodPost}, r.apiV1StartJob) + r.api.RegisterHandler(jobRouter, "/{job}/restart", []string{http.MethodPost}, r.apiV1RestartJob) + r.api.RegisterHandler(jobRouter, "/{job}/stop", []string{http.MethodPost}, r.apiV1StopJob) + r.api.RegisterHandler(jobRouter, "/{job}/status", []string{http.MethodGet}, r.apiV1JobStatus) + r.api.RegisterHandler(jobRouter, "/{job}/logs", []string{http.MethodGet}, r.apiV1JobLogs) - job := r.findCommonJobByName(jobName) - if job == nil { - job = r.findCommonIgnitionJobByName(jobName) - } - if r.jobExistsAndIsControllable(job) { - r.addJobIfNotExists(job) - next.ServeHTTP(w, req.WithContext(context.WithValue(req.Context(), contextKeyJob, job))) + r.api.RegisterHandler(r.api.router, "/v1/jobs", []string{http.MethodGet}, r.apiV1JobList) + + return r.api.Start() +} + +func (r *Runner) apiV1JobMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + jobName, ok := mux.Vars(req)["job"] + if !ok { + http.Error(w, "job parameter is missing", http.StatusBadRequest) + return + } + + job := r.findCommonJobByName(jobName) + if job == nil { + var err error + job, err = r.findCommonIgnitionJobByName(jobName) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) return } + } + if r.jobExistsAndIsControllable(job) { + r.addJobIfNotExists(job) + next.ServeHTTP(w, req.WithContext(context.WithValue(req.Context(), contextKeyJob, job))) + return + } - http.Error(w, fmt.Sprintf("job %q not found or is not controllable", jobName), http.StatusNotFound) - }) + http.Error(w, fmt.Sprintf("job %q not found or is not controllable", jobName), http.StatusNotFound) }) - - r.api.RegisterHandler("/v1/job/{job}/start", []string{http.MethodPost}, r.apiV1StartJob) - r.api.RegisterHandler("/v1/job/{job}/restart", []string{http.MethodPost}, r.apiV1RestartJob) - r.api.RegisterHandler("/v1/job/{job}/stop", []string{http.MethodPost}, r.apiV1StopJob) - r.api.RegisterHandler("/v1/job/{job}/status", []string{http.MethodGet}, r.apiV1JobStatus) - return r.api.Start() } func (r *Runner) apiV1StartJob(writer http.ResponseWriter, req *http.Request) { @@ -78,3 +96,85 @@ func (r *Runner) apiV1JobStatus(writer http.ResponseWriter, req *http.Request) { writer.Write(out) writer.WriteHeader(http.StatusOK) } + +func (r *Runner) apiV1JobList(writer http.ResponseWriter, _ *http.Request) { + var jobs []string + for _, job := range r.jobs { + jobs = append(jobs, job.GetName()) + } + out, err := json.Marshal(jobs) + if err != nil { + http.Error(writer, "failed to get job list", http.StatusInternalServerError) + return + } + writer.Header().Set("Content-Type", "application/json") + writer.Write(out) + writer.WriteHeader(http.StatusOK) +} + +func (r *Runner) apiV1JobLogs(writer http.ResponseWriter, req *http.Request) { + conn, err := r.api.upgrader.Upgrade(writer, req, nil) + if err != nil { + http.Error(writer, "failed to upgrade connection", http.StatusInternalServerError) + return + } + defer conn.Close() + + job := req.Context().Value(contextKeyJob).(*CommonJob) + if len(job.Config.Stdout) == 0 && len(job.Config.Stderr) == 0 { + _ = conn.WriteMessage(websocket.TextMessage, []byte("neither stdout, nor stderr is defined for this job")) + return + } + + streamCtx, cancel := context.WithCancel(context.Background()) + outChan := make(chan []byte) + errChan := make(chan error) + defer func() { + cancel() + close(outChan) + close(errChan) + }() + + // handle client disconnects + go func() { + _, _, err := conn.ReadMessage() + if err != nil { + cancel() + } + }() + + follow := strings.ToLower(req.FormValue("follow")) == "true" + tailLen, err := strconv.Atoi(req.FormValue("taillen")) + if err != nil { + tailLen = -1 + } + + go job.StreamStdOutAndStdErr(streamCtx, outChan, errChan, follow, tailLen) + + for { + select { + case logLine := <-outChan: + if err := conn.WriteMessage(websocket.TextMessage, logLine); err != nil { + break + } + + case err = <-errChan: + if errors.Is(err, io.EOF) { + err = conn.WriteControl( + websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, "EOF"), + time.Now().Add(time.Second), + ) + if err == nil { + return + } + } + log.WithField("job.name", job.Config.Name). + Error(fmt.Sprintf("error during logs streaming: %s", err.Error())) + break + + case <-streamCtx.Done(): + return + } + } +} diff --git a/pkg/proc/types.go b/pkg/proc/types.go index 35125b0..0193bdf 100644 --- a/pkg/proc/types.go +++ b/pkg/proc/types.go @@ -3,6 +3,7 @@ package proc import ( "context" "github.com/gorilla/mux" + "github.com/gorilla/websocket" "net/http" "os" "os/exec" @@ -32,12 +33,17 @@ type Api struct { listenAddr string srv *http.Server router *mux.Router + upgrader websocket.Upgrader } func NewApi(listenAddress string) *Api { return &Api{ router: mux.NewRouter(), listenAddr: listenAddress, + upgrader: websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + }, } } @@ -47,6 +53,8 @@ type baseJob struct { cmd *exec.Cmd restart bool stop bool + stdout *os.File + stderr *os.File } type BootJob struct { @@ -90,20 +98,63 @@ type Job interface { GetName() string } -func NewCommonJob(c *config.JobConfig) *CommonJob { +func newBaseJob(c *config.BaseJobConfig) (*baseJob, error) { + job := &baseJob{ + Config: c, + cmd: nil, + restart: false, + stop: false, + stdout: os.Stdout, + stderr: os.Stderr, + } + if len(c.Stdout) == 0 { + return job, nil + } + + stdout, err := prepareStdFile(c.Stdout) + if err != nil { + return nil, err + } + job.stdout = stdout + + if len(c.Stderr) == 0 { + return job, nil + } + + if c.Stderr == c.Stdout { + job.stderr = job.stdout + return job, nil + } + + stderr, err := prepareStdFile(c.Stderr) + if err != nil { + return nil, err + } + job.stderr = stderr + + return job, nil +} + +func NewCommonJob(c *config.JobConfig) (*CommonJob, error) { + job, err := newBaseJob(&c.BaseJobConfig) + if err != nil { + return nil, err + } j := CommonJob{ - baseJob: baseJob{ - Config: &c.BaseJobConfig, - }, - Config: c, + baseJob: *job, + Config: c, } - return &j + return &j, nil } func NewLazyJob(c *config.JobConfig) (*LazyJob, error) { + commonJob, err := NewCommonJob(c) + if err != nil { + return nil, err + } j := LazyJob{ - CommonJob: *NewCommonJob(c), + CommonJob: *commonJob, } if c.Laziness.SpinUpTimeout != "" {