diff --git a/e2e/tests/k8s_api.py b/e2e/tests/k8s_api.py index 12e45f4b0..276ddfa25 100644 --- a/e2e/tests/k8s_api.py +++ b/e2e/tests/k8s_api.py @@ -20,6 +20,7 @@ def __init__(self): self.config = config.load_kube_config() self.k8s_client = client.ApiClient() + self.rbac_api = client.RbacAuthorizationV1Api() self.core_v1 = client.CoreV1Api() self.apps_v1 = client.AppsV1Api() diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 43dd467b5..5182851b4 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -129,7 +129,8 @@ def setUpClass(cls): "infrastructure-roles.yaml", "infrastructure-roles-new.yaml", "custom-team-membership.yaml", - "e2e-storage-class.yaml"]: + "e2e-storage-class.yaml", + "fes.crd.yaml"]: result = k8s.create_with_kubectl("manifests/" + filename) print("stdout: {}, stderr: {}".format(result.stdout, result.stderr)) @@ -199,6 +200,7 @@ def test_additional_owner_roles(self): self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", owner_query)), 3, "Not all additional users found in database", 10, 5) + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_additional_pod_capabilities(self): ''' @@ -1203,7 +1205,7 @@ def check_version_14(): version = p["server_version"][0:2] return version - self.evantuallyEqual(check_version_14, "14", "Version was not upgrade to 14") + self.eventuallyEqual(check_version_14, "14", "Version was not upgrade to 14") @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_persistent_volume_claim_retention_policy(self): @@ -1989,6 +1991,123 @@ def test_standby_cluster(self): "acid.zalan.do", "v1", "default", "postgresqls", "acid-standby-cluster") time.sleep(5) + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_stream_resources(self): + ''' + Create and delete fabric event streaming resources. + ''' + k8s = self.k8s + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, + "Operator does not get in sync") + leader = k8s.get_cluster_leader_pod() + + # patch ClusterRole with CRUD privileges on FES resources + cluster_role = k8s.api.rbac_api.read_cluster_role("postgres-operator") + fes_cluster_role_rule = client.V1PolicyRule( + api_groups=["zalando.org"], + resources=["fabriceventstreams"], + verbs=["create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"] + ) + cluster_role.rules.append(fes_cluster_role_rule) + k8s.api.rbac_api.patch_cluster_role("postgres-operator", cluster_role) + + # create a table in one of the database of acid-minimal-cluster + create_stream_table = """ + CREATE TABLE test_table (id int, payload jsonb); + """ + self.query_database(leader.metadata.name, "foo", create_stream_table) + + # update the manifest with the streams section + patch_streaming_config = { + "spec": { + "patroni": { + "slots": { + "manual_slot": { + "type": "physical" + } + } + }, + "streams": [ + { + "applicationId": "test-app", + "batchSize": 100, + "database": "foo", + "enableRecovery": True, + "tables": { + "test_table": { + "eventType": "test-event", + "idColumn": "id", + "payloadColumn": "payload", + "recoveryEventType": "test-event-dlq" + } + } + } + ] + } + } + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + # check if publication, slot, and fes resource are created + get_publication_query = """ + SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app'; + """ + get_slot_query = """ + SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app'; + """ + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 1, + "Publication is not created", 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 1, + "Replication slot is not created", 10, 5) + self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object( + "zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1, + "Could not find Fabric Event Stream resource", 10, 5) + + # grant create and ownership of test_table to foo_user, reset search path to default + grant_permission_foo_user = """ + GRANT CREATE ON DATABASE foo TO foo_user; + ALTER TABLE test_table OWNER TO foo_user; + ALTER ROLE foo_user RESET search_path; + """ + self.query_database(leader.metadata.name, "foo", grant_permission_foo_user) + # non-postgres user creates a publication + create_nonstream_publication = """ + CREATE PUBLICATION mypublication FOR TABLE test_table; + """ + self.query_database_with_user(leader.metadata.name, "foo", create_nonstream_publication, "foo_user") + + # remove the streams section from the manifest + patch_streaming_config_removal = { + "spec": { + "streams": [] + } + } + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config_removal) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + + # check if publication, slot, and fes resource are removed + self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object( + "zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 0, + 'Could not delete Fabric Event Stream resource', 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 0, + "Publication is not deleted", 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 0, + "Replication slot is not deleted", 10, 5) + + # check the manual_slot and mypublication should not get deleted + get_manual_slot_query = """ + SELECT * FROM pg_replication_slots WHERE slot_name = 'manual_slot'; + """ + get_nonstream_publication_query = """ + SELECT * FROM pg_publication WHERE pubname = 'mypublication'; + """ + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_manual_slot_query)), 1, + "Slot defined in patroni config is deleted", 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_nonstream_publication_query)), 1, + "Publication defined not in stream section is deleted", 10, 5) + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_taint_based_eviction(self): ''' @@ -2115,7 +2234,7 @@ def test_zz_cluster_deletion(self): self.eventuallyEqual(lambda: k8s.count_statefulsets_with_label(cluster_label), 0, "Statefulset not deleted") self.eventuallyEqual(lambda: k8s.count_deployments_with_label(cluster_label), 0, "Deployments not deleted") self.eventuallyEqual(lambda: k8s.count_pdbs_with_label(cluster_label), 0, "Pod disruption budget not deleted") - self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 7, "Secrets were deleted although disabled in config") + self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 8, "Secrets were deleted although disabled in config") self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 3, "PVCs were deleted although disabled in config") except timeout_decorator.TimeoutError: diff --git a/manifests/fes.crd.yaml b/manifests/fes.crd.yaml new file mode 100644 index 000000000..70a8c9555 --- /dev/null +++ b/manifests/fes.crd.yaml @@ -0,0 +1,23 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: fabriceventstreams.zalando.org +spec: + group: zalando.org + names: + kind: FabricEventStream + listKind: FabricEventStreamList + plural: fabriceventstreams + singular: fabriceventstream + shortNames: + - fes + categories: + - all + scope: Namespaced + versions: + - name: v1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object diff --git a/pkg/apis/zalando.org/v1/fabriceventstream.go b/pkg/apis/zalando.org/v1/fabriceventstream.go index 609f3c9bc..41bb5e80c 100644 --- a/pkg/apis/zalando.org/v1/fabriceventstream.go +++ b/pkg/apis/zalando.org/v1/fabriceventstream.go @@ -1,6 +1,7 @@ package v1 import ( + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -89,3 +90,8 @@ type DBAuth struct { UserKey string `json:"userKey,omitempty"` PasswordKey string `json:"passwordKey,omitempty"` } + +type Slot struct { + Slot map[string]string `json:"slot"` + Publication map[string]acidv1.StreamTable `json:"publication"` +} diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index cc203eef5..433e4438e 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -49,9 +49,12 @@ const ( getPublicationsSQL = `SELECT p.pubname, string_agg(pt.schemaname || '.' || pt.tablename, ', ' ORDER BY pt.schemaname, pt.tablename) FROM pg_publication p LEFT JOIN pg_publication_tables pt ON pt.pubname = p.pubname + WHERE p.pubowner = 'postgres'::regrole + AND p.pubname LIKE 'fes_%' GROUP BY p.pubname;` createPublicationSQL = `CREATE PUBLICATION "%s" FOR TABLE %s WITH (publish = 'insert, update');` alterPublicationSQL = `ALTER PUBLICATION "%s" SET TABLE %s;` + dropPublicationSQL = `DROP PUBLICATION "%s";` globalDefaultPrivilegesSQL = `SET ROLE TO "%s"; ALTER DEFAULT PRIVILEGES GRANT USAGE ON SCHEMAS TO "%s","%s"; @@ -628,6 +631,14 @@ func (c *Cluster) getPublications() (publications map[string]string, err error) return dbPublications, err } +func (c *Cluster) executeDropPublication(pubName string) error { + c.logger.Infof("dropping publication %q", pubName) + if _, err := c.pgDb.Exec(fmt.Sprintf(dropPublicationSQL, pubName)); err != nil { + return fmt.Errorf("could not execute drop publication: %v", err) + } + return nil +} + // executeCreatePublication creates new publication for given tables // The caller is responsible for opening and closing the database connection. func (c *Cluster) executeCreatePublication(pubName, tableList string) error { diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index ec4221b4b..c76523f4a 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -43,6 +43,16 @@ func (c *Cluster) updateStreams(newEventStreams *zalandov1.FabricEventStream) er return nil } +func (c *Cluster) deleteStream(stream *zalandov1.FabricEventStream) error { + c.setProcessName("deleting event stream") + + err := c.KubeClient.FabricEventStreams(stream.Namespace).Delete(context.TODO(), stream.Name, metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("could not delete event stream %q: %v", stream.Name, err) + } + return nil +} + func (c *Cluster) deleteStreams() error { c.setProcessName("deleting event streams") @@ -61,7 +71,7 @@ func (c *Cluster) deleteStreams() error { return fmt.Errorf("could not list of FabricEventStreams: %v", err) } for _, stream := range streams.Items { - err = c.KubeClient.FabricEventStreams(stream.Namespace).Delete(context.TODO(), stream.Name, metav1.DeleteOptions{}) + err := c.deleteStream(&stream) if err != nil { errors = append(errors, fmt.Sprintf("could not delete event stream %q: %v", stream.Name, err)) } @@ -85,9 +95,10 @@ func gatherApplicationIds(streams []acidv1.Stream) []string { return appIds } -func (c *Cluster) syncPublication(publication, dbName string, tables map[string]acidv1.StreamTable) error { +func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]zalandov1.Slot, slotsToSync *map[string]map[string]string) error { createPublications := make(map[string]string) alterPublications := make(map[string]string) + deletePublications := []string{} defer func() { if err := c.closeDbConn(); err != nil { @@ -97,7 +108,7 @@ func (c *Cluster) syncPublication(publication, dbName string, tables map[string] // check for existing publications if err := c.initDbConnWithName(dbName); err != nil { - return fmt.Errorf("could not init database connection") + return fmt.Errorf("could not init database connection: %v", err) } currentPublications, err := c.getPublications() @@ -105,24 +116,35 @@ func (c *Cluster) syncPublication(publication, dbName string, tables map[string] return fmt.Errorf("could not get current publications: %v", err) } - tableNames := make([]string, len(tables)) - i := 0 - for t := range tables { - tableName, schemaName := getTableSchema(t) - tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName) - i++ + for slotName, slotAndPublication := range databaseSlotsList { + tables := slotAndPublication.Publication + tableNames := make([]string, len(tables)) + i := 0 + for t := range tables { + tableName, schemaName := getTableSchema(t) + tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName) + i++ + } + sort.Strings(tableNames) + tableList := strings.Join(tableNames, ", ") + + currentTables, exists := currentPublications[slotName] + if !exists { + createPublications[slotName] = tableList + } else if currentTables != tableList { + alterPublications[slotName] = tableList + } + (*slotsToSync)[slotName] = slotAndPublication.Slot } - sort.Strings(tableNames) - tableList := strings.Join(tableNames, ", ") - currentTables, exists := currentPublications[publication] - if !exists { - createPublications[publication] = tableList - } else if currentTables != tableList { - alterPublications[publication] = tableList + // check if there is any deletion + for slotName, _ := range currentPublications { + if _, exists := databaseSlotsList[slotName]; !exists { + deletePublications = append(deletePublications, slotName) + } } - if len(createPublications)+len(alterPublications) == 0 { + if len(createPublications)+len(alterPublications)+len(deletePublications) == 0 { return nil } @@ -136,6 +158,12 @@ func (c *Cluster) syncPublication(publication, dbName string, tables map[string] return fmt.Errorf("update of publication %q failed: %v", publicationName, err) } } + for _, publicationName := range deletePublications { + (*slotsToSync)[publicationName] = nil + if err = c.executeDropPublication(publicationName); err != nil { + return fmt.Errorf("deletion of publication %q failed: %v", publicationName, err) + } + } return nil } @@ -279,56 +307,73 @@ func (c *Cluster) syncStreams() error { return nil } - slots := make(map[string]map[string]string) + databaseSlots := make(map[string]map[string]zalandov1.Slot) slotsToSync := make(map[string]map[string]string) - publications := make(map[string]map[string]acidv1.StreamTable) requiredPatroniConfig := c.Spec.Patroni if len(requiredPatroniConfig.Slots) > 0 { - slots = requiredPatroniConfig.Slots + for slotName, slotConfig := range requiredPatroniConfig.Slots { + slotsToSync[slotName] = slotConfig + } + } + + if err := c.initDbConn(); err != nil { + return fmt.Errorf("could not init database connection") + } + defer func() { + if err := c.closeDbConn(); err != nil { + c.logger.Errorf("could not close database connection: %v", err) + } + }() + listDatabases, err := c.getDatabases() + if err != nil { + return fmt.Errorf("could not get list of databases: %v", err) + } + // get database name with empty list of slot, except template0 and template1 + for dbName, _ := range listDatabases { + if dbName != "template0" && dbName != "template1" { + databaseSlots[dbName] = map[string]zalandov1.Slot{} + } } - // gather list of required slots and publications + // gather list of required slots and publications, group by database for _, stream := range c.Spec.Streams { + if _, exists := databaseSlots[stream.Database]; !exists { + c.logger.Warningf("database %q does not exist in the cluster", stream.Database) + continue + } slot := map[string]string{ "database": stream.Database, "plugin": constants.EventStreamSourcePluginType, "type": "logical", } slotName := getSlotName(stream.Database, stream.ApplicationId) - if _, exists := slots[slotName]; !exists { - slots[slotName] = slot - publications[slotName] = stream.Tables + if _, exists := databaseSlots[stream.Database][slotName]; !exists { + databaseSlots[stream.Database][slotName] = zalandov1.Slot{ + Slot: slot, + Publication: stream.Tables, + } } else { - streamTables := publications[slotName] + slotAndPublication := databaseSlots[stream.Database][slotName] + streamTables := slotAndPublication.Publication for tableName, table := range stream.Tables { if _, exists := streamTables[tableName]; !exists { streamTables[tableName] = table } } - publications[slotName] = streamTables + slotAndPublication.Publication = streamTables + databaseSlots[stream.Database][slotName] = slotAndPublication } } - // create publications to each created slot + // sync publication in a database c.logger.Debug("syncing database publications") - for publication, tables := range publications { - // but first check for existing publications - dbName := slots[publication]["database"] - err = c.syncPublication(publication, dbName, tables) + for dbName, databaseSlotsList := range databaseSlots { + err := c.syncPublication(dbName, databaseSlotsList, &slotsToSync) if err != nil { - c.logger.Warningf("could not sync publication %q in database %q: %v", publication, dbName, err) + c.logger.Warningf("could not sync publications in database %q: %v", dbName, err) continue } - slotsToSync[publication] = slots[publication] - } - - // no slots to sync = no streams defined or publications created - if len(slotsToSync) > 0 { - requiredPatroniConfig.Slots = slotsToSync - } else { - // try to delete existing stream resources - return c.deleteStreams() } c.logger.Debug("syncing logical replication slots") @@ -338,6 +383,7 @@ func (c *Cluster) syncStreams() error { } // sync logical replication slots in Patroni config + requiredPatroniConfig.Slots = slotsToSync configPatched, _, _, err := c.syncPatroniConfig(pods, requiredPatroniConfig, nil) if err != nil { c.logger.Warningf("Patroni config updated? %v - errors during config sync: %v", configPatched, err) @@ -398,6 +444,18 @@ func (c *Cluster) createOrUpdateStreams() error { } } + // check if there is any deletion + for _, stream := range streams.Items { + if !util.SliceContains(appIds, stream.Spec.ApplicationId) { + c.logger.Infof("event streams with applicationId %s do not exist in the manifest, delete it", stream.Spec.ApplicationId) + err := c.deleteStream(&stream) + if err != nil { + return fmt.Errorf("failed deleting event streams with applicationId %s: %v", stream.Spec.ApplicationId, err) + } + c.logger.Infof("event streams %q have been successfully deleted", stream.Name) + } + } + return nil } diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 63c38311b..5045a66fe 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -466,7 +466,7 @@ func TestUpdateFabricEventStream(t *testing.T) { assert.NoError(t, err) cluster.Postgresql.Spec = pgUpdated.Spec - cluster.syncStreams() + cluster.createOrUpdateStreams() streamList, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) if len(streamList.Items) > 0 || err != nil {