Skip to content

Commit

Permalink
APIGOV-28984 Code Review Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Deepak Kasu committed Nov 13, 2024
1 parent 9f4f305 commit ab1d16f
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 62 deletions.
4 changes: 2 additions & 2 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type agentData struct {
apiValidatorJobID string
configChangeHandler ConfigChangeHandler
agentResourceChangeHandler ConfigChangeHandler
customUnitMetricServerManager *customunit.CustomUnitMetricServerManager
customUnitMetricServerManager *customunit.CustomUnitHandler
agentShutdownHandler ShutdownHandler
proxyResourceHandler *handler.StreamWatchProxyHandler
isInitialized bool
Expand Down Expand Up @@ -433,7 +433,7 @@ func GetAuthProviderRegistry() oauth.ProviderRegistry {
return agent.authProviderRegistry
}

func GetCustomUnitMetricServerManager() *customunit.CustomUnitMetricServerManager {
func GetCustomUnitMetricServerManager() *customunit.CustomUnitHandler {
return agent.customUnitMetricServerManager
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/agent/handler/accessrequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type arProvisioner interface {
AccessRequestDeprovision(accessRequest prov.AccessRequest) (status prov.RequestStatus)
}
type customUnitMetricServerManager interface {
HandleQuotaEnforcement(context.Context, context.CancelFunc, *management.AccessRequest, *management.ManagedApplication) error
HandleQuotaEnforcement(*management.AccessRequest, *management.ManagedApplication) error
}

type accessRequestHandler struct {
Expand Down Expand Up @@ -139,8 +139,7 @@ func (h *accessRequestHandler) onPending(ctx context.Context, ar *management.Acc
status, accessData := h.prov.AccessRequestProvision(req)

if status.GetStatus() == prov.Success && len(ar.Spec.AdditionalQuotas) > 0 {
ctx, cancelCtx := context.WithCancel(ctx)
err := h.customUnitMetricServerManager.HandleQuotaEnforcement(ctx, cancelCtx, ar, app)
err := h.customUnitMetricServerManager.HandleQuotaEnforcement(ar, app)

if err != nil {
// h.onError(ctx, ar, err)
Expand Down
33 changes: 16 additions & 17 deletions pkg/customunit/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"time"

"github.com/Axway/agent-sdk/pkg/agent/cache"
cu "github.com/Axway/agent-sdk/pkg/amplify/agent/customunits"
"github.com/Axway/agent-sdk/pkg/util/log"
"google.golang.org/grpc"
Expand All @@ -19,28 +18,31 @@ type customUnitClient struct {
url string
conn *grpc.ClientConn
isRunning bool
cache cache.Manager
cache agentCacheManager
stopChan chan struct{}
delay time.Duration
}

const maxRetryDelay = 5 * time.Minute
const initialRetryDelay = 30 * time.Second

type CustomUnitOption func(*customUnitClient)

type CustomUnitClientFactory func(...CustomUnitOption) (*customUnitClient, error)
type CustomUnitClientFactory func(agentCacheManager, ...CustomUnitOption) (*customUnitClient, error)

func NewCustomUnitClientFactory(url string, agentCache cache.Manager, quotaInfo *cu.QuotaInfo) CustomUnitClientFactory {
return func(opts ...CustomUnitOption) (*customUnitClient, error) {
func NewCustomUnitClientFactory(url string, quotaInfo *cu.QuotaInfo) CustomUnitClientFactory {
return func(cache agentCacheManager, opts ...CustomUnitOption) (*customUnitClient, error) {
c := &customUnitClient{
quotaInfo: quotaInfo,
url: url,
dialOpts: []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
},
cache: agentCache,
cache: cache,
logger: log.NewFieldLogger().WithPackage("customunit").WithComponent("client").WithField("metricServer", url),
stopChan: make(chan struct{}),
isRunning: true,
delay: 30 * time.Second,
isRunning: false,
delay: initialRetryDelay,
}

for _, o := range opts {
Expand All @@ -58,7 +60,7 @@ func WithGRPCDialOption(opt grpc.DialOption) CustomUnitOption {
}

func (c *customUnitClient) createConnection() error {
conn, err := grpc.DialContext(context.Background(), c.url, c.dialOpts...)
conn, err := grpc.NewClient(c.url, c.dialOpts...)
if err != nil {
return err
}
Expand All @@ -81,9 +83,6 @@ func (c *customUnitClient) StartMetricReporting(metricReportChan chan *cu.Metric
err := c.createConnection()
if err != nil {
c.ExecuteBackoff()
if c.delay > 5*time.Minute {
break
}
continue
}

Expand All @@ -93,15 +92,11 @@ func (c *customUnitClient) StartMetricReporting(metricReportChan chan *cu.Metric
if err != nil {
c.Close()
c.ExecuteBackoff()
if c.delay > 5*time.Minute {
c.logger.Debugf("ending connection retries")
break
}
continue
}
c.isRunning = true
// reset the delay
c.delay = 30 * time.Second
c.delay = initialRetryDelay
// process metrics
c.processMetrics(stream, metricReportChan)
c.logger.Debug("connection lost, retrying to connect to metric server")
Expand Down Expand Up @@ -132,6 +127,10 @@ func (c *customUnitClient) ExecuteBackoff() {
c.logger.Debugf("connection is still lost, trying again in %v.", c.delay)
time.Sleep(c.delay)
c.delay = 2 * c.delay
if c.delay >= maxRetryDelay {
c.logger.Debugf("maximum retry delay of %v reached", maxRetryDelay)
c.delay = maxRetryDelay
}
}

func (c *customUnitClient) Close() {
Expand Down
8 changes: 4 additions & 4 deletions pkg/customunit/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func createQEConnection(fakeServer *fakeQuotaEnforcementServer, _ context.Contex
},
}
cache := cache.NewAgentCacheManager(&config.CentralConfiguration{}, false)
factory := NewCustomUnitClientFactory("bufnet", cache, quotaInfo)
return factory(WithGRPCDialOption(opt))
factory := NewCustomUnitClientFactory("bufnet", quotaInfo)
return factory(cache, WithGRPCDialOption(opt))

}

Expand Down Expand Up @@ -96,6 +96,6 @@ func createMRConnection(fakeServer *fakeCustomUnitMetricReportingServer, _ conte
}()

cache := cache.NewAgentCacheManager(&config.CentralConfiguration{}, false)
factory := NewCustomUnitClientFactory("bufnet", cache, &customunits.QuotaInfo{})
return factory(WithGRPCDialOption(opt))
factory := NewCustomUnitClientFactory("bufnet", &customunits.QuotaInfo{})
return factory(cache, WithGRPCDialOption(opt))
}
69 changes: 39 additions & 30 deletions pkg/customunit/manager.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package customunit

import (
"context"
"encoding/json"
"errors"
"fmt"

"github.com/Axway/agent-sdk/pkg/agent/cache"
"github.com/Axway/agent-sdk/pkg/amplify/agent/customunits"
v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1"
"github.com/Axway/agent-sdk/pkg/apic/apiserver/models/catalog/v1"
Expand All @@ -19,9 +17,9 @@ import (
"github.com/Axway/agent-sdk/pkg/util/log"
)

type CustomUnitMetricServerManager struct {
configs []config.MetricServiceConfiguration
cache cache.Manager
type CustomUnitHandler struct {
servicesConfigs []config.MetricServiceConfiguration
cache agentCacheManager
agentType config.AgentType
logger log.FieldLogger
clients []*customUnitClient
Expand All @@ -33,9 +31,18 @@ type metricCollector interface {
AddCustomMetricDetail(models.CustomMetricDetail)
}

func NewCustomUnitMetricServerManager(configs []config.MetricServiceConfiguration, cache cache.Manager, agentType config.AgentType) *CustomUnitMetricServerManager {
return &CustomUnitMetricServerManager{
configs: configs,
type agentCacheManager interface {
GetAPIServiceWithName(string) *v1.ResourceInstance
GetAPIServiceInstanceByID(string) (*v1.ResourceInstance, error)
GetAPIServiceKeys() []string
GetAPIServiceWithAPIID(string) *v1.ResourceInstance
GetManagedApplication(string) *v1.ResourceInstance
GetManagedApplicationByName(string) *v1.ResourceInstance
}

func NewCustomUnitMetricServerManager(servicesConfigs []config.MetricServiceConfiguration, cache agentCacheManager, agentType config.AgentType) *CustomUnitHandler {
return &CustomUnitHandler{
servicesConfigs: servicesConfigs,
cache: cache,
agentType: agentType,
metricReportChan: make(chan *customunits.MetricReport, 100),
Expand All @@ -45,17 +52,17 @@ func NewCustomUnitMetricServerManager(configs []config.MetricServiceConfiguratio
}
}

func (h *CustomUnitMetricServerManager) HandleQuotaEnforcement(ctx context.Context, cancelCtx context.CancelFunc, ar *management.AccessRequest, app *management.ManagedApplication) error {
func (h *CustomUnitHandler) HandleQuotaEnforcement(ar *management.AccessRequest, app *management.ManagedApplication) error {
// Build quota info
quotaInfo, err := h.buildQuotaInfo(ctx, ar, app)
quotaInfo, err := h.buildQuotaInfo(ar, app)
if err != nil {
return fmt.Errorf("could not build quota info from access request")
}
errMessage := ""
for _, config := range h.configs {
for _, config := range h.servicesConfigs {
if config.MetricServiceEnabled() {
factory := NewCustomUnitClientFactory(config.URL, h.cache, quotaInfo)
client, _ := factory()
factory := NewCustomUnitClientFactory(config.URL, quotaInfo)
client, _ := factory(h.cache)
response, err := client.QuotaEnforcementInfo()
if err != nil {
// if error from QE and reject on fail, we return the error back to the central
Expand All @@ -72,13 +79,13 @@ func (h *CustomUnitMetricServerManager) HandleQuotaEnforcement(ctx context.Conte
return nil
}

func (h *CustomUnitMetricServerManager) buildQuotaInfo(ctx context.Context, ar *management.AccessRequest, app *management.ManagedApplication) (*customunits.QuotaInfo, error) {
func (h *CustomUnitHandler) buildQuotaInfo(ar *management.AccessRequest, app *management.ManagedApplication) (*customunits.QuotaInfo, error) {
unitRef, count := h.getQuotaInfo(ar)
if unitRef == "" {
return nil, nil
}

instance, err := h.getServiceInstance(ctx, ar)
instance, err := h.getServiceInstance(ar)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -121,7 +128,7 @@ type reference struct {
Unit string `json:"unit"`
}

func (h *CustomUnitMetricServerManager) getQuotaInfo(ar *management.AccessRequest) (string, int) {
func (h *CustomUnitHandler) getQuotaInfo(ar *management.AccessRequest) (string, int) {
index := 0
if len(ar.Spec.AdditionalQuotas) < index+1 {
return "", 0
Expand All @@ -139,7 +146,7 @@ func (h *CustomUnitMetricServerManager) getQuotaInfo(ar *management.AccessReques
return "", 0
}

func (h *CustomUnitMetricServerManager) getServiceInstance(_ context.Context, ar *management.AccessRequest) (*v1.ResourceInstance, error) {
func (h *CustomUnitHandler) getServiceInstance(ar *management.AccessRequest) (*v1.ResourceInstance, error) {
instRef := ar.GetReferenceByGVK(management.APIServiceInstanceGVK())
instID := instRef.ID
instance, err := h.cache.GetAPIServiceInstanceByID(instID)
Expand All @@ -149,32 +156,34 @@ func (h *CustomUnitMetricServerManager) getServiceInstance(_ context.Context, ar
return instance, nil
}

func (m *CustomUnitMetricServerManager) HandleMetricReporting(ctx context.Context, cancelCtx context.CancelFunc, metricCollector metricCollector) {
func (m *CustomUnitHandler) HandleMetricReporting(metricCollector metricCollector) {
if m.agentType != config.TraceabilityAgent {
return
}
go m.receiveMetrics(metricCollector)
if len(m.servicesConfigs) > 0 {
go m.receiveMetrics(metricCollector)
}
// iterate over each metric service config
for _, config := range m.configs {
for _, config := range m.servicesConfigs {
// Initialize custom units client
factory := NewCustomUnitClientFactory(config.URL, m.cache, &customunits.QuotaInfo{})
client, _ := factory()
factory := NewCustomUnitClientFactory(config.URL, &customunits.QuotaInfo{})
client, _ := factory(m.cache)
go client.StartMetricReporting(m.metricReportChan)
m.clients = append(m.clients, client)
}
}

func (c *CustomUnitMetricServerManager) receiveMetrics(metricCollector metricCollector) {
func (c *CustomUnitHandler) receiveMetrics(metricCollector metricCollector) {
for {
select {
case metricReport := <-c.metricReportChan:
if metricReport == nil {
continue
}
logger := c.logger.WithField("api", metricReport.ApiService.GetValue())
logger := c.logger.WithField("api", metricReport.ApiService.GetValue()).WithField("app", metricReport.GetManagedApp().GetValue()).WithField("planUnit", metricReport.PlanUnit.GetUnitName())
customMetricDetail, err := c.buildCustomMetricDetail(metricReport)
if err != nil {
logger.Error(err)
logger.WithError(err).Error("could not build metric data")
continue
}
// create the metric report and send it to the metric collector
Expand All @@ -190,7 +199,7 @@ func (c *CustomUnitMetricServerManager) receiveMetrics(metricCollector metricCol
}
}

func (c *CustomUnitMetricServerManager) buildCustomMetricDetail(metricReport *customunits.MetricReport) (*models.CustomMetricDetail, error) {
func (c *CustomUnitHandler) buildCustomMetricDetail(metricReport *customunits.MetricReport) (*models.CustomMetricDetail, error) {
apiServiceLookup := metricReport.GetApiService()
managedAppLookup := metricReport.GetManagedApp()
planUnitLookup := metricReport.GetPlanUnit()
Expand Down Expand Up @@ -220,7 +229,7 @@ func (c *CustomUnitMetricServerManager) buildCustomMetricDetail(metricReport *cu
}, nil
}

func (c *CustomUnitMetricServerManager) APIServiceLookup(apiServiceLookup *customunits.APIServiceLookup) (*models.APIDetails, error) {
func (c *CustomUnitHandler) APIServiceLookup(apiServiceLookup *customunits.APIServiceLookup) (*models.APIDetails, error) {
apiSvcValue := apiServiceLookup.GetValue()
apiLookupType := apiServiceLookup.GetType()
apiCustomAttr := apiServiceLookup.GetCustomAttribute()
Expand Down Expand Up @@ -267,7 +276,7 @@ func (c *CustomUnitMetricServerManager) APIServiceLookup(apiServiceLookup *custo
}, nil
}

func (c *CustomUnitMetricServerManager) ManagedApplicationLookup(appLookup *customunits.AppLookup) (*models.AppDetails, error) {
func (c *CustomUnitHandler) ManagedApplicationLookup(appLookup *customunits.AppLookup) (*models.AppDetails, error) {
appValue := appLookup.GetValue()
appLookupType := appLookup.GetType()
appCustomAttr := appLookup.GetCustomAttribute()
Expand Down Expand Up @@ -320,12 +329,12 @@ func (c *CustomUnitMetricServerManager) ManagedApplicationLookup(appLookup *cust
}, nil
}

func (c *CustomUnitMetricServerManager) PlanUnitLookup(planUnitLookup *customunits.UnitLookup) *models.Unit {
func (c *CustomUnitHandler) PlanUnitLookup(planUnitLookup *customunits.UnitLookup) *models.Unit {
return &models.Unit{
Name: planUnitLookup.GetUnitName(),
}
}

func (c *CustomUnitMetricServerManager) Stop() {
func (c *CustomUnitHandler) Stop() {
c.stopChan <- struct{}{}
}
4 changes: 1 addition & 3 deletions pkg/customunit/manager_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package customunit

import (
"context"
"testing"

agentcache "github.com/Axway/agent-sdk/pkg/agent/cache"
Expand Down Expand Up @@ -92,8 +91,7 @@ func Test_HandleQuotaEnforcementInfo(t *testing.T) {
}

manager := NewCustomUnitMetricServerManager(metricServicesConfigs, cm, config.DiscoveryAgent)
ctx, cancelCtx := context.WithCancel(context.Background())
err := manager.HandleQuotaEnforcement(ctx, cancelCtx, accessReq, managedAppForTest)
err := manager.HandleQuotaEnforcement(accessReq, managedAppForTest)

assert.Nil(t, err)
}
5 changes: 2 additions & 3 deletions pkg/transaction/metric/metricscollector.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package metric

import (
"context"
"encoding/json"
"fmt"
"os"
Expand Down Expand Up @@ -148,9 +147,9 @@ func GetMetricCollector() Collector {

if globalMetricCollector == nil && util.IsNotTest() {
globalMetricCollector = createMetricCollector()
agent.GetCustomUnitMetricServerManager().HandleMetricReporting(globalMetricCollector)

}
ctx, ctxCancel := context.WithCancel(context.Background())
agent.GetCustomUnitMetricServerManager().HandleMetricReporting(ctx, ctxCancel, globalMetricCollector)
return globalMetricCollector
}

Expand Down

0 comments on commit ab1d16f

Please sign in to comment.