Skip to content

Commit

Permalink
Merge pull request #9 from orkunkaraduman/develop
Browse files Browse the repository at this point in the history
v0.5.0
  • Loading branch information
orkunkaraduman authored Aug 28, 2023
2 parents a83f053 + fdf2fa9 commit f7a5c1c
Show file tree
Hide file tree
Showing 18 changed files with 422 additions and 289 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@

/vendor

/testcmd
/LOCALNOTES
28 changes: 9 additions & 19 deletions apps/httpapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/goinsane/logng"
"github.com/goinsane/xcontext"
"github.com/valyala/tcplisten"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
Expand All @@ -29,21 +28,19 @@ type HttpApp struct {
TLSConfig *tls.Config
Handler *cdn.Handler

ctx xcontext.CancelableContext
wg sync.WaitGroup
wg sync.WaitGroup

listener net.Listener
httpSrv *http.Server

connCount int32
}

func (a *HttpApp) Start(ctx xcontext.CancelableContext) {
func (a *HttpApp) Start(ctx context.Context, cancel context.CancelFunc) {
var err error

a.ctx = xcontext.WithCancelable2(context.WithValue(context.Background(), "logger", a.Logger))

logger := a.Logger
ctx = context.WithValue(ctx, "logger", a.Logger)

if a.ListenBacklog > 0 {
a.listener, err = (&tcplisten.Config{
Expand All @@ -57,7 +54,7 @@ func (a *HttpApp) Start(ctx xcontext.CancelableContext) {
}
if err != nil {
logger.Errorf("listen error: %w", err)
ctx.Cancel()
cancel()
return
}
logger.Infof("listening %q.", a.Listen)
Expand All @@ -79,9 +76,7 @@ func (a *HttpApp) Start(ctx xcontext.CancelableContext) {
a.httpSrv = &http.Server{
Handler: httpHandler,
TLSConfig: a.TLSConfig.Clone(),
ReadTimeout: 30 * time.Second,
ReadHeaderTimeout: 5 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 65 * time.Second,
MaxHeaderBytes: 1 << 20,
ConnState: func(conn net.Conn, state http.ConnState) {
Expand Down Expand Up @@ -115,21 +110,22 @@ func (a *HttpApp) Start(ctx xcontext.CancelableContext) {
}
}

func (a *HttpApp) Run(ctx xcontext.CancelableContext) {
func (a *HttpApp) Run(ctx context.Context, cancel context.CancelFunc) {
logger := a.Logger
ctx = context.WithValue(ctx, "logger", a.Logger)

logger.Info("started.")

if a.httpSrv.TLSConfig == nil {
if e := a.httpSrv.Serve(a.listener); e != nil && e != http.ErrServerClosed {
logger.Errorf("http serve error: %w", e)
ctx.Cancel()
cancel()
return
}
} else {
if e := a.httpSrv.ServeTLS(a.listener, "", ""); e != nil && e != http.ErrServerClosed {
logger.Errorf("https serve error: %w", e)
ctx.Cancel()
cancel()
return
}
}
Expand All @@ -149,8 +145,6 @@ func (a *HttpApp) Terminate(ctx context.Context) {
func (a *HttpApp) Stop() {
logger := a.Logger

a.ctx.Cancel()

if a.listener != nil {
_ = a.listener.Close()
}
Expand All @@ -160,7 +154,6 @@ func (a *HttpApp) Stop() {
}

func (a *HttpApp) httpHandler(w http.ResponseWriter, req *http.Request) {
ctx := a.ctx
logger := a.Logger

defer func() {
Expand All @@ -171,9 +164,6 @@ func (a *HttpApp) httpHandler(w http.ResponseWriter, req *http.Request) {

a.wg.Add(1)
defer a.wg.Done()
if ctx.Err() != nil {
return
}

a.Handler.ServeHTTP(w, req.WithContext(ctx))
a.Handler.ServeHTTP(w, req.WithContext(context.WithValue(req.Context(), "logger", logger)))
}
30 changes: 10 additions & 20 deletions apps/mgmtapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/goinsane/logng"
"github.com/goinsane/xcontext"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/orkunkaraduman/oscdn/store"
Expand All @@ -23,54 +22,51 @@ type MgmtApp struct {
Listen string
Store *store.Store

ctx xcontext.CancelableContext
wg sync.WaitGroup
wg sync.WaitGroup

listener net.Listener
httpServeMux *http.ServeMux
httpSrv *http.Server
}

func (a *MgmtApp) Start(ctx xcontext.CancelableContext) {
func (a *MgmtApp) Start(ctx context.Context, cancel context.CancelFunc) {
var err error

a.ctx = xcontext.WithCancelable2(context.WithValue(context.Background(), "logger", a.Logger))

logger := a.Logger
ctx = context.WithValue(ctx, "logger", a.Logger)

a.listener, err = net.Listen("tcp4", a.Listen)
if err != nil {
logger.Errorf("listen error: %w", err)
ctx.Cancel()
cancel()
return
}
logger.Infof("listening %q.", a.Listen)

a.httpServeMux = new(http.ServeMux)
a.httpServeMux.Handle("/debug/", mgmtDebugMux)
a.httpServeMux.Handle("/metrics/", promhttp.Handler())
a.httpServeMux.Handle("/cdn/", http.StripPrefix("/cdn", http.HandlerFunc(a.cdnHandler)))
a.httpServeMux.Handle("/mgmt/", http.StripPrefix("/mgmt", http.HandlerFunc(a.mgmtHandler)))

a.httpSrv = &http.Server{
Handler: http.HandlerFunc(a.httpHandler),
TLSConfig: nil,
ReadTimeout: 30 * time.Second,
ReadHeaderTimeout: 5 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 65 * time.Second,
MaxHeaderBytes: 1 << 20,
ErrorLog: log.New(io.Discard, "", log.LstdFlags),
}
}

func (a *MgmtApp) Run(ctx xcontext.CancelableContext) {
func (a *MgmtApp) Run(ctx context.Context, cancel context.CancelFunc) {
logger := a.Logger
ctx = context.WithValue(ctx, "logger", a.Logger)

logger.Info("started.")

if e := a.httpSrv.Serve(a.listener); e != nil && e != http.ErrServerClosed {
logger.Errorf("http serve error: %w", e)
ctx.Cancel()
cancel()
return
}
}
Expand All @@ -89,8 +85,6 @@ func (a *MgmtApp) Terminate(ctx context.Context) {
func (a *MgmtApp) Stop() {
logger := a.Logger

a.ctx.Cancel()

if a.listener != nil {
_ = a.listener.Close()
}
Expand All @@ -100,7 +94,6 @@ func (a *MgmtApp) Stop() {
}

func (a *MgmtApp) httpHandler(w http.ResponseWriter, req *http.Request) {
ctx := a.ctx
logger := a.Logger

defer func() {
Expand All @@ -111,14 +104,11 @@ func (a *MgmtApp) httpHandler(w http.ResponseWriter, req *http.Request) {

a.wg.Add(1)
defer a.wg.Done()
if ctx.Err() != nil {
return
}

a.httpServeMux.ServeHTTP(w, req.WithContext(ctx))
a.httpServeMux.ServeHTTP(w, req.WithContext(context.WithValue(req.Context(), "logger", logger)))
}

func (a *MgmtApp) cdnHandler(w http.ResponseWriter, req *http.Request) {
func (a *MgmtApp) mgmtHandler(w http.ResponseWriter, req *http.Request) {
var err error

ctx := req.Context()
Expand Down
13 changes: 7 additions & 6 deletions cdn/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ type HostConfig struct {
Scheme string
Host string
}
HttpsRedirect bool
HttpsRedirectPort int
DomainOverride bool
IgnoreQuery bool
UploadBurst int64
UploadRate int64
HttpsRedirect bool
HttpsRedirectPort int
HostOverride bool
IgnoreQuery bool
CompressionMaxSize int64
UploadBurst int64
UploadRate int64
}
113 changes: 22 additions & 91 deletions cdn/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ package cdn

import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/goinsane/logng"
"github.com/goinsane/xcontext"

"github.com/orkunkaraduman/oscdn/httputil"
"github.com/orkunkaraduman/oscdn/ioutil"
Expand All @@ -38,11 +35,12 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
req.URL.Host = strings.TrimSuffix(req.Host, ":443")
}

domain, _, _ := httputil.SplitHostPort(req.URL.Host)
remoteIP, _, _ := httputil.SplitHostPort(req.RemoteAddr)
remoteIP, _, _ := httputil.SplitHost(req.RemoteAddr)
realIP := httputil.GetRealIP(req)

logger = logger.WithFieldKeyVals("requestScheme", req.URL.Scheme, "requestHost", req.URL.Host,
"requestURI", req.RequestURI, "remoteAddr", req.RemoteAddr, "remoteIP", remoteIP)
"requestMethod", req.Method, "requestURI", req.RequestURI, "remoteAddr", req.RemoteAddr, "remoteIP", remoteIP,
"realIP", realIP)
ctx = context.WithValue(ctx, "logger", logger)

err = ctx.Err()
Expand All @@ -55,90 +53,20 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if h.ServerHeader != "" {
w.Header().Set("Server", h.ServerHeader)
}
w.Header().Set("Accept-Ranges", "bytes")

if (req.URL.Scheme != "http" && req.URL.Scheme != "https") ||
req.URL.Opaque != "" ||
req.URL.User != nil ||
req.URL.Host == "" ||
req.URL.Fragment != "" {
err = errors.New("invalid cdn url")
logger.V(1).Error(err)
http.Error(w, "invalid url", http.StatusBadRequest)
return
writer := &_Writer{
ResponseWriter: w,
Request: req,
HostConfig: h.GetHostConfig(req.URL.Scheme, req.URL.Host),
}

switch req.Method {
case http.MethodHead:
case http.MethodGet:
default:
err = fmt.Errorf("method %s not allowed", req.Method)
logger.V(1).Error(err)
http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
if !writer.Prepare(ctx) {
return
}

contentRange, err := getContentRange(req.Header)
getResult, err := h.Store.Get(ctx, writer.StoreURL.String(), writer.StoreHost, writer.ContentRange)
if err != nil {
err = fmt.Errorf("invalid content range: %w", err)
logger.V(1).Error(err)
http.Error(w, "invalid content range", http.StatusBadRequest)
return
}

var hostConfig *HostConfig
if h.GetHostConfig != nil {
hostConfig = h.GetHostConfig(req.URL.Scheme, req.URL.Host)
if hostConfig == nil {
err = errors.New("not allowed host")
logger.V(1).Error(err)
http.Error(w, "not allowed host", http.StatusForbidden)
return
}
}

storeURL := &url.URL{
Scheme: req.URL.Scheme,
Host: req.URL.Host,
Path: req.URL.Path,
RawQuery: req.URL.RawQuery,
}
storeHost := ""

if hostConfig != nil {
_, originPort, _ := httputil.SplitHostPort(hostConfig.Origin.Host)

if req.URL.Scheme == "http" && hostConfig.HttpsRedirect {
storeURL.Scheme = "https"
storeURL.Host = domain
if hostConfig.HttpsRedirectPort > 0 && hostConfig.HttpsRedirectPort != 443 {
storeURL.Host = fmt.Sprintf("%s:%d", domain, hostConfig.HttpsRedirectPort)
}
w.Header().Set("Location", storeURL.String())
http.Error(w, http.StatusText(http.StatusFound), http.StatusFound)
return
}

storeURL.Scheme = hostConfig.Origin.Scheme
storeURL.Host = hostConfig.Origin.Host

if hostConfig.DomainOverride {
storeHost = domain
if originPort > 0 {
storeHost = fmt.Sprintf("%s:%d", domain, originPort)
}
}

if hostConfig.IgnoreQuery {
storeURL.RawQuery = ""
}
}

getResult, err := h.Store.Get(ctx, storeURL.String(), storeHost, contentRange)
if err != nil {
if xcontext.IsContextError(err) {
logger.V(1).Error(err)
return
}
switch err.(type) {
case *store.RequestError:
http.Error(w, "origin not responding", http.StatusBadGateway)
Expand Down Expand Up @@ -167,8 +95,14 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
}
w.Header().Set("Expires", getResult.ExpiresAt.Format(time.RFC1123))
if getResult.StatusCode != http.StatusOK || getResult.ContentRange == nil {
w.Header().Set("Content-Length", strconv.FormatInt(getResult.Size, 10))

if getResult.ContentRange == nil {
if writer.HostConfig.CompressionMaxSize > 0 && getResult.Size <= writer.HostConfig.CompressionMaxSize {
writer.SetContentEncoder(ctx)
}
if writer.ContentEncoding == "" {
w.Header().Set("Content-Length", strconv.FormatInt(getResult.Size, 10))
}
w.WriteHeader(getResult.StatusCode)
} else {
contentLength := getResult.ContentRange.End - getResult.ContentRange.Start
Expand All @@ -182,13 +116,10 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
case http.MethodHead:
err = nil
case http.MethodGet:
var uploadBurst int64
var uploadRate int64
if hostConfig != nil {
uploadBurst = hostConfig.UploadBurst
uploadRate = hostConfig.UploadRate
_, err = ioutil.CopyRate(writer, getResult, writer.HostConfig.UploadBurst, writer.HostConfig.UploadRate)
if err == nil {
err = writer.Close()
}
_, err = ioutil.CopyRate(w, getResult, uploadBurst, uploadRate)
}
if err != nil {
err = fmt.Errorf("content upload error: %w", err)
Expand Down
Loading

0 comments on commit f7a5c1c

Please sign in to comment.