From b8e48709001ffa43f6e472d95e45746c295e8f72 Mon Sep 17 00:00:00 2001 From: Nico Carl Date: Sat, 26 Nov 2022 12:55:07 +0100 Subject: [PATCH] feat: Add resource for subscriptions (#244) Co-authored-by: Enzo Cappa Co-authored-by: Cyril Gaudin --- postgresql/config.go | 4 +- postgresql/provider.go | 1 + .../resource_postgresql_subscription.go | 336 ++++++++++++++++++ .../resource_postgresql_subscription_test.go | 318 +++++++++++++++++ .../docs/r/postgresql_subscription.markdown | 34 ++ website/postgresql.erb | 3 + 6 files changed, 695 insertions(+), 1 deletion(-) create mode 100644 postgresql/resource_postgresql_subscription.go create mode 100644 postgresql/resource_postgresql_subscription_test.go create mode 100644 website/docs/r/postgresql_subscription.markdown diff --git a/postgresql/config.go b/postgresql/config.go index 8fcc9791..4e713555 100644 --- a/postgresql/config.go +++ b/postgresql/config.go @@ -104,6 +104,7 @@ var ( // publication is Supported featurePublication: semver.MustParseRange(">=10.0.0"), + // We do not support CREATE FUNCTION for Postgresql < 8.4 featureFunction: semver.MustParseRange(">=8.4.0"), // CREATE SERVER support @@ -277,7 +278,8 @@ func (c *Client) Connect() (*DBConnection, error) { db, err = postgres.Open(context.Background(), dsn) } if err != nil { - return nil, fmt.Errorf("Error connecting to PostgreSQL server %s (scheme: %s): %w", c.config.Host, c.config.Scheme, err) + errString := strings.Replace(err.Error(), c.config.Password, "XXXX", 2) + return nil, fmt.Errorf("Error connecting to PostgreSQL server %s (scheme: %s): %s", c.config.Host, c.config.Scheme, errString) } // We don't want to retain connection diff --git a/postgresql/provider.go b/postgresql/provider.go index dd3825e6..0f3e83eb 100644 --- a/postgresql/provider.go +++ b/postgresql/provider.go @@ -171,6 +171,7 @@ func Provider() *schema.Provider { "postgresql_grant_role": resourcePostgreSQLGrantRole(), "postgresql_replication_slot": resourcePostgreSQLReplicationSlot(), "postgresql_publication": resourcePostgreSQLPublication(), + "postgresql_subscription": resourcePostgreSQLSubscription(), "postgresql_physical_replication_slot": resourcePostgreSQLPhysicalReplicationSlot(), "postgresql_schema": resourcePostgreSQLSchema(), "postgresql_role": resourcePostgreSQLRole(), diff --git a/postgresql/resource_postgresql_subscription.go b/postgresql/resource_postgresql_subscription.go new file mode 100644 index 00000000..62bfac3e --- /dev/null +++ b/postgresql/resource_postgresql_subscription.go @@ -0,0 +1,336 @@ +package postgresql + +import ( + "database/sql" + "fmt" + "log" + "strings" + + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" + "github.com/lib/pq" +) + +func resourcePostgreSQLSubscription() *schema.Resource { + return &schema.Resource{ + Create: PGResourceFunc(resourcePostgreSQLSubscriptionCreate), + Read: PGResourceFunc(resourcePostgreSQLSubscriptionRead), + Delete: PGResourceFunc(resourcePostgreSQLSubscriptionDelete), + Exists: PGResourceExistsFunc(resourcePostgreSQLSubscriptionExists), + Importer: &schema.ResourceImporter{StateContext: schema.ImportStatePassthroughContext}, + + Schema: map[string]*schema.Schema{ + "name": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + Description: "The name of the subscription", + ValidateFunc: validation.StringIsNotEmpty, + }, + "database": { + Type: schema.TypeString, + Optional: true, + Computed: true, + ForceNew: true, + Description: "Sets the database to add the subscription for", + }, + "conninfo": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + Sensitive: true, + Description: "The connection string to the publisher. It should follow the keyword/value format (https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING)", + ValidateFunc: validation.StringIsNotEmpty, + }, + "publications": { + Type: schema.TypeSet, + Required: true, + ForceNew: true, + Elem: &schema.Schema{Type: schema.TypeString}, + Description: "Names of the publications on the publisher to subscribe to", + }, + "create_slot": { + Type: schema.TypeBool, + Optional: true, + ForceNew: true, + Default: true, + Description: "Specifies whether the command should create the replication slot on the publisher", + }, + "slot_name": { + Type: schema.TypeString, + Optional: true, + ForceNew: true, + Description: "Name of the replication slot to use. The default behavior is to use the name of the subscription for the slot name", + ValidateFunc: validation.StringIsNotEmpty, + }, + }, + } +} + +func resourcePostgreSQLSubscriptionCreate(db *DBConnection, d *schema.ResourceData) error { + subName := d.Get("name").(string) + databaseName := getDatabaseForSubscription(d, db.client.databaseName) + + publications, err := getPublicationsForSubscription(d) + if err != nil { + return fmt.Errorf("could not get publications: %w", err) + } + connInfo, err := getConnInfoForSubscription(d) + if err != nil { + return fmt.Errorf("could not get conninfo: %w", err) + } + + optionalParams := getOptionalParameters(d) + + // Creating of a subscription can not be done in an transaction + client := db.client.config.NewClient(databaseName) + conn, err := client.Connect() + if err != nil { + return fmt.Errorf("could not establish database connection: %w", err) + } + + sql := fmt.Sprintf("CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s %s;", + pq.QuoteIdentifier(subName), + pq.QuoteLiteral(connInfo), + publications, + optionalParams, + ) + if _, err := conn.Exec(sql); err != nil { + return fmt.Errorf("could not execute sql: %w", err) + } + + d.SetId(generateSubscriptionID(d, databaseName)) + + return resourcePostgreSQLSubscriptionReadImpl(db, d) +} + +func resourcePostgreSQLSubscriptionRead(db *DBConnection, d *schema.ResourceData) error { + return resourcePostgreSQLSubscriptionReadImpl(db, d) +} + +func resourcePostgreSQLSubscriptionReadImpl(db *DBConnection, d *schema.ResourceData) error { + databaseName, subName, err := getDBSubscriptionName(d, db.client) + if err != nil { + return fmt.Errorf("could not get subscription name: %w", err) + } + + txn, err := startTransaction(db.client, databaseName) + if err != nil { + return fmt.Errorf("could not start transaction: %w", err) + } + defer deferredRollback(txn) + + var publications []string + var connInfo string + var slotName string + + var subExists bool + queryExists := "SELECT TRUE FROM pg_catalog.pg_stat_subscription WHERE subname = $1" + err = txn.QueryRow(queryExists, pqQuoteLiteral(subName)).Scan(&subExists) + if err != nil { + return fmt.Errorf("Failed to check subscription: %w", err) + } + + if !subExists { + log.Printf("[WARN] PostgreSQL Subscription (%s) not found for database %s", subName, databaseName) + d.SetId("") + return nil + } + + // pg_subscription requires superuser permissions, it is okay to fail here + query := "SELECT subconninfo, subpublications, subslotname FROM pg_catalog.pg_subscription WHERE subname = $1" + err = txn.QueryRow(query, pqQuoteLiteral(subName)).Scan(&connInfo, pq.Array(&publications), &slotName) + + if err != nil { + // we already checked that the subscription exists + connInfo, err := getConnInfoForSubscription(d) + if err != nil { + return fmt.Errorf("could not get conninfo: %w", err) + } + d.Set("conninfo", connInfo) + + setPublications, ok := d.GetOk("publications") + if !ok { + return fmt.Errorf("Attribute publications is not set") + } + publications := setPublications.(*schema.Set).List() + d.Set("publications", publications) + } else { + d.Set("conninfo", connInfo) + d.Set("publications", publications) + } + d.Set("name", subName) + d.Set("database", databaseName) + d.SetId(generateSubscriptionID(d, databaseName)) + + createSlot, okCreate := d.GetOkExists("create_slot") //nolint:staticcheck + if okCreate { + d.Set("create_slot", createSlot.(bool)) + } + _, okSlotName := d.GetOk("slot_name") + if okSlotName { + d.Set("slot_name", slotName) + } + + return nil +} + +func resourcePostgreSQLSubscriptionDelete(db *DBConnection, d *schema.ResourceData) error { + subName := d.Get("name").(string) + createSlot := d.Get("create_slot").(bool) + + databaseName := getDatabaseForSubscription(d, db.client.databaseName) + + // Dropping a subscription can not be done in a transaction + client := db.client.config.NewClient(databaseName) + conn, err := client.Connect() + if err != nil { + return fmt.Errorf("could not establish database connection: %w", err) + } + + // disable subscription and unset the slot before dropping in order to keep the replication slot + if !createSlot { + sql := fmt.Sprintf("ALTER SUBSCRIPTION %s DISABLE", pq.QuoteIdentifier(subName)) + if _, err := conn.Exec(sql); err != nil { + return fmt.Errorf("could not execute sql: %w", err) + } + sql = fmt.Sprintf("ALTER SUBSCRIPTION %s SET (slot_name = NONE)", pq.QuoteIdentifier(subName)) + if _, err := conn.Exec(sql); err != nil { + return fmt.Errorf("could not execute sql: %w", err) + } + } + + sql := fmt.Sprintf("DROP SUBSCRIPTION %s", pq.QuoteIdentifier(subName)) + + if _, err := conn.Exec(sql); err != nil { + return fmt.Errorf("could not execute sql: %w", err) + } + + d.SetId("") + + return nil +} + +func resourcePostgreSQLSubscriptionExists(db *DBConnection, d *schema.ResourceData) (bool, error) { + var subName string + + database, subName, err := getDBSubscriptionName(d, db.client) + if err != nil { + return false, err + } + + // Check if the database exists + exists, err := dbExists(db, database) + if err != nil || !exists { + return false, err + } + + txn, err := startTransaction(db.client, database) + if err != nil { + return false, err + } + defer deferredRollback(txn) + + query := "SELECT subname from pg_catalog.pg_stat_subscription WHERE subname = $1" + err = txn.QueryRow(query, pqQuoteLiteral(subName)).Scan(&subName) + + switch { + case err == sql.ErrNoRows: + return false, nil + case err != nil: + return false, err + } + + return true, nil +} + +func getPublicationsForSubscription(d *schema.ResourceData) (string, error) { + var publicationsString string + setPublications, ok := d.GetOk("publications") + + if !ok { + return publicationsString, fmt.Errorf("Attribute publications is not set") + } + + publications := setPublications.(*schema.Set).List() + var plist []string + if elem, ok := isUniqueArr(publications); !ok { + return publicationsString, fmt.Errorf("'%s' is duplicated for attribute publications", elem.(string)) + } + for _, p := range publications { + plist = append(plist, pq.QuoteIdentifier(p.(string))) + } + + return strings.Join(plist, ", "), nil +} + +func getConnInfoForSubscription(d *schema.ResourceData) (string, error) { + var connInfo string + setConnInfo, ok := d.GetOk("conninfo") + if !ok { + return connInfo, fmt.Errorf("Attribute conninfo is not set") + } + return setConnInfo.(string), nil +} + +func generateSubscriptionID(d *schema.ResourceData, databaseName string) string { + return strings.Join([]string{databaseName, d.Get("name").(string)}, ".") +} + +func getDatabaseForSubscription(d *schema.ResourceData, databaseName string) string { + if v, ok := d.GetOk("database"); ok { + databaseName = v.(string) + } + + return databaseName +} + +// getDBSubscriptionName returns database and subscription name. If we are importing this +// resource, they will be parsed from the resource ID (it will return an error if parsing failed) +// otherwise they will be simply get from the state. +func getDBSubscriptionName(d *schema.ResourceData, client *Client) (string, string, error) { + database := getDatabaseForSubscription(d, client.databaseName) + subName := d.Get("name").(string) + + // When importing, we have to parse the ID to find subscription and database names. + if subName == "" { + parsed := strings.Split(d.Id(), ".") + if len(parsed) != 2 { + return "", "", fmt.Errorf("Subscription ID %s has not the expected format 'database.subscriptionName': %v", d.Id(), parsed) + } + database = parsed[0] + subName = parsed[1] + } + + return database, subName, nil +} + +// slotName and createSlot require recreation of the subscription, only return WITH ... +func getOptionalParameters(d *schema.ResourceData) string { + parameterSQLTemplate := "WITH (%s)" + returnValue := "" + + createSlot, okCreate := d.GetOkExists("create_slot") //nolint:staticcheck + slotName, okName := d.GetOk("slot_name") + + if !okCreate && !okName { + // use default behavior, no WITH statement + return "" + } + + var params []string + if okCreate { + params = append(params, fmt.Sprintf("%s = %t", "create_slot", createSlot.(bool))) + } + if okName { + params = append(params, fmt.Sprintf("%s = %s", "slot_name", pq.QuoteLiteral(slotName.(string)))) + } + + returnValue = fmt.Sprintf(parameterSQLTemplate, strings.Join(params, ", ")) + return returnValue +} + +func getSubscriptionNameFromID(ID string) string { + splitted := strings.Split(ID, ".") + return splitted[0] +} diff --git a/postgresql/resource_postgresql_subscription_test.go b/postgresql/resource_postgresql_subscription_test.go new file mode 100644 index 00000000..f4e4d085 --- /dev/null +++ b/postgresql/resource_postgresql_subscription_test.go @@ -0,0 +1,318 @@ +package postgresql + +import ( + "database/sql" + "fmt" + "testing" + "time" + + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + "github.com/hashicorp/terraform-plugin-sdk/v2/terraform" +) + +func testAccCheckPostgresqlSubscriptionDestroy(s *terraform.State) error { + client := testAccProvider.Meta().(*Client) + + for _, rs := range s.RootModule().Resources { + if rs.Type != "postgresql_subscription" { + continue + } + + databaseName, ok := rs.Primary.Attributes["database"] + if !ok { + return fmt.Errorf("No Attribute for database is set") + } + txn, err := startTransaction(client, databaseName) + if err != nil { + return err + } + defer deferredRollback(txn) + + exists, err := checkSubscriptionExists(txn, getSubscriptionNameFromID(rs.Primary.ID)) + + if err != nil { + return fmt.Errorf("Error checking subscription %s", err) + } + + if exists { + return fmt.Errorf("Subscription still exists after destroy") + } + + streams, err := checkSubscriptionStreams(txn, getSubscriptionNameFromID(rs.Primary.ID)) + + if err != nil { + return fmt.Errorf("Error checking subscription %s", err) + } + + if streams { + return fmt.Errorf("Subscription still streams after destroy") + } + } + + return nil +} + +func checkSubscriptionExists(txn *sql.Tx, subName string) (bool, error) { + var _rez bool + err := txn.QueryRow("SELECT TRUE from pg_catalog.pg_subscription WHERE subname=$1", subName).Scan(&_rez) + + switch { + case err == sql.ErrNoRows: + return false, nil + case err != nil: + return false, fmt.Errorf("Error reading info about subscription: %s", err) + } + + return true, nil +} + +func checkSubscriptionStreams(txn *sql.Tx, subName string) (bool, error) { + var _rez bool + err := txn.QueryRow("SELECT TRUE from pg_catalog.pg_stat_replication WHERE application_name=$1 and state='streaming'", subName).Scan(&_rez) + + switch { + case err == sql.ErrNoRows: + return false, nil + case err != nil: + return false, fmt.Errorf("Error reading info about subscription: %s", err) + } + + return true, nil +} + +func testAccCheckPostgresqlSubscriptionExists(n string) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[n] + if !ok { + return fmt.Errorf("Resource not found: %s", n) + } + + if rs.Primary.ID == "" { + return fmt.Errorf("No ID is set") + } + + databaseName, ok := rs.Primary.Attributes["database"] + if !ok { + return fmt.Errorf("No Attribute for database is set") + } + + subName, ok := rs.Primary.Attributes["name"] + if !ok { + return fmt.Errorf("No Attribute for subscription name is set") + } + + client := testAccProvider.Meta().(*Client) + txn, err := startTransaction(client, databaseName) + + if err != nil { + return err + } + defer deferredRollback(txn) + + exists, err := checkSubscriptionExists(txn, subName) + + if err != nil { + return fmt.Errorf("Error checking subscription %s", err) + } + + if !exists { + return fmt.Errorf("Subscription not found") + } + + streams, err := checkSubscriptionStreams(txn, subName) + if err != nil { + return fmt.Errorf("Error checking subscription %s", err) + } + if !streams { + return fmt.Errorf("Subscription not streaming") + } + + return nil + } +} + +func getConnInfo(t *testing.T, dbName string) string { + dbConfig := getTestConfig(t) + + return fmt.Sprintf( + `host=%s port=%d dbname=%s user=%s password=%s`, + dbConfig.Host, + 5432, + dbName, + dbConfig.Username, + dbConfig.Password, + ) +} + +// The database seems to take a few second to cleanup everything +func coolDown() { + time.Sleep(5 * time.Second) +} + +func TestAccPostgresqlSubscription_Basic(t *testing.T) { + skipIfNotAcc(t) + + dbSuffixPub, teardownPub := setupTestDatabase(t, true, true) + dbSuffixSub, teardownSub := setupTestDatabase(t, true, true) + + defer teardownPub() + defer teardownSub() + testTables := []string{"test_schema.test_table_1"} + createTestTables(t, dbSuffixPub, testTables, "") + createTestTables(t, dbSuffixSub, testTables, "") + + dbNamePub, _ := getTestDBNames(dbSuffixPub) + dbNameSub, _ := getTestDBNames(dbSuffixSub) + + conninfo := getConnInfo(t, dbNamePub) + + subName := "subscription" + testAccPostgresqlSubscriptionDatabaseConfig := fmt.Sprintf(` + resource "postgresql_publication" "test_pub" { + name = "test_publication" + database = "%s" + tables = ["test_schema.test_table_1"] + } + resource "postgresql_replication_slot" "test_replication_slot" { + name = "%s" + database = "%s" + plugin = "pgoutput" + } + resource "postgresql_subscription" "test_sub" { + name = postgresql_replication_slot.test_replication_slot.name + database = "%s" + conninfo = "%s" + publications = [ postgresql_publication.test_pub.name ] + create_slot = false + } + `, dbNamePub, subName, dbNamePub, dbNameSub, conninfo) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { + testAccPreCheck(t) + testSuperuserPreCheck(t) + }, + Providers: testAccProviders, + CheckDestroy: testAccCheckPostgresqlSubscriptionDestroy, + Steps: []resource.TestStep{ + { + Config: testAccPostgresqlSubscriptionDatabaseConfig, + Check: resource.ComposeTestCheckFunc( + testAccCheckPostgresqlSubscriptionExists( + "postgresql_subscription.test_sub"), + resource.TestCheckResourceAttr( + "postgresql_subscription.test_sub", + "name", + subName), + resource.TestCheckResourceAttr( + "postgresql_subscription.test_sub", + "database", + dbNameSub), + resource.TestCheckResourceAttr( + "postgresql_subscription.test_sub", + "conninfo", + conninfo), + resource.TestCheckResourceAttr( + "postgresql_subscription.test_sub", + fmt.Sprintf("%s.#", "publications"), + "1"), + resource.TestCheckResourceAttr( + "postgresql_subscription.test_sub", + fmt.Sprintf("%s.0", "publications"), + "test_publication"), + resource.TestCheckResourceAttr( + "postgresql_subscription.test_sub", + "create_slot", + "false"), + ), + }, + }, + }, + ) + coolDown() +} + +func TestAccPostgresqlSubscription_CustomSlotName(t *testing.T) { + skipIfNotAcc(t) + + dbSuffixPub, teardownPub := setupTestDatabase(t, true, true) + dbSuffixSub, teardownSub := setupTestDatabase(t, true, true) + + defer teardownPub() + defer teardownSub() + + dbNamePub, _ := getTestDBNames(dbSuffixPub) + dbNameSub, _ := getTestDBNames(dbSuffixSub) + + conninfo := getConnInfo(t, dbNamePub) + + subName := "subscription" + testAccPostgresqlSubscriptionDatabaseConfig := fmt.Sprintf(` + resource "postgresql_publication" "test_pub" { + name = "test_publication" + database = "%s" + } + resource "postgresql_replication_slot" "test_replication_slot" { + name = "custom_slot_name" + plugin = "pgoutput" + database = "%s" + } + resource "postgresql_subscription" "test_sub" { + name = "%s" + database = "%s" + conninfo = "%s" + publications = [ postgresql_publication.test_pub.name ] + create_slot = false + slot_name = "custom_slot_name" + + depends_on = [ postgresql_replication_slot.test_replication_slot ] + } + `, dbNamePub, dbNamePub, subName, dbNameSub, conninfo) + resource.Test(t, resource.TestCase{ + PreCheck: func() { + testAccPreCheck(t) + testSuperuserPreCheck(t) + }, + Providers: testAccProviders, + CheckDestroy: testAccCheckPostgresqlSubscriptionDestroy, + Steps: []resource.TestStep{ + { + Config: testAccPostgresqlSubscriptionDatabaseConfig, + Check: resource.ComposeTestCheckFunc( + testAccCheckPostgresqlSubscriptionExists( + "postgresql_subscription.test_sub"), + resource.TestCheckResourceAttr( + "postgresql_subscription.test_sub", + "name", + subName), + resource.TestCheckResourceAttr( + "postgresql_subscription.test_sub", + "database", + dbNameSub), + resource.TestCheckResourceAttr( + "postgresql_subscription.test_sub", + "conninfo", + conninfo), + resource.TestCheckResourceAttr( + "postgresql_subscription.test_sub", + fmt.Sprintf("%s.#", "publications"), + "1"), + resource.TestCheckResourceAttr( + "postgresql_subscription.test_sub", + fmt.Sprintf("%s.0", "publications"), + "test_publication"), + resource.TestCheckResourceAttr( + "postgresql_subscription.test_sub", + "create_slot", + "false"), + resource.TestCheckResourceAttr( + "postgresql_subscription.test_sub", + "slot_name", + "custom_slot_name"), + ), + }, + }, + }, + ) + coolDown() +} diff --git a/website/docs/r/postgresql_subscription.markdown b/website/docs/r/postgresql_subscription.markdown new file mode 100644 index 00000000..7a063a60 --- /dev/null +++ b/website/docs/r/postgresql_subscription.markdown @@ -0,0 +1,34 @@ +--- +layout: "postgresql" +page_title: "PostgreSQL: postgresql_susbcription" +sidebar_current: "docs-postgresql-resource-postgresql_subscription" +description: |- +Creates and manages a subscription in a PostgreSQL server database. +--- + +# postgresql_subscription + +The `postgresql_subscription` resource creates and manages a publication on a PostgreSQL +server. + +## Usage + +```hcl +resource "postgresql_subscription" "subscription" { + name = "subscription" + conninfo = "host=localhost port=5432 dbname=mydb user=postgres password=postgres" + publications = ["publication"] +} +``` + +## Argument Reference + +- `name` - (Required) The name of the publication. +- `conninfo` - (Required) The connection string to the publisher. It should follow the [keyword/value format](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING) +- `publications` - (Required) Names of the publications on the publisher to subscribe to +- `database` - (Optional) Which database to create the subscription on. Defaults to provider database. +- `create_slot` - (Optional) Specifies whether the command should create the replication slot on the publisher. Default behavior is true +- `slot_name` - (Optional) Name of the replication slot to use. The default behavior is to use the name of the subscription for the slot name + +## Postgres documentation +- https://www.postgresql.org/docs/current/sql-createsubscription.html \ No newline at end of file diff --git a/website/postgresql.erb b/website/postgresql.erb index 8421842f..ef8ef1bc 100644 --- a/website/postgresql.erb +++ b/website/postgresql.erb @@ -34,6 +34,9 @@ > postgresql_publication + > + postgresql_subscription + > postgresql_role