Skip to content

Commit

Permalink
feat(vdp): support authentication for streaming API (#238)
Browse files Browse the repository at this point in the history
Because

- The streaming implementation has been changed.

This commit

- Re-implements the SSE plugin.
- Adds support for authentication in the streaming API.
  • Loading branch information
donch1989 authored Aug 22, 2024
1 parent c0a52d9 commit a99d723
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 171 deletions.
2 changes: 0 additions & 2 deletions config/base.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@
"artifact_private_hostport": "{{ .plugins.artifact_private_hostport }}"
},
"sse-streaming": {
"endpoint": "/v1beta/sse/{id}",
"backend_url_pattern": "/sse/{id}",
"backend_host": "{{ .plugins.pipeline_public_hostport }}"
}
}
Expand Down
11 changes: 8 additions & 3 deletions plugins/grpc-proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,14 @@ func (r registerer) RegisterHandlers(f func(
func (r registerer) registerHandlers(ctx context.Context, extra map[string]interface{}, h http.Handler) (http.Handler, error) {

return h2c.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w = NewResponseHijacker(w)
h.ServeHTTP(w, req)
w.(Trailer).WriteTrailer()
if req.Header.Get("instill-use-sse") == "true" {
// For SSE, we need to skip this plugin.
h.ServeHTTP(w, req)
} else {
w = NewResponseHijacker(w)
h.ServeHTTP(w, req)
w.(Trailer).WriteTrailer()
}
}), &http2.Server{}), nil

}
Expand Down
49 changes: 49 additions & 0 deletions plugins/multi-auth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package main
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"

Expand Down Expand Up @@ -54,6 +56,8 @@ func (r registerer) registerHandlers(ctx context.Context, extra map[string]inter

mgmtClient, _ := InitMgmtPublicServiceClient(context.Background(), config["grpc_server"].(string), "", "")

httpClient := http.Client{Transport: http.DefaultTransport}

return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
authorization := req.Header.Get("Authorization")

Expand Down Expand Up @@ -103,6 +107,51 @@ func (r registerer) registerHandlers(ctx context.Context, extra map[string]inter
req.Header.Set("Instill-Visitor-Uid", visitorID.String())
h.ServeHTTP(w, req)

} else if req.Header.Get("instill-use-sse") == "true" {
// Currently, KrakenD doesn’t support event-stream. To make
// authentication work, we send a request to the management API
// first for verification.
r, err := http.NewRequest("GET", "http://localhost:8080/v1beta/user", nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
r.Header = req.Header
r.Header.Del("instill-use-sse")

resp, err := httpClient.Do(r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if resp.StatusCode == 401 {
writeStatusUnauthorized(req, w)
return
}
type user struct {
User struct {
UID string `json:"uid"`
} `json:"user"`
}
respBytes, err := io.ReadAll(resp.Body)
if err != nil {
writeStatusUnauthorized(req, w)
return
}
defer resp.Body.Close()

u := user{}
err = json.Unmarshal(respBytes, &u)
if err != nil {
writeStatusUnauthorized(req, w)
return
}

req.Header.Set("Instill-Auth-Type", "user")
req.Header.Set("Instill-User-Uid", u.User.UID)
req.Header.Set("instill-Use-SSE", "true")
h.ServeHTTP(w, req)

} else {
req.Header.Set("Instill-Auth-Type", "user")
req.URL.Path = "/internal" + req.URL.Path
Expand Down
4 changes: 4 additions & 0 deletions plugins/sse-streaming/go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
module sse_streaming_plugin

go 1.22.6

require golang.org/x/net v0.26.0

require golang.org/x/text v0.16.0 // indirect
4 changes: 4 additions & 0 deletions plugins/sse-streaming/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
72 changes: 20 additions & 52 deletions plugins/sse-streaming/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ the following configuration must be present in the krakend.json file:
"plugin/http-server": {
"name": ["sse-streaming"],
"sse-streaming": {
"endpoint": "/sse/{id}",
"backend_url_pattern": "/events-stream/{id}",
"backend_host": "http://localhost:9081"
}
}
Expand Down Expand Up @@ -53,22 +51,13 @@ func (r registerer) registerHandlers(ctx context.Context, extra map[string]inter
}

// Extract configuration values
endpoint, endpointOk := config["endpoint"].(string)
backendURLPattern, backendURLPatternOk := config["backend_url_pattern"].(string)
backendHost, backendHostOk := config["backend_host"].(string)

// Check if all required configuration values are present
if !endpointOk || !backendURLPatternOk || !backendHostOk {
if !backendHostOk {
return h, errors.New("missing required configuration values")
}

// Basic sanity checks on the configuration values
if endpoint == "" {
return h, errors.New("endpoint cannot be empty")
}
if backendURLPattern == "" {
return h, errors.New("backend_url_pattern cannot be empty")
}
if backendHost == "" {
return h, errors.New("backend_host cannot be empty")
}
Expand All @@ -80,35 +69,36 @@ func (r registerer) registerHandlers(ctx context.Context, extra map[string]inter

// Return a new HTTP handler that wraps the original handler with custom logic.
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
// TODO: Performance optimize matchStrings; critical, every request to the API gateway uses this.
matchPaths, id := matchStrings(endpoint, req.URL.Path)

if !matchPaths {
httpClient := http.Client{Transport: http.DefaultTransport}

// This is a quick solution since we only support sse for pipeline trigger endpoint
if req.Header.Get("instill-use-sse") == "true" {
proxyHandler(w, req, httpClient, backendHost)
} else {
h.ServeHTTP(w, req)
return
}

// Construct serverURL using the extracted ID
serverURL := fmt.Sprintf("http://%s%s", backendHost, strings.Replace(backendURLPattern, "{id}", id, 1))
// Call proxyHandler if the path matches
proxyHandler(w, req, serverURL)
}), nil
}

// proxyHandler forwards the request to the actual SSE server and streams the response back to the client.
func proxyHandler(w http.ResponseWriter, r *http.Request, serverURL string) {
logger.Debug("server URL", serverURL)
// Forward the request to the actual SSE server
resp, err := http.Get(serverURL)
func proxyHandler(w http.ResponseWriter, r *http.Request, httpClient http.Client, backendHost string) {

url := string(r.URL.Path)
url = strings.ReplaceAll(url, "/internal", "")
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s%s", backendHost, url), r.Body)

if err != nil {
errM := "failed to connect to downstream SSE server"
logger.Critical(errM)
http.Error(w, errM, http.StatusInternalServerError)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
req.Header = r.Header
resp, err := httpClient.Do(req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer resp.Body.Close()

// Set headers for the client
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
Expand All @@ -134,28 +124,6 @@ func proxyHandler(w http.ResponseWriter, r *http.Request, serverURL string) {
}
}

// matchStrings checks if the request path matches the pattern and extracts the ID.
func matchStrings(pattern, str string) (bool, string) {
patternParts := strings.Split(pattern, "/")
strParts := strings.Split(str, "/")

if len(patternParts) != len(strParts) {
return false, ""
}

var id string
for i := 0; i < len(patternParts); i++ {
if patternParts[i] != strParts[i] && patternParts[i] != "{id}" {
return false, ""
}
if patternParts[i] == "{id}" {
id = strParts[i]
}
}

return true, id
}

func main() {}

// This logger is replaced by the RegisterLogger method to load the one from KrakenD
Expand Down
114 changes: 0 additions & 114 deletions plugins/sse-streaming/main_test.go

This file was deleted.

0 comments on commit a99d723

Please sign in to comment.