Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: user update api concurrent request handling #3760

Merged
merged 2 commits into from
Aug 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions pkg/user/UserService.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,15 @@ import (
"go.uber.org/zap"
"net/http"
"strings"
"sync"
"time"
)

const (
ConcurrentRequestLockError = "there is an ongoing request for this user, please try after some time"
ConcurrentRequestUnlockError = "cannot block request that is not in process"
)

type UserService interface {
CreateUser(userInfo *bean.UserInfo, token string, managerAuth func(resource, token string, object string) bool) ([]*bean.UserInfo, error)
SelfRegisterUserIfNotExists(userInfo *bean.UserInfo) ([]*bean.UserInfo, error)
Expand All @@ -62,6 +68,10 @@ type UserService interface {
}

type UserServiceImpl struct {
userReqLock sync.RWMutex
//map of userId and current lock-state of their serving ability;
//if TRUE then it means that some request is ongoing & unable to serve and FALSE then it is open to serve
userReqState map[int32]bool
userAuthRepository repository2.UserAuthRepository
logger *zap.SugaredLogger
userRepository repository2.UserRepository
Expand All @@ -77,6 +87,7 @@ func NewUserServiceImpl(userAuthRepository repository2.UserAuthRepository,
userGroupRepository repository2.RoleGroupRepository,
sessionManager2 *middleware.SessionManager, userCommonService UserCommonService, userAuditService UserAuditService) *UserServiceImpl {
serviceImpl := &UserServiceImpl{
userReqState: make(map[int32]bool),
userAuthRepository: userAuthRepository,
logger: logger,
userRepository: userRepository,
Expand All @@ -89,6 +100,36 @@ func NewUserServiceImpl(userAuthRepository repository2.UserAuthRepository,
return serviceImpl
}

func (impl *UserServiceImpl) getUserReqLockStateById(userId int32) bool {
defer impl.userReqLock.RUnlock()
impl.userReqLock.RLock()
return impl.userReqState[userId]
}

// FreeUnfreeUserReqState - free sets the userId free for serving, meaning removing the lock(removing entry). Unfree locks the user for other requests
func (impl *UserServiceImpl) lockUnlockUserReqState(userId int32, lock bool) error {
var err error
defer impl.userReqLock.Unlock()
impl.userReqLock.Lock()
if lock {
//checking again if someone changed or not
if !impl.userReqState[userId] {
//available to serve, locking
impl.userReqState[userId] = true
} else {
err = &util.ApiError{Code: "409", HttpStatusCode: http.StatusConflict, UserMessage: ConcurrentRequestLockError}
}
} else {
if impl.userReqState[userId] {
//in serving state, unlocking
delete(impl.userReqState, userId)
} else {
err = &util.ApiError{Code: "409", HttpStatusCode: http.StatusConflict, UserMessage: ConcurrentRequestUnlockError}
}
}
return err
}

func (impl *UserServiceImpl) validateUserRequest(userInfo *bean.UserInfo) (bool, error) {
if len(userInfo.RoleFilters) == 1 &&
userInfo.RoleFilters[0].Team == "" && userInfo.RoleFilters[0].Environment == "" && userInfo.RoleFilters[0].Action == "" {
Expand Down Expand Up @@ -617,6 +658,23 @@ func (impl *UserServiceImpl) mergeGroups(oldGroups []string, newGroups []string)
}

func (impl *UserServiceImpl) UpdateUser(userInfo *bean.UserInfo, token string, managerAuth func(resource, token string, object string) bool) (*bean.UserInfo, bool, bool, []string, error) {
//checking if request for same user is being processed
isLocked := impl.getUserReqLockStateById(userInfo.Id)
if isLocked {
impl.logger.Errorw("received concurrent request for user update, UpdateUser", "userId", userInfo.Id)
return nil, false, false, nil, &util.ApiError{
Code: "409",
HttpStatusCode: http.StatusConflict,
UserMessage: ConcurrentRequestLockError,
}
} else {
//locking state for this user since it's ready to serve
err := impl.lockUnlockUserReqState(userInfo.Id, true)
if err != nil {
impl.logger.Errorw("error in locking, lockUnlockUserReqState", "userId", userInfo.Id)
return nil, false, false, nil, err
}
}
//validating if action user is not admin and trying to update user who has super admin polices, return 403
isUserSuperAdmin, err := impl.IsSuperAdmin(int(userInfo.Id))
if err != nil {
Expand Down Expand Up @@ -802,6 +860,12 @@ func (impl *UserServiceImpl) UpdateUser(userInfo *bean.UserInfo, token string, m
//loading policy for syncing orchestrator to casbin with newly added policies
casbin2.LoadPolicy()

err = impl.lockUnlockUserReqState(userInfo.Id, false)
if err != nil {
impl.logger.Errorw("error in unlocking, lockUnlockUserReqState", "userId", userInfo.Id)
return nil, false, false, nil, err
}

return userInfo, rolesChanged, groupsModified, restrictedGroups, nil
}

Expand Down
Loading