Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(server): refactor activityStream invocations - batch #1 - user #3845

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions packages/server/modules/activitystream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,15 @@ import { Knex } from 'knex'
import {
onServerAccessRequestCreatedFactory,
onServerAccessRequestFinalizedFactory,
onServerInviteCreatedFactory,
onUserCreatedFactory
onServerInviteCreatedFactory
} from '@/modules/activitystream/services/eventListener'
import { isProjectResourceTarget } from '@/modules/serverinvites/helpers/core'
import { publish } from '@/modules/shared/utils/subscriptions'
import { isStreamAccessRequest } from '@/modules/accessrequests/repositories'
import { ServerInvitesEvents } from '@/modules/serverinvites/domain/events'
import { ProjectEvents } from '@/modules/core/domain/projects/events'
import { UserEvents } from '@/modules/core/domain/users/events'
import { AccessRequestEvents } from '@/modules/accessrequests/domain/events'
import { reportUserActivityFactory } from '@/modules/activitystream/services/userActivity'

let scheduledTask: ReturnType<ScheduleExecution> | null = null
let quitEventListeners: Optional<() => void> = undefined
Expand All @@ -53,12 +52,13 @@ const initializeEventListeners = ({
eventBus: EventBus
db: Knex
}) => {
const saveActivity = saveActivityFactory({ db })
const reportUserActivity = reportUserActivityFactory({
eventListen: eventBus.listen,
saveActivity
})
const quitCbs = [
eventBus.listen(
UserEvents.Created,
// this activity will always go in the main DB
onUserCreatedFactory({ saveActivity: saveActivityFactory({ db }) })
),
reportUserActivity(),
eventBus.listen(AccessRequestEvents.Created, async (payload) => {
if (!isStreamAccessRequest(payload.payload.request)) return
return await onServerAccessRequestCreatedFactory({
Expand Down
20 changes: 1 addition & 19 deletions packages/server/modules/activitystream/services/eventListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ import {
import {
AddStreamAccessRequestDeclinedActivity,
AddStreamAccessRequestedActivity,
AddStreamInviteSentOutActivity,
SaveActivity
AddStreamInviteSentOutActivity
} from '@/modules/activitystream/domain/operations'
import { GetStream } from '@/modules/core/domain/streams/operations'
import { UserEvents } from '@/modules/core/domain/users/events'
import {
ServerInvitesEvents,
ServerInvitesEventsPayloads
Expand All @@ -22,22 +20,6 @@ import {
} from '@/modules/serverinvites/helpers/core'
import { EventPayload } from '@/modules/shared/services/eventBus'

export const onUserCreatedFactory =
({ saveActivity }: { saveActivity: SaveActivity }) =>
async (payload: EventPayload<typeof UserEvents.Created>) => {
const { user } = payload.payload

await saveActivity({
streamId: null,
resourceType: 'user',
resourceId: user.id,
actionType: 'user_create',
userId: user.id,
info: { user },
message: 'User created'
})
}

export const onServerAccessRequestCreatedFactory =
({
addStreamAccessRequestedActivity
Expand Down
55 changes: 54 additions & 1 deletion packages/server/modules/activitystream/services/userActivity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,26 @@ import { UserUpdateInput } from '@/modules/core/graph/generated/graphql'
import { UserRecord } from '@/modules/core/helpers/types'
import { ActionTypes, ResourceTypes } from '@/modules/activitystream/helpers/types'
import { SaveActivity } from '@/modules/activitystream/domain/operations'
import { EventBusListen, EventPayload } from '@/modules/shared/services/eventBus'
import { UserEvents } from '@/modules/core/domain/users/events'

export const addUserUpdatedActivityFactory =
const addUserCreatedActivityFactory =
({ saveActivity }: { saveActivity: SaveActivity }) =>
async (payload: EventPayload<typeof UserEvents.Created>) => {
const { user } = payload.payload

await saveActivity({
streamId: null,
resourceType: 'user',
resourceId: user.id,
actionType: 'user_create',
userId: user.id,
info: { user },
message: 'User created'
})
}

const addUserUpdatedActivityFactory =
({ saveActivity }: { saveActivity: SaveActivity }) =>
async (params: {
oldUser: UserRecord
Expand All @@ -22,3 +40,38 @@ export const addUserUpdatedActivityFactory =
message: 'User updated'
})
}

const addUserDeletedActivityFactory =
(deps: { saveActivity: SaveActivity }) =>
async (params: { targetUserId: string; invokerUserId: string }) => {
const { targetUserId, invokerUserId } = params

await deps.saveActivity({
streamId: null,
resourceType: 'user',
resourceId: targetUserId,
actionType: ActionTypes.User.Delete,
userId: invokerUserId,
info: {},
message: 'User deleted'
})
}

export const reportUserActivityFactory =
(deps: { eventListen: EventBusListen; saveActivity: SaveActivity }) => () => {
const addUserDeletedActivity = addUserDeletedActivityFactory(deps)
const addUserUpdatedActivity = addUserUpdatedActivityFactory(deps)
const addUserCreatedActivity = addUserCreatedActivityFactory(deps)

const quitters = [
deps.eventListen(UserEvents.Deleted, async ({ payload }) => {
await addUserDeletedActivity(payload)
}),
deps.eventListen(UserEvents.Created, addUserCreatedActivity),
deps.eventListen(UserEvents.Updated, async ({ payload }) => {
await addUserUpdatedActivity(payload)
})
]

return () => quitters.forEach((q) => q())
}
14 changes: 13 additions & 1 deletion packages/server/modules/core/domain/users/events.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { User, UserSignUpContext } from '@/modules/core/domain/users/types'
import { UserUpdateInput } from '@/modules/core/graph/generated/graphql'
import { Optional } from '@speckle/shared'

export const userEventsNamespace = 'users' as const

export const UserEvents = {
Created: `${userEventsNamespace}.created`
Created: `${userEventsNamespace}.created`,
Deleted: `${userEventsNamespace}.deleted`,
Updated: `${userEventsNamespace}.updated`
} as const

export type UserEventsPayloads = {
Expand All @@ -15,4 +18,13 @@ export type UserEventsPayloads = {
*/
signUpCtx: Optional<UserSignUpContext>
}
[UserEvents.Deleted]: {
targetUserId: string
invokerUserId: string
}
[UserEvents.Updated]: {
oldUser: User
update: UserUpdateInput
updaterId: string
}
}
2 changes: 1 addition & 1 deletion packages/server/modules/core/domain/users/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ export type ValidateUserPassword = (params: {
password: string
}) => Promise<boolean>

export type DeleteUser = (id: string) => Promise<boolean>
export type DeleteUser = (id: string, invokerId?: string) => Promise<boolean>

export type ChangeUserRole = (params: { userId: string; role: string }) => Promise<void>

Expand Down
25 changes: 6 additions & 19 deletions packages/server/modules/core/graph/resolvers/users.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { ActionTypes } from '@/modules/activitystream/helpers/types'
import { validateScopes } from '@/modules/shared'
import zxcvbn from 'zxcvbn'
import { Roles, Scopes } from '@speckle/shared'
Expand Down Expand Up @@ -27,13 +26,11 @@ import {
} from '@/modules/serverinvites/repositories/serverInvites'
import db from '@/db/knex'
import { BadRequestError } from '@/modules/shared/errors'
import { saveActivityFactory } from '@/modules/activitystream/repositories'
import {
updateUserAndNotifyFactory,
deleteUserFactory,
changeUserRoleFactory
} from '@/modules/core/services/users/management'
import { addUserUpdatedActivityFactory } from '@/modules/activitystream/services/userActivity'
import {
deleteStreamFactory,
getUserDeletableStreamsFactory
Expand All @@ -42,16 +39,15 @@ import { dbLogger } from '@/logging/logging'
import { getAdminUsersListCollectionFactory } from '@/modules/core/services/users/legacyAdminUsersList'
import { Resolvers } from '@/modules/core/graph/generated/graphql'
import { getServerInfoFactory } from '@/modules/core/repositories/server'
import { getEventBus } from '@/modules/shared/services/eventBus'

const getUser = legacyGetUserFactory({ db })
const getUserByEmail = legacyGetUserByEmailFactory({ db })

const updateUserAndNotify = updateUserAndNotifyFactory({
getUser: getUserFactory({ db }),
updateUser: updateUserFactory({ db }),
addUserUpdatedActivity: addUserUpdatedActivityFactory({
saveActivity: saveActivityFactory({ db })
})
emitEvent: getEventBus().emit
})

const getServerInfo = getServerInfoFactory({ db })
Expand All @@ -61,7 +57,8 @@ const deleteUser = deleteUserFactory({
isLastAdminUser: isLastAdminUserFactory({ db }),
getUserDeletableStreams: getUserDeletableStreamsFactory({ db }),
deleteAllUserInvites: deleteAllUserInvitesFactory({ db }),
deleteUserRecord: deleteUserRecordFactory({ db })
deleteUserRecord: deleteUserRecordFactory({ db }),
emitEvent: getEventBus().emit
})
const getUserRole = getUserRoleFactory({ db })
const changeUserRole = changeUserRoleFactory({
Expand Down Expand Up @@ -226,7 +223,7 @@ export = {
const user = await getUserByEmail({ email: args.userConfirmation.email })
if (!user) return false

await deleteUser(user.id)
await deleteUser(user.id, context.userId)
return true
},

Expand All @@ -243,17 +240,7 @@ export = {
await throwForNotHavingServerRole(context, Roles.Server.Guest)
await validateScopes(context.scopes, Scopes.Profile.Delete)

await deleteUser(context.userId!)

await saveActivityFactory({ db })({
streamId: null,
resourceType: 'user',
resourceId: context.userId!,
actionType: ActionTypes.User.Delete,
userId: context.userId!,
info: {},
message: 'User deleted'
})
await deleteUser(context.userId!, context.userId!)

return true
},
Expand Down
27 changes: 19 additions & 8 deletions packages/server/modules/core/services/users/management.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { addUserUpdatedActivityFactory } from '@/modules/activitystream/services/userActivity'
import {
ChangeUserPassword,
ChangeUserRole,
Expand Down Expand Up @@ -56,7 +55,7 @@ export const updateUserAndNotifyFactory =
(deps: {
getUser: GetUser
updateUser: UpdateUser
addUserUpdatedActivity: ReturnType<typeof addUserUpdatedActivityFactory>
emitEvent: EventBusEmit
}): UpdateUserAndNotify =>
async (userId: string, update: UserUpdateInput) => {
const existingUser = await deps.getUser(userId)
Expand All @@ -83,10 +82,13 @@ export const updateUserAndNotifyFactory =
throw new UserUpdateError("Couldn't update user")
}

await deps.addUserUpdatedActivity({
oldUser: existingUser,
update,
updaterId: userId
await deps.emitEvent({
eventName: UserEvents.Updated,
payload: {
oldUser: existingUser,
update,
updaterId: userId
}
})

return newUser
Expand Down Expand Up @@ -264,8 +266,9 @@ export const deleteUserFactory =
getUserDeletableStreams: GetUserDeletableStreams
deleteAllUserInvites: DeleteAllUserInvites
deleteUserRecord: DeleteUserRecord
emitEvent: EventBusEmit
}): DeleteUser =>
async (id) => {
async (id, invokerId) => {
deps.logger.info('Deleting user ' + id)
const isLastAdmin = await deps.isLastAdminUser(id)
if (isLastAdmin) {
Expand All @@ -281,7 +284,15 @@ export const deleteUserFactory =
// THIS REALLY SHOULD BE A REACTION TO THE USER DELETED EVENT EMITTED HER
await deps.deleteAllUserInvites(id)

return await deps.deleteUserRecord(id)
const deleted = await deps.deleteUserRecord(id)
if (deleted) {
await deps.emitEvent({
eventName: UserEvents.Deleted,
payload: { targetUserId: id, invokerUserId: invokerId || id }
})
}

return deleted
}

export const changeUserRoleFactory =
Expand Down
8 changes: 3 additions & 5 deletions packages/server/modules/core/tests/users.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ import {
} from '@/modules/core/services/users/management'
import { validateAndCreateUserEmailFactory } from '@/modules/core/services/userEmails'
import { finalizeInvitedServerRegistrationFactory } from '@/modules/serverinvites/services/processing'
import { addUserUpdatedActivityFactory } from '@/modules/activitystream/services/userActivity'
import { dbLogger } from '@/logging/logging'
import {
storeApiTokenFactory,
Expand Down Expand Up @@ -214,9 +213,7 @@ const getUserByEmail = legacyGetUserByEmailFactory({ db })
const updateUser = updateUserAndNotifyFactory({
getUser: getUserFactory({ db }),
updateUser: updateUserFactory({ db }),
addUserUpdatedActivity: addUserUpdatedActivityFactory({
saveActivity: saveActivityFactory({ db })
})
emitEvent: getEventBus().emit
})
const updateUserPassword = changePasswordFactory({
getUser: getUserFactory({ db }),
Expand All @@ -231,7 +228,8 @@ const deleteUser = deleteUserFactory({
isLastAdminUser: isLastAdminUserFactory({ db }),
getUserDeletableStreams: getUserDeletableStreamsFactory({ db }),
deleteAllUserInvites: deleteAllUserInvitesFactory({ db }),
deleteUserRecord: deleteUserRecordFactory({ db })
deleteUserRecord: deleteUserRecordFactory({ db }),
emitEvent: getEventBus().emit
})
const changeUserRole = changeUserRoleFactory({
getServerInfo,
Expand Down
1 change: 1 addition & 0 deletions packages/server/modules/shared/services/eventBus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ export function initializeEventBus() {
export type EventBus = ReturnType<typeof initializeEventBus>
export type EventBusPayloads = EventTypes
export type EventBusEmit = EventBus['emit']
export type EventBusListen = EventBus['listen']
export type EmitArg = Parameters<EventBusEmit>[0]

let eventBus: EventBus
Expand Down