diff --git a/storage/mysql/queue.go b/storage/mysql/queue.go index 8ab370a..2a0be58 100644 --- a/storage/mysql/queue.go +++ b/storage/mysql/queue.go @@ -146,20 +146,28 @@ UPDATE } func (s *MySQLStorage) RetrieveNextCommand(r *mdm.Request, skipNotNow bool) (*mdm.Command, error) { - statusWhere := "status IS NULL" - if !skipNotNow { - statusWhere = `(` + statusWhere + ` OR status = 'NotNow')` - } command := new(mdm.Command) err := s.db.QueryRowContext( - r.Context, - `SELECT command_uuid, request_type, command FROM view_queue WHERE id = ? AND active = 1 AND `+statusWhere+` LIMIT 1;`, - r.ID, + r.Context, ` +SELECT c.command_uuid, c.request_type, c.command +FROM enrollment_queue AS q + INNER JOIN commands AS c + ON q.command_uuid = c.command_uuid + LEFT JOIN command_results r + ON r.command_uuid = q.command_uuid AND r.id = q.id +WHERE q.id = ? + AND q.active = 1 + AND (r.status IS NULL OR (r.status = 'NotNow' AND NOT ?)) +ORDER BY + q.priority DESC, + q.created_at +LIMIT 1;`, + r.ID, skipNotNow, ).Scan(&command.CommandUUID, &command.Command.RequestType, &command.Raw) + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return nil, nil - } return nil, err } return command, nil diff --git a/storage/mysql/schema.00009.sql b/storage/mysql/schema.00009.sql new file mode 100644 index 0000000..49380e8 --- /dev/null +++ b/storage/mysql/schema.00009.sql @@ -0,0 +1 @@ +ALTER TABLE enrollment_queue ADD INDEX (priority DESC, created_at); diff --git a/storage/mysql/schema.sql b/storage/mysql/schema.sql index e870875..7630d48 100644 --- a/storage/mysql/schema.sql +++ b/storage/mysql/schema.sql @@ -202,6 +202,8 @@ CREATE TABLE enrollment_queue ( PRIMARY KEY (id, command_uuid), + INDEX (priority DESC, created_at), + FOREIGN KEY (id) REFERENCES enrollments (id) ON DELETE CASCADE ON UPDATE CASCADE,