diff --git a/pkg/user/UserService.go b/pkg/user/UserService.go index 73a7f795ebf..49b89149eb0 100644 --- a/pkg/user/UserService.go +++ b/pkg/user/UserService.go @@ -36,9 +36,15 @@ import ( "go.uber.org/zap" "net/http" "strings" + "sync" "time" ) +const ( + ConcurrentRequestLockError = "there is an ongoing request for this user, please try after some time" + ConcurrentRequestUnlockError = "cannot block request that is not in process" +) + type UserService interface { CreateUser(userInfo *bean.UserInfo, token string, managerAuth func(resource, token string, object string) bool) ([]*bean.UserInfo, error) SelfRegisterUserIfNotExists(userInfo *bean.UserInfo) ([]*bean.UserInfo, error) @@ -62,6 +68,10 @@ type UserService interface { } type UserServiceImpl struct { + userReqLock sync.RWMutex + //map of userId and current lock-state of their serving ability; + //if TRUE then it means that some request is ongoing & unable to serve and FALSE then it is open to serve + userReqState map[int32]bool userAuthRepository repository2.UserAuthRepository logger *zap.SugaredLogger userRepository repository2.UserRepository @@ -77,6 +87,7 @@ func NewUserServiceImpl(userAuthRepository repository2.UserAuthRepository, userGroupRepository repository2.RoleGroupRepository, sessionManager2 *middleware.SessionManager, userCommonService UserCommonService, userAuditService UserAuditService) *UserServiceImpl { serviceImpl := &UserServiceImpl{ + userReqState: make(map[int32]bool), userAuthRepository: userAuthRepository, logger: logger, userRepository: userRepository, @@ -89,6 +100,36 @@ func NewUserServiceImpl(userAuthRepository repository2.UserAuthRepository, return serviceImpl } +func (impl *UserServiceImpl) getUserReqLockStateById(userId int32) bool { + defer impl.userReqLock.RUnlock() + impl.userReqLock.RLock() + return impl.userReqState[userId] +} + +// FreeUnfreeUserReqState - free sets the userId free for serving, meaning removing the lock(removing entry). Unfree locks the user for other requests +func (impl *UserServiceImpl) lockUnlockUserReqState(userId int32, lock bool) error { + var err error + defer impl.userReqLock.Unlock() + impl.userReqLock.Lock() + if lock { + //checking again if someone changed or not + if !impl.userReqState[userId] { + //available to serve, locking + impl.userReqState[userId] = true + } else { + err = &util.ApiError{Code: "409", HttpStatusCode: http.StatusConflict, UserMessage: ConcurrentRequestLockError} + } + } else { + if impl.userReqState[userId] { + //in serving state, unlocking + delete(impl.userReqState, userId) + } else { + err = &util.ApiError{Code: "409", HttpStatusCode: http.StatusConflict, UserMessage: ConcurrentRequestUnlockError} + } + } + return err +} + func (impl *UserServiceImpl) validateUserRequest(userInfo *bean.UserInfo) (bool, error) { if len(userInfo.RoleFilters) == 1 && userInfo.RoleFilters[0].Team == "" && userInfo.RoleFilters[0].Environment == "" && userInfo.RoleFilters[0].Action == "" { @@ -617,6 +658,23 @@ func (impl *UserServiceImpl) mergeGroups(oldGroups []string, newGroups []string) } func (impl *UserServiceImpl) UpdateUser(userInfo *bean.UserInfo, token string, managerAuth func(resource, token string, object string) bool) (*bean.UserInfo, bool, bool, []string, error) { + //checking if request for same user is being processed + isLocked := impl.getUserReqLockStateById(userInfo.Id) + if isLocked { + impl.logger.Errorw("received concurrent request for user update, UpdateUser", "userId", userInfo.Id) + return nil, false, false, nil, &util.ApiError{ + Code: "409", + HttpStatusCode: http.StatusConflict, + UserMessage: ConcurrentRequestLockError, + } + } else { + //locking state for this user since it's ready to serve + err := impl.lockUnlockUserReqState(userInfo.Id, true) + if err != nil { + impl.logger.Errorw("error in locking, lockUnlockUserReqState", "userId", userInfo.Id) + return nil, false, false, nil, err + } + } //validating if action user is not admin and trying to update user who has super admin polices, return 403 isUserSuperAdmin, err := impl.IsSuperAdmin(int(userInfo.Id)) if err != nil { @@ -802,6 +860,12 @@ func (impl *UserServiceImpl) UpdateUser(userInfo *bean.UserInfo, token string, m //loading policy for syncing orchestrator to casbin with newly added policies casbin2.LoadPolicy() + err = impl.lockUnlockUserReqState(userInfo.Id, false) + if err != nil { + impl.logger.Errorw("error in unlocking, lockUnlockUserReqState", "userId", userInfo.Id) + return nil, false, false, nil, err + } + return userInfo, rolesChanged, groupsModified, restrictedGroups, nil }