Skip to content

Commit

Permalink
updated lock/unlock for concurrent request handling, user update (#3760
Browse files Browse the repository at this point in the history
…) (#390)
  • Loading branch information
kartik-579 authored Aug 14, 2023
1 parent 1a7d725 commit 482cdbb
Showing 1 changed file with 64 additions and 0 deletions.
64 changes: 64 additions & 0 deletions pkg/user/UserService.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,15 @@ import (
"go.uber.org/zap"
"net/http"
"strings"
"sync"
"time"
)

const (
ConcurrentRequestLockError = "there is an ongoing request for this user, please try after some time"
ConcurrentRequestUnlockError = "cannot block request that is not in process"
)

type UserService interface {
CreateUser(userInfo *bean.UserInfo, token string, managerAuth func(resource, token string, object string) bool) ([]*bean.UserInfo, error)
SelfRegisterUserIfNotExists(userInfo *bean.UserInfo) ([]*bean.UserInfo, error)
Expand All @@ -63,6 +69,10 @@ type UserService interface {
}

type UserServiceImpl struct {
userReqLock sync.RWMutex
//map of userId and current lock-state of their serving ability;
//if TRUE then it means that some request is ongoing & unable to serve and FALSE then it is open to serve
userReqState map[int32]bool
userAuthRepository repository2.UserAuthRepository
logger *zap.SugaredLogger
userRepository repository2.UserRepository
Expand All @@ -78,6 +88,7 @@ func NewUserServiceImpl(userAuthRepository repository2.UserAuthRepository,
userGroupRepository repository2.RoleGroupRepository,
sessionManager2 *middleware.SessionManager, userCommonService UserCommonService, userAuditService UserAuditService) *UserServiceImpl {
serviceImpl := &UserServiceImpl{
userReqState: make(map[int32]bool),
userAuthRepository: userAuthRepository,
logger: logger,
userRepository: userRepository,
Expand All @@ -90,6 +101,36 @@ func NewUserServiceImpl(userAuthRepository repository2.UserAuthRepository,
return serviceImpl
}

func (impl *UserServiceImpl) getUserReqLockStateById(userId int32) bool {
defer impl.userReqLock.RUnlock()
impl.userReqLock.RLock()
return impl.userReqState[userId]
}

// FreeUnfreeUserReqState - free sets the userId free for serving, meaning removing the lock(removing entry). Unfree locks the user for other requests
func (impl *UserServiceImpl) lockUnlockUserReqState(userId int32, lock bool) error {
var err error
defer impl.userReqLock.Unlock()
impl.userReqLock.Lock()
if lock {
//checking again if someone changed or not
if !impl.userReqState[userId] {
//available to serve, locking
impl.userReqState[userId] = true
} else {
err = &util.ApiError{Code: "409", HttpStatusCode: http.StatusConflict, UserMessage: ConcurrentRequestLockError}
}
} else {
if impl.userReqState[userId] {
//in serving state, unlocking
delete(impl.userReqState, userId)
} else {
err = &util.ApiError{Code: "409", HttpStatusCode: http.StatusConflict, UserMessage: ConcurrentRequestUnlockError}
}
}
return err
}

func (impl UserServiceImpl) validateUserRequest(userInfo *bean.UserInfo) (bool, error) {
if len(userInfo.RoleFilters) == 1 &&
userInfo.RoleFilters[0].Team == "" && userInfo.RoleFilters[0].Environment == "" && userInfo.RoleFilters[0].Action == "" {
Expand Down Expand Up @@ -627,6 +668,23 @@ func (impl UserServiceImpl) mergeGroups(oldGroups []string, newGroups []string)
}

func (impl UserServiceImpl) UpdateUser(userInfo *bean.UserInfo, token string, managerAuth func(resource, token string, object string) bool) (*bean.UserInfo, bool, bool, []string, error) {
//checking if request for same user is being processed
isLocked := impl.getUserReqLockStateById(userInfo.Id)
if isLocked {
impl.logger.Errorw("received concurrent request for user update, UpdateUser", "userId", userInfo.Id)
return nil, false, false, nil, &util.ApiError{
Code: "409",
HttpStatusCode: http.StatusConflict,
UserMessage: ConcurrentRequestLockError,
}
} else {
//locking state for this user since it's ready to serve
err := impl.lockUnlockUserReqState(userInfo.Id, true)
if err != nil {
impl.logger.Errorw("error in locking, lockUnlockUserReqState", "userId", userInfo.Id)
return nil, false, false, nil, err
}
}
//validating if action user is not admin and trying to update user who has super admin polices, return 403
isUserSuperAdmin, err := impl.IsSuperAdmin(int(userInfo.Id))
if err != nil {
Expand Down Expand Up @@ -818,6 +876,12 @@ func (impl UserServiceImpl) UpdateUser(userInfo *bean.UserInfo, token string, ma
//loading policy for syncing orchestrator to casbin with newly added policies
casbin2.LoadPolicy()

err = impl.lockUnlockUserReqState(userInfo.Id, false)
if err != nil {
impl.logger.Errorw("error in unlocking, lockUnlockUserReqState", "userId", userInfo.Id)
return nil, false, false, nil, err
}

return userInfo, rolesChanged, groupsModified, restrictedGroups, nil
}

Expand Down

0 comments on commit 482cdbb

Please sign in to comment.