From ab1d16f4ae75cfdeea56e2a4893b675a08fec00f Mon Sep 17 00:00:00 2001 From: Deepak Kasu Date: Wed, 13 Nov 2024 09:29:19 -0700 Subject: [PATCH] APIGOV-28984 Code Review Feedback --- pkg/agent/agent.go | 4 +- pkg/agent/handler/accessrequest.go | 5 +- pkg/customunit/client.go | 33 +++++------ pkg/customunit/client_test.go | 8 +-- pkg/customunit/manager.go | 69 ++++++++++++---------- pkg/customunit/manager_test.go | 4 +- pkg/transaction/metric/metricscollector.go | 5 +- 7 files changed, 66 insertions(+), 62 deletions(-) diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 98f9cdf7b..5b316a56e 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -68,7 +68,7 @@ type agentData struct { apiValidatorJobID string configChangeHandler ConfigChangeHandler agentResourceChangeHandler ConfigChangeHandler - customUnitMetricServerManager *customunit.CustomUnitMetricServerManager + customUnitMetricServerManager *customunit.CustomUnitHandler agentShutdownHandler ShutdownHandler proxyResourceHandler *handler.StreamWatchProxyHandler isInitialized bool @@ -433,7 +433,7 @@ func GetAuthProviderRegistry() oauth.ProviderRegistry { return agent.authProviderRegistry } -func GetCustomUnitMetricServerManager() *customunit.CustomUnitMetricServerManager { +func GetCustomUnitMetricServerManager() *customunit.CustomUnitHandler { return agent.customUnitMetricServerManager } diff --git a/pkg/agent/handler/accessrequest.go b/pkg/agent/handler/accessrequest.go index 6eb95577c..1b3b681b9 100644 --- a/pkg/agent/handler/accessrequest.go +++ b/pkg/agent/handler/accessrequest.go @@ -24,7 +24,7 @@ type arProvisioner interface { AccessRequestDeprovision(accessRequest prov.AccessRequest) (status prov.RequestStatus) } type customUnitMetricServerManager interface { - HandleQuotaEnforcement(context.Context, context.CancelFunc, *management.AccessRequest, *management.ManagedApplication) error + HandleQuotaEnforcement(*management.AccessRequest, *management.ManagedApplication) error } type accessRequestHandler struct { @@ -139,8 +139,7 @@ func (h *accessRequestHandler) onPending(ctx context.Context, ar *management.Acc status, accessData := h.prov.AccessRequestProvision(req) if status.GetStatus() == prov.Success && len(ar.Spec.AdditionalQuotas) > 0 { - ctx, cancelCtx := context.WithCancel(ctx) - err := h.customUnitMetricServerManager.HandleQuotaEnforcement(ctx, cancelCtx, ar, app) + err := h.customUnitMetricServerManager.HandleQuotaEnforcement(ar, app) if err != nil { // h.onError(ctx, ar, err) diff --git a/pkg/customunit/client.go b/pkg/customunit/client.go index a28cb0203..14e5d7a17 100644 --- a/pkg/customunit/client.go +++ b/pkg/customunit/client.go @@ -4,7 +4,6 @@ import ( "context" "time" - "github.com/Axway/agent-sdk/pkg/agent/cache" cu "github.com/Axway/agent-sdk/pkg/amplify/agent/customunits" "github.com/Axway/agent-sdk/pkg/util/log" "google.golang.org/grpc" @@ -19,28 +18,31 @@ type customUnitClient struct { url string conn *grpc.ClientConn isRunning bool - cache cache.Manager + cache agentCacheManager stopChan chan struct{} delay time.Duration } +const maxRetryDelay = 5 * time.Minute +const initialRetryDelay = 30 * time.Second + type CustomUnitOption func(*customUnitClient) -type CustomUnitClientFactory func(...CustomUnitOption) (*customUnitClient, error) +type CustomUnitClientFactory func(agentCacheManager, ...CustomUnitOption) (*customUnitClient, error) -func NewCustomUnitClientFactory(url string, agentCache cache.Manager, quotaInfo *cu.QuotaInfo) CustomUnitClientFactory { - return func(opts ...CustomUnitOption) (*customUnitClient, error) { +func NewCustomUnitClientFactory(url string, quotaInfo *cu.QuotaInfo) CustomUnitClientFactory { + return func(cache agentCacheManager, opts ...CustomUnitOption) (*customUnitClient, error) { c := &customUnitClient{ quotaInfo: quotaInfo, url: url, dialOpts: []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), }, - cache: agentCache, + cache: cache, logger: log.NewFieldLogger().WithPackage("customunit").WithComponent("client").WithField("metricServer", url), stopChan: make(chan struct{}), - isRunning: true, - delay: 30 * time.Second, + isRunning: false, + delay: initialRetryDelay, } for _, o := range opts { @@ -58,7 +60,7 @@ func WithGRPCDialOption(opt grpc.DialOption) CustomUnitOption { } func (c *customUnitClient) createConnection() error { - conn, err := grpc.DialContext(context.Background(), c.url, c.dialOpts...) + conn, err := grpc.NewClient(c.url, c.dialOpts...) if err != nil { return err } @@ -81,9 +83,6 @@ func (c *customUnitClient) StartMetricReporting(metricReportChan chan *cu.Metric err := c.createConnection() if err != nil { c.ExecuteBackoff() - if c.delay > 5*time.Minute { - break - } continue } @@ -93,15 +92,11 @@ func (c *customUnitClient) StartMetricReporting(metricReportChan chan *cu.Metric if err != nil { c.Close() c.ExecuteBackoff() - if c.delay > 5*time.Minute { - c.logger.Debugf("ending connection retries") - break - } continue } c.isRunning = true // reset the delay - c.delay = 30 * time.Second + c.delay = initialRetryDelay // process metrics c.processMetrics(stream, metricReportChan) c.logger.Debug("connection lost, retrying to connect to metric server") @@ -132,6 +127,10 @@ func (c *customUnitClient) ExecuteBackoff() { c.logger.Debugf("connection is still lost, trying again in %v.", c.delay) time.Sleep(c.delay) c.delay = 2 * c.delay + if c.delay >= maxRetryDelay { + c.logger.Debugf("maximum retry delay of %v reached", maxRetryDelay) + c.delay = maxRetryDelay + } } func (c *customUnitClient) Close() { diff --git a/pkg/customunit/client_test.go b/pkg/customunit/client_test.go index e914d0d66..8f91ab326 100644 --- a/pkg/customunit/client_test.go +++ b/pkg/customunit/client_test.go @@ -63,8 +63,8 @@ func createQEConnection(fakeServer *fakeQuotaEnforcementServer, _ context.Contex }, } cache := cache.NewAgentCacheManager(&config.CentralConfiguration{}, false) - factory := NewCustomUnitClientFactory("bufnet", cache, quotaInfo) - return factory(WithGRPCDialOption(opt)) + factory := NewCustomUnitClientFactory("bufnet", quotaInfo) + return factory(cache, WithGRPCDialOption(opt)) } @@ -96,6 +96,6 @@ func createMRConnection(fakeServer *fakeCustomUnitMetricReportingServer, _ conte }() cache := cache.NewAgentCacheManager(&config.CentralConfiguration{}, false) - factory := NewCustomUnitClientFactory("bufnet", cache, &customunits.QuotaInfo{}) - return factory(WithGRPCDialOption(opt)) + factory := NewCustomUnitClientFactory("bufnet", &customunits.QuotaInfo{}) + return factory(cache, WithGRPCDialOption(opt)) } diff --git a/pkg/customunit/manager.go b/pkg/customunit/manager.go index 64e68b1df..a7c49ba1b 100644 --- a/pkg/customunit/manager.go +++ b/pkg/customunit/manager.go @@ -1,12 +1,10 @@ package customunit import ( - "context" "encoding/json" "errors" "fmt" - "github.com/Axway/agent-sdk/pkg/agent/cache" "github.com/Axway/agent-sdk/pkg/amplify/agent/customunits" v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1" "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/catalog/v1" @@ -19,9 +17,9 @@ import ( "github.com/Axway/agent-sdk/pkg/util/log" ) -type CustomUnitMetricServerManager struct { - configs []config.MetricServiceConfiguration - cache cache.Manager +type CustomUnitHandler struct { + servicesConfigs []config.MetricServiceConfiguration + cache agentCacheManager agentType config.AgentType logger log.FieldLogger clients []*customUnitClient @@ -33,9 +31,18 @@ type metricCollector interface { AddCustomMetricDetail(models.CustomMetricDetail) } -func NewCustomUnitMetricServerManager(configs []config.MetricServiceConfiguration, cache cache.Manager, agentType config.AgentType) *CustomUnitMetricServerManager { - return &CustomUnitMetricServerManager{ - configs: configs, +type agentCacheManager interface { + GetAPIServiceWithName(string) *v1.ResourceInstance + GetAPIServiceInstanceByID(string) (*v1.ResourceInstance, error) + GetAPIServiceKeys() []string + GetAPIServiceWithAPIID(string) *v1.ResourceInstance + GetManagedApplication(string) *v1.ResourceInstance + GetManagedApplicationByName(string) *v1.ResourceInstance +} + +func NewCustomUnitMetricServerManager(servicesConfigs []config.MetricServiceConfiguration, cache agentCacheManager, agentType config.AgentType) *CustomUnitHandler { + return &CustomUnitHandler{ + servicesConfigs: servicesConfigs, cache: cache, agentType: agentType, metricReportChan: make(chan *customunits.MetricReport, 100), @@ -45,17 +52,17 @@ func NewCustomUnitMetricServerManager(configs []config.MetricServiceConfiguratio } } -func (h *CustomUnitMetricServerManager) HandleQuotaEnforcement(ctx context.Context, cancelCtx context.CancelFunc, ar *management.AccessRequest, app *management.ManagedApplication) error { +func (h *CustomUnitHandler) HandleQuotaEnforcement(ar *management.AccessRequest, app *management.ManagedApplication) error { // Build quota info - quotaInfo, err := h.buildQuotaInfo(ctx, ar, app) + quotaInfo, err := h.buildQuotaInfo(ar, app) if err != nil { return fmt.Errorf("could not build quota info from access request") } errMessage := "" - for _, config := range h.configs { + for _, config := range h.servicesConfigs { if config.MetricServiceEnabled() { - factory := NewCustomUnitClientFactory(config.URL, h.cache, quotaInfo) - client, _ := factory() + factory := NewCustomUnitClientFactory(config.URL, quotaInfo) + client, _ := factory(h.cache) response, err := client.QuotaEnforcementInfo() if err != nil { // if error from QE and reject on fail, we return the error back to the central @@ -72,13 +79,13 @@ func (h *CustomUnitMetricServerManager) HandleQuotaEnforcement(ctx context.Conte return nil } -func (h *CustomUnitMetricServerManager) buildQuotaInfo(ctx context.Context, ar *management.AccessRequest, app *management.ManagedApplication) (*customunits.QuotaInfo, error) { +func (h *CustomUnitHandler) buildQuotaInfo(ar *management.AccessRequest, app *management.ManagedApplication) (*customunits.QuotaInfo, error) { unitRef, count := h.getQuotaInfo(ar) if unitRef == "" { return nil, nil } - instance, err := h.getServiceInstance(ctx, ar) + instance, err := h.getServiceInstance(ar) if err != nil { return nil, err } @@ -121,7 +128,7 @@ type reference struct { Unit string `json:"unit"` } -func (h *CustomUnitMetricServerManager) getQuotaInfo(ar *management.AccessRequest) (string, int) { +func (h *CustomUnitHandler) getQuotaInfo(ar *management.AccessRequest) (string, int) { index := 0 if len(ar.Spec.AdditionalQuotas) < index+1 { return "", 0 @@ -139,7 +146,7 @@ func (h *CustomUnitMetricServerManager) getQuotaInfo(ar *management.AccessReques return "", 0 } -func (h *CustomUnitMetricServerManager) getServiceInstance(_ context.Context, ar *management.AccessRequest) (*v1.ResourceInstance, error) { +func (h *CustomUnitHandler) getServiceInstance(ar *management.AccessRequest) (*v1.ResourceInstance, error) { instRef := ar.GetReferenceByGVK(management.APIServiceInstanceGVK()) instID := instRef.ID instance, err := h.cache.GetAPIServiceInstanceByID(instID) @@ -149,32 +156,34 @@ func (h *CustomUnitMetricServerManager) getServiceInstance(_ context.Context, ar return instance, nil } -func (m *CustomUnitMetricServerManager) HandleMetricReporting(ctx context.Context, cancelCtx context.CancelFunc, metricCollector metricCollector) { +func (m *CustomUnitHandler) HandleMetricReporting(metricCollector metricCollector) { if m.agentType != config.TraceabilityAgent { return } - go m.receiveMetrics(metricCollector) + if len(m.servicesConfigs) > 0 { + go m.receiveMetrics(metricCollector) + } // iterate over each metric service config - for _, config := range m.configs { + for _, config := range m.servicesConfigs { // Initialize custom units client - factory := NewCustomUnitClientFactory(config.URL, m.cache, &customunits.QuotaInfo{}) - client, _ := factory() + factory := NewCustomUnitClientFactory(config.URL, &customunits.QuotaInfo{}) + client, _ := factory(m.cache) go client.StartMetricReporting(m.metricReportChan) m.clients = append(m.clients, client) } } -func (c *CustomUnitMetricServerManager) receiveMetrics(metricCollector metricCollector) { +func (c *CustomUnitHandler) receiveMetrics(metricCollector metricCollector) { for { select { case metricReport := <-c.metricReportChan: if metricReport == nil { continue } - logger := c.logger.WithField("api", metricReport.ApiService.GetValue()) + logger := c.logger.WithField("api", metricReport.ApiService.GetValue()).WithField("app", metricReport.GetManagedApp().GetValue()).WithField("planUnit", metricReport.PlanUnit.GetUnitName()) customMetricDetail, err := c.buildCustomMetricDetail(metricReport) if err != nil { - logger.Error(err) + logger.WithError(err).Error("could not build metric data") continue } // create the metric report and send it to the metric collector @@ -190,7 +199,7 @@ func (c *CustomUnitMetricServerManager) receiveMetrics(metricCollector metricCol } } -func (c *CustomUnitMetricServerManager) buildCustomMetricDetail(metricReport *customunits.MetricReport) (*models.CustomMetricDetail, error) { +func (c *CustomUnitHandler) buildCustomMetricDetail(metricReport *customunits.MetricReport) (*models.CustomMetricDetail, error) { apiServiceLookup := metricReport.GetApiService() managedAppLookup := metricReport.GetManagedApp() planUnitLookup := metricReport.GetPlanUnit() @@ -220,7 +229,7 @@ func (c *CustomUnitMetricServerManager) buildCustomMetricDetail(metricReport *cu }, nil } -func (c *CustomUnitMetricServerManager) APIServiceLookup(apiServiceLookup *customunits.APIServiceLookup) (*models.APIDetails, error) { +func (c *CustomUnitHandler) APIServiceLookup(apiServiceLookup *customunits.APIServiceLookup) (*models.APIDetails, error) { apiSvcValue := apiServiceLookup.GetValue() apiLookupType := apiServiceLookup.GetType() apiCustomAttr := apiServiceLookup.GetCustomAttribute() @@ -267,7 +276,7 @@ func (c *CustomUnitMetricServerManager) APIServiceLookup(apiServiceLookup *custo }, nil } -func (c *CustomUnitMetricServerManager) ManagedApplicationLookup(appLookup *customunits.AppLookup) (*models.AppDetails, error) { +func (c *CustomUnitHandler) ManagedApplicationLookup(appLookup *customunits.AppLookup) (*models.AppDetails, error) { appValue := appLookup.GetValue() appLookupType := appLookup.GetType() appCustomAttr := appLookup.GetCustomAttribute() @@ -320,12 +329,12 @@ func (c *CustomUnitMetricServerManager) ManagedApplicationLookup(appLookup *cust }, nil } -func (c *CustomUnitMetricServerManager) PlanUnitLookup(planUnitLookup *customunits.UnitLookup) *models.Unit { +func (c *CustomUnitHandler) PlanUnitLookup(planUnitLookup *customunits.UnitLookup) *models.Unit { return &models.Unit{ Name: planUnitLookup.GetUnitName(), } } -func (c *CustomUnitMetricServerManager) Stop() { +func (c *CustomUnitHandler) Stop() { c.stopChan <- struct{}{} } diff --git a/pkg/customunit/manager_test.go b/pkg/customunit/manager_test.go index 537bf6630..4ccd0e350 100644 --- a/pkg/customunit/manager_test.go +++ b/pkg/customunit/manager_test.go @@ -1,7 +1,6 @@ package customunit import ( - "context" "testing" agentcache "github.com/Axway/agent-sdk/pkg/agent/cache" @@ -92,8 +91,7 @@ func Test_HandleQuotaEnforcementInfo(t *testing.T) { } manager := NewCustomUnitMetricServerManager(metricServicesConfigs, cm, config.DiscoveryAgent) - ctx, cancelCtx := context.WithCancel(context.Background()) - err := manager.HandleQuotaEnforcement(ctx, cancelCtx, accessReq, managedAppForTest) + err := manager.HandleQuotaEnforcement(accessReq, managedAppForTest) assert.Nil(t, err) } diff --git a/pkg/transaction/metric/metricscollector.go b/pkg/transaction/metric/metricscollector.go index c3fadbe5c..fdedd35d3 100644 --- a/pkg/transaction/metric/metricscollector.go +++ b/pkg/transaction/metric/metricscollector.go @@ -1,7 +1,6 @@ package metric import ( - "context" "encoding/json" "fmt" "os" @@ -148,9 +147,9 @@ func GetMetricCollector() Collector { if globalMetricCollector == nil && util.IsNotTest() { globalMetricCollector = createMetricCollector() + agent.GetCustomUnitMetricServerManager().HandleMetricReporting(globalMetricCollector) + } - ctx, ctxCancel := context.WithCancel(context.Background()) - agent.GetCustomUnitMetricServerManager().HandleMetricReporting(ctx, ctxCancel, globalMetricCollector) return globalMetricCollector }