Skip to content
This repository has been archived by the owner on Nov 25, 2022. It is now read-only.

Commit

Permalink
refactor: add timeouts to http handler and querier
Browse files Browse the repository at this point in the history
Signed-off-by: Artur Troian <troian.ap@gmail.com>
  • Loading branch information
troian committed Mar 3, 2022
1 parent 6a7889d commit d6ca3e5
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 110 deletions.
38 changes: 11 additions & 27 deletions ceph.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,30 +84,21 @@ func (cc cephClusters) dup() cephClusters {
return res
}

type resp struct {
res []akashv1.InventoryClusterStorage
err error
}

type req struct {
resp chan resp
}

type ceph struct {
exe RemotePodCommandExecutor
ctx context.Context
cancel context.CancelFunc
reqch chan req
querier
}

func NewCeph(ctx context.Context) (Storage, error) {
ctx, cancel := context.WithCancel(ctx)

c := &ceph{
exe: NewRemotePodCommandExecutor(KubeConfigFromCtx(ctx), KubeClientFromCtx(ctx)),
ctx: ctx,
cancel: cancel,
reqch: make(chan req, 100),
exe: NewRemotePodCommandExecutor(KubeConfigFromCtx(ctx), KubeClientFromCtx(ctx)),
ctx: ctx,
cancel: cancel,
querier: newQuerier(),
}

group := ErrGroupFromCtx(ctx)
Expand All @@ -116,18 +107,6 @@ func NewCeph(ctx context.Context) (Storage, error) {
return c, nil
}

func (c *ceph) Query() ([]akashv1.InventoryClusterStorage, error) {
r := req{
resp: make(chan resp, 1),
}

c.reqch <- r

rsp := <-r.resp

return rsp.res, rsp.err
}

func (c *ceph) run() error {
events := make(chan interface{}, 1000)

Expand All @@ -154,6 +133,11 @@ func (c *ceph) run() error {
case rawEvt := <-events:
switch evt := rawEvt.(type) {
case watch.Event:
if evt.Object == nil {
log.Info("received nil event object", "event", evt.Type)
break
}

kind := reflect.TypeOf(evt.Object).String()
if idx := strings.LastIndex(kind, "."); idx > 0 {
kind = kind[idx+1:]
Expand Down Expand Up @@ -261,7 +245,7 @@ func (c *ceph) scrapeMetrics(scs cephStorageClasses, clusters map[string]string)
dfResults := make(map[string]dfResp, len(clusters))

for clusterID, ns := range clusters {
stdout, _, err := c.exe.ExecCommandInContainerWithFullOutput("rook-ceph-tools", "rook-ceph-tools", ns, "ceph", "df", "--format", "json")
stdout, _, err := c.exe.ExecCommandInContainerWithFullOutputWithTimeout("rook-ceph-tools", "rook-ceph-tools", ns, "ceph", "df", "--format", "json")
if err != nil {
return nil, err
}
Expand Down
106 changes: 41 additions & 65 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
Expand All @@ -38,6 +36,8 @@ import (
const (
FlagKubeConfig = "kubeconfig"
FlagKubeInCluster = "kube-incluster"
FlagApiTimeout = "api-timeout"
FlagQueryTimeout = "query-timeout"
)

type ContextKey string
Expand Down Expand Up @@ -165,6 +165,20 @@ func main() {
&cli.BoolFlag{
Name: FlagKubeInCluster,
},
&cli.DurationFlag{
Name: FlagApiTimeout,
EnvVars: []string{strings.ToUpper(FlagApiTimeout)},
Required: false,
Hidden: false,
Value: 3 * time.Second,
},
&cli.DurationFlag{
Name: FlagQueryTimeout,
EnvVars: []string{strings.ToUpper(FlagQueryTimeout)},
Required: false,
Hidden: false,
Value: 2 * time.Second,
},
}

app.Before = func(c *cli.Context) error {
Expand All @@ -173,17 +187,19 @@ func main() {
return err
}

clientset, err := kubernetes.NewForConfig(KubeConfigFromCtx(c.Context))
kubecfg := KubeConfigFromCtx(c.Context)

clientset, err := kubernetes.NewForConfig(kubecfg)
if err != nil {
return err
}

rc, err := rookclientset.NewForConfig(KubeConfigFromCtx(c.Context))
rc, err := rookclientset.NewForConfig(kubecfg)
if err != nil {
return err
}

ac, err := akashclientset.NewForConfig(KubeConfigFromCtx(c.Context))
ac, err := akashclientset.NewForConfig(kubecfg)

group, ctx := errgroup.WithContext(c.Context)
c.Context = ctx
Expand Down Expand Up @@ -216,13 +232,9 @@ func main() {

ContextSet(c, CtxKeyStorage, storage)

group.Go(func() error {
return reqListener(c)
})

srv := &http.Server{
Addr: ":8080",
Handler: newRouter(),
Handler: newRouter(LogFromCtx(c.Context).WithName("router"), c.Duration(FlagApiTimeout), c.Duration(FlagQueryTimeout)),
BaseContext: func(_ net.Listener) context.Context {
return c.Context
},
Expand Down Expand Up @@ -264,28 +276,44 @@ func main() {
_ = app.RunContext(ctx, os.Args)
}

func newRouter() *mux.Router {
func newRouter(log logr.Logger, apiTimeout, queryTimeout time.Duration) *mux.Router {
router := mux.NewRouter()

router.Use(func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
rCtx, cancel := context.WithTimeout(r.Context(), apiTimeout)
defer cancel()

h.ServeHTTP(w, r.WithContext(rCtx))
})
})

router.HandleFunc("/inventory", func(w http.ResponseWriter, req *http.Request) {
storage := StorageFromCtx(req.Context())
inv := akashv1.Inventory{
TypeMeta: metav1.TypeMeta{
Kind: "Inventory",
APIVersion: "akash.network/v1",
},
ObjectMeta: metav1.ObjectMeta{
CreationTimestamp: metav1.NewTime(time.Now().UTC()),
},
Spec: akashv1.InventorySpec{},
Status: akashv1.InventoryStatus{
State: akashv1.InventoryStatePulled,
},
}

for _, st := range storage {
res, err := st.Query()
if err != nil {
ctx, cancel := context.WithTimeout(req.Context(), queryTimeout)
res, err := st.Query(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
inv.Status.Messages = append(inv.Status.Messages, err.Error())
log.Error(err, "query failed")
}

cancel()

inv.Spec.Storage = append(inv.Spec.Storage, res...)
}

Expand Down Expand Up @@ -329,55 +357,3 @@ func loadKubeConfig(c *cli.Context) error {

return nil
}

func reqListener(c *cli.Context) error {
bus := PubSubFromCtx(c.Context)
evtch := bus.Sub("invreq")

group := ErrGroupFromCtx(c.Context)

storage := StorageFromCtx(c.Context)
ac := AkashClientFromCtx(c.Context)

log := LogFromCtx(c.Context).WithName("req-handler")
for {
select {
case <-c.Context.Done():
return c.Context.Err()
case rawEvt := <-evtch:
switch evt := rawEvt.(type) {
case watch.Event:
switch obj := evt.Object.(type) {
case *akashv1.InventoryRequest:
group.Go(func() error {
inv := akashv1.Inventory{
TypeMeta: metav1.TypeMeta{
APIVersion: "akash.network/v1",
Kind: "Inventory",
},
ObjectMeta: metav1.ObjectMeta{
Name: obj.Name,
},
}
for _, st := range storage {
res, _ := st.Query()
inv.Spec.Storage = append(inv.Spec.Storage, res...)
}

payload, _ := json.Marshal(&inv)

_, err := ac.AkashV1().
Inventories().
Patch(c.Context, obj.Name, types.ApplyPatchType, payload, metav1.ApplyOptions{Force: true, FieldManager: os.Args[0]}.ToPatchOptions())

if err != nil {
log.Error(err, "")
}

return nil
})
}
}
}
}
}
23 changes: 6 additions & 17 deletions rancher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,25 @@ type rancher struct {
exe RemotePodCommandExecutor
ctx context.Context
cancel context.CancelFunc
reqch chan req
querier
}

type rancherStorage struct {
isRancher bool
isAkashManaged bool
allocated uint64
}

type rancherStorageClasses map[string]*rancherStorage

func NewRancher(ctx context.Context) (Storage, error) {
ctx, cancel := context.WithCancel(ctx)

r := &rancher{
exe: NewRemotePodCommandExecutor(KubeConfigFromCtx(ctx), KubeClientFromCtx(ctx)),
ctx: ctx,
cancel: cancel,
reqch: make(chan req, 100),
exe: NewRemotePodCommandExecutor(KubeConfigFromCtx(ctx), KubeClientFromCtx(ctx)),
ctx: ctx,
cancel: cancel,
querier: newQuerier(),
}

group := ErrGroupFromCtx(ctx)
Expand All @@ -46,18 +47,6 @@ func NewRancher(ctx context.Context) (Storage, error) {
return r, nil
}

func (c *rancher) Query() ([]akashv1.InventoryClusterStorage, error) {
r := req{
resp: make(chan resp, 1),
}

c.reqch <- r

rsp := <-r.resp

return rsp.res, rsp.err
}

func (c *rancher) run() error {
defer func() {
c.cancel()
Expand Down
42 changes: 41 additions & 1 deletion types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,44 @@ import (
akashv1 "github.com/ovrclk/akash/pkg/apis/akash.network/v1"
)

type resp struct {
res []akashv1.InventoryClusterStorage
err error
}

type req struct {
resp chan resp
}

type querier struct {
reqch chan req
}

func newQuerier() querier {
return querier{
reqch: make(chan req, 100),
}
}

func (c *querier) Query(ctx context.Context) ([]akashv1.InventoryClusterStorage, error) {
r := req{
resp: make(chan resp, 1),
}

select {
case c.reqch <- r:
case <-ctx.Done():
return nil, ctx.Err()
}

select {
case rsp := <-r.resp:
return rsp.res, rsp.err
case <-ctx.Done():
return nil, ctx.Err()
}
}

type watchOption struct {
listOptions metav1.ListOptions
}
Expand All @@ -28,7 +66,7 @@ func WatchWithListOptions(val metav1.ListOptions) WatchOption {
}

type Storage interface {
Query() ([]akashv1.InventoryClusterStorage, error)
Query(ctx context.Context) ([]akashv1.InventoryClusterStorage, error)
}

type Watcher interface {
Expand All @@ -38,6 +76,8 @@ type Watcher interface {
type RemotePodCommandExecutor interface {
ExecWithOptions(options rookexec.ExecOptions) (string, string, error)
ExecCommandInContainerWithFullOutput(appLabel, containerName, namespace string, cmd ...string) (string, string, error)
// ExecCommandInContainerWithFullOutputWithTimeout uses 15s hard-coded timeout
ExecCommandInContainerWithFullOutputWithTimeout(appLabel, containerName, namespace string, cmd ...string) (string, string, error)
}

func NewRemotePodCommandExecutor(restcfg *rest.Config, clientset *kubernetes.Clientset) RemotePodCommandExecutor {
Expand Down

0 comments on commit d6ca3e5

Please sign in to comment.