Skip to content

Commit

Permalink
feat(ws): use controller-runtime for backend (#43)
Browse files Browse the repository at this point in the history
* feat(ws): use controller-runtime with backend

Signed-off-by: Mathew Wicks <5735406+thesuperzapper@users.noreply.github.com>

* revert healthcheck to normal results

Signed-off-by: Mathew Wicks <5735406+thesuperzapper@users.noreply.github.com>

---------

Signed-off-by: Mathew Wicks <5735406+thesuperzapper@users.noreply.github.com>
  • Loading branch information
thesuperzapper authored Sep 14, 2024
1 parent bc4e445 commit b0af8ae
Show file tree
Hide file tree
Showing 14 changed files with 278 additions and 114 deletions.
47 changes: 25 additions & 22 deletions workspaces/backend/api/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ limitations under the License.
package api

import (
"fmt"
"github.com/kubeflow/notebooks/workspaces/backend/config"
"github.com/kubeflow/notebooks/workspaces/backend/data"
"github.com/kubeflow/notebooks/workspaces/backend/integrations"
"log/slog"
"net/http"

"github.com/julienschmidt/httprouter"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kubeflow/notebooks/workspaces/backend/internal/config"
"github.com/kubeflow/notebooks/workspaces/backend/internal/data"
)

const (
Expand All @@ -33,33 +34,35 @@ const (
)

type App struct {
config config.EnvConfig
logger *slog.Logger
models data.Models
kubernetesClient *integrations.KubernetesClient
}
Config config.EnvConfig
logger *slog.Logger
models data.Models

func NewApp(cfg config.EnvConfig, logger *slog.Logger) (*App, error) {
k8sClient, err := integrations.NewKubernetesClient()
if err != nil {
return nil, fmt.Errorf("failed to create Kubernetes client: %w", err)
}
client.Client
Scheme *runtime.Scheme
}

// NewApp creates a new instance of the app
func NewApp(cfg config.EnvConfig, logger *slog.Logger, client client.Client, scheme *runtime.Scheme) (*App, error) {
app := &App{
config: cfg,
logger: logger,
kubernetesClient: k8sClient,
Config: cfg,
logger: logger,
models: data.NewModels(),

Client: client,
Scheme: scheme,
}
return app, nil
}

func (app *App) Routes() http.Handler {
// Routes returns the HTTP handler for the app
func (a *App) Routes() http.Handler {
router := httprouter.New()

router.NotFound = http.HandlerFunc(app.notFoundResponse)
router.MethodNotAllowed = http.HandlerFunc(app.methodNotAllowedResponse)
router.NotFound = http.HandlerFunc(a.notFoundResponse)
router.MethodNotAllowed = http.HandlerFunc(a.methodNotAllowedResponse)

router.GET(HealthCheckPath, app.HealthcheckHandler)
router.GET(HealthCheckPath, a.HealthcheckHandler)

return app.RecoverPanic(app.enableCORS(router))
return a.RecoverPanic(a.enableCORS(router))
}
32 changes: 16 additions & 16 deletions workspaces/backend/api/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,40 +33,40 @@ type ErrorResponse struct {
Message string `json:"message"`
}

func (app *App) LogError(r *http.Request, err error) {
func (a *App) LogError(r *http.Request, err error) {
var (
method = r.Method
uri = r.URL.RequestURI()
)

app.logger.Error(err.Error(), "method", method, "uri", uri)
a.logger.Error(err.Error(), "method", method, "uri", uri)
}

func (app *App) badRequestResponse(w http.ResponseWriter, r *http.Request, err error) {
func (a *App) badRequestResponse(w http.ResponseWriter, r *http.Request, err error) {
httpError := &HTTPError{
StatusCode: http.StatusBadRequest,
ErrorResponse: ErrorResponse{
Code: strconv.Itoa(http.StatusBadRequest),
Message: err.Error(),
},
}
app.errorResponse(w, r, httpError)
a.errorResponse(w, r, httpError)
}

func (app *App) errorResponse(w http.ResponseWriter, r *http.Request, error *HTTPError) {
func (a *App) errorResponse(w http.ResponseWriter, r *http.Request, error *HTTPError) {

env := Envelope{"error": error}

err := app.WriteJSON(w, error.StatusCode, env, nil)
err := a.WriteJSON(w, error.StatusCode, env, nil)

if err != nil {
app.LogError(r, err)
a.LogError(r, err)
w.WriteHeader(error.StatusCode)
}
}

func (app *App) serverErrorResponse(w http.ResponseWriter, r *http.Request, err error) {
app.LogError(r, err)
func (a *App) serverErrorResponse(w http.ResponseWriter, r *http.Request, err error) {
a.LogError(r, err)

httpError := &HTTPError{
StatusCode: http.StatusInternalServerError,
Expand All @@ -75,10 +75,10 @@ func (app *App) serverErrorResponse(w http.ResponseWriter, r *http.Request, err
Message: "the server encountered a problem and could not process your request",
},
}
app.errorResponse(w, r, httpError)
a.errorResponse(w, r, httpError)
}

func (app *App) notFoundResponse(w http.ResponseWriter, r *http.Request) {
func (a *App) notFoundResponse(w http.ResponseWriter, r *http.Request) {

httpError := &HTTPError{
StatusCode: http.StatusNotFound,
Expand All @@ -87,10 +87,10 @@ func (app *App) notFoundResponse(w http.ResponseWriter, r *http.Request) {
Message: "the requested resource could not be found",
},
}
app.errorResponse(w, r, httpError)
a.errorResponse(w, r, httpError)
}

func (app *App) methodNotAllowedResponse(w http.ResponseWriter, r *http.Request) {
func (a *App) methodNotAllowedResponse(w http.ResponseWriter, r *http.Request) {

httpError := &HTTPError{
StatusCode: http.StatusMethodNotAllowed,
Expand All @@ -99,10 +99,10 @@ func (app *App) methodNotAllowedResponse(w http.ResponseWriter, r *http.Request)
Message: fmt.Sprintf("the %s method is not supported for this resource", r.Method),
},
}
app.errorResponse(w, r, httpError)
a.errorResponse(w, r, httpError)
}

func (app *App) failedValidationResponse(w http.ResponseWriter, r *http.Request, errors map[string]string) {
func (a *App) failedValidationResponse(w http.ResponseWriter, r *http.Request, errors map[string]string) {

message, err := json.Marshal(errors)
if err != nil {
Expand All @@ -115,5 +115,5 @@ func (app *App) failedValidationResponse(w http.ResponseWriter, r *http.Request,
Message: string(message),
},
}
app.errorResponse(w, r, httpError)
a.errorResponse(w, r, httpError)
}
10 changes: 6 additions & 4 deletions workspaces/backend/api/healthcheck__handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@ package api

import (
"encoding/json"
"github.com/kubeflow/notebooks/workspaces/backend/config"
"github.com/kubeflow/notebooks/workspaces/backend/data"
"github.com/stretchr/testify/assert"
"io"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"

"github.com/kubeflow/notebooks/workspaces/backend/internal/config"
"github.com/kubeflow/notebooks/workspaces/backend/internal/data"
)

func TestHealthCheckHandler(t *testing.T) {

app := App{config: config.EnvConfig{
app := App{Config: config.EnvConfig{
Port: 4000,
}}

Expand Down
10 changes: 5 additions & 5 deletions workspaces/backend/api/healthcheck_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ import (
"net/http"
)

func (app *App) HealthcheckHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
func (a *App) HealthcheckHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {

healthCheck, err := app.models.HealthCheck.HealthCheck(Version)
healthCheck, err := a.models.HealthCheck.HealthCheck(Version)
if err != nil {
app.serverErrorResponse(w, r, err)
a.serverErrorResponse(w, r, err)
return
}

err = app.WriteJSON(w, http.StatusOK, healthCheck, nil)
err = a.WriteJSON(w, http.StatusOK, healthCheck, nil)

if err != nil {
app.serverErrorResponse(w, r, err)
a.serverErrorResponse(w, r, err)
}

}
4 changes: 2 additions & 2 deletions workspaces/backend/api/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

type Envelope map[string]any

func (app *App) WriteJSON(w http.ResponseWriter, status int, data any, headers http.Header) error {
func (a *App) WriteJSON(w http.ResponseWriter, status int, data any, headers http.Header) error {

js, err := json.MarshalIndent(data, "", "\t")

Expand All @@ -48,7 +48,7 @@ func (app *App) WriteJSON(w http.ResponseWriter, status int, data any, headers h
return nil
}

func (app *App) ReadJSON(w http.ResponseWriter, r *http.Request, dst any) error {
func (a *App) ReadJSON(w http.ResponseWriter, r *http.Request, dst any) error {

maxBytes := 1_048_576
r.Body = http.MaxBytesReader(w, r.Body, int64(maxBytes))
Expand Down
6 changes: 3 additions & 3 deletions workspaces/backend/api/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ import (
"net/http"
)

func (app *App) RecoverPanic(next http.Handler) http.Handler {
func (a *App) RecoverPanic(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {
if err := recover(); err != nil {
w.Header().Set("Connection", "close")
app.serverErrorResponse(w, r, fmt.Errorf("%s", err))
a.serverErrorResponse(w, r, fmt.Errorf("%s", err))
}
}()

next.ServeHTTP(w, r)
})
}

func (app *App) enableCORS(next http.Handler) http.Handler {
func (a *App) enableCORS(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// TODO(ederign) restrict CORS to a much smaller set of trusted origins.
// TODO(ederign) deal with preflight requests
Expand Down
64 changes: 45 additions & 19 deletions workspaces/backend/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package main

import (
"flag"
"fmt"
application "github.com/kubeflow/notebooks/workspaces/backend/api"
"github.com/kubeflow/notebooks/workspaces/backend/config"
"log/slog"
"net/http"
"os"
"strconv"
"time"

ctrl "sigs.k8s.io/controller-runtime"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

application "github.com/kubeflow/notebooks/workspaces/backend/api"
"github.com/kubeflow/notebooks/workspaces/backend/internal/config"
"github.com/kubeflow/notebooks/workspaces/backend/internal/helper"
"github.com/kubeflow/notebooks/workspaces/backend/internal/server"
)

func main() {
Expand All @@ -34,26 +37,49 @@ func main() {

logger := slog.New(slog.NewTextHandler(os.Stdout, nil))

app, err := application.NewApp(cfg, logger)
kubeconfig, err := helper.GetKubeconfig()
if err != nil {
logger.Error(err.Error())
logger.Error("failed to get kubeconfig", "error", err)
os.Exit(1)
}

srv := &http.Server{
Addr: fmt.Sprintf(":%d", cfg.Port),
Handler: app.Routes(),
IdleTimeout: time.Minute,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
ErrorLog: slog.NewLogLogger(logger.Handler(), slog.LevelError),
scheme, err := helper.BuildScheme()
if err != nil {
logger.Error("failed to build Kubernetes scheme", "error", err)
os.Exit(1)
}
mgr, err := ctrl.NewManager(kubeconfig, ctrl.Options{
Scheme: scheme,
Metrics: metricsserver.Options{
BindAddress: "0", // disable metrics serving
},
HealthProbeBindAddress: "0", // disable health probe serving
LeaderElection: false,
})
if err != nil {
logger.Error("unable to create manager", "error", err)
os.Exit(1)
}

logger.Info("starting server", "addr", srv.Addr)
app, err := application.NewApp(cfg, logger, mgr.GetClient(), mgr.GetScheme())
if err != nil {
logger.Error("failed to create app", "error", err)
os.Exit(1)
}
svr, err := server.NewServer(app, logger)
if err != nil {
logger.Error("failed to create server", "error", err)
os.Exit(1)
}
if err := svr.SetupWithManager(mgr); err != nil {
logger.Error("failed to setup server with manager", "error", err)
os.Exit(1)
}

err = srv.ListenAndServe()
logger.Error(err.Error())
os.Exit(1)
logger.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
logger.Error("problem running manager", "error", err)
os.Exit(1)
}
}

func getEnv(key, defaultVal string) string {
Expand Down
Loading

0 comments on commit b0af8ae

Please sign in to comment.