From 9dd0dabc8bba555cc4b33e2e6f5d29800cdc5ba2 Mon Sep 17 00:00:00 2001 From: Jakob Date: Tue, 16 Apr 2024 12:00:22 +0200 Subject: [PATCH] Add ISM Policy (#524) * ci/opensearch: add ism settings Signed-off-by: Jakob Hahn * plugins/ism: add base Signed-off-by: Jakob Hahn * plugins/ism: add policies Signed-off-by: Jakob Hahn * plugins/ism: add 'Add' function Signed-off-by: Jakob Hahn * plugins/ism: add change function Signed-off-by: Jakob Hahn * plugins/ism: add explain func Signed-off-by: Jakob Hahn * plugins/ism: add remove func Signed-off-by: Jakob Hahn * plugins/ism: add retry func Signed-off-by: Jakob Hahn * plugins/ism: add api tests Signed-off-by: Jakob Hahn * add changelog Signed-off-by: Jakob Hahn * makefile: adjust test-integ to work for unrealesed opensearch Signed-off-by: Jakob Hahn * opensearchapi: adjust scroll test to match all indices for scroll Signed-off-by: Jakob Hahn * opensearchapi: nodes stats add missing IO usage field Signed-off-by: Jakob Hahn --------- Signed-off-by: Jakob Hahn --- .ci/opensearch/docker-compose.yml | 1 + .../workflows/test-integration-unreleased.yml | 2 +- CHANGELOG.md | 3 +- Makefile | 6 +- opensearchapi/api_nodes-stats.go | 3 + opensearchapi/api_scroll_test.go | 14 +- plugins/ism/api.go | 69 +++++ plugins/ism/api_add.go | 81 ++++++ plugins/ism/api_change.go | 88 ++++++ plugins/ism/api_explain-params.go | 27 ++ plugins/ism/api_explain.go | 159 +++++++++++ plugins/ism/api_policies-delete.go | 55 ++++ plugins/ism/api_policies-get.go | 57 ++++ plugins/ism/api_policies-put-params.go | 29 ++ plugins/ism/api_policies-put.go | 61 ++++ plugins/ism/api_policies.go | 262 ++++++++++++++++++ plugins/ism/api_policies_test.go | 181 ++++++++++++ plugins/ism/api_remove.go | 68 +++++ plugins/ism/api_retry.go | 86 ++++++ plugins/ism/api_test.go | 260 +++++++++++++++++ plugins/ism/inspect.go | 14 + plugins/ism/internal/test/helper.go | 83 ++++++ 22 files changed, 1599 insertions(+), 10 deletions(-) create mode 100644 plugins/ism/api.go create mode 100644 plugins/ism/api_add.go create mode 100644 plugins/ism/api_change.go create mode 100644 plugins/ism/api_explain-params.go create mode 100644 plugins/ism/api_explain.go create mode 100644 plugins/ism/api_policies-delete.go create mode 100644 plugins/ism/api_policies-get.go create mode 100644 plugins/ism/api_policies-put-params.go create mode 100644 plugins/ism/api_policies-put.go create mode 100644 plugins/ism/api_policies.go create mode 100644 plugins/ism/api_policies_test.go create mode 100644 plugins/ism/api_remove.go create mode 100644 plugins/ism/api_retry.go create mode 100644 plugins/ism/api_test.go create mode 100644 plugins/ism/inspect.go create mode 100644 plugins/ism/internal/test/helper.go diff --git a/.ci/opensearch/docker-compose.yml b/.ci/opensearch/docker-compose.yml index 1754359d5..1f1b64db3 100755 --- a/.ci/opensearch/docker-compose.yml +++ b/.ci/opensearch/docker-compose.yml @@ -17,6 +17,7 @@ services: - bootstrap.memory_lock=true - path.repo=/usr/share/opensearch/mnt - OPENSEARCH_INITIAL_ADMIN_PASSWORD=myStrongPassword123! + - plugins.index_state_management.job_interval=1 ports: - "9200:9200" user: opensearch diff --git a/.github/workflows/test-integration-unreleased.yml b/.github/workflows/test-integration-unreleased.yml index a589e2418..7925cf872 100644 --- a/.github/workflows/test-integration-unreleased.yml +++ b/.github/workflows/test-integration-unreleased.yml @@ -65,7 +65,7 @@ jobs: - name: Integration test working-directory: go-client - run: make test-integ race=true + run: make test-integ race=true unreleased=true env: OPENSEARCH_GO_SKIP_JSON_COMPARE: true diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d0d35cf8..91addb79d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Adds security plugin ([#507](https://github.com/opensearch-project/opensearch-go/pull/507)) - Adds security settings to container for security testing ([#507](https://github.com/opensearch-project/opensearch-go/pull/507)) - Adds cluster.get-certs to copy admin certs out of the container ([#507](https://github.com/opensearch-project/opensearch-go/pull/507)) -- Add the `Fields` field containing stored fields to the `DocumentGetResp` struct (#526)[https://github.com/opensearch-project/opensearch-go/pull/526] +- Adds the `Fields` field containing stored fields to the `DocumentGetResp` struct (#526)[https://github.com/opensearch-project/opensearch-go/pull/526] +- Adds ism plugin ([#524](https://github.com/opensearch-project/opensearch-go/pull/524)) ### Changed - Uses docker compose v2 instead of v1 ([#506](https://github.com/opensearch-project/opensearch-go/pull/506)) diff --git a/Makefile b/Makefile index c804f4e40..665053417 100644 --- a/Makefile +++ b/Makefile @@ -22,13 +22,17 @@ test: test-unit test-integ: ## Run integration tests @printf "\033[2m→ Running integration tests...\033[0m\n" $(eval testintegtags += "integration") + $(eval testintegpath = "./...") ifdef multinode $(eval testintegtags += "multinode") endif ifdef race $(eval testintegargs += "-race") endif - $(eval testintegargs += "-cover" "-tags=$(testintegtags)" "-timeout=1h" "./..." "-args" "-test.gocoverdir=$(PWD)/tmp/integration") +ifdef unreleased + $(eval testintegpath = "./opensearchapi/...") +endif + $(eval testintegargs += "-cover" "-tags=$(testintegtags)" "-timeout=1h" "$(testintegpath)" "-args" "-test.gocoverdir=$(PWD)/tmp/integration") @mkdir -p $(PWD)/tmp/integration @echo "go test -v" $(testintegargs); \ go test -v $(testintegargs); diff --git a/opensearchapi/api_nodes-stats.go b/opensearchapi/api_nodes-stats.go index 4e2e3e5a7..063bbf175 100644 --- a/opensearchapi/api_nodes-stats.go +++ b/opensearchapi/api_nodes-stats.go @@ -691,6 +691,9 @@ type NodesStatsResourceUsageStats struct { Timestamp int64 `json:"timestamp"` CPUUtilizationPercent string `json:"cpu_utilization_percent"` MemoryUtilizationPercent string `json:"memory_utilization_percent"` + IOUsageStats struct { + MaxIOUtilizationPercent string `json:"max_io_utilization_percent"` + } `json:"io_usage_stats"` } // NodesStatsSegmentReplicationBackpressure is a sub type of NodesStats containing information about segment replication backpressure diff --git a/opensearchapi/api_scroll_test.go b/opensearchapi/api_scroll_test.go index f755cf277..58e46e35b 100644 --- a/opensearchapi/api_scroll_test.go +++ b/opensearchapi/api_scroll_test.go @@ -22,19 +22,19 @@ import ( func TestScrollClient(t *testing.T) { client, err := ostest.NewClient() - require.Nil(t, err) + require.NoError(t, err) failingClient, err := osapitest.CreateFailingClient() - require.Nil(t, err) + require.NoError(t, err) search, err := client.Search( nil, &opensearchapi.SearchReq{ - Indices: []string{"test*"}, + Indices: []string{"*"}, Params: opensearchapi.SearchParams{Scroll: 5 * time.Minute}, }, ) - require.Nil(t, err) - require.NotNil(t, search.ScrollID) + require.NoError(t, err) + require.NotNil(t, search.ScrollID, "ScrollID is nil") type scrollTests struct { Name string @@ -86,11 +86,11 @@ func TestScrollClient(t *testing.T) { t.Run(testCase.Name, func(t *testing.T) { res, err := testCase.Results() if testCase.Name == "inspect" { - assert.NotNil(t, err) + assert.Error(t, err) assert.NotNil(t, res) osapitest.VerifyInspect(t, res.Inspect()) } else { - require.Nil(t, err) + require.NoError(t, err) require.NotNil(t, res) assert.NotNil(t, res.Inspect().Response) ostest.CompareRawJSONwithParsedJSON(t, res, res.Inspect().Response) diff --git a/plugins/ism/api.go b/plugins/ism/api.go new file mode 100644 index 000000000..53e23a7c2 --- /dev/null +++ b/plugins/ism/api.go @@ -0,0 +1,69 @@ +// SPDX-License-Identifier: Apache-2.0 +// +// The OpenSearch Contributors require contributions made to +// this file be licensed under the Apache-2.0 license or a +// compatible open source license. + +package ism + +import ( + "context" + "fmt" + + "github.com/opensearch-project/opensearch-go/v3" +) + +// Config represents the client configuration +type Config struct { + Client opensearch.Config +} + +// Client represents the ism Client summarizing all API calls +type Client struct { + Client *opensearch.Client + Policies policiesClient +} + +// clientInit inits the Client with all sub clients +func clientInit(rootClient *opensearch.Client) *Client { + client := &Client{ + Client: rootClient, + } + client.Policies = policiesClient{apiClient: client} + return client +} + +// NewClient returns a ism client +func NewClient(config Config) (*Client, error) { + rootClient, err := opensearch.NewClient(config.Client) + if err != nil { + return nil, err + } + + return clientInit(rootClient), nil +} + +// do calls the opensearch.Client.Do() and checks the response for response errors +func (c *Client) do(ctx context.Context, req opensearch.Request, dataPointer any) (*opensearch.Response, error) { + resp, err := c.Client.Do(ctx, req, dataPointer) + if err != nil { + return nil, err + } + + if resp.IsError() { + if dataPointer != nil { + return resp, opensearch.ParseError(resp) + } else { + return resp, fmt.Errorf("status: %s", resp.Status()) + } + } + + return resp, nil +} + +// FailedIndex contains information about fieled actions +type FailedIndex struct { + IndexName string `json:"index_name"` + IndexUUID string `json:"index_uuid"` + Reason string `json:"reason"` +} diff --git a/plugins/ism/api_add.go b/plugins/ism/api_add.go new file mode 100644 index 000000000..4df0e2e5b --- /dev/null +++ b/plugins/ism/api_add.go @@ -0,0 +1,81 @@ +// SPDX-License-Identifier: Apache-2.0 +// +// The OpenSearch Contributors require contributions made to +// this file be licensed under the Apache-2.0 license or a +// compatible open source license. + +package ism + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "strings" + + "github.com/opensearch-project/opensearch-go/v3" +) + +// Add executes a add policy request with the required AddReq +func (c Client) Add(ctx context.Context, req AddReq) (AddResp, error) { + var ( + data AddResp + err error + ) + if data.response, err = c.do(ctx, req, &data); err != nil { + return data, err + } + + return data, nil +} + +// AddReq represents possible options for the add policy request +type AddReq struct { + Indices []string + Body AddBody + + Header http.Header +} + +// GetRequest returns the *http.Request that gets executed by the client +func (r AddReq) GetRequest() (*http.Request, error) { + body, err := json.Marshal(r.Body) + if err != nil { + return nil, err + } + + indices := strings.Join(r.Indices, ",") + var path strings.Builder + path.Grow(len("/_plugins/_ism/add/") + len(indices)) + path.WriteString("/_plugins/_ism/add") + if len(r.Indices) > 0 { + path.WriteString("/") + path.WriteString(indices) + } + + return opensearch.BuildRequest( + http.MethodPost, + path.String(), + bytes.NewReader(body), + make(map[string]string), + r.Header, + ) +} + +// AddResp represents the returned struct of the add policy response +type AddResp struct { + UpdatedIndices int `json:"updated_indices"` + Failures bool `json:"failures"` + FailedIndices []FailedIndex `json:"failed_indices"` + response *opensearch.Response +} + +// Inspect returns the Inspect type containing the raw *opensearch.Reponse +func (r AddResp) Inspect() Inspect { + return Inspect{Response: r.response} +} + +// AddBody represents the request body for the add policy request +type AddBody struct { + PolicyID string `json:"policy_id"` +} diff --git a/plugins/ism/api_change.go b/plugins/ism/api_change.go new file mode 100644 index 000000000..bb06477f2 --- /dev/null +++ b/plugins/ism/api_change.go @@ -0,0 +1,88 @@ +// SPDX-License-Identifier: Apache-2.0 +// +// The OpenSearch Contributors require contributions made to +// this file be licensed under the Apache-2.0 license or a +// compatible open source license. + +package ism + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "strings" + + "github.com/opensearch-project/opensearch-go/v3" +) + +// Change executes a change policy request with the required ChangeReq +func (c Client) Change(ctx context.Context, req ChangeReq) (ChangeResp, error) { + var ( + data ChangeResp + err error + ) + if data.response, err = c.do(ctx, req, &data); err != nil { + return data, err + } + + return data, nil +} + +// ChangeReq represents possible options for the change policy request +type ChangeReq struct { + Indices []string + Body ChangeBody + + Header http.Header +} + +// GetRequest returns the *http.Request that gets executed by the client +func (r ChangeReq) GetRequest() (*http.Request, error) { + body, err := json.Marshal(r.Body) + if err != nil { + return nil, err + } + + indices := strings.Join(r.Indices, ",") + var path strings.Builder + path.Grow(len("/_plugins/_ism/change_policy/") + len(indices)) + path.WriteString("/_plugins/_ism/change_policy") + if len(r.Indices) > 0 { + path.WriteString("/") + path.WriteString(indices) + } + + return opensearch.BuildRequest( + http.MethodPost, + path.String(), + bytes.NewReader(body), + make(map[string]string), + r.Header, + ) +} + +// ChangeResp represents the returned struct of the change policy response +type ChangeResp struct { + UpdatedIndices int `json:"updated_indices"` + Failures bool `json:"failures"` + FailedIndices []FailedIndex `json:"failed_indices"` + response *opensearch.Response +} + +// Inspect returns the Inspect type containing the raw *opensearch.Reponse +func (r ChangeResp) Inspect() Inspect { + return Inspect{Response: r.response} +} + +// ChangeBody represents the request body for the change policy request +type ChangeBody struct { + PolicyID string `json:"policy_id"` + State string `json:"state"` + Include []ChangeBodyInclude `json:"include,omitempty"` +} + +// ChangeBodyInclude is a sub type of ChangeBody containing the state information +type ChangeBodyInclude struct { + State string `json:"state"` +} diff --git a/plugins/ism/api_explain-params.go b/plugins/ism/api_explain-params.go new file mode 100644 index 000000000..2b915948e --- /dev/null +++ b/plugins/ism/api_explain-params.go @@ -0,0 +1,27 @@ +// SPDX-License-Identifier: Apache-2.0 +// +// The OpenSearch Contributors require contributions made to +// this file be licensed under the Apache-2.0 license or a +// compatible open source license. + +package ism + +// ExplainParams represents possible parameters for the ExplainReq +type ExplainParams struct { + ShowPolicy bool + ValidateAction bool +} + +func (r ExplainParams) get() map[string]string { + params := make(map[string]string) + + if r.ShowPolicy { + params["show_policy"] = "true" + } + + if r.ValidateAction { + params["validate_action"] = "true" + } + + return params +} diff --git a/plugins/ism/api_explain.go b/plugins/ism/api_explain.go new file mode 100644 index 000000000..51c40b61c --- /dev/null +++ b/plugins/ism/api_explain.go @@ -0,0 +1,159 @@ +// SPDX-License-Identifier: Apache-2.0 +// +// The OpenSearch Contributors require contributions made to +// this file be licensed under the Apache-2.0 license or a +// compatible open source license. + +package ism + +import ( + "context" + "encoding/json" + "net/http" + "strings" + + "github.com/opensearch-project/opensearch-go/v3" +) + +// Explain executes a explain policy request with the optional ExplainReq +func (c Client) Explain(ctx context.Context, req *ExplainReq) (ExplainResp, error) { + if req == nil { + req = &ExplainReq{} + } + + var ( + data ExplainResp + err error + ) + if data.response, err = c.do(ctx, req, &data); err != nil { + return data, err + } + + return data, nil +} + +// ExplainReq represents possible options for the explain policy request +type ExplainReq struct { + Indices []string + + Params ExplainParams + Header http.Header +} + +// GetRequest returns the *http.Request that gets executed by the client +func (r ExplainReq) GetRequest() (*http.Request, error) { + indices := strings.Join(r.Indices, ",") + var path strings.Builder + path.Grow(len("/_plugins/_ism/explain/") + len(indices)) + path.WriteString("/_plugins/_ism/explain") + if len(r.Indices) > 0 { + path.WriteString("/") + path.WriteString(indices) + } + + return opensearch.BuildRequest( + http.MethodGet, + path.String(), + nil, + r.Params.get(), + r.Header, + ) +} + +// ExplainResp represents the returned struct of the explain policy response +type ExplainResp struct { + Indices map[string]ExplainItem + TotalManagedIndices int `json:"total_managed_indices"` + response *opensearch.Response +} + +// Inspect returns the Inspect type containing the raw *opensearch.Reponse +func (r ExplainResp) Inspect() Inspect { + return Inspect{Response: r.response} +} + +// ExplainItem is a sub type of ExplainResp containing information about the policy attached to the index +type ExplainItem struct { + PluginPolicyID *string `json:"index.plugins.index_state_management.policy_id"` + OpenDistroPolicyID *string `json:"index.opendistro.index_state_management.policy_id"` + Index string `json:"index,omitempty"` + IndexUUID string `json:"index_uuid,omitempty"` + PolicyID string `json:"policy_id,omitempty"` + PolicySeqNo int `json:"policy_seq_no,omitempty"` + PolicyPrimaryTerm int `json:"policy_primary_term,omitempty"` + RolledOver bool `json:"rolled_over,omitempty"` + RolledOverIndexName string `json:"rolled_over_index_name,omitempty"` + IndexCreationDate int64 `json:"index_creation_date,omitempty"` + State *struct { + Name string `json:"name"` + StartTime int64 `json:"start_time"` + } `json:"state,omitempty"` + Action *struct { + Name string `json:"name"` + StartTime int64 `json:"start_time"` + Index int `json:"index"` + Failed bool `json:"failed"` + ConsumedRetries int `json:"consumed_retries"` + LastRetryTime int64 `json:"last_retry_time"` + } `json:"action,omitempty"` + Step *struct { + Name string `json:"name"` + StartTime int64 `json:"start_time"` + StepStatus string `json:"step_status"` + } `json:"step,omitempty"` + RetryInfo *struct { + Failed bool `json:"failed"` + ConsumedRetries int `json:"consumed_retries"` + } `json:"retry_info,omitempty"` + Info *struct { + Message string `json:"message"` + } `json:"info,omitempty"` + Enabled *bool `json:"enabled"` + Policy *PolicyBody `json:"policy,omitempty"` + Validate *struct { + Message string `json:"validation_message"` + Status string `json:"validation_status"` + } `json:"validate,omitempty"` +} + +// UnmarshalJSON is a custom unmarshal function for ExplainResp as the default Unmarshal can not handle it correctly +func (r *ExplainResp) UnmarshalJSON(b []byte) error { + var dummy struct { + index map[string]json.RawMessage + } + if err := json.Unmarshal(b, &dummy.index); err != nil { + return err + } + if r.Indices == nil { + r.Indices = make(map[string]ExplainItem) + } + for key, value := range dummy.index { + if key == "total_managed_indices" { + var intDummy int + if err := json.Unmarshal(value, &intDummy); err != nil { + return err + } + r.TotalManagedIndices = intDummy + continue + } + var itemDummy ExplainItem + if err := json.Unmarshal(value, &itemDummy); err != nil { + return err + } + r.Indices[key] = itemDummy + } + return nil +} + +// MarshalJSON is a custom marshal function for ExplainResp as the default Unmarshal can not handle it correctly +func (r *ExplainResp) MarshalJSON() ([]byte, error) { + var dummy struct { + index map[string]any + } + dummy.index = make(map[string]any) + for key, value := range r.Indices { + dummy.index[key] = value + } + dummy.index["total_managed_indices"] = r.TotalManagedIndices + return json.Marshal(dummy.index) +} diff --git a/plugins/ism/api_policies-delete.go b/plugins/ism/api_policies-delete.go new file mode 100644 index 000000000..b0460cd87 --- /dev/null +++ b/plugins/ism/api_policies-delete.go @@ -0,0 +1,55 @@ +// SPDX-License-Identifier: Apache-2.0 +// +// The OpenSearch Contributors require contributions made to +// this file be licensed under the Apache-2.0 license or a +// compatible open source license. + +package ism + +import ( + "fmt" + "net/http" + + "github.com/opensearch-project/opensearch-go/v3" +) + +// PoliciesDeleteReq represents possible options for the policies get request +type PoliciesDeleteReq struct { + Policy string + + Header http.Header +} + +// GetRequest returns the *http.Request that gets executed by the client +func (r PoliciesDeleteReq) GetRequest() (*http.Request, error) { + return opensearch.BuildRequest( + http.MethodDelete, + fmt.Sprintf("/_plugins/_ism/policies/%s", r.Policy), + nil, + make(map[string]string), + r.Header, + ) +} + +// PoliciesDeleteResp represents the returned struct of the policies get response +type PoliciesDeleteResp struct { + Index string `json:"_index"` + Type string `json:"_type"` // Deprecated with opensearch 2.0 + ID string `json:"_id"` + Version int `json:"_version"` + Result string `json:"result"` + ForcedRefresh bool `json:"forced_refresh"` + Shards struct { + Total int `json:"total"` + Successful int `json:"successful"` + Failed int `json:"failed"` + } `json:"_shards"` + SeqNo int `json:"_seq_no"` + PrimaryTerm int `json:"_primary_term"` + response *opensearch.Response +} + +// Inspect returns the Inspect type containing the raw *opensearch.Reponse +func (r PoliciesDeleteResp) Inspect() Inspect { + return Inspect{Response: r.response} +} diff --git a/plugins/ism/api_policies-get.go b/plugins/ism/api_policies-get.go new file mode 100644 index 000000000..0c4b8f86d --- /dev/null +++ b/plugins/ism/api_policies-get.go @@ -0,0 +1,57 @@ +// SPDX-License-Identifier: Apache-2.0 +// +// The OpenSearch Contributors require contributions made to +// this file be licensed under the Apache-2.0 license or a +// compatible open source license. + +package ism + +import ( + "net/http" + "strings" + + "github.com/opensearch-project/opensearch-go/v3" +) + +// PoliciesGetReq represents possible options for the policies get request +type PoliciesGetReq struct { + Policy string + + Header http.Header +} + +// GetRequest returns the *http.Request that gets executed by the client +func (r PoliciesGetReq) GetRequest() (*http.Request, error) { + var path strings.Builder + path.Grow(len("/_plugins/_ism/policies/") + len(r.Policy)) + path.WriteString("/_plugins/_ism/policies") + if len(r.Policy) > 0 { + path.WriteString("/") + path.WriteString(r.Policy) + } + + return opensearch.BuildRequest( + http.MethodGet, + path.String(), + nil, + make(map[string]string), + r.Header, + ) +} + +// PoliciesGetResp represents the returned struct of the policies get response +type PoliciesGetResp struct { + Policies []Policy `json:"policies,omitempty"` + TotalPolicies *int `json:"total_policies,omitempty"` + ID *string `json:"_id,omitempty"` + SeqNo *int `json:"_seq_no,omitempty"` + PrimaryTerm *int `json:"_primary_term,omitempty"` + Version *int `json:"_version,omitempty"` + Policy *PolicyBody `json:"policy,omitempty"` + response *opensearch.Response +} + +// Inspect returns the Inspect type containing the raw *opensearch.Reponse +func (r PoliciesGetResp) Inspect() Inspect { + return Inspect{Response: r.response} +} diff --git a/plugins/ism/api_policies-put-params.go b/plugins/ism/api_policies-put-params.go new file mode 100644 index 000000000..415fdde37 --- /dev/null +++ b/plugins/ism/api_policies-put-params.go @@ -0,0 +1,29 @@ +// SPDX-License-Identifier: Apache-2.0 +// +// The OpenSearch Contributors require contributions made to +// this file be licensed under the Apache-2.0 license or a +// compatible open source license. + +package ism + +import "strconv" + +// PoliciesPutParams represents possible parameters for the PoliciesPutReq +type PoliciesPutParams struct { + IfSeqNo *int + IfPrimaryTerm *int +} + +func (r PoliciesPutParams) get() map[string]string { + params := make(map[string]string) + + if r.IfSeqNo != nil { + params["if_seq_no"] = strconv.FormatInt(int64(*r.IfSeqNo), 10) + } + + if r.IfPrimaryTerm != nil { + params["if_primary_term"] = strconv.FormatInt(int64(*r.IfPrimaryTerm), 10) + } + + return params +} diff --git a/plugins/ism/api_policies-put.go b/plugins/ism/api_policies-put.go new file mode 100644 index 000000000..ee37be1f0 --- /dev/null +++ b/plugins/ism/api_policies-put.go @@ -0,0 +1,61 @@ +// SPDX-License-Identifier: Apache-2.0 +// +// The OpenSearch Contributors require contributions made to +// this file be licensed under the Apache-2.0 license or a +// compatible open source license. + +package ism + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + + "github.com/opensearch-project/opensearch-go/v3" +) + +// PoliciesPutReq represents possible options for the policies get request +type PoliciesPutReq struct { + Policy string + Body PoliciesPutBody + + Params PoliciesPutParams + Header http.Header +} + +// GetRequest returns the *http.Request that gets executed by the client +func (r PoliciesPutReq) GetRequest() (*http.Request, error) { + body, err := json.Marshal(r.Body) + if err != nil { + return nil, err + } + + return opensearch.BuildRequest( + http.MethodPut, + fmt.Sprintf("/_plugins/_ism/policies/%s", r.Policy), + bytes.NewReader(body), + r.Params.get(), + r.Header, + ) +} + +// PoliciesPutResp represents the returned struct of the policies get response +type PoliciesPutResp struct { + ID string `json:"_id"` + SeqNo int `json:"_seq_no"` + PrimaryTerm int `json:"_primary_term"` + Version int `json:"_version"` + Policy struct { + Policy PolicyBody `json:"policy"` + } `json:"policy"` + response *opensearch.Response +} + +// Inspect returns the Inspect type containing the raw *opensearch.Reponse +func (r PoliciesPutResp) Inspect() Inspect { + return Inspect{Response: r.response} +} + +// PoliciesPutBody represents the request body for the policies put request +type PoliciesPutBody Policy diff --git a/plugins/ism/api_policies.go b/plugins/ism/api_policies.go new file mode 100644 index 000000000..8c78f09a7 --- /dev/null +++ b/plugins/ism/api_policies.go @@ -0,0 +1,262 @@ +// SPDX-License-Identifier: Apache-2.0 +// +// The OpenSearch Contributors require contributions made to +// this file be licensed under the Apache-2.0 license or a +// compatible open source license. + +package ism + +import ( + "context" + "encoding/json" +) + +type policiesClient struct { + apiClient *Client +} + +// Get executes a get policies request with the optional PoliciesGetReq +func (c policiesClient) Get(ctx context.Context, req *PoliciesGetReq) (PoliciesGetResp, error) { + if req == nil { + req = &PoliciesGetReq{} + } + + var ( + data PoliciesGetResp + err error + ) + if data.response, err = c.apiClient.do(ctx, req, &data); err != nil { + return data, err + } + + return data, nil +} + +// Put executes a put policies request with the required PoliciesPutReq +func (c policiesClient) Put(ctx context.Context, req PoliciesPutReq) (PoliciesPutResp, error) { + var ( + data PoliciesPutResp + err error + ) + if data.response, err = c.apiClient.do(ctx, req, &data); err != nil { + return data, err + } + + return data, nil +} + +// Delete executes a delete policies request with the required PoliciesDeleteReq +func (c policiesClient) Delete(ctx context.Context, req PoliciesDeleteReq) (PoliciesDeleteResp, error) { + var ( + data PoliciesDeleteResp + err error + ) + if data.response, err = c.apiClient.do(ctx, req, &data); err != nil { + return data, err + } + + return data, nil +} + +// Policy is a sub type of PoliciesGetResp represeting information about an action group +type Policy struct { + ID string `json:"_id,omitempty"` + SeqNo *int `json:"_seq_no,omitempty"` + PrimaryTerm *int `json:"_primary_term,omitempty"` + Policy PolicyBody `json:"policy"` +} + +// PolicyBody is a sub type of Policy containing information about the policy +type PolicyBody struct { + PolicyID string `json:"policy_id,omitempty"` + Description string `json:"description,omitempty"` + LastUpdatedTime int64 `json:"last_updated_time,omitempty"` + SchemaVersion int `json:"schema_version,omitempty"` + ErrorNotification *PolicyErrorNotification `json:"error_notification,omitempty"` + DefaultState string `json:"default_state"` + States []PolicyState `json:"states"` + Template []Template `json:"ism_template,omitempty"` +} + +// PolicyErrorNotification is a sub type of PolicyBody containing information about error notification +type PolicyErrorNotification struct { + Channel string `json:"channel,omitempty"` + Destination NotificationDestination `json:"destination,omitempty"` + MessageTemplate NotificationMessageTemplate `json:"message_template"` +} + +// NotificationDestination is a sub type of PolicyErrorNotification containing information about notification destinations +type NotificationDestination struct { + Chime *NotificationDestinationURL `json:"chime,omitempty"` + Slack *NotificationDestinationURL `json:"slack,omitempty"` + CustomWebhook *NotificationDestinationCustomWebhook `json:"custom_webhook,omitempty"` +} + +// NotificationDestinationURL is sub type of NotificationDestination containing the url of the notification destination +type NotificationDestinationURL struct { + URL string `json:"url"` +} + +// NotificationDestinationCustomWebhook is a sub type of NotificationDestination containing parameters for the custom webhook destination +type NotificationDestinationCustomWebhook struct { + URL string `json:"url,omitempty"` + HeaderParams map[string]string `json:"header_params,omitempty"` + Host string `json:"host,omitempty"` + Password string `json:"password,omitempty"` + Path string `json:"path,omitempty"` + Port int `json:"port,omitempty"` + QueryParams map[string]string `json:"query_params,omitempty"` + Scheme string `json:"scheme,omitempty"` + Username string `json:"username,omitempty"` +} + +// NotificationMessageTemplate is a sub type of PolicyErrorNotification containing a pattern or string for the error message +type NotificationMessageTemplate struct { + Source string `json:"source"` + Lang string `json:"lang,omitempty"` +} + +// PolicyState uis a sub type of PolicyBody containing information about the policy state +type PolicyState struct { + Name string `json:"name"` + Actions []PolicyStateAction `json:"actions,omitempty"` + Transitions *[]PolicyStateTransition `json:"transitions,omitempty"` +} + +// PolicyStateAction is a sub type of PolicyState containing all type of policy actions +type PolicyStateAction struct { + Timeout string `json:"timeout,omitempty"` + Retry *PolicyStateRetry `json:"retry,omitempty"` + ForceMerge *PolicyStateForeMerge `json:"force_merge,omitempty"` + ReadOnly *PolicyStateReadOnly `json:"read_only,omitempty"` + ReadWrite *PolicyStateReadWrite `json:"read_write,omitempty"` + ReplicaCount *PolicyStateReplicaCount `json:"replica_count,omitempty"` + Shrink *PolicyStateShrink `json:"shrink,omitempty"` + Close *PolicyStateClose `json:"close,omitempty"` + Open *PolicyStateOpen `json:"open,omitempty"` + Delete *PolicyStateDelete `json:"delete,omitempty"` + Rollover *PolicyStateRollover `json:"rollover,omitempty"` + Notification *PolicyStateNotification `json:"notification,omitempty"` + Snapshot *PolicyStateSnapshot `json:"snapshot,omitempty"` + IndexPriority *PolicyStateIndexPriority `json:"index_priority,omitempty"` + Allocation *PolicyStateAllocation `json:"allocation,omitempty"` + Rollup *PolicyStateRollup `json:"rollup,omitempty"` +} + +// Template is a sub type of PolicyBody containing information about the ims template +type Template struct { + IndexPatterns []string `json:"index_patterns,omitempty"` + Priority int `json:"priority"` + LastUpdatedTime int64 `json:"last_updated_time,omitempty"` +} + +// PolicyStateRetry represents the retry action +type PolicyStateRetry struct { + Count int `json:"count"` + Backoff string `json:"backoff,omitempty"` + Delay string `json:"delay,omitempty"` +} + +// PolicyStateForeMerge represents the force_merge action +type PolicyStateForeMerge struct { + MaxNumSegments int `json:"max_num_segments"` + WaitForCompletion *bool `json:"wait_for_completion,omitempty"` + TaskExecutionTimeout string `json:"task_execution_timeout,omitempty"` +} + +// PolicyStateReadOnly represents the read_only action +type PolicyStateReadOnly struct{} + +// PolicyStateReadWrite represents the read_write action +type PolicyStateReadWrite struct{} + +// PolicyStateReplicaCount represents the replica_count action +type PolicyStateReplicaCount struct { + NumberOfReplicas int `json:"number_of_replicas"` +} + +// PolicyStateShrink represents the Shrink action +type PolicyStateShrink struct { + NumNewShards int `json:"num_new_shards,omitempty"` + MaxShardSize string `json:"max_shard_size,omitempty"` + PercentageOfSourceShards float32 `json:"percentage_of_source_shards,omitempty"` + TargetIndexNameTemplate *struct { + Source string `json:"source"` + Lang string `json:"lang,omitempty"` + } `json:"target_index_name_template,omitempty"` + Aliases []map[string]any `json:"aliases,omitempty"` + SwitchAliases bool `json:"switch_aliases,omitempty"` + ForceUnsafe bool `json:"force_unsafe,omitempty"` +} + +// PolicyStateClose represents the close action +type PolicyStateClose struct{} + +// PolicyStateOpen represents the open action +type PolicyStateOpen struct{} + +// PolicyStateDelete represents the delete action +type PolicyStateDelete struct{} + +// PolicyStateRollover represents the rollover action +type PolicyStateRollover struct { + MinSize string `json:"min_size,omitempty"` + MinPrimaryShardSize string `json:"min_primary_shard_size,omitempty"` + MinDocCount int `json:"min_doc_count,omitempty"` + MinIndexAge string `json:"min_index_age,omitempty"` + CopyAlias bool `json:"copy_alias,omitempty"` +} + +// PolicyStateNotification represents the notification action +type PolicyStateNotification struct { + Destination NotificationDestination `json:"destination"` + MessageTemplate NotificationMessageTemplate `json:"message_template"` +} + +// PolicyStateSnapshot represents the snapshot action +type PolicyStateSnapshot struct { + Repository string `json:"repository"` + Snapshot string `json:"snapshot"` +} + +// PolicyStateIndexPriority represents the index_priority action +type PolicyStateIndexPriority struct { + Priority int `json:"priority"` +} + +// PolicyStateAllocation represents the allocation action +type PolicyStateAllocation struct { + Require string `json:"require,omitempty"` + Include string `json:"include,omitempty"` + Exclude string `json:"exclude,omitempty"` + WaitFor string `json:"wait_for,omitempty"` +} + +// PolicyStateRollup represents the rollup action +type PolicyStateRollup struct { + ISMRollup struct { + Description string `json:"description,omitempty"` + TargetIndex string `json:"target_index"` + PageSize string `json:"page_size"` + Dimensions []json.RawMessage `json:"dimensions"` + Metrics []struct { + SourceField string `json:"source_field"` + Metrics []map[string]struct{} `json:"metrics"` + } `json:"metrics"` + } `json:"ism_rollup"` +} + +// PolicyStateTransition is a sub type of PolicyState containing information about transitios to other states +type PolicyStateTransition struct { + StateName string `json:"state_name"` + Conditions []struct { + MinIndexAge string `json:"min_index_age,omitempty"` + MinRolloverAge string `json:"min_rollover_age,omitempty"` + MinDocCount int `json:"min_doc_count,omitempty"` + MinSize string `json:"min_size,omitempty"` + Cron *struct { + Expression string `json:"expression"` + Timezone string `json:"timezone"` + } `json:"cron,omitempty"` + } `json:"conditions"` +} diff --git a/plugins/ism/api_policies_test.go b/plugins/ism/api_policies_test.go new file mode 100644 index 000000000..ce4051454 --- /dev/null +++ b/plugins/ism/api_policies_test.go @@ -0,0 +1,181 @@ +// SPDX-License-Identifier: Apache-2.0 +// +// The OpenSearch Contributors require contributions made to +// this file be licensed under the Apache-2.0 license or a +// compatible open source license. +// +//go:build integration + +package ism_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/opensearch-project/opensearch-go/v3" + ostest "github.com/opensearch-project/opensearch-go/v3/internal/test" + "github.com/opensearch-project/opensearch-go/v3/plugins/ism" + osismtest "github.com/opensearch-project/opensearch-go/v3/plugins/ism/internal/test" +) + +func TestPoliciesClient(t *testing.T) { + client, err := osismtest.NewClient() + require.Nil(t, err) + + failingClient, err := osismtest.CreateFailingClient() + require.Nil(t, err) + + var putResp ism.PoliciesPutResp + + type policiesTests struct { + Name string + Results func() (osismtest.Response, error) + } + + testCases := []struct { + Name string + Tests []policiesTests + }{ + { + Name: "Put", + Tests: []policiesTests{ + { + Name: "Create", + Results: func() (osismtest.Response, error) { + putResp, err = client.Policies.Put( + nil, + ism.PoliciesPutReq{ + Policy: "test", + Body: ism.PoliciesPutBody{ + Policy: ism.PolicyBody{ + Description: "test", + ErrorNotification: &ism.PolicyErrorNotification{ + Destination: ism.NotificationDestination{ + CustomWebhook: &ism.NotificationDestinationCustomWebhook{ + Host: "exmaple.com", + Scheme: "https", + Path: "/test", + Username: "test", + Password: "test", + HeaderParams: map[string]string{"test": "2"}, + QueryParams: map[string]string{"test": "2"}, + Port: 443, + URL: "example.com", + }, + }, + MessageTemplate: ism.NotificationMessageTemplate{ + Source: "The index {{ctx.index}} failed during policy execution.", + }, + }, + DefaultState: "test", + States: []ism.PolicyState{ + ism.PolicyState{ + Name: "test", + Actions: []ism.PolicyStateAction{ + ism.PolicyStateAction{ + Delete: &ism.PolicyStateDelete{}, + }, + }, + }, + }, + Template: []ism.Template{ + ism.Template{ + IndexPatterns: []string{"*test*"}, + Priority: 20, + }, + }, + }, + }, + }, + ) + return putResp, err + }, + }, + { + Name: "Update", + Results: func() (osismtest.Response, error) { + putResp.Policy.Policy.ErrorNotification.Destination.CustomWebhook = nil + putResp.Policy.Policy.ErrorNotification.Destination.Slack = &ism.NotificationDestinationURL{URL: "https://example.com"} + return client.Policies.Put( + nil, + ism.PoliciesPutReq{ + Policy: "test", + Params: ism.PoliciesPutParams{IfSeqNo: opensearch.ToPointer(putResp.SeqNo), IfPrimaryTerm: opensearch.ToPointer(putResp.PrimaryTerm)}, + Body: ism.PoliciesPutBody{ + Policy: putResp.Policy.Policy, + }, + }, + ) + }, + }, + { + Name: "inspect", + Results: func() (osismtest.Response, error) { + return failingClient.Policies.Put(nil, ism.PoliciesPutReq{}) + }, + }, + }, + }, + { + Name: "Get", + Tests: []policiesTests{ + { + Name: "without request", + Results: func() (osismtest.Response, error) { + return client.Policies.Get(nil, nil) + }, + }, + { + Name: "with request", + Results: func() (osismtest.Response, error) { + return client.Policies.Get(nil, &ism.PoliciesGetReq{Policy: "test"}) + }, + }, + { + Name: "inspect", + Results: func() (osismtest.Response, error) { + return failingClient.Policies.Get(nil, nil) + }, + }, + }, + }, + { + Name: "Delete", + Tests: []policiesTests{ + { + Name: "with request", + Results: func() (osismtest.Response, error) { + return client.Policies.Delete(nil, ism.PoliciesDeleteReq{Policy: "test"}) + }, + }, + { + Name: "inspect", + Results: func() (osismtest.Response, error) { + return failingClient.Policies.Delete(nil, ism.PoliciesDeleteReq{}) + }, + }, + }, + }, + } + for _, value := range testCases { + t.Run(value.Name, func(t *testing.T) { + for _, testCase := range value.Tests { + t.Run(testCase.Name, func(t *testing.T) { + res, err := testCase.Results() + if testCase.Name == "inspect" { + assert.NotNil(t, err) + assert.NotNil(t, res) + osismtest.VerifyInspect(t, res.Inspect()) + } else { + require.NoError(t, err) + require.NotNil(t, res) + assert.NotNil(t, res.Inspect().Response) + ostest.CompareRawJSONwithParsedJSON(t, res, res.Inspect().Response) + } + }) + } + }) + } +} diff --git a/plugins/ism/api_remove.go b/plugins/ism/api_remove.go new file mode 100644 index 000000000..96d29b62b --- /dev/null +++ b/plugins/ism/api_remove.go @@ -0,0 +1,68 @@ +// SPDX-License-Identifier: Apache-2.0 +// +// The OpenSearch Contributors require contributions made to +// this file be licensed under the Apache-2.0 license or a +// compatible open source license. + +package ism + +import ( + "context" + "net/http" + "strings" + + "github.com/opensearch-project/opensearch-go/v3" +) + +// Remove executes a remove policy request with the required RemoveReq +func (c Client) Remove(ctx context.Context, req RemoveReq) (RemoveResp, error) { + var ( + data RemoveResp + err error + ) + if data.response, err = c.do(ctx, req, &data); err != nil { + return data, err + } + + return data, nil +} + +// RemoveReq represents possible options for the remove policy request +type RemoveReq struct { + Indices []string + + Header http.Header +} + +// GetRequest returns the *http.Request that gets executed by the client +func (r RemoveReq) GetRequest() (*http.Request, error) { + indices := strings.Join(r.Indices, ",") + var path strings.Builder + path.Grow(len("/_plugins/_ism/remove/") + len(indices)) + path.WriteString("/_plugins/_ism/remove") + if len(r.Indices) > 0 { + path.WriteString("/") + path.WriteString(indices) + } + + return opensearch.BuildRequest( + http.MethodPost, + path.String(), + nil, + make(map[string]string), + r.Header, + ) +} + +// RemoveResp represents the returned struct of the remove policy response +type RemoveResp struct { + UpdatedIndices int `json:"updated_indices"` + Failures bool `json:"failures"` + FailedIndices []FailedIndex `json:"failed_indices"` + response *opensearch.Response +} + +// Inspect returns the Inspect type containing the raw *opensearch.Reponse +func (r RemoveResp) Inspect() Inspect { + return Inspect{Response: r.response} +} diff --git a/plugins/ism/api_retry.go b/plugins/ism/api_retry.go new file mode 100644 index 000000000..9e527bfdd --- /dev/null +++ b/plugins/ism/api_retry.go @@ -0,0 +1,86 @@ +// SPDX-License-Identifier: Apache-2.0 +// +// The OpenSearch Contributors require contributions made to +// this file be licensed under the Apache-2.0 license or a +// compatible open source license. + +package ism + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "strings" + + "github.com/opensearch-project/opensearch-go/v3" +) + +// Retry executes a retry policy request with the required RetryReq +func (c Client) Retry(ctx context.Context, req RetryReq) (RetryResp, error) { + var ( + data RetryResp + err error + ) + if data.response, err = c.do(ctx, req, &data); err != nil { + return data, err + } + + return data, nil +} + +// RetryReq represents possible options for the retry policy request +type RetryReq struct { + Indices []string + Body *RetryBody + + Header http.Header +} + +// GetRequest returns the *http.Request that gets executed by the client +func (r RetryReq) GetRequest() (*http.Request, error) { + var reqBody io.Reader + if r.Body != nil { + body, err := json.Marshal(r.Body) + if err != nil { + return nil, err + } + reqBody = bytes.NewReader(body) + } + + indices := strings.Join(r.Indices, ",") + var path strings.Builder + path.Grow(len("/_plugins/_ism/retry/") + len(indices)) + path.WriteString("/_plugins/_ism/retry") + if len(r.Indices) > 0 { + path.WriteString("/") + path.WriteString(indices) + } + + return opensearch.BuildRequest( + http.MethodPost, + path.String(), + reqBody, + make(map[string]string), + r.Header, + ) +} + +// RetryResp represents the returned struct of the retry policy response +type RetryResp struct { + UpdatedIndices int `json:"updated_indices"` + Failures bool `json:"failures"` + FailedIndices []FailedIndex `json:"failed_indices"` + response *opensearch.Response +} + +// Inspect returns the Inspect type containing the raw *opensearch.Reponse +func (r RetryResp) Inspect() Inspect { + return Inspect{Response: r.response} +} + +// RetryBody represents the request body for the retry policy request +type RetryBody struct { + State string `json:"state"` +} diff --git a/plugins/ism/api_test.go b/plugins/ism/api_test.go new file mode 100644 index 000000000..c16d375f9 --- /dev/null +++ b/plugins/ism/api_test.go @@ -0,0 +1,260 @@ +// SPDX-License-Identifier: Apache-2.0 +// +// The OpenSearch Contributors require contributions made to +// this file be licensed under the Apache-2.0 license or a +// compatible open source license. +// +//go:build integration + +package ism_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + ostest "github.com/opensearch-project/opensearch-go/v3/internal/test" + "github.com/opensearch-project/opensearch-go/v3/opensearchapi" + "github.com/opensearch-project/opensearch-go/v3/plugins/ism" + osismtest "github.com/opensearch-project/opensearch-go/v3/plugins/ism/internal/test" +) + +func TestClient(t *testing.T) { + t.Parallel() + client, err := osismtest.NewClient() + require.Nil(t, err) + + osClient, err := ostest.NewClient() + require.Nil(t, err) + + failingClient, err := osismtest.CreateFailingClient() + require.Nil(t, err) + + testPolicy := "testPolicy" + testIndex := []string{"test_policy"} + + t.Cleanup(func() { client.Policies.Delete(nil, ism.PoliciesDeleteReq{Policy: testPolicy}) }) + _, err = client.Policies.Put( + nil, + ism.PoliciesPutReq{ + Policy: testPolicy, + Body: ism.PoliciesPutBody{ + Policy: ism.PolicyBody{ + Description: "test", + ErrorNotification: &ism.PolicyErrorNotification{ + Destination: ism.NotificationDestination{ + CustomWebhook: &ism.NotificationDestinationCustomWebhook{ + Host: "exmaple.com", + Scheme: "https", + Path: "/test", + Username: "test", + Password: "test", + HeaderParams: map[string]string{"test": "2"}, + QueryParams: map[string]string{"test": "2"}, + Port: 443, + URL: "example.com", + }, + }, + MessageTemplate: ism.NotificationMessageTemplate{ + Source: "The index {{ctx.index}} failed during policy execution.", + }, + }, + DefaultState: "test", + States: []ism.PolicyState{ + ism.PolicyState{ + Name: "test", + Actions: []ism.PolicyStateAction{ + ism.PolicyStateAction{ + Delete: &ism.PolicyStateDelete{}, + }, + }, + }, + }, + Template: []ism.Template{ + ism.Template{ + IndexPatterns: []string{"test"}, + Priority: 20, + }, + }, + }, + }, + }, + ) + require.Nil(t, err) + + t.Cleanup(func() { osClient.Indices.Delete(nil, opensearchapi.IndicesDeleteReq{Indices: testIndex}) }) + _, err = osClient.Indices.Create(nil, opensearchapi.IndicesCreateReq{Index: testIndex[0]}) + require.Nil(t, err) + + type clientTests struct { + Name string + Results func() (osismtest.Response, error) + } + + waitFor := func() error { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for range ticker.C { + resp, err := client.Explain(nil, &ism.ExplainReq{Indices: testIndex}) + if err != nil { + return err + } + if resp.Indices[testIndex[0]].Info != nil && resp.Indices[testIndex[0]].Info.Message != "" { + return nil + } + } + return nil + } + testCases := []struct { + Name string + Tests []clientTests + }{ + { + Name: "Add", + Tests: []clientTests{ + { + Name: "okay", + Results: func() (osismtest.Response, error) { + return client.Add(nil, ism.AddReq{Indices: testIndex, Body: ism.AddBody{PolicyID: testPolicy}}) + }, + }, + { + Name: "failure", + Results: func() (osismtest.Response, error) { + return client.Add(nil, ism.AddReq{Indices: testIndex, Body: ism.AddBody{PolicyID: testPolicy}}) + }, + }, + { + Name: "inspect", + Results: func() (osismtest.Response, error) { + return failingClient.Add(nil, ism.AddReq{}) + }, + }, + }, + }, + { + Name: "Explain", + Tests: []clientTests{ + { + Name: "without body", + Results: func() (osismtest.Response, error) { + return client.Explain(nil, &ism.ExplainReq{Indices: testIndex}) + }, + }, + { + Name: "inspect", + Results: func() (osismtest.Response, error) { + return failingClient.Explain(nil, &ism.ExplainReq{}) + }, + }, + }, + }, + { + Name: "Change", + Tests: []clientTests{ + { + Name: "with request", + Results: func() (osismtest.Response, error) { + return client.Change(nil, ism.ChangeReq{Indices: testIndex, Body: ism.ChangeBody{PolicyID: testPolicy, State: "delete"}}) + }, + }, + { + Name: "inspect", + Results: func() (osismtest.Response, error) { + return failingClient.Change(nil, ism.ChangeReq{}) + }, + }, + }, + }, + { + Name: "Retry", + Tests: []clientTests{ + { + Name: "without body", + Results: func() (osismtest.Response, error) { + return client.Retry(nil, ism.RetryReq{Indices: testIndex}) + }, + }, + { + Name: "with body", + Results: func() (osismtest.Response, error) { + return client.Retry(nil, ism.RetryReq{Indices: testIndex, Body: &ism.RetryBody{State: "test"}}) + }, + }, + { + Name: "inspect", + Results: func() (osismtest.Response, error) { + return failingClient.Retry(nil, ism.RetryReq{}) + }, + }, + }, + }, + { + Name: "Remove", + Tests: []clientTests{ + { + Name: "with request", + Results: func() (osismtest.Response, error) { + return client.Remove(nil, ism.RemoveReq{Indices: testIndex}) + }, + }, + { + Name: "inspect", + Results: func() (osismtest.Response, error) { + return failingClient.Remove(nil, ism.RemoveReq{}) + }, + }, + }, + }, + } + for _, value := range testCases { + t.Run(value.Name, func(t *testing.T) { + for _, testCase := range value.Tests { + t.Run(testCase.Name, func(t *testing.T) { + res, err := testCase.Results() + if testCase.Name == "inspect" { + assert.NotNil(t, err) + assert.NotNil(t, res) + osismtest.VerifyInspect(t, res.Inspect()) + } else { + require.NoError(t, err) + require.NotNil(t, res) + assert.NotNil(t, res.Inspect().Response) + if value.Name != "Explain" { + ostest.CompareRawJSONwithParsedJSON(t, res, res.Inspect().Response) + } + if value.Name == "Add" && testCase.Name == "failure" { + err = waitFor() + assert.NoError(t, err) + } + } + }) + } + }) + } + t.Run("ValidateResponse", func(t *testing.T) { + t.Run("Explain", func(t *testing.T) { + resp, err := client.Explain(nil, &ism.ExplainReq{Indices: testIndex}) + assert.Nil(t, err) + assert.NotNil(t, resp) + ostest.CompareRawJSONwithParsedJSON(t, &resp, resp.Inspect().Response) + }) + t.Run("Explain with validate_action", func(t *testing.T) { + ostest.SkipIfBelowVersion(t, osClient, 2, 3, "Explain with validate_action") + resp, err := client.Explain(nil, &ism.ExplainReq{Indices: testIndex, Params: ism.ExplainParams{ShowPolicy: true, ValidateAction: true}}) + assert.Nil(t, err) + assert.NotNil(t, resp) + ostest.CompareRawJSONwithParsedJSON(t, &resp, resp.Inspect().Response) + }) + t.Run("Explain with show_policy", func(t *testing.T) { + ostest.SkipIfBelowVersion(t, osClient, 1, 2, "Explain with validate_action") + resp, err := client.Explain(nil, &ism.ExplainReq{Indices: testIndex, Params: ism.ExplainParams{ShowPolicy: true}}) + assert.Nil(t, err) + assert.NotNil(t, resp) + ostest.CompareRawJSONwithParsedJSON(t, &resp, resp.Inspect().Response) + }) + }) + +} diff --git a/plugins/ism/inspect.go b/plugins/ism/inspect.go new file mode 100644 index 000000000..2c27702b1 --- /dev/null +++ b/plugins/ism/inspect.go @@ -0,0 +1,14 @@ +// SPDX-License-Identifier: Apache-2.0 +// +// The OpenSearch Contributors require contributions made to +// this file be licensed under the Apache-2.0 license or a +// compatible open source license. + +package ism + +import "github.com/opensearch-project/opensearch-go/v3" + +// Inspect represents the struct returned by Inspect() func, its main use is to return the opensearch.Response to the user +type Inspect struct { + Response *opensearch.Response +} diff --git a/plugins/ism/internal/test/helper.go b/plugins/ism/internal/test/helper.go new file mode 100644 index 000000000..909d40804 --- /dev/null +++ b/plugins/ism/internal/test/helper.go @@ -0,0 +1,83 @@ +// SPDX-License-Identifier: Apache-2.0 +// +// The OpenSearch Contributors require contributions made to +// this file be licensed under the Apache-2.0 license or a +// compatible open source license. + +package osismtest + +import ( + "crypto/tls" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/opensearch-project/opensearch-go/v3" + ostest "github.com/opensearch-project/opensearch-go/v3/internal/test" + "github.com/opensearch-project/opensearch-go/v3/plugins/ism" +) + +// Response is a dummy interface to run tests with Inspect() +type Response interface { + Inspect() ism.Inspect +} + +// NewClient returns an opensearchapi.Client that is adjusted for the wanted test case +func NewClient() (*ism.Client, error) { + config, err := ClientConfig() + if err != nil { + return nil, err + } + if config == nil { + return ism.NewClient(ism.Config{}) + } + return ism.NewClient(*config) +} + +// ClientConfig returns an opensearchapi.Config for secure opensearch +func ClientConfig() (*ism.Config, error) { + if ostest.IsSecure() { + password, err := ostest.GetPassword() + if err != nil { + return nil, err + } + + return &ism.Config{ + Client: opensearch.Config{ + Username: "admin", + Password: password, + Addresses: []string{"https://localhost:9200"}, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + }, + }, nil + } + //nolint:nilnil // easier to test with nil rather then doing complex error handling for tests + return nil, nil +} + +// CreateFailingClient returns an ism.Client that always return 400 with an empty object as body +func CreateFailingClient() (*ism.Client, error) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Body != nil { + defer r.Body.Close() + } + w.WriteHeader(http.StatusBadRequest) + io.Copy(w, strings.NewReader(`{"status": "error", "reason": "Test Failing Client Response"}`)) + })) + + return ism.NewClient(ism.Config{Client: opensearch.Config{Addresses: []string{ts.URL}}}) +} + +// VerifyInspect validates the returned ism.Inspect type +func VerifyInspect(t *testing.T, inspect ism.Inspect) { + t.Helper() + assert.NotEmpty(t, inspect) + assert.Equal(t, http.StatusBadRequest, inspect.Response.StatusCode) + assert.NotEmpty(t, inspect.Response.Body) +}