Skip to content

Commit

Permalink
add streaming namespace policy offload threshold (#381)
Browse files Browse the repository at this point in the history
Implement pulsar namespace offload threshold:
https://pulsar.apache.org/admin-rest-api/#operation/Namespaces_setOffloadThreshold

Co-authored-by: ext-devgangavkar <142803490+ext-devgangavkar@users.noreply.github.com>
  • Loading branch information
pgier and ext-devgangavkar authored Mar 26, 2024
1 parent 6ebea76 commit 538380d
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/resources/streaming_namespace.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ Optional:
- `inactive_topic_policies` (Attributes) (see [below for nested schema](#nestedatt--policies--inactive_topic_policies))
- `is_allow_auto_update_schema` (Boolean)
- `message_ttl_in_seconds` (Number)
- `offload_threshold` (Number)
- `retention_policies` (Attributes) (see [below for nested schema](#nestedatt--policies--retention_policies))
- `schema_auto_update_compatibility_strategy` (String)
- `schema_compatibility_strategy` (String)
Expand Down
8 changes: 7 additions & 1 deletion internal/provider/types_streaming_namespace_policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
policyInactiveTopicDeleteMode = "delete_mode"

policySubscriptionExpirationTimeMinutes = "subscription_expiration_time_minutes"
policyOffloadThreshold = "offload_threshold"
)

type PulsarNamespacePolicies struct {
Expand All @@ -65,6 +66,7 @@ type PulsarNamespacePolicies struct {
RetentionPolicies *PulsarNamespaceRetentionPolicies `tfsdk:"retention_policies" json:"retention_policies,omitempty"`
InactiveTopicPolicies *PulsarNamespaceInactiveTopicPolicies `tfsdk:"inactive_topic_policies" json:"inactive_topic_policies,omitempty"`
SubscriptionExpirationTimeMinutes *int64 `tfsdk:"subscription_expiration_time_minutes" json:"subscription_expiration_time_minutes,omitempty"`
OffloadThreshold *int64 `tfsdk:"offload_threshold" json:"offload_threshold,omitempty"`
}

type PulsarNamespaceRetentionPolicies struct {
Expand Down Expand Up @@ -199,6 +201,7 @@ var (
},
},
policySubscriptionExpirationTimeMinutes: int64PulsarNamespacePolicyAttribute,
policyOffloadThreshold: int64PulsarNamespacePolicyAttribute,
},
}
)
Expand Down Expand Up @@ -316,6 +319,10 @@ func setNamespacePolicies(ctx context.Context, client *pulsaradmin.ClientWithRes
resp, err := client.NamespacesSetSchemaValidationEnforced(ctx, tenant, namespace, *policies.SchemaValidationEnforced, requestEditors...)
diags.Append(HTTPResponseDiagWarn(resp, err, pulsarNamespacePolicyError(policySchemaValidationEnforced))...)
}
if policies.OffloadThreshold != nil {
resp, err := client.NamespacesSetOffloadThreshold(ctx, tenant, namespace, *policies.OffloadThreshold, requestEditors...)
diags.Append(HTTPResponseDiagWarn(resp, err, pulsarNamespacePolicyError(policyOffloadThreshold))...)
}

// Nested objects
if policies.AutoTopicCreationOverride != nil {
Expand All @@ -334,7 +341,6 @@ func setNamespacePolicies(ctx context.Context, client *pulsaradmin.ClientWithRes
resp, err := client.NamespacesSetRetention(ctx, tenant, namespace, *policies.RetentionPolicies, requestEditors...)
diags.Append(HTTPResponseDiagWarn(resp, err, pulsarNamespacePolicyError(policyRetentionPolicies))...)
}

if policies.InactiveTopicPolicies != nil {
resp, err := client.NamespacesSetInactiveTopicPolicies(ctx, tenant, namespace, *policies.InactiveTopicPolicies, requestEditors...)
diags.Append(HTTPResponseDiagWarn(resp, err, pulsarNamespacePolicyError(policyInactiveTopicPolicies))...)
Expand Down

0 comments on commit 538380d

Please sign in to comment.