Skip to content

Commit

Permalink
better way to control room start & end tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
jibon57 committed Nov 22, 2023
1 parent eda7df8 commit 4241c40
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 20 deletions.
9 changes: 6 additions & 3 deletions pkg/config/constants.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package config

import "time"

const (
RECORDER_BOT = "RECORDER_BOT"
RTMP_BOT = "RTMP_BOT"
MAX_PRELOADED_WHITEBOARD_FILE_SIZE int64 = 5 * 1000000 // limit to 5MB
RECORDER_BOT = "RECORDER_BOT"
RTMP_BOT = "RTMP_BOT"
MAX_PRELOADED_WHITEBOARD_FILE_SIZE int64 = 5 * 1000000 // limit to 5MB
WAIT_BEFORE_TRIGGER_ON_AFTER_ROOM_ENDED = 5 * time.Second
)
14 changes: 12 additions & 2 deletions pkg/models/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ type AnalyticsModel struct {
rc *redis.Client
ctx context.Context
data *plugnmeet.AnalyticsDataMsg
rs *RoomService
}

func NewAnalyticsModel() *AnalyticsModel {
return &AnalyticsModel{
rc: config.AppCnf.RDS,
ctx: context.Background(),
rs: NewRoomService(),
}
}

Expand Down Expand Up @@ -141,7 +143,7 @@ func (m *AnalyticsModel) insertEventData(key string) {
}
}

func (m *AnalyticsModel) PrepareToExportAnalytics(sid, meta string) {
func (m *AnalyticsModel) PrepareToExportAnalytics(roomId, sid, meta string) {
if config.AppCnf.AnalyticsSettings == nil || !config.AppCnf.AnalyticsSettings.Enabled {
return
}
Expand All @@ -159,9 +161,17 @@ func (m *AnalyticsModel) PrepareToExportAnalytics(sid, meta string) {
return
}

// let's wait few seconds so that all other process will finish
// let's wait a few seconds so that all other processes will finish
time.Sleep(waitBeforeProcessDuration)

// we'll check if the room is still active or not.
// this may happen when we closed the room & re-created it instantly
exist, err := m.rs.ManageActiveRoomsWithMetadata(roomId, "get", "")
if err == nil && exist != nil {
log.Infoln("this room:", roomId, "still active, so we won't process to export analytics")
return
}

if _, err := os.Stat(*config.AppCnf.AnalyticsSettings.FilesStorePath); os.IsNotExist(err) {
err = os.MkdirAll(*config.AppCnf.AnalyticsSettings.FilesStorePath, os.ModePerm)
if err != nil {
Expand Down
12 changes: 9 additions & 3 deletions pkg/models/room_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@ func NewRoomAuthModel() *RoomAuthModel {
}

func (am *RoomAuthModel) CreateRoom(r *plugnmeet.CreateRoomReq) (bool, string, *livekit.Room) {
roomDbInfo, _ := am.rm.GetRoomInfo(r.RoomId, "", 1)
exist, err := am.rs.ManageActiveRoomsWithMetadata(r.GetRoomId(), "get", "")
if err == nil && exist != nil {
// maybe this room was ended just now, so we'll wait until clean up done
log.Infoln("this room:", r.GetRoomId(), "still active, we'll wait for:", config.WAIT_BEFORE_TRIGGER_ON_AFTER_ROOM_ENDED, "before recreating it again.")
time.Sleep(config.WAIT_BEFORE_TRIGGER_ON_AFTER_ROOM_ENDED)
}

roomDbInfo, _ := am.rm.GetRoomInfo(r.RoomId, "", 1)
if roomDbInfo.Id > 0 {
rf, err := am.rs.LoadRoomInfo(r.RoomId)
if err != nil && err.Error() != "requested room does not exist" {
Expand All @@ -42,8 +48,8 @@ func (am *RoomAuthModel) CreateRoom(r *plugnmeet.CreateRoomReq) (bool, string, *
return true, "room already exists", rf
}

// we'll allow to create room again & use the same DB row
// we can just update the DB row. No need to create new one
// we'll allow creating room again & use the same DB row
// we can just update the DB row. No need to create a new one
}

// we'll set default values otherwise client got confused if data is missing
Expand Down
4 changes: 2 additions & 2 deletions pkg/models/room_duration.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ func (m *RoomDurationModel) GetRoomsWithDurationMap() map[string]RoomDurationInf
out := make(map[string]RoomDurationInfo)
for _, key := range roomsKey {
var val RoomDurationInfo
err := m.rc.HGetAll(m.ctx, key).Scan(&val)
err = m.rc.HGetAll(m.ctx, key).Scan(&val)
if err != nil {
fmt.Println(err)
log.Errorln(err)
continue
}
rId := strings.Replace(key, roomWithDurationInfoKey+":", "", 1)
Expand Down
4 changes: 1 addition & 3 deletions pkg/models/room_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ func (r *RoomService) ManageRoomWithUsersMetadata(roomId, userId, task, metadata
}

func (r *RoomService) OnAfterRoomClosed(roomId string) {
// completely remove room active users list
// completely remove a room active users list
_, err := r.ManageActiveUsersList(roomId, "", "delList", 0)
if err != nil {
log.Errorln(err)
Expand All @@ -579,8 +579,6 @@ func (r *RoomService) OnAfterRoomClosed(roomId string) {
log.Errorln(err)
}

// we'll wait a little bit before we clean up
time.Sleep(5 * time.Second)
// remove this room from an active room list
_, err = r.ManageActiveRoomsWithMetadata(roomId, "del", "")
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions pkg/models/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (w *webhookEvent) roomFinished() {
go w.sendToWebhookNotifier(event)

if event.Room.Sid != "" {
// we will only update table if the SID is not empty
// we will only update the table if the SID is not empty
room := &RoomInfo{
Sid: event.Room.Sid,
IsRunning: 0,
Expand All @@ -127,14 +127,14 @@ func (w *webhookEvent) roomFinished() {
}
}

// now we'll perform few service related tasks
// now we'll perform a few service related tasks
go func() {
// let's wait few seconds so that any pending task will finish
time.Sleep(5 * time.Second)
// let's wait a few seconds so that any pending task will finish
time.Sleep(config.WAIT_BEFORE_TRIGGER_ON_AFTER_ROOM_ENDED)
w.roomService.OnAfterRoomClosed(event.Room.Name)
}()

//we'll send message to recorder to stop
//we'll send a message to the recorder to stop
_ = w.recorderModel.SendMsgToRecorder(&plugnmeet.RecordingReq{
Task: plugnmeet.RecordingTasks_STOP,
Sid: w.event.Room.Sid,
Expand Down Expand Up @@ -193,8 +193,8 @@ func (w *webhookEvent) roomFinished() {
sm.OnAfterRoomEnded(event.Room.Name, event.Room.Sid)
}()

// finally create analytics file
go w.analyticsModel.PrepareToExportAnalytics(event.Room.Sid, event.Room.Metadata)
// finally, create the analytics file
go w.analyticsModel.PrepareToExportAnalytics(event.Room.Name, event.Room.Sid, event.Room.Metadata)

// let's delete webhook queue
go w.notifier.DeleteWebhookQueuedNotifier(event.Room.Name)
Expand Down

0 comments on commit 4241c40

Please sign in to comment.