From 7453084c1779b850da12c739b5b30920233e3c4f Mon Sep 17 00:00:00 2001 From: Vivek Singh Chauhan Date: Fri, 25 Oct 2024 08:15:29 -0700 Subject: [PATCH] APIGOV-29054 - Using Anypoint Monitoring Archive API for API metrics (#279) * APIGOV-29054 - Using Anypoint Monitoring Archive API to capture API metrics - Remove deprecated Anypoint Analytics API calls to fetch traffic events * APIGOV-29054 - updates * APIGOV-29054 - use time from metric report for metric event * APIGOV-29054 - Updates to query monitoring datasource - added config to optionally disable monitoring archive API to fetch metrics and use monitoring query to fetch API metrics * APIGOV-29054 - update * APIGOV-29054 - update * APIGOV-29054 - fix for publishing metric batch * APIGOV-29054 - fixes * APIGOV-29054 - fix to use current time if instance create time is more than 24h * APIGOV-29054 - update --- README_discovery.md | 2 +- README_traceability.md | 2 +- build/mulesoft_discovery_agent.yml | 45 ++-- build/mulesoft_traceability_agent.yml | 1 + deployment/discovery-deployment.yaml | 4 +- deployment/docker-compose.yml | 4 +- deployment/traceability-deployment.yaml | 4 +- pkg/anypoint/client.go | 191 ++++++++++++-- pkg/anypoint/client_test.go | 131 ++++----- pkg/anypoint/mocks.go | 4 +- pkg/anypoint/monitoring.go | 143 ++++++++++ pkg/anypoint/testdata/apis.json | 8 + pkg/anypoint/testdata/assets.json | 3 + pkg/anypoint/testdata/boot-data.json | 10 + pkg/anypoint/testdata/monitoring-archive.txt | 1 + pkg/anypoint/testdata/org-333-envs.json | 10 + pkg/anypoint/testdata/org-444-envs.json | 10 + pkg/anypoint/testdata/policies.json | 5 + pkg/anypoint/testdata/query-response.json | 30 +++ pkg/anypoint/testdata/summary-datafiles.json | 7 + pkg/anypoint/testdata/user.json | 25 ++ pkg/anypoint/types.go | 35 --- pkg/cmd/traceability/root.go | 13 + pkg/common/common.go | 36 ++- pkg/config/config.go | 58 ++-- pkg/config/config_test.go | 2 +- pkg/traceability/agent.go | 132 ++++++++-- pkg/traceability/agent_test.go | 81 ++++-- pkg/traceability/credential.go | 63 +++++ pkg/traceability/eventmapper.go | 194 -------------- pkg/traceability/eventmapper_test.go | 263 ------------------- pkg/traceability/eventprocessor.go | 73 ----- pkg/traceability/eventprocessor_test.go | 196 -------------- pkg/traceability/file | 1 - pkg/traceability/muleemitter.go | 144 ++++++---- pkg/traceability/muleemitter_test.go | 131 ++++++++- 36 files changed, 1039 insertions(+), 1023 deletions(-) create mode 100644 pkg/anypoint/monitoring.go create mode 100644 pkg/anypoint/testdata/apis.json create mode 100644 pkg/anypoint/testdata/assets.json create mode 100644 pkg/anypoint/testdata/boot-data.json create mode 100644 pkg/anypoint/testdata/monitoring-archive.txt create mode 100644 pkg/anypoint/testdata/org-333-envs.json create mode 100644 pkg/anypoint/testdata/org-444-envs.json create mode 100644 pkg/anypoint/testdata/policies.json create mode 100644 pkg/anypoint/testdata/query-response.json create mode 100644 pkg/anypoint/testdata/summary-datafiles.json create mode 100644 pkg/anypoint/testdata/user.json create mode 100644 pkg/traceability/credential.go delete mode 100644 pkg/traceability/eventmapper.go delete mode 100644 pkg/traceability/eventmapper_test.go delete mode 100644 pkg/traceability/eventprocessor.go delete mode 100644 pkg/traceability/eventprocessor_test.go delete mode 100644 pkg/traceability/file diff --git a/README_discovery.md b/README_discovery.md index e67d4da8..03049577 100644 --- a/README_discovery.md +++ b/README_discovery.md @@ -30,7 +30,7 @@ Along with all [common agent variables](https://docs.axway.com/bundle/amplify-ce | MULESOFT_DISCOVERORIGINALRAML | mulesoft.discoverOriginalRAML | Set to true if the agent should discover the Assets that were created in RAML as RAML | _false_ | | MULESOFT_ENVIRONMENT | mulesoft.environment | The Mulesoft Anypoint Exchange the agent connects to, e.g. Sandbox. | | | MULESOFT_ORGNAME | mulesoft.orgName | The Mulesoft Anypoint Business Unit the agent connects to | | -| MULESOFT_POLLINTERVAL | mulesoft.pollInterval | The frequency in which Mulesoft API Manager is polled for new endpoints. | _30s_ | +| MULESOFT_POLLINTERVAL | mulesoft.pollInterval | The frequency in which Mulesoft API Manager is polled for new endpoints. | _60s_ | | MULESOFT_PROXYURL | mulesoft.proxyUrl | The url for the proxy for API Manager (e.g. ). If empty, no proxy is defined. | Internally, this value defaults to empty | | MULESOFT_SSL_CIPHERSUITES | mulesoft.ssl.cipherSuites | An array of strings. It is a list of supported cipher suites for TLS versions up to TLS 1.2. If CipherSuites is nil, a default list of secure cipher suites is used, with a preference order based on hardware performance. | [See](https://docs.axway.com/bundle/amplify-central/page/docs/connect_manage_environ/connected_agent_common_reference/agent_security/index.html) for default cipher suite setting | | MULESOFT_SSL_INSECURESKIPVERIFY | mulesoft.ssl.insecureSkipVerify | InsecureSkipVerify controls whether a client verifies the server's certificate chain and host name. If InsecureSkipVerify is true, TLS accepts any certificate presented by the server and any host name in that certificate. In this mode, TLS is susceptible to man-in-the-middle attacks. | Internally defaulted to false | diff --git a/README_traceability.md b/README_traceability.md index d0537743..0b541894 100644 --- a/README_traceability.md +++ b/README_traceability.md @@ -27,7 +27,7 @@ Along with all [common agent variables](https://docs.axway.com/bundle/amplify-ce | MULESOFT_CACHEPATH | mulesoft.cachePath | Path entry to store stateful cache between agent invocations | _/tmp_ | | MULESOFT_ENVIRONMENT | mulesoft.environment | The MuleSoft Anypoint Exchange the agent connects to, e.g. Sandbox. | | | MULESOFT_ORGNAME | mulesoft.orgName | The MuleSoft Anypoint Business Unit the agent connects to | | -| MULESOFT_POLLINTERVAL | mulesoft.pollInterval | The frequency in which MuleSoft API Manager is polled for new endpoints. | _30s_ | +| MULESOFT_POLLINTERVAL | mulesoft.pollInterval | The frequency in which MuleSoft API Manager is polled for new endpoints. | _60s_ | | MULESOFT_PROXYURL | mulesoft.proxyUrl | The url for the proxy for API Manager (e.g. ). If empty, no proxy is defined. | Internally, this value defaults to empty | | MULESOFT_SSL_INSECURESKIPVERIFY | mulesoft.ssl.insecureSkipVerify | InsecureSkipVerify controls whether a client verifies the server's certificate chain and host name. If InsecureSkipVerify is true, TLS accepts any certificate presented by the server and any host name in that certificate. In this mode, TLS is susceptible to man-in-the-middle attacks. | Internally defaulted to false | | MULESOFT_SSL_CIPHERSUITES | mulesoft.ssl.cipherSuites | An array of strings. It is a list of supported cipher suites for TLS versions up to TLS 1.2. If CipherSuites is nil, a default list of secure cipher suites is used, with a preference order based on hardware performance. | [See](https://docs.axway.com/bundle/amplify-central/page/docs/connect_manage_environ/connected_agent_common_reference/agent_security/index.html) for default cipher suite setting | diff --git a/build/mulesoft_discovery_agent.yml b/build/mulesoft_discovery_agent.yml index fe7403cc..f3ce3c08 100644 --- a/build/mulesoft_discovery_agent.yml +++ b/build/mulesoft_discovery_agent.yml @@ -1,35 +1,28 @@ central: - agentName: ${CENTRAL_AGENTNAME:""} - organizationID: "" - environment: - mode: publishToEnvironmentAndCatalog - url: https://apicentral.axway.com - platformURL: https://platform.axway.com - pollInterval: 5s - grpc: - enabled: false + organizationID: + team: + environment: auth: - clientID: - privateKey: - publicKey: - realm: Broker - timeout: 30s - url: https://login.axway.com/auth - # Uncomment the following line if you are using a proxy to access the Amplify platform (apicentral.axway.com / login.axway.com / platform.axway.com) on port 443 - # Configure the appropriate value (PROXY_USER / PROXY_PASSWORD are optional values) - #proxyurl: ://:@: + clientId: + privateKey: /keys/private_key.pem + publicKey: /keys/public_key.pem + + # Uncomment the following line if you are using a proxy to access the AMPLIFY platform (apicentral.axway.com / login.axway.com / platform.axway.com) on port 443 + # Configure the appropriate value (PROXY_USER / PROXY_PASSWORD are optional values) + #proxyurl: ://:@: log: - level: debug + level: info format: json output: stdout path: logs + mulesoft: - anypointExchangeUrl: https://anypoint.mulesoft.com - environment: "${MULESOFT_ENVIRONMENT}" - pollInterval: 20s - orgName: "${MULESOFT_ORGNAME}" + anypointExchangeUrl: + environment: + pollInterval: 60s + orgName: # Comma-separated list of tags. Used to filter the APIs that have a particular tag in their configuration. # Default value: empty. Meaning that no matching is performed. #discoveryTags: tags1, tags2 @@ -38,8 +31,6 @@ mulesoft: # Default value: empty. Meaning that no API is ignored #discoveryIgnoreTags: tags1, tags2 auth: - username: - password: - clientID: - clientSecret: + clientID: + clientSecret: diff --git a/build/mulesoft_traceability_agent.yml b/build/mulesoft_traceability_agent.yml index f93ad909..eb5ac41b 100644 --- a/build/mulesoft_traceability_agent.yml +++ b/build/mulesoft_traceability_agent.yml @@ -46,6 +46,7 @@ mulesoft_traceability_agent: orgName: "${MULESOFT_ORGNAME}" cachePath: "${MULESOFT_CACHEPATH:/tmp}" pollInterval: ${MULESOFT_POLLINTERVAL:20s} + useMonitoringAPI: "${MULESOFT_USEMONITORINGAPI}" auth: username: "${MULESOFT_AUTH_USERNAME}" password: "${MULESOFT_AUTH_PASSWORD}" diff --git a/deployment/discovery-deployment.yaml b/deployment/discovery-deployment.yaml index 52a22470..bd7e8b1d 100644 --- a/deployment/discovery-deployment.yaml +++ b/deployment/discovery-deployment.yaml @@ -26,7 +26,7 @@ spec: - name: CENTRAL_AUTH_PUBLICKEY value: /keys/publicKey - name: CENTRAL_POLLINTERVAL - value: 20s + value: 60s - name: CENTRAL_ENVIRONMENT value: - name: CENTRAL_TEAM @@ -60,7 +60,7 @@ spec: - name: MULESOFT_ORGNAME value: - name: MULESOFT_POLLINTERVAL - value: 20s + value: 60s - name: MULESOFT_DISCOVERORIGINALRAML value: false - name: STATUS_PORT diff --git a/deployment/docker-compose.yml b/deployment/docker-compose.yml index 759a66c5..262756ca 100644 --- a/deployment/docker-compose.yml +++ b/deployment/docker-compose.yml @@ -20,7 +20,7 @@ services: MULESOFT_ANYPOINTEXCHANGEURL: https://anypoint.mulesoft.com MULESOFT_ENVIRONMENT: Sandbox MULESOFT_ORGID: - MULESOFT_POLLINTERVAL: 20s + MULESOFT_POLLINTERVAL: 60s MULESOFT_AUTH_USERNAME: MULESOFT_AUTH_PASSWORD: MULESOFT_DISCOVERORIGINALRAML: false @@ -46,7 +46,7 @@ services: MULESOFT_ANYPOINTEXCHANGEURL: https://anypoint.mulesoft.com MULESOFT_ENVIRONMENT: MULESOFT_ORGNAME: - MULESOFT_POLLINTERVAL: 20s + MULESOFT_POLLINTERVAL: 60s MULESOFT_AUTH_USERNAME: MULESOFT_AUTH_PASSWORD: MULESOFT_CACHEPATH: "/tmp" diff --git a/deployment/traceability-deployment.yaml b/deployment/traceability-deployment.yaml index d1adc0ea..2579c22c 100644 --- a/deployment/traceability-deployment.yaml +++ b/deployment/traceability-deployment.yaml @@ -26,7 +26,7 @@ spec: - name: CENTRAL_AUTH_PUBLICKEY value: /keys/publicKey - name: CENTRAL_POLLINTERVAL - value: 20s + value: 60s - name: CENTRAL_ENVIRONMENT value: - name: CENTRAL_TEAM @@ -60,7 +60,7 @@ spec: - name: MULESOFT_ORGNAME value: - name: MULESOFT_POLLINTERVAL - value: 20s + value: 60s - name: STATUS_PORT value: "8989" - name: TRACEABILITY_REDACTION_PATH_SHOW diff --git a/pkg/anypoint/client.go b/pkg/anypoint/client.go index e12b1b32..d437f95c 100644 --- a/pkg/anypoint/client.go +++ b/pkg/anypoint/client.go @@ -21,7 +21,16 @@ import ( "github.com/Axway/agents-mulesoft/pkg/config" ) -const HealthCheckEndpoint = "mulesoft" +const ( + HealthCheckEndpoint = "mulesoft" + monitoringURITemplate = "%s/monitoring/archive/api/v1/organizations/%s/environments/%s/apis/%s/summary/%d/%02d/%02d" + metricSummaryURITemplate = "%s/monitoring/archive/api/v1/organizations/%s/environments/%s/apis/%s/summary/%d/%02d/%02d/%s" + queryTemplate = `SELECT sum("request_size.count") as request_count, max("response_time.max") as response_max, min("response_time.min") as response_min +FROM "rp_general"."api_summary_metric" +WHERE ("api_id" = '%s' AND "api_version_id" = '%s') AND +time >= %dms and time <= %dms +GROUP BY "client_id", "status_code"` +) // Page describes the page query parameter type Page struct { @@ -52,7 +61,9 @@ type Client interface { } type AnalyticsClient interface { - GetAnalyticsWindow(string, string) ([]AnalyticsEvent, error) + GetMonitoringBootstrap() (*MonitoringBootInfo, error) + GetMonitoringMetrics(dataSourceName string, dataSourceID int, apiID, apiVersionID string, startDate, endTime time.Time) ([]APIMonitoringMetric, error) + GetMonitoringArchive(apiID string, startDate time.Time) ([]APIMonitoringMetric, error) OnConfigChange(mulesoftConfig *config.MulesoftConfig) GetClientApplication(appID string) (*Application, error) GetAPI(apiID string) (*API, error) @@ -68,14 +79,15 @@ type ListAssetClient interface { // AnypointClient is the client for interacting with Mulesoft Anypoint. type AnypointClient struct { - baseURL string - clientID string - clientSecret string - lifetime time.Duration - apiClient coreapi.Client - auth Auth - environment *Environment - orgName string + baseURL string + monitoringBaseURL string + clientID string + clientSecret string + lifetime time.Duration + apiClient coreapi.Client + auth Auth + environment *Environment + orgName string } type ClientOptions func(*AnypointClient) @@ -102,6 +114,7 @@ func (c *AnypointClient) OnConfigChange(mulesoftConfig *config.MulesoftConfig) { } c.baseURL = mulesoftConfig.AnypointExchangeURL + c.monitoringBaseURL = mulesoftConfig.AnypointMonitoringURL c.clientID = mulesoftConfig.ClientID c.clientSecret = mulesoftConfig.ClientSecret c.orgName = mulesoftConfig.OrgName @@ -229,7 +242,7 @@ func (c *AnypointClient) getCurrentUser(token string) (*User, error) { // this sets the User.Organization.ID as the Org ID of the Business Unit specified in Config for _, value := range user.User.MemberOfOrganizations { - if value.ID == c.orgName { + if value.Name == c.orgName { user.User.Organization.ID = value.ID user.User.Organization.Name = value.Name } @@ -384,28 +397,154 @@ func (c *AnypointClient) GetExchangeFileContent(link, packaging, mainFile string return fileContent, wasConverted, err } -// GetAnalyticsWindow lists the managed assets in Mulesoft: https://docs.qax.mulesoft.com/api-manager/2.x/analytics-event-api -func (c *AnypointClient) GetAnalyticsWindow(startDate, endDate string) ([]AnalyticsEvent, error) { - query := map[string]string{ - "format": "json", - "startDate": startDate, - "endDate": endDate, - "fields": "Application Name.Application.Browser.City.Client IP.Continent.Country.Hardware Platform.Message ID.OS Family.OS Major Version.OS Minor Version.OS Version.Postal Code.Request Outcome.Request Size.Resource Path.Response Size.Response Time.Status Code.Timezone.User Agent Name.User Agent Version.Verb.Violated Policy Name", +func (c *AnypointClient) GetMonitoringBootstrap() (*MonitoringBootInfo, error) { + headers := map[string]string{ + "Authorization": c.getAuthString(c.auth.GetToken()), + } + + url := fmt.Sprintf("%s/monitoring/api/visualizer/api/bootdata", c.baseURL) + bootInfo := &MonitoringBootInfo{} + request := coreapi.Request{ + Method: coreapi.GET, + URL: url, + Headers: headers, } + + err := c.invokeJSON(request, &bootInfo) + if err != nil { + return nil, err + } + + return bootInfo, err +} + +// GetMonitoringMetrics returns monitoring data from InfluxDb +func (c *AnypointClient) GetMonitoringMetrics(dataSourceName string, dataSourceID int, apiID, apiVersionID string, startDate, endTime time.Time) ([]APIMonitoringMetric, error) { headers := map[string]string{ "Authorization": c.getAuthString(c.auth.GetToken()), } - url := fmt.Sprintf("%s/analytics/1.0/%s/environments/%s/events", c.baseURL, c.auth.GetOrgID(), c.environment.ID) - events := make([]AnalyticsEvent, 0) + query := fmt.Sprintf(queryTemplate, apiID, apiVersionID, startDate.UnixMilli(), endTime.UnixMilli()) + url := fmt.Sprintf("%s/monitoring/api/visualizer/api/datasources/proxy/%d/query", c.baseURL, dataSourceID) request := coreapi.Request{ - Method: coreapi.GET, - URL: url, - Headers: headers, - QueryParams: query, + Method: coreapi.GET, + URL: url, + Headers: headers, + QueryParams: map[string]string{ + "db": dataSourceName, + "q": query, + "epoch": "ms", + }, + } + metricResponse := &MetricResponse{} + err := c.invokeJSON(request, metricResponse) + if err != nil { + return nil, err + } + // convert metricResponse + metrics := make([]APIMonitoringMetric, 0) + for _, mr := range metricResponse.Results { + for _, ms := range mr.Series { + m := APIMonitoringMetric{ + Time: ms.Time, + Events: []APISummaryMetricEvent{ + { + ClientID: ms.Tags.ClientID, + StatusCode: ms.Tags.StatusCode, + RequestSizeCount: int(ms.Count), + ResponseSizeMax: int(ms.ResponseMax), + ResponseSizeMin: int(ms.ResponseMin), + }, + }, + } + metrics = append(metrics, m) + } + } + return metrics, err +} + +// GetMonitoringArchive returns archived monitoring data Mulesoft: +// https://anypoint.mulesoft.com/exchange/portals/anypoint-platform/f1e97bc6-315a-4490-82a7-23abe036327a.anypoint-platform/anypoint-monitoring-archive-api/minor/1.0/pages/home/ +func (c *AnypointClient) GetMonitoringArchive(apiID string, startDate time.Time) ([]APIMonitoringMetric, error) { + headers := map[string]string{ + "Authorization": c.getAuthString(c.auth.GetToken()), + } + year := startDate.Year() + month := int(startDate.Month()) + day := startDate.Day() + + url := fmt.Sprintf(monitoringURITemplate, c.monitoringBaseURL, c.auth.GetOrgID(), c.environment.ID, apiID, year, month, day) + dataFiles := &DataFileResources{} + request := coreapi.Request{ + Method: coreapi.GET, + URL: url, + Headers: headers, + } + + err := c.invokeJSON(request, &dataFiles) + if err != nil && !strings.Contains(err.Error(), "404") { + return nil, err + } + + metrics := make([]APIMonitoringMetric, 0) + for _, dataFile := range dataFiles.Resources { + apiMetric, err := c.getMonitoringArchiveFile(apiID, year, month, day, dataFile.ID) + if err != nil { + logrus.WithField("apiID", apiID). + WithField("fileName", dataFile.ID). + WithError(err). + Warn("failed to read monitoring archive") + } + if len(apiMetric) > 0 { + metrics = append(metrics, apiMetric...) + } + } + + return metrics, err +} + +func (c *AnypointClient) getMonitoringArchiveFile(apiID string, year, month, day int, fileName string) ([]APIMonitoringMetric, error) { + headers := map[string]string{ + "Authorization": c.getAuthString(c.auth.GetToken()), + } + + url := fmt.Sprintf(metricSummaryURITemplate, c.monitoringBaseURL, c.auth.GetOrgID(), c.environment.ID, apiID, year, month, day, fileName) + request := coreapi.Request{ + Method: coreapi.GET, + URL: url, + Headers: headers, + } + + body, _, err := c.invoke(request) + if err != nil && !strings.Contains(err.Error(), "404") { + return nil, err + } + + return c.parseMetricSummaries(body) +} + +func (c *AnypointClient) parseMetricSummaries(metricDataStream []byte) ([]APIMonitoringMetric, error) { + metrics := make([]APIMonitoringMetric, 0) + d := json.NewDecoder(strings.NewReader(string(metricDataStream))) + for { + metricData := &MetricData{} + err := d.Decode(&metricData) + + if err != nil { + // io.EOF is expected at end of stream. + if err != io.EOF { + return metrics, nil + } + break + } + metricTime := time.Unix(metricData.Time, 0) + metric := APIMonitoringMetric{ + Time: metricTime, + Events: metricData.Events, + } + metrics = append(metrics, metric) } - err := c.invokeJSON(request, &events) - return events, err + return metrics, nil } func (c *AnypointClient) GetSLATiers(apiID string, tierName string) (*Tiers, error) { diff --git a/pkg/anypoint/client_test.go b/pkg/anypoint/client_test.go index e0762b58..95c6c328 100644 --- a/pkg/anypoint/client_test.go +++ b/pkg/anypoint/client_test.go @@ -1,6 +1,8 @@ package anypoint import ( + "io" + "os" "testing" "time" @@ -13,17 +15,26 @@ import ( "github.com/stretchr/testify/assert" ) +func readTestDataFile(t *testing.T, fileName string) []byte { + file, _ := os.Open(fileName) + inputData, err := io.ReadAll(file) + assert.Nil(t, err) + + return inputData +} + func TestClient(t *testing.T) { cfg := &config.MulesoftConfig{ - AnypointExchangeURL: "", - CachePath: "/tmp", - Environment: "Sandbox", - OrgName: "BusinessOrg1", - PollInterval: 10, - ProxyURL: "", - SessionLifetime: 60, - ClientID: "1", - ClientSecret: "2", + AnypointExchangeURL: "", + AnypointMonitoringURL: "", + CachePath: "/tmp", + Environment: "Sandbox", + OrgName: "BusinessOrg1", + PollInterval: 10, + ProxyURL: "", + SessionLifetime: 60, + ClientID: "1", + ClientSecret: "2", } mcb := &MockClientBase{} mcb.Reqs = map[string]*api.Response{ @@ -34,91 +45,32 @@ func TestClient(t *testing.T) { }, "/accounts/api/me": { Code: 200, - Body: []byte(`{ - "user": { - "identityType": "idtype", - "id": "123", - "username": "name", - "firstName": "first", - "lastName": "last", - "email": "email", - "organization": { - "id": "333", - "name": "org1", - "domain": "abc.com" - }, - "memberOfOrganizations": [{ - "id": "333", - "name": "org1" - }, - { - "id": "444", - "name": "BusinessOrg1" - } - ] - - } - }`), + Body: readTestDataFile(t, "./testdata/user.json"), }, "/accounts/api/organizations/444/environments": { Code: 200, - Body: []byte(`{ - "data": [{ - "id": "111", - "name": "Sandbox", - "organizationId": "444", - "type": "fake", - "clientId": "abc123" - }], - "total": 1 - }`), + Body: readTestDataFile(t, "./testdata/org-444-envs.json"), }, "/accounts/api/organizations/333/environments": { Code: 200, - Body: []byte(`{ - "data": [{ - "id": "111", - "name": "name", - "organizationId": "333", - "type": "fake", - "clientId": "abc123" - }], - "total": 1 - }`), + Body: readTestDataFile(t, "./testdata/org-333-envs.json"), }, "/apimanager/api/v1/organizations/444/environments/111/apis": { Code: 200, - Body: []byte(`{ - "assets": [ - { - "apis": [] - } - ], - "total": 1 - }`), + Body: readTestDataFile(t, "./testdata/apis.json"), }, "/apimanager/api/v1/organizations/444/environments/111/apis/10/policies": { Code: 200, - Body: []byte(`[ - { - "id": 0 - } - ]`), + Body: readTestDataFile(t, "./testdata/policies.json"), }, "/exchange/api/v2/assets/1/2/3": { Code: 200, - Body: []byte(`{ - "assetId": "petstore" - }`), + Body: readTestDataFile(t, "./testdata/assets.json"), }, "/icon": { Code: 200, Body: []byte(`content`), }, - "/analytics/1.0/444/environments/111/events": { - Code: 200, - Body: []byte(`[{}]`), - }, "https://123.com": { Code: 500, Body: []byte(`{}`), @@ -127,6 +79,22 @@ func TestClient(t *testing.T) { Code: 200, Body: []byte(`[]`), }, + "/monitoring/archive/api/v1/organizations/444/environments/111/apis/222/summary/2024/01/01": { + Code: 200, + Body: readTestDataFile(t, "./testdata/summary-datafiles.json"), + }, + "/monitoring/archive/api/v1/organizations/444/environments/111/apis/222/summary/2024/01/01/444-111-222.log": { + Code: 200, + Body: readTestDataFile(t, "./testdata/monitoring-archive.txt"), + }, + "/monitoring/api/visualizer/api/bootdata": { + Code: 200, + Body: readTestDataFile(t, "./testdata/boot-data.json"), + }, + "/monitoring/api/visualizer/api/datasources/proxy/1234/query": { + Code: 200, + Body: readTestDataFile(t, "./testdata/query-response.json"), + }, } client := NewClient(cfg, SetClient(mcb)) @@ -160,7 +128,7 @@ func TestClient(t *testing.T) { logrus.Info(token, user, duration, err) assert.Equal(t, "abc123", token) assert.Equal(t, "123", user.ID) - assert.Equal(t, "333", user.Organization.ID) + assert.Equal(t, "444", user.Organization.ID) assert.Equal(t, time.Hour, duration) assert.Equal(t, nil, err) env, err := client.GetEnvironmentByName("/env1") @@ -183,7 +151,18 @@ func TestClient(t *testing.T) { logrus.Info(i, contentType) assert.NotEmpty(t, i) assert.Empty(t, contentType) - events, err := client.GetAnalyticsWindow("2021-05-19T14:30:20-07:00", "2021-05-19T14:30:22-07:00") + + startTime, _ := time.Parse(time.RFC3339, "2024-01-01T14:30:20-07:00") + + events, err := client.GetMonitoringArchive("222", startTime) + assert.Nil(t, err) + assert.Equal(t, 1, len(events)) + + bootInfo, err := client.GetMonitoringBootstrap() + assert.Nil(t, err) + assert.NotNil(t, bootInfo) + + events, err = client.GetMonitoringMetrics(bootInfo.Settings.DataSource.InfluxDB.Database, bootInfo.Settings.DataSource.InfluxDB.ID, "222", "222", startTime, startTime) assert.Nil(t, err) assert.Equal(t, 1, len(events)) diff --git a/pkg/anypoint/mocks.go b/pkg/anypoint/mocks.go index 3682cdfe..f80dcc56 100644 --- a/pkg/anypoint/mocks.go +++ b/pkg/anypoint/mocks.go @@ -98,10 +98,10 @@ func (m *MockAnypointClient) GetExchangeFileContent(_, _, _ string, shouldConver return result.([]byte), shouldConvert, args.Error(2) } -func (m *MockAnypointClient) GetAnalyticsWindow() ([]AnalyticsEvent, error) { +func (m *MockAnypointClient) GetMonitoringArchive(apiID string, startDate time.Time) ([]APIMonitoringMetric, error) { args := m.Called() result := args.Get(0) - return result.([]AnalyticsEvent), args.Error(1) + return result.([]APIMonitoringMetric), args.Error(1) } func (m *MockAnypointClient) CreateClientApplication(apiID string, body *AppRequestBody) (*Application, error) { diff --git a/pkg/anypoint/monitoring.go b/pkg/anypoint/monitoring.go new file mode 100644 index 00000000..1e75d0df --- /dev/null +++ b/pkg/anypoint/monitoring.go @@ -0,0 +1,143 @@ +package anypoint + +import ( + "encoding/json" + "time" +) + +// Monitoring Archive API metrics data definitions +type APIMonitoringMetric struct { + Time time.Time + Events []APISummaryMetricEvent +} + +type DataFile struct { + ID string `json:"id"` + Time time.Time `json:"time"` + Size int `json:"size"` +} + +type DataFileResources struct { + Resources []DataFile `json:"resources"` +} + +type APISummaryMetricEvent struct { + APIName string `json:"api_name"` + APIVersion string `json:"api_version"` + APIVersionID string `json:"api_version_id"` + ClientID string `json:"client_id"` + Method string `json:"method"` + StatusCode string `json:"status_code"` + ResponseSizeCount int `json:"response_size.count"` + ResponseSizeMax int `json:"response_size.max"` + ResponseSizeMin int `json:"response_size.min"` + ResponseSizeSos int `json:"response_size.sos"` + ResponseSizeSum int `json:"response_size.sum"` + ResponseTimeCount int `json:"response_time.count"` + ResponseTimeMax int `json:"response_time.max"` + ResponseTimeMin int `json:"response_time.min"` + ResponseTimeSos int `json:"response_time.sos"` + ResponseTimeSum int `json:"response_time.sum"` + RequestSizeCount int `json:"request_size.count"` + RequestSizeMax int `json:"request_size.max"` + RequestSizeMin int `json:"request_size.min"` + RequestSizeSos int `json:"request_size.sos"` + RequestSizeSum int `json:"request_size.sum"` + RequestDisposition string `json:"request_disposition"` +} + +type MetricData struct { + Format string `json:"format"` + Time int64 `json:"time"` + Type string `json:"type"` + Metadata map[string]interface{} `json:"metadata"` + Commons map[string]interface{} `json:"commons"` + Events []APISummaryMetricEvent +} + +// Influx DB based metric data definitions +type MonitoringBootInfo struct { + Settings MonitoringBootSetting `json:"Settings"` +} + +type MonitoringBootSetting struct { + DataSource MonitoringDataSource `json:"datasources"` +} + +type MonitoringDataSource struct { + InfluxDB InfluxDB `json:"influxdb"` +} + +type InfluxDB struct { + ID int `json:"id"` + Database string `json:"database"` +} + +type MetricResponse struct { + Results []*MetricResult `json:"results"` +} + +type MetricResult struct { + StatementID int `json:"statement_id"` + Series []*MetricSeries `json:"series"` +} + +type MetricTag struct { + ClientID string `json:"client_id"` + StatusCode string `json:"status_code"` +} + +type MetricSeries struct { + Name string `json:"name"` + Tags *MetricTag `json:"tags"` + Columns []string `json:"columns"` + Values [][]float64 `json:"values"` + Time time.Time `json:"-"` + Count int64 `json:"-"` + ResponseMax int64 `json:"-"` + ResponseMin int64 `json:"-"` +} + +func (ms *MetricSeries) UnmarshalJSON(data []byte) error { + type alias MetricSeries + v := &struct{ *alias }{ + alias: (*alias)(ms), + } + + err := json.Unmarshal(data, v) + if err != nil { + return err + } + + tm := ms.getValue("time") + ms.Time = time.Unix(tm, 0) + ms.Count = ms.getValue("request_count") + ms.ResponseMax = ms.getValue("response_max") + ms.ResponseMin = ms.getValue("response_min") + + return nil +} + +func (ms *MetricSeries) getValue(columnName string) int64 { + return ms.getMetricSeriesIndexValue(ms.getMetricSeriesColumnIndex(columnName)) +} + +func (ms *MetricSeries) getMetricSeriesIndexValue(index int) int64 { + if len(ms.Values) > 0 { + val := ms.Values[0] + if index >= 0 && index < len(val) { + return int64(val[index]) + } + } + + return 0 +} + +func (ms *MetricSeries) getMetricSeriesColumnIndex(column string) int { + for n := 0; n < len(ms.Columns); n++ { + if ms.Columns[n] == column { + return n + } + } + return -1 +} diff --git a/pkg/anypoint/testdata/apis.json b/pkg/anypoint/testdata/apis.json new file mode 100644 index 00000000..f4310109 --- /dev/null +++ b/pkg/anypoint/testdata/apis.json @@ -0,0 +1,8 @@ +{ + "assets": [ + { + "apis": [] + } + ], + "total": 1 +} \ No newline at end of file diff --git a/pkg/anypoint/testdata/assets.json b/pkg/anypoint/testdata/assets.json new file mode 100644 index 00000000..555fe8ea --- /dev/null +++ b/pkg/anypoint/testdata/assets.json @@ -0,0 +1,3 @@ +{ + "assetId": "petstore" +} \ No newline at end of file diff --git a/pkg/anypoint/testdata/boot-data.json b/pkg/anypoint/testdata/boot-data.json new file mode 100644 index 00000000..d8826788 --- /dev/null +++ b/pkg/anypoint/testdata/boot-data.json @@ -0,0 +1,10 @@ +{ + "Settings": { + "datasources": { + "influxdb": { + "database": "\"db\"", + "id": 1234 + } + } + } +} \ No newline at end of file diff --git a/pkg/anypoint/testdata/monitoring-archive.txt b/pkg/anypoint/testdata/monitoring-archive.txt new file mode 100644 index 00000000..0f5ab9f2 --- /dev/null +++ b/pkg/anypoint/testdata/monitoring-archive.txt @@ -0,0 +1 @@ +{"format":"v2","time":1585082947062,"type":"api_summary_metric","commons":{"deployment_type":"RTF","api_id":"204393","cluster_id":"rtf","env_id":"env","public_ip":"127.0.0.1","org_id":"org","worker_id":"worker-1"},"events":[{"response_size.max":2,"request_size.min":6,"status_code":"200","method":"POST","response_time.max":4,"api_version_id":"223337","response_size.count":1,"response_size.sum":2,"response_time.min":4,"request_size.count":1,"api_version":"v1:223337","request_size.sos":36,"client_id":"eb30101d7394407ea86f0643e1c63331","response_time.count":1,"response_time.sum":4,"request_size.max":6,"request_disposition":"processed","response_time.sos":16,"api_name":"groupId:6046b96d-c9aa-4cb2-9b30-90a54fc01a7b:assetId:policy_sla_rate_limit","response_size.min":2,"request_size.sum":6,"response_size.sos":4}],"metadata":{"batch_id":0,"aggregated":true,"limited":false,"producer_name":"analytics-metrics-collector-mule3","producer_version":"2.2.2-SNAPSHOT"}} \ No newline at end of file diff --git a/pkg/anypoint/testdata/org-333-envs.json b/pkg/anypoint/testdata/org-333-envs.json new file mode 100644 index 00000000..9adbc07f --- /dev/null +++ b/pkg/anypoint/testdata/org-333-envs.json @@ -0,0 +1,10 @@ +{ + "data": [{ + "id": "111", + "name": "name", + "organizationId": "333", + "type": "fake", + "clientId": "abc123" + }], + "total": 1 +} \ No newline at end of file diff --git a/pkg/anypoint/testdata/org-444-envs.json b/pkg/anypoint/testdata/org-444-envs.json new file mode 100644 index 00000000..ee07467a --- /dev/null +++ b/pkg/anypoint/testdata/org-444-envs.json @@ -0,0 +1,10 @@ +{ + "data": [{ + "id": "111", + "name": "Sandbox", + "organizationId": "444", + "type": "fake", + "clientId": "abc123" + }], + "total": 1 +} \ No newline at end of file diff --git a/pkg/anypoint/testdata/policies.json b/pkg/anypoint/testdata/policies.json new file mode 100644 index 00000000..db98f51a --- /dev/null +++ b/pkg/anypoint/testdata/policies.json @@ -0,0 +1,5 @@ +[ + { + "id": 0 + } +] \ No newline at end of file diff --git a/pkg/anypoint/testdata/query-response.json b/pkg/anypoint/testdata/query-response.json new file mode 100644 index 00000000..e90f2203 --- /dev/null +++ b/pkg/anypoint/testdata/query-response.json @@ -0,0 +1,30 @@ +{ + "results": [ + { + "statement_id": 0, + "series": [ + { + "name": "api_summary_metric", + "tags": { + "client_id": "", + "status_code": "200" + }, + "columns": [ + "time", + "request_count", + "response_max", + "response_min" + ], + "values": [ + [ + 1729589392, + 4, + 3243, + 74 + ] + ] + } + ] + } + ] + } \ No newline at end of file diff --git a/pkg/anypoint/testdata/summary-datafiles.json b/pkg/anypoint/testdata/summary-datafiles.json new file mode 100644 index 00000000..8425fa80 --- /dev/null +++ b/pkg/anypoint/testdata/summary-datafiles.json @@ -0,0 +1,7 @@ +{ + "resources": [ + { + "id": "444-111-222.log" + } + ] +} \ No newline at end of file diff --git a/pkg/anypoint/testdata/user.json b/pkg/anypoint/testdata/user.json new file mode 100644 index 00000000..c3b96754 --- /dev/null +++ b/pkg/anypoint/testdata/user.json @@ -0,0 +1,25 @@ +{ + "user": { + "identityType": "idtype", + "id": "123", + "username": "name", + "firstName": "first", + "lastName": "last", + "email": "email", + "organization": { + "id": "333", + "name": "org1", + "domain": "abc.com" + }, + "memberOfOrganizations": [{ + "id": "333", + "name": "org1" + }, + { + "id": "444", + "name": "BusinessOrg1" + } + ] + + } +} \ No newline at end of file diff --git a/pkg/anypoint/types.go b/pkg/anypoint/types.go index d21ea045..b0dc62ef 100644 --- a/pkg/anypoint/types.go +++ b/pkg/anypoint/types.go @@ -184,41 +184,6 @@ type ExchangeFile struct { SHA1 string `json:"sha1"` } -// AnalyticsEvent - -type AnalyticsEvent struct { - APIID string `json:"API ID"` - APIName string `json:"API Name"` - APIVersionID string `json:"API Version ID"` - APIVersionName string `json:"API Version Name"` - ApplicationName string `json:"Application Name"` - Application string `json:"Application"` - Browser string `json:"Browser"` - City string `json:"City"` - ClientIP string `json:"Client IP"` - Continent string `json:"Continent"` - Country string `json:"Country"` - HardwarePlatform string `json:"Hardware Platform"` - MessageID string `json:"Message ID"` - OSFamily string `json:"OS Family"` - OSMajorVersion string `json:"OS Major Version"` - OSMinorVersion string `json:"OS Minor Version"` - OSVersion string `json:"OS Version"` - PostalCode string `json:"Postal Code"` - RequestOutcome string `json:"Request Outcome"` - RequestSize int `json:"Request Size"` - ResourcePath string `json:"Resource Path"` - ResponseSize int `json:"Response Size"` - ResponseTime int `json:"Response Time"` - StatusCode int `json:"Status Code"` - Timestamp time.Time `json:"Timestamp"` - Timezone string `json:"Timezone"` - UserAgentName string `json:"User Agent Name"` - UserAgentVersion string `json:"User Agent Version"` - Verb string `json:"Verb"` - ViolatedPolicyName string `json:"Violated Policy Name"` - AssetVersion string `json:"AssetVersion"` -} - type Application struct { APIEndpoints bool `json:"apiEndpoints,omitempty"` ClientID string `json:"clientId"` diff --git a/pkg/cmd/traceability/root.go b/pkg/cmd/traceability/root.go index 07ca2bd1..94a1022d 100644 --- a/pkg/cmd/traceability/root.go +++ b/pkg/cmd/traceability/root.go @@ -6,6 +6,7 @@ import ( "github.com/Axway/agent-sdk/pkg/cmd/service" corecfg "github.com/Axway/agent-sdk/pkg/config" + management "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1" libcmd "github.com/elastic/beats/v7/libbeat/cmd" "github.com/elastic/beats/v7/libbeat/cmd/instance" @@ -54,6 +55,18 @@ func run() error { // Callback that agent will call to initialize the config. CentralConfig is parsed by Agent SDK // and passed to the callback allowing the agent code to access the central config func initConfig(centralConfig corecfg.CentralConfig) (interface{}, error) { + err := centralConfig.SetWatchResourceFilters([]corecfg.ResourceFilter{ + { + Group: management.CredentialGVK().Group, + Kind: management.CredentialGVK().Kind, + Name: "*", + IsCachedResource: true, + }, + }) + if err != nil { + return nil, err + } + agentConfig := &config.AgentConfig{ CentralConfig: centralConfig, MulesoftConfig: config.NewMulesoftConfig(RootCmd.GetProperties()), diff --git a/pkg/common/common.go b/pkg/common/common.go index 5b590116..aab6d69c 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -1,6 +1,11 @@ package common -import "fmt" +import ( + "fmt" + "time" + + v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1" +) const ( AccessCode = "accessCode" @@ -75,3 +80,32 @@ type PolicyDetail struct { IsSLABased bool APIId string } + +type EventType int + +const ( + // Initialize - + Initialize EventType = iota + 1 + // Metric - + Metric + // Completed - + Completed +) + +type MetricEvent struct { + Type EventType + Metric Metrics +} + +type Metrics struct { + StartTime time.Time + EndTime time.Time + APIID string + Instance *v1.ResourceInstance + ClientID string + StatusCode string + Count int64 + Max int64 + Min int64 + Avg float64 +} diff --git a/pkg/config/config.go b/pkg/config/config.go index f7e89fad..9759db4c 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -26,6 +26,7 @@ var config *AgentConfig const ( pathAnypointExchangeURL = "mulesoft.anypointExchangeUrl" + pathAnypointMonitoringURL = "mulesoft.anypointMonitoringUrl" pathEnvironment = "mulesoft.environment" pathOrgName = "mulesoft.orgName" pathDiscoveryTags = "mulesoft.discoveryTags" @@ -42,6 +43,7 @@ const ( pathProxyURL = "mulesoft.proxyUrl" pathCachePath = "mulesoft.cachePath" pathDiscoverOriginalRaml = "mulesoft.discoverOriginalRaml" + pathUseMonitoringAPI = "mulesoft.useMonitoringAPI" ) const ( @@ -72,19 +74,21 @@ type AgentConfig struct { // MulesoftConfig - represents the config for the Mulesoft gateway type MulesoftConfig struct { corecfg.IConfigValidator - AnypointExchangeURL string `config:"anypointExchangeUrl"` - CachePath string `config:"cachePath"` - DiscoveryIgnoreTags string `config:"discoveryIgnoreTags"` - DiscoveryTags string `config:"discoveryTags"` - Environment string `config:"environment"` - OrgName string `config:"orgname"` - PollInterval time.Duration `config:"pollInterval"` - ProxyURL string `config:"proxyUrl"` - SessionLifetime time.Duration `config:"auth.lifetime"` - TLS corecfg.TLSConfig `config:"ssl"` - ClientID string `config:"auth.clientID"` - ClientSecret string `config:"auth.clientSecret"` - DiscoverOriginalRaml bool `config:"discoverOriginalRaml"` + AnypointExchangeURL string `config:"anypointExchangeUrl"` + AnypointMonitoringURL string `config:"anypointMonitoringUrl"` + CachePath string `config:"cachePath"` + DiscoveryIgnoreTags string `config:"discoveryIgnoreTags"` + DiscoveryTags string `config:"discoveryTags"` + Environment string `config:"environment"` + OrgName string `config:"orgname"` + PollInterval time.Duration `config:"pollInterval"` + ProxyURL string `config:"proxyUrl"` + SessionLifetime time.Duration `config:"auth.lifetime"` + TLS corecfg.TLSConfig `config:"ssl"` + ClientID string `config:"auth.clientID"` + ClientSecret string `config:"auth.clientSecret"` + DiscoverOriginalRaml bool `config:"discoverOriginalRaml"` + UseMonitoringAPI bool `config:"useMonitoringAPI"` } // ValidateCfg - Validates the gateway config @@ -119,6 +123,7 @@ func (c *MulesoftConfig) ValidateCfg() (err error) { // AddConfigProperties - Adds the command properties needed for Mulesoft func AddConfigProperties(rootProps props) { rootProps.AddStringProperty(pathAnypointExchangeURL, "https://anypoint.mulesoft.com", "Mulesoft Anypoint Exchange URL.") + rootProps.AddStringProperty(pathAnypointMonitoringURL, "https://monitoring.anypoint.mulesoft.com", "Mulesoft Anypoint Monitoring URL.") rootProps.AddStringProperty(pathEnvironment, "", "Mulesoft Anypoint environment.") rootProps.AddStringProperty(pathOrgName, "", "Mulesoft Anypoint Business Group.") rootProps.AddStringProperty(pathAuthClientID, "", "Mulesoft client id.") @@ -127,7 +132,7 @@ func AddConfigProperties(rootProps props) { rootProps.AddStringProperty(pathDiscoveryTags, "", "APIs containing any of these tags are selected for discovery.") rootProps.AddStringProperty(pathDiscoveryIgnoreTags, "", "APIs containing any of these tags are ignored. Takes precedence over "+pathDiscoveryIgnoreTags+".") rootProps.AddStringProperty(pathCachePath, "/tmp", "Mulesoft Cache Path") - rootProps.AddDurationProperty(pathPollInterval, 20*time.Second, "The interval at which Mulesoft is checked for updates.", properties.WithLowerLimit(20*time.Second)) + rootProps.AddDurationProperty(pathPollInterval, time.Minute, "The interval at which Mulesoft is checked for updates.", properties.WithLowerLimit(30*time.Second)) rootProps.AddStringProperty(pathProxyURL, "", "Proxy URL") // ssl properties and command flags @@ -137,22 +142,24 @@ func AddConfigProperties(rootProps props) { rootProps.AddStringProperty(pathSSLMinVersion, corecfg.TLSDefaultMinVersionString(), "Minimum acceptable SSL/TLS protocol version.") rootProps.AddStringProperty(pathSSLMaxVersion, "0", "Maximum acceptable SSL/TLS protocol version.") rootProps.AddBoolProperty(pathDiscoverOriginalRaml, false, "If RAML API specs are discovered as RAML and not converted to OAS") + rootProps.AddBoolProperty(pathUseMonitoringAPI, true, "Flag to setup traceability agent to use Anypoint Monitoring Archive API") } // NewMulesoftConfig - parse the props and create an Mulesoft Configuration structure func NewMulesoftConfig(rootProps props) *MulesoftConfig { return &MulesoftConfig{ - AnypointExchangeURL: rootProps.StringPropertyValue(pathAnypointExchangeURL), - CachePath: rootProps.StringPropertyValue(pathCachePath), - DiscoveryIgnoreTags: rootProps.StringPropertyValue(pathDiscoveryIgnoreTags), - DiscoveryTags: rootProps.StringPropertyValue(pathDiscoveryTags), - Environment: rootProps.StringPropertyValue(pathEnvironment), - OrgName: rootProps.StringPropertyValue(pathOrgName), - PollInterval: rootProps.DurationPropertyValue(pathPollInterval), - ProxyURL: rootProps.StringPropertyValue(pathProxyURL), - SessionLifetime: rootProps.DurationPropertyValue(pathAuthLifetime), - ClientID: rootProps.StringPropertyValue(pathAuthClientID), - ClientSecret: rootProps.StringPropertyValue(pathAuthClientSecret), + AnypointExchangeURL: rootProps.StringPropertyValue(pathAnypointExchangeURL), + AnypointMonitoringURL: rootProps.StringPropertyValue(pathAnypointMonitoringURL), + CachePath: rootProps.StringPropertyValue(pathCachePath), + DiscoveryIgnoreTags: rootProps.StringPropertyValue(pathDiscoveryIgnoreTags), + DiscoveryTags: rootProps.StringPropertyValue(pathDiscoveryTags), + Environment: rootProps.StringPropertyValue(pathEnvironment), + OrgName: rootProps.StringPropertyValue(pathOrgName), + PollInterval: rootProps.DurationPropertyValue(pathPollInterval), + ProxyURL: rootProps.StringPropertyValue(pathProxyURL), + SessionLifetime: rootProps.DurationPropertyValue(pathAuthLifetime), + ClientID: rootProps.StringPropertyValue(pathAuthClientID), + ClientSecret: rootProps.StringPropertyValue(pathAuthClientSecret), TLS: &corecfg.TLSConfiguration{ NextProtos: rootProps.StringSlicePropertyValue(pathSSLNextProtos), InsecureSkipVerify: rootProps.BoolPropertyValue(pathSSLInsecureSkipVerify), @@ -161,5 +168,6 @@ func NewMulesoftConfig(rootProps props) *MulesoftConfig { MaxVersion: corecfg.TLSVersionAsValue(rootProps.StringPropertyValue(pathSSLMaxVersion)), }, DiscoverOriginalRaml: rootProps.BoolPropertyValue(pathDiscoverOriginalRaml), + UseMonitoringAPI: rootProps.BoolPropertyValue(pathUseMonitoringAPI), } } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 52717fff..e5281ccb 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -134,7 +134,7 @@ func TestKongProperties(t *testing.T) { assert.Equal(t, corecfg.NewCipherArray(corecfg.TLSDefaultCipherSuitesStringSlice()), cfg.TLS.GetCipherSuites()) assert.Equal(t, corecfg.TLSVersionAsValue(corecfg.TLSDefaultMinVersionString()), cfg.TLS.GetMinVersion()) assert.Equal(t, corecfg.TLSVersionAsValue("0"), cfg.TLS.GetMaxVersion()) - assert.Equal(t, 20*time.Second, cfg.PollInterval) + assert.Equal(t, time.Minute, cfg.PollInterval) assert.Equal(t, "", cfg.ProxyURL) assert.Equal(t, "/tmp", cfg.CachePath) assert.Equal(t, false, cfg.DiscoverOriginalRaml) diff --git a/pkg/traceability/agent.go b/pkg/traceability/agent.go index 44a86f85..01dcf0e7 100644 --- a/pkg/traceability/agent.go +++ b/pkg/traceability/agent.go @@ -5,55 +5,78 @@ import ( "os/signal" "syscall" + "github.com/Axway/agent-sdk/pkg/agent" coreagent "github.com/Axway/agent-sdk/pkg/agent" - "github.com/Axway/agent-sdk/pkg/transaction" + v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1" + management "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1" + "github.com/Axway/agent-sdk/pkg/apic/definitions" + cache "github.com/Axway/agent-sdk/pkg/cache" + "github.com/Axway/agent-sdk/pkg/transaction/metric" + "github.com/Axway/agent-sdk/pkg/transaction/models" + coreutil "github.com/Axway/agent-sdk/pkg/util" hc "github.com/Axway/agent-sdk/pkg/util/healthcheck" "github.com/Axway/agents-mulesoft/pkg/anypoint" + cmn "github.com/Axway/agents-mulesoft/pkg/common" "github.com/Axway/agents-mulesoft/pkg/config" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" ) +type metricCollector interface { + InitializeBatch() + AddAPIMetricDetail(detail metric.MetricDetail) + Publish() +} + +func getMetricCollector() metricCollector { + return metric.GetMetricCollector() +} + // Agent - mulesoft Beater configuration. Implements the beat.Beater interface. type Agent struct { - client beat.Client - doneCh chan struct{} - eventChannel chan string - eventProcessor Processor - mule Emitter + client beat.Client + doneCh chan struct{} + eventChannel chan cmn.MetricEvent + mule Emitter + collector metricCollector + credentialCache cache.Cache + publishMetrics bool } // NewBeater creates an instance of mulesoft_traceability_agent. func NewBeater(_ *beat.Beat, _ *common.Config) (beat.Beater, error) { - eventChannel := make(chan string) + eventChannel := make(chan cmn.MetricEvent) agentConfig := config.GetConfig() pollInterval := agentConfig.MulesoftConfig.PollInterval var err error - generator := transaction.NewEventGenerator() client := anypoint.NewClient(agentConfig.MulesoftConfig) - mapper := NewEventMapper(client, agentConfig.CentralConfig) - processor := NewEventProcessor(agentConfig, generator, mapper) - emitter := NewMuleEventEmitter(agentConfig.MulesoftConfig.CachePath, eventChannel, client) + emitter := NewMuleEventEmitter(agentConfig.MulesoftConfig, eventChannel, client, agent.GetCacheManager()) emitterJob, err := NewMuleEventEmitterJob(emitter, pollInterval, traceabilityHealthCheck, hc.GetStatus, hc.RegisterHealthcheck) if err != nil { return nil, err } - return newAgent(processor, emitterJob, eventChannel) + credentialCache := cache.New() + credentialHandler := NewCredentialHandler(credentialCache, agent.GetCacheManager()) + agent.RegisterResourceEventHandler(management.CredentialGVK().Kind, credentialHandler) + + return newAgent(emitterJob, eventChannel, getMetricCollector(), credentialCache) } func newAgent( - processor Processor, emitter Emitter, - eventChannel chan string, + eventChannel chan cmn.MetricEvent, + collector metricCollector, + credentialCache cache.Cache, ) (*Agent, error) { a := &Agent{ - doneCh: make(chan struct{}), - eventChannel: eventChannel, - eventProcessor: processor, - mule: emitter, + doneCh: make(chan struct{}), + eventChannel: eventChannel, + mule: emitter, + collector: collector, + credentialCache: credentialCache, } return a, nil @@ -82,10 +105,79 @@ func (a *Agent) Run(b *beat.Beat) error { case <-gracefulStop: return a.client.Close() case event := <-a.eventChannel: - eventsToPublish := a.eventProcessor.ProcessRaw([]byte(event)) - a.client.PublishAll(eventsToPublish) + a.processEvent(event) + } + } +} + +func (a *Agent) processEvent(me cmn.MetricEvent) { + switch me.Type { + case cmn.Initialize: + a.collector.InitializeBatch() + a.publishMetrics = false + case cmn.Metric: + a.processMetricEvent(me.Metric) + case cmn.Completed: + if a.publishMetrics { + a.collector.Publish() + } + } +} + +func (a *Agent) processMetricEvent(m cmn.Metrics) { + if m.Instance == nil { + return + } + + a.collector.AddAPIMetricDetail(metric.MetricDetail{ + APIDetails: a.getAPIDetails(m), + AppDetails: a.getAppDetails(m), + StatusCode: m.StatusCode, + Count: m.Count, + Response: metric.ResponseMetrics{ + Max: m.Max, + Min: m.Min, + }, + Observation: metric.ObservationDetails{ + Start: m.StartTime.UnixMilli(), + End: m.EndTime.UnixMilli(), + }, + }) + a.publishMetrics = true +} + +func (a *Agent) getAPIDetails(m cmn.Metrics) models.APIDetails { + apisRef := m.Instance.GetReferenceByGVK(management.APIServiceGVK()) + externalAPIID, _ := coreutil.GetAgentDetailsValue(m.Instance, definitions.AttrExternalAPIID) + stage, _ := coreutil.GetAgentDetailsValue(m.Instance, definitions.AttrExternalAPIStage) + return models.APIDetails{ + ID: externalAPIID, + Name: apisRef.Name, + Revision: 1, + APIServiceInstance: m.Instance.Name, + Stage: stage, + } +} + +func (a *Agent) getAppDetails(me cmn.Metrics) models.AppDetails { + appDetails := models.AppDetails{} + if item, err := a.credentialCache.Get(me.ClientID); err == nil && item != nil { + ri, ok := item.(*v1.ResourceInstance) + if ok && ri != nil { + appRef := ri.GetReferenceByGVK(management.ManagedApplicationGVK()) + app := agent.GetCacheManager().GetManagedApplicationByName(appRef.Name) + if app != nil { + managedApp := &management.ManagedApplication{} + managedApp.FromInstance(app) + appDetails = models.AppDetails{ + ID: managedApp.Metadata.ID, + Name: managedApp.Name, + ConsumerOrgID: managedApp.Marketplace.Resource.Owner.Organization.ID, + } + } } } + return appDetails } // onConfigChange apply configuration changes diff --git a/pkg/traceability/agent_test.go b/pkg/traceability/agent_test.go index 5a1d48f8..9975f210 100644 --- a/pkg/traceability/agent_test.go +++ b/pkg/traceability/agent_test.go @@ -4,9 +4,14 @@ import ( "testing" "time" + management "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1" + cache "github.com/Axway/agent-sdk/pkg/cache" corecfg "github.com/Axway/agent-sdk/pkg/config" + "github.com/Axway/agent-sdk/pkg/transaction/metric" + "github.com/Axway/agent-sdk/pkg/util" "github.com/Axway/agents-mulesoft/pkg/anypoint" + "github.com/Axway/agents-mulesoft/pkg/common" "github.com/Axway/agents-mulesoft/pkg/config" "github.com/elastic/beats/v7/libbeat/beat" @@ -14,20 +19,59 @@ import ( "github.com/stretchr/testify/assert" ) -func TestAgent_Run(t *testing.T) { - processorChannel := make(chan bool) - eventChannel := make(chan string) +type mockMetricCollector struct { + channel chan bool + details []metric.MetricDetail +} - processor := &mockProcessor{ - channel: processorChannel, +func (m *mockMetricCollector) InitializeBatch() { +} + +func (m *mockMetricCollector) AddAPIMetricDetail(detail metric.MetricDetail) { + if m.details == nil { + m.details = make([]metric.MetricDetail, 0) } + m.details = append(m.details, detail) +} + +func (m *mockMetricCollector) Publish() { + m.channel <- true +} +func TestAgent_Run(t *testing.T) { + processorChannel := make(chan bool) + eventChannel := make(chan common.MetricEvent) + + event := anypoint.APIMonitoringMetric{ + Time: time.Now().Add(10 * time.Second), + Events: []anypoint.APISummaryMetricEvent{ + { + APIName: "test", + ClientID: "test", + StatusCode: "200", + RequestSizeCount: 1, + ResponseTimeMax: 2, + ResponseTimeMin: 1, + }, + }, + } client := &mockAnalyticsClient{ - events: []anypoint.AnalyticsEvent{event}, + events: []anypoint.APIMonitoringMetric{event}, err: nil, } - emitter := NewMuleEventEmitter("/tmp", eventChannel, client) - traceAgent, err := newAgent(processor, emitter, eventChannel) + instanceCache := &mockInstaceCache{} + svcInst := management.NewAPIServiceInstance("api", "env") + util.SetAgentDetailsKey(svcInst, common.AttrAPIID, "1234") + util.SetAgentDetailsKey(svcInst, common.AttrAssetID, "1234") + svcInst.Metadata.ID = "1234" + ri, _ := svcInst.AsInstance() + instanceCache.AddAPIServiceInstance(ri) + emitter := NewMuleEventEmitter(&config.MulesoftConfig{CachePath: "/tmp", UseMonitoringAPI: true}, eventChannel, client, instanceCache) + collector := &mockMetricCollector{ + channel: processorChannel, + } + credCache := cache.New() + traceAgent, err := newAgent(emitter, eventChannel, collector, credCache) assert.Nil(t, err) assert.NotNil(t, traceAgent) @@ -57,12 +101,20 @@ func TestAgent_Run(t *testing.T) { } type mockAnalyticsClient struct { - events []anypoint.AnalyticsEvent + events []anypoint.APIMonitoringMetric app *anypoint.Application err error } -func (m mockAnalyticsClient) GetAnalyticsWindow(_, _ string) ([]anypoint.AnalyticsEvent, error) { +func (m mockAnalyticsClient) GetMonitoringBootstrap() (*anypoint.MonitoringBootInfo, error) { + return nil, m.err +} + +func (m mockAnalyticsClient) GetMonitoringMetrics(dataSourceName string, dataSourceID int, apiID, apiVersionID string, startDate, endTime time.Time) ([]anypoint.APIMonitoringMetric, error) { + return m.events, m.err +} + +func (m mockAnalyticsClient) GetMonitoringArchive(apiID string, startDate time.Time) ([]anypoint.APIMonitoringMetric, error) { return m.events, m.err } @@ -76,12 +128,3 @@ func (m mockAnalyticsClient) OnConfigChange(_ *config.MulesoftConfig) { func (m mockAnalyticsClient) GetAPI(_ string) (*anypoint.API, error) { return nil, nil } - -type mockProcessor struct { - channel chan bool -} - -func (m mockProcessor) ProcessRaw(_ []byte) []beat.Event { - m.channel <- true - return []beat.Event{} -} diff --git a/pkg/traceability/credential.go b/pkg/traceability/credential.go new file mode 100644 index 00000000..3ee3ba35 --- /dev/null +++ b/pkg/traceability/credential.go @@ -0,0 +1,63 @@ +package traceability + +import ( + "context" + + agentCache "github.com/Axway/agent-sdk/pkg/agent/cache" + "github.com/Axway/agent-sdk/pkg/agent/handler" + v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1" + mv1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1" + "github.com/Axway/agent-sdk/pkg/cache" + "github.com/Axway/agent-sdk/pkg/util" + "github.com/Axway/agent-sdk/pkg/watchmanager/proto" +) + +const ( + xAgentDetailClientID = "clientId" +) + +type credentialHandler struct { + credentialCache cache.Cache +} + +// NewCredentialHandler creates a Handler for Credential and initializes credential cache with +// items from agent watch resource cache +func NewCredentialHandler(credentialCache cache.Cache, agentCacheManager agentCache.Manager) handler.Handler { + h := &credentialHandler{ + credentialCache: credentialCache, + } + + h.initCredentialCache(agentCacheManager) + return h +} + +// initCredentialCache - initializes credential cache with items from agent watch resource cache +func (h *credentialHandler) initCredentialCache(agentCacheManager agentCache.Manager) { + keys := agentCacheManager.GetWatchResourceCacheKeys(mv1.CredentialGVK().Group, mv1.CredentialGVK().Kind) + for _, key := range keys { + credential := agentCacheManager.GetWatchResourceByKey(key) + clientID, _ := util.GetAgentDetailsValue(credential, xAgentDetailClientID) + if clientID != "" { + h.credentialCache.Set(clientID, credential) + } + } +} + +// Handle processes grpc events triggered for Credential +func (h *credentialHandler) Handle(ctx context.Context, _ *proto.EventMeta, resource *v1.ResourceInstance) error { + action := handler.GetActionFromContext(ctx) + if resource.Kind != mv1.CredentialGVK().Kind { + return nil + } + + clientID, _ := util.GetAgentDetailsValue(resource, xAgentDetailClientID) + if clientID != "" { + if action == proto.Event_DELETED { + h.credentialCache.Delete(clientID) + } else { + h.credentialCache.Set(clientID, resource) + } + } + + return nil +} diff --git a/pkg/traceability/eventmapper.go b/pkg/traceability/eventmapper.go deleted file mode 100644 index 36a1e9af..00000000 --- a/pkg/traceability/eventmapper.go +++ /dev/null @@ -1,194 +0,0 @@ -package traceability - -import ( - "encoding/json" - "fmt" - "net/http" - "strconv" - - "github.com/Axway/agents-mulesoft/pkg/anypoint" - "github.com/google/uuid" - - "github.com/Axway/agent-sdk/pkg/config" - "github.com/Axway/agent-sdk/pkg/transaction" - transutil "github.com/Axway/agent-sdk/pkg/transaction/util" - "github.com/Axway/agent-sdk/pkg/util/log" -) - -const Inbound = "Inbound" -const Outbound = "Outbound" -const Client = "Client" -const MuleProxy = "Mule.APIProxy" -const Backend = "Backend" - -type Mapper interface { - ProcessMapping(event anypoint.AnalyticsEvent) ([]*transaction.LogEvent, error) -} - -func NewEventMapper(client anypoint.AnalyticsClient, centralCfg config.CentralConfig) *EventMapper { - return &EventMapper{ - client: client, - centralCfg: centralCfg, - } -} - -// EventMapper - -type EventMapper struct { - client anypoint.AnalyticsClient - centralCfg config.CentralConfig -} - -func (em *EventMapper) ProcessMapping(event anypoint.AnalyticsEvent) ([]*transaction.LogEvent, error) { - centralCfg := em.centralCfg - - eventTime := event.Timestamp.UnixNano() / 1000000 - txID := uuid.New().String() - txEventID := event.MessageID - leg0ID := FormatLeg0(txEventID) - leg1ID := FormatLeg1(txEventID) - - transSummaryLogEvent, err := em.createSummaryEvent(eventTime, txID, event, centralCfg.GetTeamID()) - if err != nil { - return nil, err - } - - transOutboundLogEventLeg, err := em.createTransactionEvent(eventTime, txID, event, leg0ID, "", Outbound) - if err != nil { - return nil, err - } - - transInboundLogEventLeg, err := em.createTransactionEvent(eventTime, txID, event, leg1ID, leg0ID, Inbound) - if err != nil { - return nil, err - } - - return []*transaction.LogEvent{ - transSummaryLogEvent, - transOutboundLogEventLeg, - transInboundLogEventLeg, - }, nil -} - -func (em *EventMapper) createTransactionEvent( - eventTime int64, - txID string, - txDetails anypoint.AnalyticsEvent, - eventID, - parentEventID, - direction string, -) (*transaction.LogEvent, error) { - - req := map[string]string{ - "User-AgentName": txDetails.UserAgentName + txDetails.UserAgentVersion, - "Request-ID": txDetails.MessageID, - "Forwarded-For": txDetails.ClientIP, - "Violated-Policies": txDetails.ViolatedPolicyName, - } - res := map[string]string{ - "Request-Outcome": txDetails.RequestOutcome, - "Response-Time": strconv.Itoa(txDetails.ResponseTime), - } - - httpProtocolDetails, err := transaction.NewHTTPProtocolBuilder(). - SetByteLength(txDetails.RequestSize, txDetails.ResponseSize). - SetHeaders(buildHeaders(req), buildHeaders(res)). - SetHost(txDetails.ClientIP). - SetMethod(txDetails.Verb). - SetStatus(txDetails.StatusCode, http.StatusText(txDetails.StatusCode)). - SetURI(txDetails.ResourcePath). - Build() - - if err != nil { - return nil, err - } - - builder := transaction.NewTransactionEventBuilder(). - SetDirection(direction). - SetID(eventID). - SetParentID(parentEventID). - SetProtocolDetail(httpProtocolDetails). - SetStatus(getTransactionEventStatus(txDetails.StatusCode)). - SetTimestamp(eventTime). - SetTransactionID(txID) - - if direction == Outbound { - builder. - SetSource(Client). - SetDestination(MuleProxy) - } else { - builder. - SetSource(MuleProxy). - SetDestination(Backend + txDetails.APIName) - } - - return builder.Build() -} - -func (em *EventMapper) createSummaryEvent( - eventTime int64, - txID string, - event anypoint.AnalyticsEvent, - teamID string, -) (*transaction.LogEvent, error) { - host := event.ClientIP - method := event.Verb - name := FormatAPIName(event.APIName, event.APIVersionName) - statusCode := event.StatusCode - uri := event.ResourcePath - - builder := transaction.NewTransactionSummaryBuilder(). - SetDuration(event.ResponseTime). - SetEntryPoint("http", method, uri, host). - SetProxyWithStage(transutil.FormatProxyID(event.APIID), name, event.AssetVersion, 1). - SetStatus(getTransactionSummaryStatus(statusCode), strconv.Itoa(statusCode)). - SetTeam(teamID). - SetTransactionID(txID). - SetTimestamp(eventTime) - - if event.ApplicationName != "" { - builder.SetApplication(transutil.FormatApplicationID(event.Application), event.ApplicationName) - } - - return builder.Build() -} - -func getTransactionSummaryStatus(statusCode int) transaction.TxSummaryStatus { - transSummaryStatus := transaction.TxSummaryStatusUnknown - if statusCode >= http.StatusOK && statusCode < http.StatusBadRequest { - transSummaryStatus = transaction.TxSummaryStatusSuccess - } else if statusCode >= http.StatusBadRequest && statusCode < http.StatusInternalServerError { - transSummaryStatus = transaction.TxSummaryStatusFailure - } else if statusCode >= http.StatusInternalServerError && statusCode < http.StatusNetworkAuthenticationRequired { - transSummaryStatus = transaction.TxSummaryStatusException - } - return transSummaryStatus -} - -func buildHeaders(headers map[string]string) string { - jsonHeader, err := json.Marshal(headers) - if err != nil { - log.Error(err.Error()) - return "" - } - return string(jsonHeader) -} - -func getTransactionEventStatus(code int) transaction.TxEventStatus { - if code >= 400 { - return transaction.TxEventStatusFail - } - return transaction.TxEventStatusPass -} - -func FormatLeg0(id string) string { - return fmt.Sprintf("%s-leg0", id) -} - -func FormatLeg1(id string) string { - return fmt.Sprintf("%s-leg1", id) -} - -// FormatAPIName formats the name for the api that generated the event -func FormatAPIName(apiName, apiVersionName string) string { - return fmt.Sprintf("%s-%s", apiName, apiVersionName) -} diff --git a/pkg/traceability/eventmapper_test.go b/pkg/traceability/eventmapper_test.go deleted file mode 100644 index d6bd60c2..00000000 --- a/pkg/traceability/eventmapper_test.go +++ /dev/null @@ -1,263 +0,0 @@ -package traceability - -import ( - "os" - "strings" - "testing" - "time" - - "github.com/Axway/agents-mulesoft/pkg/config" - "github.com/Axway/agents-mulesoft/pkg/discovery" - "github.com/google/uuid" - - "github.com/Axway/agent-sdk/pkg/agent" - corecfg "github.com/Axway/agent-sdk/pkg/config" - "github.com/Axway/agent-sdk/pkg/traceability/redaction" - "github.com/Axway/agent-sdk/pkg/transaction" - - "github.com/stretchr/testify/assert" - - transutil "github.com/Axway/agent-sdk/pkg/transaction/util" - "github.com/Axway/agents-mulesoft/pkg/anypoint" -) - -var agentConfig *config.AgentConfig - -var event = anypoint.AnalyticsEvent{ - Application: "43210", - APIID: "211799904", - APIName: "petstore-3", - APIVersionID: "16810512", - APIVersionName: "v1", - ApplicationName: "foo", - Browser: "Chrome", - City: "Phoenix", - ClientIP: "1.2.3.4", - Continent: "North America", - Country: "United States", - HardwarePlatform: "", - MessageID: "e2029ea0-a873-11eb-875c-064449f4dd2c", - OSFamily: "", - OSMajorVersion: "", - OSMinorVersion: "", - OSVersion: "", - PostalCode: "", - RequestOutcome: "PROCESSED", - RequestSize: 0, - ResourcePath: "/pets", - ResponseSize: 20, - ResponseTime: 60, - StatusCode: 200, - Timestamp: time.Now(), - Timezone: "", - UserAgentName: "Mozilla", - UserAgentVersion: "5.0", - Verb: "GET", - ViolatedPolicyName: "", -} - -var app = &anypoint.Application{ - APIEndpoints: false, - ClientID: "21", - ClientSecret: "23", - Description: "app", - ID: 1, - Name: "foo", -} - -func setupConfig() { - os.Setenv("CENTRAL_AUTH_PRIVATEKEY_DATA", "12345") - os.Setenv("CENTRAL_AUTH_PUBLICKEY_DATA", "12345") - cfg := corecfg.NewTestCentralConfig(corecfg.TraceabilityAgent) - centralCfg := cfg.(*corecfg.CentralConfiguration) - centralCfg.APICDeployment = APICDeployment - centralCfg.TenantID = TenantID - centralCfg.Environment = Environment - centralCfg.EnvironmentID = EnvID - agentConfig = &config.AgentConfig{ - CentralConfig: centralCfg, - MulesoftConfig: &config.MulesoftConfig{ - PollInterval: 1 * time.Second, - }, - } - agentConfig.CentralConfig.SetEnvironmentID(EnvID) - agentConfig.CentralConfig.SetTeamID(TeamID) - config.SetConfig(agentConfig) - agent.Initialize(agentConfig.CentralConfig) -} - -func setupForTest() { - cfg := redaction.Config{ - Path: redaction.Path{ - Allowed: []redaction.Show{ - redaction.Show{ - KeyMatch: ".*", - }, - }, - }, - Args: redaction.Filter{ - Allowed: []redaction.Show{ - redaction.Show{ - KeyMatch: ".*", - }, - }, - }, - RequestHeaders: redaction.Filter{ - Allowed: []redaction.Show{ - redaction.Show{ - KeyMatch: ".*", - }, - }, - }, - ResponseHeaders: redaction.Filter{ - Allowed: []redaction.Show{ - redaction.Show{ - KeyMatch: ".*", - }, - }, - }, - MaskingCharacters: ".*", - JMSProperties: redaction.Filter{ - Allowed: []redaction.Show{ - redaction.Show{ - KeyMatch: ".*", - }, - }, - }, - } - redaction.SetupGlobalRedaction(cfg) - setupConfig() -} - -func TestEventMapper_processMapping(t *testing.T) { - setupForTest() - client := &mockAnalyticsClient{ - app: app, - } - mapper := NewEventMapper(client, agentConfig.CentralConfig) - - item, err := mapper.ProcessMapping(event) - assert.Nil(t, err) - assert.Equal(t, transutil.FormatApplicationID(event.Application), item[0].TransactionSummary.Application.ID) - assert.Equal(t, event.ApplicationName, item[0].TransactionSummary.Application.Name) - assert.Equal(t, 3, len(item)) - assert.NotNil(t, item[1].TransactionEvent.Protocol) - for i := 0; i < 2; i++ { - rqstHeader := item[i+1].TransactionEvent.Protocol.(*transaction.Protocol).RequestHeaders - respHeader := item[i+1].TransactionEvent.Protocol.(*transaction.Protocol).ResponseHeaders - assert.Contains(t, rqstHeader, "User-AgentName") - assert.Contains(t, rqstHeader, "Request-ID") - assert.Contains(t, rqstHeader, "Forwarded-For") - assert.Contains(t, rqstHeader, "Violated-Policies") - assert.Contains(t, respHeader, "Request-Outcome") - assert.Contains(t, respHeader, "Response-Time") - } - - // expect the application name and id to be empty when the event has no app. - ev := event - ev.Application = "" - ev.ApplicationName = "" - item, err = mapper.ProcessMapping(ev) - assert.Nil(t, err) - assert.Nil(t, item[0].TransactionSummary.Application) -} - -func Test_getTransactionEventStatus(t *testing.T) { - setupForTest() - status := getTransactionEventStatus(100) - assert.Equal(t, transaction.TxEventStatusPass, status) - - status = getTransactionEventStatus(200) - assert.Equal(t, transaction.TxEventStatusPass, status) - - status = getTransactionEventStatus(300) - assert.Equal(t, transaction.TxEventStatusPass, status) - - status = getTransactionEventStatus(400) - assert.Equal(t, transaction.TxEventStatusFail, status) - - status = getTransactionEventStatus(500) - assert.Equal(t, transaction.TxEventStatusFail, status) - - status = getTransactionEventStatus(600) - assert.Equal(t, transaction.TxEventStatusFail, status) -} - -func Test_getTransactionSummaryStatus(t *testing.T) { - setupForTest() - status := getTransactionSummaryStatus(200) - assert.Equal(t, transaction.TxSummaryStatusSuccess, status) - - status = getTransactionSummaryStatus(300) - assert.Equal(t, transaction.TxSummaryStatusSuccess, status) - - status = getTransactionSummaryStatus(400) - assert.Equal(t, transaction.TxSummaryStatusFailure, status) - - status = getTransactionSummaryStatus(500) - assert.Equal(t, transaction.TxSummaryStatusException, status) - - status = getTransactionSummaryStatus(600) - assert.Equal(t, transaction.TxSummaryStatusUnknown, status) - - status = getTransactionSummaryStatus(100) - assert.Equal(t, transaction.TxSummaryStatusUnknown, status) -} - -func Test_buildHeaders(t *testing.T) { - setupForTest() - h := map[string]string{ - "Authorization": "abc123", - "User-Agent": "MulesoftTraceability", - } - res := buildHeaders(h) - assert.Equal(t, "{\"Authorization\":\"abc123\",\"User-Agent\":\"MulesoftTraceability\"}", res) -} - -func Test_APIServiceNameAndTransactionProxyNameAreEqual(t *testing.T) { - setupForTest() - redaction.SetupGlobalRedaction(redaction.DefaultConfig()) - - sd := &discovery.ServiceDetail{ - APIName: "petstore-3", - APISpec: []byte(`{"openapi":"3.0.1","servers":[{"url":"google.com"}],"paths":{},"info":{"title":"petstore3"}}`), - APIUpdateSeverity: "", - AuthPolicy: "pass-through", - Description: "petstore api", - Documentation: nil, - ID: "211797097", - Image: "", - ImageContentType: "", - ResourceType: "oas3", - AgentDetails: map[string]string{ - "API ID": "16810512", - }, - Stage: "Sandbox", - State: "", - Status: "", - SubscriptionName: "", - Tags: nil, - Title: "petstore-3", - URL: "", - Version: "1.0.0", - } - body, err := discovery.BuildServiceBody(sd) - assert.Nil(t, err) - apiServiceName := body.NameToPush - - client := &mockAnalyticsClient{ - app: app, - err: nil, - } - em := &EventMapper{client: client} - - le, err := em.createSummaryEvent(100, uuid.New().String(), event, "123") - assert.Nil(t, err) - transactionProxyName := le.TransactionSummary.Proxy.Name - transactionProxyID := le.TransactionSummary.Proxy.ID - assert.Contains(t, transactionProxyName, apiServiceName) - - assert.True(t, strings.Contains(transactionProxyID, event.APIID)) - assert.Equal(t, event.ApplicationName, le.TransactionSummary.Application.Name) - assert.Equal(t, transutil.FormatApplicationID(event.Application), le.TransactionSummary.Application.ID) -} diff --git a/pkg/traceability/eventprocessor.go b/pkg/traceability/eventprocessor.go deleted file mode 100644 index b45bd5b4..00000000 --- a/pkg/traceability/eventprocessor.go +++ /dev/null @@ -1,73 +0,0 @@ -package traceability - -import ( - "encoding/json" - "time" - - "github.com/Axway/agents-mulesoft/pkg/anypoint" - - "github.com/Axway/agent-sdk/pkg/transaction" - "github.com/Axway/agent-sdk/pkg/util/log" - - "github.com/Axway/agents-mulesoft/pkg/config" - "github.com/elastic/beats/v7/libbeat/beat" -) - -type Processor interface { - ProcessRaw(rawEvent []byte) []beat.Event -} - -// EventProcessor - represents the processor for received event for Amplify Central -// The event processing can be done either when the beat input receives the log entry or before the beat transport -// publishes the event to transport. -// When processing the received log entry on input, the log entry is mapped to structure expected for Amplify Central Observer -// and then beat.Event is published to beat output that produces the event over the configured transport. -// When processing the log entry on output, the log entry is published to output as beat.Event. The output transport invokes -// the Process(events []publisher.Event) method which is set as output event processor. The Process() method processes the received -// log entry and performs the mapping to structure expected for Amplify Central Observer. The method returns the converted Events to -// transport publisher which then produces the events over the transport. -type EventProcessor struct { - cfg *config.AgentConfig - eventGenerator transaction.EventGenerator - eventMapper Mapper -} - -func NewEventProcessor( - gateway *config.AgentConfig, - eventGenerator transaction.EventGenerator, - mapper Mapper, -) *EventProcessor { - ep := &EventProcessor{ - cfg: gateway, - eventGenerator: eventGenerator, - eventMapper: mapper, - } - return ep -} - -// ProcessRaw - process the received log entry and returns the event to be published to Amplifyingestion service -func (ep *EventProcessor) ProcessRaw(rawEvent []byte) []beat.Event { - var gatewayTrafficLogEntry anypoint.AnalyticsEvent - err := json.Unmarshal(rawEvent, &gatewayTrafficLogEntry) - if err != nil { - log.Error(err.Error()) - return nil - } - // Map the log entry to log event structure expected by AmplifyCentral Observer - logEvents, err := ep.eventMapper.ProcessMapping(gatewayTrafficLogEntry) - if err != nil { - log.Error(err.Error()) - return nil - } - events := make([]beat.Event, 0) - for _, logEvent := range logEvents { - // Generates the beat.Event with attributes by Amplify ingestion service - event, err := ep.eventGenerator.CreateEvent(*logEvent, time.Now(), nil, nil, nil) - if err != nil { - log.Error(err.Error()) - } else { - events = append(events, event) - } - } - return events -} diff --git a/pkg/traceability/eventprocessor_test.go b/pkg/traceability/eventprocessor_test.go deleted file mode 100644 index 62510c4b..00000000 --- a/pkg/traceability/eventprocessor_test.go +++ /dev/null @@ -1,196 +0,0 @@ -package traceability - -import ( - "encoding/json" - "fmt" - "testing" - "time" - - "github.com/Axway/agents-mulesoft/pkg/anypoint" - - "github.com/stretchr/testify/assert" - - "github.com/Axway/agent-sdk/pkg/transaction" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common" - - transutil "github.com/Axway/agent-sdk/pkg/transaction/util" -) - -const ( - TenantID = "332211" - APICDeployment = "prod" - Environment = "mule" - EnvID = "envid00" - TeamID = "678" -) - -func TestEventProcessor_ProcessRaw(t *testing.T) { - setupForTest() - client := &mockAnalyticsClient{ - app: app, - } - mapper := NewEventMapper(client, agentConfig.CentralConfig) - processor := NewEventProcessor(agentConfig, &eventGeneratorMock{}, mapper) - - bts, err := json.Marshal(&event) - assert.Nil(t, err) - evts := processor.ProcessRaw(bts) - - summaryRaw := evts[0] - summaryEvent := &transaction.LogEvent{} - summaryMsg := summaryRaw.Fields["message"].(string) - err = json.Unmarshal([]byte(summaryMsg), summaryEvent) - assert.Nil(t, err) - // TransactionSummary assertions - assertLegCommonFields(t, event, summaryEvent, transaction.TypeTransactionSummary) - assert.Nil(t, summaryEvent.TransactionEvent) - assert.Equal(t, "Success", summaryEvent.TransactionSummary.Status) - assert.Equal(t, "200", summaryEvent.TransactionSummary.StatusDetail) - assert.Equal(t, 60, summaryEvent.TransactionSummary.Duration) - assert.Equal(t, TeamID, summaryEvent.TransactionSummary.Team.ID) - assert.Equal(t, transutil.FormatProxyID(event.APIID), summaryEvent.TransactionSummary.Proxy.ID) - assert.Equal(t, 1, summaryEvent.TransactionSummary.Proxy.Revision) - assert.Equal(t, FormatAPIName(event.APIName, event.APIVersionName), summaryEvent.TransactionSummary.Proxy.Name) - assert.Nil(t, summaryEvent.TransactionSummary.Runtime) - assert.Equal(t, "http", summaryEvent.TransactionSummary.EntryPoint.Type) - assert.Equal(t, event.Verb, summaryEvent.TransactionSummary.EntryPoint.Method) - assert.Equal(t, event.ResourcePath, summaryEvent.TransactionSummary.EntryPoint.Path) - assert.Equal(t, event.ClientIP, summaryEvent.TransactionSummary.EntryPoint.Host) - - leg0Raw := evts[1] - leg0Event := &transaction.LogEvent{} - leg0Msg := leg0Raw.Fields["message"].(string) - err = json.Unmarshal([]byte(leg0Msg), leg0Event) - assert.Nil(t, err) - assertLegCommonFields(t, event, leg0Event, transaction.TypeTransactionEvent) - assert.Equal(t, FormatLeg0(event.MessageID), leg0Event.TransactionEvent.ID) - assertLegTransactionEvent(t, event, leg0Event, Outbound, "") - - leg1Raw := evts[2] - leg1Event := &transaction.LogEvent{} - leg1Msg := leg1Raw.Fields["message"].(string) - err = json.Unmarshal([]byte(leg1Msg), leg1Event) - assert.Nil(t, err) - assertLegCommonFields(t, event, leg1Event, transaction.TypeTransactionEvent) - assert.Equal(t, FormatLeg1(event.MessageID), leg1Event.TransactionEvent.ID) - assertLegTransactionEvent(t, event, leg1Event, Inbound, FormatLeg0(event.MessageID)) -} - -func TestEventProcessor_ProcessRaw_Errors(t *testing.T) { - setupForTest() - // returns nil when the EventMapper throws an error - processor := NewEventProcessor(agentConfig, &eventGeneratorMock{}, &eventMapperErr{}) - bts, err := json.Marshal(&event) - assert.Nil(t, err) - evts := processor.ProcessRaw(bts) - assert.Nil(t, evts) - - // returns an empty array when the EventGenerator throws an error - client := &mockAnalyticsClient{ - app: app, - } - mapper := NewEventMapper(client, agentConfig.CentralConfig) - processor = NewEventProcessor(agentConfig, &eventGenMockErr{}, mapper) - bts, err = json.Marshal(&event) - assert.Nil(t, err) - evts = processor.ProcessRaw(bts) - assert.Equal(t, 0, len(evts)) - - // return nil when given bad json - processor = NewEventProcessor(agentConfig, &eventGeneratorMock{}, mapper) - evts = processor.ProcessRaw([]byte("nope")) - assert.Nil(t, evts) -} - -func assertLegCommonFields(t *testing.T, muleEvent anypoint.AnalyticsEvent, logEvent *transaction.LogEvent, logType string) { - assert.Equal(t, "1.0", logEvent.Version) - assert.Equal(t, "", logEvent.Environment) - assert.Equal(t, APICDeployment, logEvent.APICDeployment) - assert.Equal(t, EnvID, logEvent.EnvironmentID) - assert.Equal(t, TenantID, logEvent.TenantID) - assert.Equal(t, TenantID, logEvent.TrcbltPartitionID) - assert.Equal(t, logType, logEvent.Type) - assert.Equal(t, "", logEvent.TargetPath) - assert.Equal(t, "", logEvent.ResourcePath) -} - -func assertLegTransactionEvent(t *testing.T, muleEvent anypoint.AnalyticsEvent, logEvent *transaction.LogEvent, direction, parent string) { - source := "" - destination := "" - if direction == Outbound { - source = Client - destination = MuleProxy - } else { - source = MuleProxy - destination = Backend + muleEvent.APIName - } - assert.Nil(t, logEvent.TransactionSummary) - assert.Equal(t, parent, logEvent.TransactionEvent.ParentID) - assert.Equal(t, source, logEvent.TransactionEvent.Source) - assert.Equal(t, destination, logEvent.TransactionEvent.Destination) - assert.Equal(t, 0, logEvent.TransactionEvent.Duration) - assert.Equal(t, direction, logEvent.TransactionEvent.Direction) - assert.Equal(t, "Pass", logEvent.TransactionEvent.Status) -} - -// eventGeneratorMock - mock event generator -type eventGeneratorMock struct { - shouldUseTrafficForAggregation bool -} - -func (c *eventGeneratorMock) CreateEvents(transaction.LogEvent, []transaction.LogEvent, time.Time, common.MapStr, common.MapStr, interface{}) (events []beat.Event, err error) { - return nil, nil -} - -// CreateEvent - Creates a new mocked event for tests -func (c *eventGeneratorMock) CreateEvent( - logEvent transaction.LogEvent, - eventTime time.Time, - metaData common.MapStr, - _ common.MapStr, - privateData interface{}, -) (event beat.Event, err error) { - serializedLogEvent, _ := json.Marshal(logEvent) - eventData := make(map[string]interface{}) - eventData["message"] = string(serializedLogEvent) - event = beat.Event{ - Timestamp: eventTime, - Meta: metaData, - Private: privateData, - Fields: eventData, - } - return -} - -func (c *eventGeneratorMock) SetUseTrafficForAggregation(useTrafficForAggregation bool) { - c.shouldUseTrafficForAggregation = useTrafficForAggregation -} - -type eventGenMockErr struct { - shouldUseTrafficForAggregation bool -} - -func (c *eventGenMockErr) CreateEvents(transaction.LogEvent, []transaction.LogEvent, time.Time, common.MapStr, common.MapStr, interface{}) (events []beat.Event, err error) { - return nil, nil -} - -func (c *eventGenMockErr) CreateEvent( - _ transaction.LogEvent, - _ time.Time, - _ common.MapStr, - _ common.MapStr, - _ interface{}, -) (event beat.Event, err error) { - return beat.Event{}, fmt.Errorf("create event error") -} - -func (c *eventGenMockErr) SetUseTrafficForAggregation(useTrafficForAggregation bool) { - c.shouldUseTrafficForAggregation = useTrafficForAggregation -} - -type eventMapperErr struct{} - -func (em *eventMapperErr) ProcessMapping(_ anypoint.AnalyticsEvent) ([]*transaction.LogEvent, error) { - return nil, fmt.Errorf("event mapping error") -} diff --git a/pkg/traceability/file b/pkg/traceability/file deleted file mode 100644 index 56a6051c..00000000 --- a/pkg/traceability/file +++ /dev/null @@ -1 +0,0 @@ -1 \ No newline at end of file diff --git a/pkg/traceability/muleemitter.go b/pkg/traceability/muleemitter.go index 3e364a8a..5b8c746a 100644 --- a/pkg/traceability/muleemitter.go +++ b/pkg/traceability/muleemitter.go @@ -1,20 +1,21 @@ package traceability import ( - "encoding/json" "fmt" "time" + v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1" "github.com/Axway/agent-sdk/pkg/cache" + "github.com/Axway/agent-sdk/pkg/util" "github.com/Axway/agent-sdk/pkg/jobs" - "github.com/Axway/agent-sdk/pkg/util/log" "github.com/sirupsen/logrus" hc "github.com/Axway/agent-sdk/pkg/util/healthcheck" "github.com/Axway/agents-mulesoft/pkg/anypoint" + "github.com/Axway/agents-mulesoft/pkg/common" "github.com/Axway/agents-mulesoft/pkg/config" ) @@ -23,6 +24,11 @@ const ( CacheKeyTimeStamp = "LAST_RUN" ) +type instanceCache interface { + GetAPIServiceInstanceKeys() []string + GetAPIServiceInstanceByID(id string) (*v1.ResourceInstance, error) +} + type Emitter interface { Start() error OnConfigChange(gatewayCfg *config.AgentConfig) @@ -32,10 +38,12 @@ type healthChecker func(name, endpoint string, check hc.CheckStatus) (string, er // MuleEventEmitter - Gathers analytics data for publishing to Central. type MuleEventEmitter struct { - client anypoint.AnalyticsClient - eventChannel chan string - cache cache.Cache - cachePath string + client anypoint.AnalyticsClient + eventChannel chan common.MetricEvent + cache cache.Cache + cachePath string + instanceCache instanceCache + useMonitoringAPI bool } // MuleEventEmitterJob wraps an Emitter and implements the Job interface so that it can be executed by the sdk. @@ -48,65 +56,113 @@ type MuleEventEmitterJob struct { } // NewMuleEventEmitter - Creates a client to poll for events. -func NewMuleEventEmitter(cachePath string, eventChannel chan string, client anypoint.AnalyticsClient) *MuleEventEmitter { +func NewMuleEventEmitter(config *config.MulesoftConfig, eventChannel chan common.MetricEvent, client anypoint.AnalyticsClient, instanceCache instanceCache) *MuleEventEmitter { me := &MuleEventEmitter{ - eventChannel: eventChannel, - client: client, + eventChannel: eventChannel, + client: client, + instanceCache: instanceCache, + useMonitoringAPI: config.UseMonitoringAPI, } - me.cachePath = formatCachePath(cachePath) + me.cachePath = formatCachePath(config.CachePath) me.cache = cache.Load(me.cachePath) return me } // Start retrieves analytics data from anypoint and sends them on the event channel for processing. func (me *MuleEventEmitter) Start() error { - strStartTime, strEndTime := me.getLastRun() - events, err := me.client.GetAnalyticsWindow(strStartTime, strEndTime) - - if err != nil { - logrus.WithError(err).Error("failed to get analytics data") - return err + var bootInfo *anypoint.MonitoringBootInfo + if !me.useMonitoringAPI { + bi, err := me.client.GetMonitoringBootstrap() + if err != nil { + return err + } + bootInfo = bi } - var lastTime time.Time - lastTime, err = time.Parse(time.RFC3339, strStartTime) - if err != nil { - logrus.WithFields(logrus.Fields{"strStartTime": strStartTime}).Warn("Unable to Parse Last Time") - } - for _, event := range events { - // Results are not sorted. We want the most recent time to bubble up - if event.Timestamp.After(lastTime) { - lastTime = event.Timestamp + // Initialize Metric Batch + me.eventChannel <- common.MetricEvent{Type: common.Initialize} + + // Publish metrics, event receiver takes care if no metrics needs to be published + defer func() { + me.eventChannel <- common.MetricEvent{Type: common.Completed} + }() + + // change the cache to store startTime per API + instanceKeys := me.instanceCache.GetAPIServiceInstanceKeys() + reportEndTime := time.Now() + for _, instanceID := range instanceKeys { + instance, _ := me.instanceCache.GetAPIServiceInstanceByID(instanceID) + apiID, _ := util.GetAgentDetailsValue(instance, common.AttrAssetID) + apiVersionID, _ := util.GetAgentDetailsValue(instance, common.AttrAPIID) + if apiID == "" { + continue + } + lastAPIReportTime := me.getLastRun(apiID, instance) + metrics, err := me.getMetrics(bootInfo, apiID, apiVersionID, lastAPIReportTime, reportEndTime) + endTime := lastAPIReportTime + for _, metric := range metrics { + // Report only latest entries, ignore old entries + if metric.Time.After(lastAPIReportTime) { + for _, event := range metric.Events { + m := common.MetricEvent{ + Type: common.Metric, + Metric: common.Metrics{ + StartTime: lastAPIReportTime, + EndTime: metric.Time, + APIID: apiID, + Instance: instance, + StatusCode: event.StatusCode, + Count: int64(event.RequestSizeCount), + Max: int64(event.ResponseTimeMax), + Min: int64(event.ResponseTimeMin), + }, + } + me.eventChannel <- m + } + } + // Results are not sorted. We want the most recent time to bubble up for next run cycle + if metric.Time.After(endTime) { + endTime = metric.Time + } } - j, err := json.Marshal(event) + me.saveLastRun(apiID, endTime) if err != nil { - log.Warnf("failed to marshal event: %s", err.Error()) + logrus.WithError(err).Error("failed to get analytics data") + return err } - me.eventChannel <- string(j) } - // Add 1 second to the last time stamp if we found records from this pull. - // This will prevent duplicate records from being retrieved - if len(events) > 0 { - lastTime = lastTime.Add(time.Second * 1) - } - me.saveLastRun(lastTime.Format(time.RFC3339)) return nil } -func (me *MuleEventEmitter) getLastRun() (string, string) { - tStamp, _ := me.cache.Get(CacheKeyTimeStamp) - now := time.Now() - tNow := now.Format(time.RFC3339Nano) - if tStamp == nil { - tStamp = tNow - me.saveLastRun(tNow) + +func (me *MuleEventEmitter) getMetrics(bootInfo *anypoint.MonitoringBootInfo, apiID, apiVersionID string, startTime, endTime time.Time) ([]anypoint.APIMonitoringMetric, error) { + if me.useMonitoringAPI { + return me.client.GetMonitoringArchive(apiID, startTime) + } + + return me.client.GetMonitoringMetrics(bootInfo.Settings.DataSource.InfluxDB.Database, bootInfo.Settings.DataSource.InfluxDB.ID, apiID, apiVersionID, startTime, endTime) +} + +func (me *MuleEventEmitter) getLastRun(apiID string, instance *v1.ResourceInstance) time.Time { + tStamp, _ := me.cache.Get(CacheKeyTimeStamp + "-" + apiID) + // use instance.Metadata.Audit.CreateTimestamp instead of Now() + tStart := time.Time(instance.Metadata.Audit.CreateTimestamp) + if tStamp != nil { + tStart, _ = time.Parse(time.RFC3339Nano, tStamp.(string)) + } else { + // if instance create time is more than a day, use current time to query + if time.Since(tStart) > 24*time.Hour { + tStart = time.Now() + } + me.saveLastRun(apiID, tStart) } - return tStamp.(string), tNow + return tStart } -func (me *MuleEventEmitter) saveLastRun(lastTime string) { - me.cache.Set(CacheKeyTimeStamp, lastTime) +func (me *MuleEventEmitter) saveLastRun(apiID string, lastTime time.Time) { + tm := lastTime.Format(time.RFC3339Nano) + me.cache.Set(CacheKeyTimeStamp+"-"+apiID, tm) me.cache.Save(me.cachePath) } diff --git a/pkg/traceability/muleemitter_test.go b/pkg/traceability/muleemitter_test.go index 16b53bf4..c91f5038 100644 --- a/pkg/traceability/muleemitter_test.go +++ b/pkg/traceability/muleemitter_test.go @@ -2,11 +2,17 @@ package traceability import ( "fmt" + "sync" "testing" "time" "github.com/Axway/agents-mulesoft/pkg/anypoint" + "github.com/Axway/agents-mulesoft/pkg/common" + v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1" + management "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1" + "github.com/Axway/agent-sdk/pkg/cache" + "github.com/Axway/agent-sdk/pkg/util" hc "github.com/Axway/agent-sdk/pkg/util/healthcheck" corecfg "github.com/Axway/agent-sdk/pkg/config" @@ -14,29 +20,130 @@ import ( "github.com/stretchr/testify/assert" ) +type mockInstaceCache struct { + instances cache.Cache +} + +func (c *mockInstaceCache) AddAPIServiceInstance(ri *v1.ResourceInstance) { + if c.instances == nil { + c.instances = cache.New() + } + c.instances.Set(ri.Metadata.ID, ri) +} + +func (c *mockInstaceCache) GetAPIServiceInstanceKeys() []string { + if c.instances == nil { + c.instances = cache.New() + } + return c.instances.GetKeys() +} + +func (c *mockInstaceCache) GetAPIServiceInstanceByID(id string) (*v1.ResourceInstance, error) { + item, err := c.instances.Get(id) + if err != nil { + return nil, err + } + ri, ok := item.(*v1.ResourceInstance) + if ok { + return ri, nil + } + return nil, fmt.Errorf("error") +} + +type mockEventReceiver struct { + metricBatchInitialized bool + metricBatchPublish bool + metricReceived bool + receivedMetric common.Metrics + wg sync.WaitGroup +} + +func (r *mockEventReceiver) init() { + r.wg.Add(1) +} + +func (r *mockEventReceiver) wait() { + r.wg.Wait() +} + +func (r *mockEventReceiver) receiveEvents(eventChannel chan common.MetricEvent) { + for { + select { + case event := <-eventChannel: + switch event.Type { + case common.Initialize: + r.metricBatchInitialized = true + case common.Metric: + r.metricReceived = true + r.receivedMetric = event.Metric + case common.Completed: + r.metricBatchPublish = true + r.wg.Done() + return + } + } + } +} + func Test_MuleEventEmitter(t *testing.T) { - eventCh := make(chan string) + eventCh := make(chan common.MetricEvent) + event := anypoint.APIMonitoringMetric{ + Time: time.Now().Add(10 * time.Second), + Events: []anypoint.APISummaryMetricEvent{ + { + APIName: "test", + ClientID: "test", + StatusCode: "200", + RequestSizeCount: 1, + ResponseTimeMax: 2, + ResponseTimeMin: 1, + }, + }, + } client := &mockAnalyticsClient{ - events: []anypoint.AnalyticsEvent{event}, + events: []anypoint.APIMonitoringMetric{event}, err: nil, } - emitter := NewMuleEventEmitter("/tmp", eventCh, client) + instanceCache := &mockInstaceCache{} + svcInst := management.NewAPIServiceInstance("api", "env") + util.SetAgentDetailsKey(svcInst, common.AttrAPIID, "1234") + util.SetAgentDetailsKey(svcInst, common.AttrAssetID, "1234") + svcInst.Metadata.ID = "1234" + ri, _ := svcInst.AsInstance() + instanceCache.AddAPIServiceInstance(ri) + + emitter := NewMuleEventEmitter(&config.MulesoftConfig{CachePath: "/tmp", UseMonitoringAPI: true}, eventCh, client, instanceCache) assert.NotNil(t, emitter) + eventReceiver := &mockEventReceiver{} go emitter.Start() + eventReceiver.init() + go eventReceiver.receiveEvents(eventCh) - e := <-eventCh - assert.NotEmpty(t, e) + eventReceiver.wait() + assert.True(t, eventReceiver.metricBatchInitialized) + assert.True(t, eventReceiver.metricReceived) + assert.True(t, eventReceiver.metricBatchPublish) // Should throw an error when the client returns an error + eventCh = make(chan common.MetricEvent) client = &mockAnalyticsClient{ - events: []anypoint.AnalyticsEvent{}, + events: []anypoint.APIMonitoringMetric{}, err: fmt.Errorf("failed"), } - emitter = NewMuleEventEmitter("/tmp", eventCh, client) - err := emitter.Start() - assert.Equal(t, client.err, err) + emitter = NewMuleEventEmitter(&config.MulesoftConfig{CachePath: "/tmp", UseMonitoringAPI: true}, eventCh, client, instanceCache) + eventReceiver = &mockEventReceiver{} + eventReceiver.init() + go func() { + err := emitter.Start() + assert.Equal(t, client.err, err) + }() + go eventReceiver.receiveEvents(eventCh) + + eventReceiver.wait() + assert.True(t, eventReceiver.metricBatchInitialized) + assert.True(t, eventReceiver.metricBatchPublish) } func TestMuleEventEmitterJob(t *testing.T) { @@ -48,12 +155,12 @@ func TestMuleEventEmitterJob(t *testing.T) { }, } - eventCh := make(chan string) + eventCh := make(chan common.MetricEvent) client := &mockAnalyticsClient{ - events: []anypoint.AnalyticsEvent{event}, + events: []anypoint.APIMonitoringMetric{}, err: nil, } - emitter := NewMuleEventEmitter("/tmp", eventCh, client) + emitter := NewMuleEventEmitter(&config.MulesoftConfig{CachePath: "/tmp", UseMonitoringAPI: true}, eventCh, client, &mockInstaceCache{}) job, err := NewMuleEventEmitterJob(emitter, pollInterval, mockHealthCheck, getStatusSuccess, mockRegisterHC) assert.Nil(t, err)