diff --git a/admin-ui/src/App.vue b/admin-ui/src/App.vue index 1eac6675f..084a72869 100644 --- a/admin-ui/src/App.vue +++ b/admin-ui/src/App.vue @@ -336,7 +336,7 @@ diff --git a/admin-ui/src/components/widgets/accounts.vue b/admin-ui/src/components/widgets/accounts.vue index baf542283..a0119908f 100644 --- a/admin-ui/src/components/widgets/accounts.vue +++ b/admin-ui/src/components/widgets/accounts.vue @@ -19,7 +19,7 @@
Created in last {{ data.period }}Created this {{ data.period }} {{ countsForPeriod[data.period] }} diff --git a/cmd/services_registry/main.go b/cmd/services_registry/main.go index 5e4718541..79476b0ae 100644 --- a/cmd/services_registry/main.go +++ b/cmd/services_registry/main.go @@ -129,7 +129,7 @@ func main() { } log.Info("Pub/Sub setted up") - server := services.NewServicesServer(log, db, ps) + server := services.NewServicesServer(log, db, ps, rbmq) iserver := instances.NewInstancesServiceServer(log, db, rbmq) for _, driver := range drivers { diff --git a/pkg/billing/billing_test.go b/pkg/billing/billing_test.go index 0a515f363..cc06d50ac 100644 --- a/pkg/billing/billing_test.go +++ b/pkg/billing/billing_test.go @@ -134,7 +134,7 @@ func TestGenerateTransactions(t *testing.T) { accountController := nograph.NewAccountsController(log, db) recordsController := nograph.NewRecordsController(log, db) nsConroller := nograph.NewNamespacesController(log, db) - srvConroller := nograph.NewServicesController(log, db) + srvConroller := nograph.NewServicesController(log, db, nil) currencyController := nograph.NewCurrencyController(log, db) ctx := context.Background() diff --git a/pkg/graph/account_delete_test.go b/pkg/graph/account_delete_test.go index 584cb4474..a5674fd33 100644 --- a/pkg/graph/account_delete_test.go +++ b/pkg/graph/account_delete_test.go @@ -45,10 +45,10 @@ func TestDeleteAccount(t *testing.T) { ac := NewAccountsController(log, db) nsc := NewNamespacesController(log, db) - instc := NewInstancesController(log, db) - igc := NewInstancesGroupsController(log, db) + instc := NewInstancesController(log, db, nil) + igc := NewInstancesGroupsController(log, db, nil) spc := NewServicesProvidersController(log, db) - srvc := NewServicesController(log, db) + srvc := NewServicesController(log, db, nil) acc, err := ac.Create(ctx, accounts.Account{Title: "test_user"}) if err != nil { @@ -115,7 +115,7 @@ func TestDeleteAccountCredentials(t *testing.T) { ac := NewAccountsController(log, db) nsc := NewNamespacesController(log, db) spc := NewServicesProvidersController(log, db) - srvc := NewServicesController(log, db) + srvc := NewServicesController(log, db, nil) acc, err := ac.Create(ctx, accounts.Account{Title: "test_user"}) if err != nil { diff --git a/pkg/graph/graphs.go b/pkg/graph/graphs.go index 10c079617..027cc9152 100644 --- a/pkg/graph/graphs.go +++ b/pkg/graph/graphs.go @@ -477,12 +477,12 @@ func ListAccounts[T Accessible]( values := val.GetStructValue().AsMap() if val, ok := values["from"]; ok { from := val.(float64) - insert += fmt.Sprintf(` FILTER node.data["%s"] >= %f`, key, from) + insert += fmt.Sprintf(` FILTER node.data["%s"] >= %f`, split[1], from) } if val, ok := values["to"]; ok { to := val.(float64) - insert += fmt.Sprintf(` FILTER node.data["%s"] <= %f`, key, to) + insert += fmt.Sprintf(` FILTER node.data["%s"] <= %f`, split[1], to) } } else if split[1] == "whmcs_id" { insert += fmt.Sprintf(` FILTER node.data["%s"] == %d`, split[1], int(val.GetNumberValue())) diff --git a/pkg/graph/instances.go b/pkg/graph/instances.go index c93fc633b..1966b488f 100644 --- a/pkg/graph/instances.go +++ b/pkg/graph/instances.go @@ -24,9 +24,11 @@ import ( "time" "github.com/arangodb/go-driver" + "github.com/rabbitmq/amqp091-go" "github.com/slntopp/nocloud/pkg/nocloud" "github.com/wI2L/jsondiff" "go.uber.org/zap" + "google.golang.org/protobuf/proto" bpb "github.com/slntopp/nocloud-proto/billing" elpb "github.com/slntopp/nocloud-proto/events_logging" @@ -57,16 +59,36 @@ type InstancesController struct { db driver.Database ig2inst driver.Collection + channel *amqp091.Channel } -func NewInstancesController(log *zap.Logger, db driver.Database) *InstancesController { +func NewInstancesController(log *zap.Logger, db driver.Database, conn *amqp091.Connection) *InstancesController { ctx := context.TODO() graph := GraphGetEnsure(log, ctx, db, schema.PERMISSIONS_GRAPH.Name) col := GetEnsureCollection(log, ctx, db, schema.INSTANCES_COL) ig2inst := GraphGetEdgeEnsure(log, ctx, graph, schema.IG2INST, schema.INSTANCES_GROUPS_COL, schema.INSTANCES_COL) - return &InstancesController{log: log.Named("InstancesController"), col: col, graph: graph, db: db, ig2inst: ig2inst} + channel, err := conn.Channel() + if err != nil { + log.Fatal("Failed to init channel", zap.Error(err)) + } + + err = channel.ExchangeDeclare( + "hooks", + "topic", + true, + false, + false, + false, + nil, + ) + + if err != nil { + log.Fatal("Failed to init exchange", zap.Error(err)) + } + + return &InstancesController{log: log.Named("InstancesController"), col: col, graph: graph, db: db, ig2inst: ig2inst, channel: channel} } func (ctrl *InstancesController) Create(ctx context.Context, group driver.DocumentID, sp string, i *pb.Instance) error { @@ -122,6 +144,26 @@ func (ctrl *InstancesController) Create(ctx context.Context, group driver.Docume return err } + c := pb.Context{ + Instance: i.GetUuid(), + Sp: sp, + Event: "create", + } + body, err := proto.Marshal(&c) + if err == nil { + err = ctrl.channel.PublishWithContext(ctx, "hooks", "ansible_hooks", false, false, amqp091.Publishing{ + ContentType: "text/plain", + DeliveryMode: amqp091.Persistent, + Body: body, + }) + + if err != nil { + log.Error("Failed to publish", zap.Error(err)) + } + } else { + log.Error("Failed to parse", zap.Error(err)) + } + return nil } @@ -256,6 +298,26 @@ func (ctrl *InstancesController) Update(ctx context.Context, sp string, inst, ol nocloud.Log(log, event) + c := pb.Context{ + Instance: inst.GetUuid(), + Sp: sp, + Event: "update", + } + body, err := proto.Marshal(&c) + if err == nil { + err = ctrl.channel.PublishWithContext(ctx, "hooks", "ansible_hooks", false, false, amqp091.Publishing{ + ContentType: "text/plain", + DeliveryMode: amqp091.Persistent, + Body: body, + }) + + if err != nil { + log.Error("Failed to publish", zap.Error(err)) + } + } else { + log.Error("Failed to parse", zap.Error(err)) + } + return nil } @@ -290,6 +352,31 @@ func (ctrl *InstancesController) Delete(ctx context.Context, group string, i *pb return err } + sp, err := ctrl.getSp(ctx, i.GetUuid()) + + if err == nil { + c := pb.Context{ + Instance: i.GetUuid(), + Sp: sp, + Event: "create", + } + body, err := proto.Marshal(&c) + if err == nil { + err = ctrl.channel.PublishWithContext(ctx, "hooks", "ansible_hooks", false, false, amqp091.Publishing{ + ContentType: "text/plain", + DeliveryMode: amqp091.Persistent, + Body: body, + }) + + if err != nil { + log.Error("Failed to publish", zap.Error(err)) + } + } else { + log.Error("Failed to parse", zap.Error(err)) + } + } else { + log.Error("Failed to get sp", zap.Error(err)) + } return nil } @@ -436,6 +523,8 @@ func (ctrl *InstancesController) ValidateBillingPlan(ctx context.Context, spUuid } func (ctrl *InstancesController) SetStatus(ctx context.Context, inst *pb.Instance, status spb.NoCloudStatus) (err error) { + log := ctrl.log.Named("SetStatus") + mask := &pb.Instance{ Status: status, } @@ -443,7 +532,38 @@ func (ctrl *InstancesController) SetStatus(ctx context.Context, inst *pb.Instanc mask.Deleted = time.Now().Unix() } _, err = ctrl.col.UpdateDocument(ctx, inst.Uuid, mask) - return err + if err != nil { + log.Error("Failed to update", zap.Error(err)) + return err + } + + sp, err := ctrl.getSp(ctx, inst.GetUuid()) + + if err == nil { + c := pb.Context{ + Instance: inst.GetUuid(), + Sp: sp, + Event: status.String(), + } + body, err := proto.Marshal(&c) + if err == nil { + err = ctrl.channel.PublishWithContext(ctx, "hooks", "ansible_hooks", false, false, amqp091.Publishing{ + ContentType: "text/plain", + DeliveryMode: amqp091.Persistent, + Body: body, + }) + + if err != nil { + log.Error("Failed to publish", zap.Error(err)) + } + } else { + log.Error("Failed to parse", zap.Error(err)) + } + } else { + log.Error("Failed to get sp", zap.Error(err)) + } + + return nil } func (ctrl *InstancesController) SetState(ctx context.Context, inst *pb.Instance, state stpb.NoCloudState) (err error) { @@ -498,3 +618,47 @@ func (ctrl *InstancesController) GetEdge(ctx context.Context, inboundNode string return edgeId, nil } + +const getSp = ` +LET inboundNode = DOCUMENT(@node) + +LET ig = LAST( + FOR ig IN 1 Inbound inboundNode + GRAPH @permissions + FILTER IS_SAME_COLLECTION(@ig, ig) + RETURN ig +) + +LET sp = LAST( + FOR sp IN 1 OUTBOUND ig + GRAPH @permissions + FILTER IS_SAME_COLLECTION(@sp, sp) + RETURN sp +) + +return sp._key +` + +func (ctrl *InstancesController) getSp(ctx context.Context, uuid string) (string, error) { + log := ctrl.log.Named("GetSp") + c, err := ctrl.db.Query(ctx, getSp, map[string]interface{}{ + "node": driver.NewDocumentID(schema.INSTANCES_COL, uuid), + "permissions": schema.PERMISSIONS_GRAPH.Name, + "ig": schema.INSTANCES_GROUPS_COL, + "sp": schema.SERVICES_PROVIDERS_COL, + }) + + if err != nil { + log.Error("Error while querying", zap.Error(err)) + return "", err + } + defer c.Close() + var sp string + _, err = c.ReadDocument(ctx, &sp) + if err != nil { + log.Error("Error while reading document", zap.Error(err)) + return "", err + } + + return sp, nil +} diff --git a/pkg/graph/instances_groups.go b/pkg/graph/instances_groups.go index 683e07d99..43ce38c63 100644 --- a/pkg/graph/instances_groups.go +++ b/pkg/graph/instances_groups.go @@ -20,6 +20,7 @@ import ( "reflect" "github.com/arangodb/go-driver" + "github.com/rabbitmq/amqp091-go" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -53,7 +54,7 @@ type InstancesGroupsController struct { ig2sp driver.Collection } -func NewInstancesGroupsController(log *zap.Logger, db driver.Database) *InstancesGroupsController { +func NewInstancesGroupsController(log *zap.Logger, db driver.Database, conn *amqp091.Connection) *InstancesGroupsController { log.Debug("New InstancesGroups Controller Creating") ctx := context.TODO() @@ -70,7 +71,7 @@ func NewInstancesGroupsController(log *zap.Logger, db driver.Database) *Instance ig2sp := GraphGetEdgeEnsure(log, ctx, graph, schema.IG2SP, schema.INSTANCES_GROUPS_COL, schema.SERVICES_PROVIDERS_COL) return &InstancesGroupsController{ - log: log.Named("InstancesGroupsController"), inst_ctrl: NewInstancesController(log, db), + log: log.Named("InstancesGroupsController"), inst_ctrl: NewInstancesController(log, db, conn), col: col, graph: graph, db: db, serv2ig: serv2ig, ig2sp: ig2sp, diff --git a/pkg/graph/services.go b/pkg/graph/services.go index 73d07e087..129de2b79 100644 --- a/pkg/graph/services.go +++ b/pkg/graph/services.go @@ -20,6 +20,7 @@ import ( "fmt" "github.com/arangodb/go-driver" + "github.com/rabbitmq/amqp091-go" "github.com/slntopp/nocloud-proto/access" hasher "github.com/slntopp/nocloud-proto/hasher" spb "github.com/slntopp/nocloud-proto/services" @@ -54,7 +55,7 @@ type ServicesController struct { db driver.Database } -func NewServicesController(log *zap.Logger, db driver.Database) ServicesController { +func NewServicesController(log *zap.Logger, db driver.Database, conn *amqp091.Connection) ServicesController { log.Debug("New Services Controller creating") ctx := context.TODO() @@ -66,7 +67,7 @@ func NewServicesController(log *zap.Logger, db driver.Database) ServicesControll }) GraphGetEdgeEnsure(log, ctx, graph, schema.NS2SERV, schema.NAMESPACES_COL, schema.SERVICES_COL) - return ServicesController{log: log, col: col, ig_ctrl: NewInstancesGroupsController(log, db), db: db} + return ServicesController{log: log, col: col, ig_ctrl: NewInstancesGroupsController(log, db, conn), db: db} } func (ctrl *ServicesController) IGController() *InstancesGroupsController { diff --git a/pkg/graph/services_test.go b/pkg/graph/services_test.go index 5decd781e..a0e1fa95a 100644 --- a/pkg/graph/services_test.go +++ b/pkg/graph/services_test.go @@ -33,7 +33,7 @@ func init() { db = connectdb.MakeDBConnection(log, arangodbHost, arangodbCred) log.Info("DB connection established") - ctrl = graph.NewServicesController(log, db) + ctrl = graph.NewServicesController(log, db, nil) } func TestCreate(t *testing.T) { diff --git a/pkg/instances/server.go b/pkg/instances/server.go index 4e02cb932..c159daf72 100644 --- a/pkg/instances/server.go +++ b/pkg/instances/server.go @@ -56,7 +56,7 @@ type InstancesServer struct { func NewInstancesServiceServer(logger *zap.Logger, db driver.Database, rbmq *amqp.Connection) *InstancesServer { log := logger.Named("instances") log.Debug("New Instances Server Creating") - ig_ctrl := graph.NewInstancesGroupsController(logger, db) + ig_ctrl := graph.NewInstancesGroupsController(logger, db, rbmq) log.Debug("Setting up StatesPubSub") s := s.NewStatesPubSub(log, &db, rbmq) diff --git a/pkg/services/services.go b/pkg/services/services.go index 6ece3ec62..980ab4d10 100644 --- a/pkg/services/services.go +++ b/pkg/services/services.go @@ -24,6 +24,7 @@ import ( "github.com/arangodb/go-driver" "github.com/cskr/pubsub" + "github.com/rabbitmq/amqp091-go" "github.com/slntopp/nocloud-proto/access" bpb "github.com/slntopp/nocloud-proto/billing" driverpb "github.com/slntopp/nocloud-proto/drivers/instance/vanilla" @@ -60,12 +61,12 @@ type ServicesServer struct { log *zap.Logger } -func NewServicesServer(_log *zap.Logger, db driver.Database, ps *pubsub.PubSub) *ServicesServer { +func NewServicesServer(_log *zap.Logger, db driver.Database, ps *pubsub.PubSub, conn *amqp091.Connection) *ServicesServer { log := _log.Named("ServicesServer") log.Debug("New Services Server Creating") return &ServicesServer{ - log: log, db: db, ctrl: graph.NewServicesController(log, db), + log: log, db: db, ctrl: graph.NewServicesController(log, db, conn), sp_ctrl: graph.NewServicesProvidersController(log, db), ns_ctrl: graph.NewNamespacesController(log, db), drivers: make(map[string]driverpb.DriverServiceClient),