Skip to content

스케줄러의 전반적인 이해

ChoYG edited this page Jan 12, 2017 · 1 revision

APM의 스케줄러가 담당하는 영역

결론적으로, APM의 스케줄러에서 담당하는 영역은 딱 두가지다.

  • 주기성
  • 오버런 감지

주기성은 말 그대로 일정한 시간을 주기로, 반복적으로 태스크를 돌리는 것을 말한다. 기존의 비선점형 RTOS에서 옵션으로 주기를 정해주면 그 주기마다 실행시켜주는 것과 비슷하다. 하지만 여기서는 이게 사실상 거의 전부라는것.
예를들어 50Hz 마다(0.02초)실행하도록 설정해주면, 그때마다 지정한 함수를 반복적으로 실행시켜준다.

오버런 감지는 지정한 시간을 초과하는 경우를 감지할 수 있다는 말인데, 만약 실행시키도록 한 함수가 시간을 너무 많이 사용하여 지정한 시간을 초과하는 타임아웃이 되었다면 이를 감지해준다는 말이다.

이 두가지 이외에 스케줄러에서는 부가적으로 debug용 옵션이 있는데, _debug 라는 AP_Scheduler 의 private 맴버변수로 디버그 모드를 사용할 지 말지 결정한다. private 이기 때문에 어디선가 설정해주는 맴버함수가 존재하겠지만, 귀찮기 때문에굳이 지금 단계에서 확인해 볼 필요는 없다고 판단했기 때문에 자세히 확인해보지는 않을 것이다.
_debug 옵션을 사용하였을 때 이용할 수 있는 것은 성능체크다. 말 그대로 이 테스크가 시간을 얼마나 잡아먹고, 뭐 그런걸 확인할 수 있다는 소리다.

주기성

    for (uint8_t i=0; i<_num_tasks; i++) {
        uint16_t dt = _tick_counter - _last_run[i];
        uint16_t interval_ticks = _loop_rate_hz / _tasks[i].rate_hz;
        if (interval_ticks < 1) {
            interval_ticks = 1;
        }
        if (dt >= interval_ticks) {

            if (_task_time_allowed <= time_available) {
                // run it
                _task_time_allowed = _tasks[i].max_time_micros;
                // All but the parts related to the periodicity have been removed.
                
                _tasks[i].function();

                // record the tick counter when we ran. This drives
                // when we next run the event
                _last_run[i] = _tick_counter;

            }
        }
    }

주기적으로 함수를 실행하기 위한 부분, 주기성과 관련된 부분만 뽑아보면 위와 같다.
일단 주기성과 관련없는 부분을 전부 제거하니 코드가 확실히 간단해 졌다.

앞선 문서에서 _tick_counter가 뭐하는 녀석인지 자세히 확인하지 않았는데, 전체 loop가 한번 도는것을 확인하기 위한 장치이다. 다른 문서에서 소개했듯이 APM은 아두이노에서 시작했다. 따라서 반복적으로 실행되는 loop문이 있는데 사실상 main문이다. 그 loop문 또한 일정 주기로 돌고 있는 듯 하다.

#if APM_BUILD_TYPE(APM_BUILD_ArduCopter)
#define SCHEDULER_DEFAULT_LOOP_RATE 400
#else
#define SCHEDULER_DEFAULT_LOOP_RATE  50
#endif

AP_scheduler.cpp 의 일부분으로 기본 loop의 rate(단위는 Hz인듯)을 define 해둔 것으로 보인다.
우리는 Arducopter로 빌드 할 것이기 때문에 400Hz로 loop가 돌 것임을 짐작할 수 있다.

        uint16_t dt = _tick_counter - _last_run[i];
        uint16_t interval_ticks = _loop_rate_hz / _tasks[i].rate_hz;

dt는 측정된 시간간격을 의미하면 된다고 보면 된다. 여기서의 단위는 sec나 millisec가 아니고 loop가 한번 돌때마다 1씩 더해지는 값이다. 전체 loop문(앞서 말한 사실상의 main문)이 400Hz주기로 실행된다면 2.5ms(400의 역수)마다 _tick_counter가 1씩 더해진다. _last_run[i]은 변수명 그대로 이전에 실행된 테스크가 끝났을 때의 _tick_counter이다.

interval_ticks는 목표하는 주기로 보면 된다. 예를 들어 설명하면 _loop_rate_hz는 400일 것이며 _tasks[i].rate_hz는 함수를 등록할 때 같이 작성한 함수가 실행되는 초당 횟수(Hz)다. _tasks[i].rate_hz 값이 50Hz로 설정 되어 있었다면, 위 식에 의해 interval_ticks은 400/50인 8이 된다. 이는 _tick_counter가 8또는 그 이상이 되었을 때 지정한 함수를 실행하겠다는 의미다.

        if (dt >= interval_ticks) {

            if (_task_time_allowed <= time_available) {
                // run it
                _task_time_allowed = _tasks[i].max_time_micros;
                // All but the parts related to the periodicity have been removed.
                
                _tasks[i].function();

                // record the tick counter when we ran. This drives
                // when we next run the event
                _last_run[i] = _tick_counter;

            }
        }

주기적으로 실행되는 부분은 익숙하다면 상당히 익숙한 방식으로, 수시로 앞서 말한 dt값이 목표값에 도달했는지 확인한다.

  • dt 값이 interval_ticks(목표값)에 도달했는지 확인
  • _tasks[i].function() 실행
  • _last_run[i](마지막으로 실행된 시간으로 보면 된다.)을 현재 _tick_counter 값으로 갱신

_tasks[i].function() 는 처음 실행시킬 함수를 등록할 때, 등록된 함수를 호출하도록 되어있다. 어떤식으로 등록이 되어있는지 확인해보기 위해 이전에 보았던 예제에서 define되어있는 부분을 좀 찾아보았는데,

    .function = Functor<void>::bind<std::remove_reference<decltype(*&schedtest)>::type, &SchedTest::ins_update>(&schedtest),\
    .name = "ins_update",    .rate_hz = 50,\
    .max_time_micros = 1000}

function에 대입되는 값이 좀,,, 여러모로 보기 그렇지만 ins_update라는 함수값일 것이다.
(C++개발자들이 죽어나는 이유)
(STL이 하기 싫어지는 이유)

오버런 감지

    uint32_t run_started_usec = AP_HAL::micros();
    uint32_t now = run_started_usec;
    
    for (uint8_t i=0; i<_num_tasks; i++) {
        uint16_t dt = _tick_counter - _last_run[i];
        uint16_t interval_ticks = _loop_rate_hz / _tasks[i].rate_hz;
        if (interval_ticks < 1) {
            interval_ticks = 1;
        }
        if (dt >= interval_ticks) {
            // this task is due to run. Do we have enough time to run it?
            _task_time_allowed = _tasks[i].max_time_micros;

            if (_task_time_allowed <= time_available) {
                // run it
                _task_time_started = now;

                _tasks[i].function();

                // record the tick counter when we ran. This drives
                // when we next run the event
                _last_run[i] = _tick_counter;

                // work out how long the event actually took
                now = AP_HAL::micros();
                uint32_t time_taken = now - _task_time_started;

                if (time_taken > _task_time_allowed) {
                    // the event overran!
                    if (_debug > 4) {
                        ::printf("Scheduler overrun task[%u-%s] (%u/%u)\n",
                                 (unsigned)i,
                                 _tasks[i].name,
                                 (unsigned)time_taken,
                                 (unsigned)_task_time_allowed);
                    }
                }
                if (time_taken >= time_available) {
                    goto update_spare_ticks;
                }
                time_available -= time_taken;
            }
        }
    }

    // update number of spare microseconds
    _spare_micros += time_available;

update_spare_ticks:
    _spare_ticks++;
    if (_spare_ticks == 32) {
        _spare_ticks /= 2;
        _spare_micros /= 2;
    }

주기성과 관련된 코드에 오버런 체크 부분까지 포함하는 코드이다.
뭐 실제로 오버런(타임아웃)이 발생해도 실패 핸들러를 실행하거나 하는 행동은 하지 않고, 그냥 그런게 발생했다고 알리는 정도다. 말 그대로 '체크'만 한다.

                // work out how long the event actually took
                now = AP_HAL::micros();
                uint32_t time_taken = now - _task_time_started;

now는 현제시간, _task_time_started는 테스크의 시작시간이다. 즉, 테스크가 얼만큼 시간을 사용했는지를 나타낸다.

                if (time_taken > _task_time_allowed) {
                    // the event overran!
                    if (_debug > 4) {
                        ::printf("Scheduler overrun task[%u-%s] (%u/%u)\n",
                                 (unsigned)i,
                                 _tasks[i].name,
                                 (unsigned)time_taken,
                                 (unsigned)_task_time_allowed);
                    }
                }
                if (time_taken >= time_available) {
                    goto update_spare_ticks;
                }
                time_available -= time_taken;

_task_time_allowed는 스케줄러에 실행할 함수를 등록시 같이 쓰는 '최대 실행가능 시간'이라고 보면 되겠다.
time_available은 주기성을 보장하기 위해 계산된 '남은 실행가능 시간'으로 보면 되겠다.

  • 최대 실행가능 시간보다 더 시간을 잡아먹으면, _debug 옵션에 따라 출력을 하게 된다.
  • 남은 실행가능 시간보다 더 시간을 잡아먹으면 update_spare_ticks위치로 이동한다.

남은 실행가능 시간보다 시간을 더 잡아먹을 경우, 결론적으로 주기성이 깨지게 된다. 이를 보상하기 위한 과정이 update_spare_ticks 위치에 작성되어 있다. goto문을 이용해 올바르지 못한 상황을 예외처리 해 놨음을 볼 수 있다.

어찌보면 주기성과 관련되어 있는 부분이지만, 오버런(타임아웃) 상황에 어떻게 대처하는지 알지 못하는 상황에서 보면 뭔 헛소린지 모르게 되기 때문에 이쪽에서 다루었다.

    // update number of spare microseconds
    _spare_micros += time_available;

update_spare_ticks:
    _spare_ticks++;
    if (_spare_ticks == 32) {
        _spare_ticks /= 2;
        _spare_micros /= 2;
    }

update_spare_ticks위치에는 차를 보상하기 위한 값인 _spare_ticks을 1씩 더하고, 조건문을 이용해 뭔가를 하는데 솔직히 구체적으로 어떤 결과를 얻기 위해 위와 같은 계산을 하는지는 모르겠다(왜 2로 나누는것인지?).

_debug 옵션을 이용하는 성능체크

위 소스코드에서 이제 debug 관련 부분을 추가하면 다음과 같이 run 매써드가 완성된다.

void AP_Scheduler::run(uint32_t time_available)
{
    uint32_t run_started_usec = AP_HAL::micros();
    uint32_t now = run_started_usec;

    if (_debug > 3 && _perf_counters == nullptr) {
        _perf_counters = new AP_HAL::Util::perf_counter_t[_num_tasks];
        if (_perf_counters != nullptr) {
            for (uint8_t i=0; i<_num_tasks; i++) {
                _perf_counters[i] = hal.util->perf_alloc(AP_HAL::Util::PC_ELAPSED, _tasks[i].name);
            }
        }
    }
    
    for (uint8_t i=0; i<_num_tasks; i++) {
        uint16_t dt = _tick_counter - _last_run[i];
        uint16_t interval_ticks = _loop_rate_hz / _tasks[i].rate_hz;
        if (interval_ticks < 1) {
            interval_ticks = 1;
        }
        if (dt >= interval_ticks) {
            // this task is due to run. Do we have enough time to run it?
            _task_time_allowed = _tasks[i].max_time_micros;

            if (dt >= interval_ticks*2) {
                // we've slipped a whole run of this task!
                if (_debug > 1) {
                    ::printf("Scheduler slip task[%u-%s] (%u/%u/%u)\n",
                             (unsigned)i,
                             _tasks[i].name,
                             (unsigned)dt,
                             (unsigned)interval_ticks,
                             (unsigned)_task_time_allowed);
                }
            }

            if (_task_time_allowed <= time_available) {
                // run it
                _task_time_started = now;
                current_task = i;
                if (_debug > 3 && _perf_counters && _perf_counters[i]) {
                    hal.util->perf_begin(_perf_counters[i]);
                }
                _tasks[i].function();
                if (_debug > 3 && _perf_counters && _perf_counters[i]) {
                    hal.util->perf_end(_perf_counters[i]);
                }
                current_task = -1;

                // record the tick counter when we ran. This drives
                // when we next run the event
                _last_run[i] = _tick_counter;

                // work out how long the event actually took
                now = AP_HAL::micros();
                uint32_t time_taken = now - _task_time_started;

                if (time_taken > _task_time_allowed) {
                    // the event overran!
                    if (_debug > 4) {
                        ::printf("Scheduler overrun task[%u-%s] (%u/%u)\n",
                                 (unsigned)i,
                                 _tasks[i].name,
                                 (unsigned)time_taken,
                                 (unsigned)_task_time_allowed);
                    }
                }
                if (time_taken >= time_available) {
                    goto update_spare_ticks;
                }
                time_available -= time_taken;
            }
        }
    }

    // update number of spare microseconds
    _spare_micros += time_available;

update_spare_ticks:
    _spare_ticks++;
    if (_spare_ticks == 32) {
        _spare_ticks /= 2;
        _spare_micros /= 2;
    }
}

부분적으로 _debug 를 이용한 조건문들이 보일탠데, 이것들이 디버그(성능체크)를 위한 부분이다.
이를 위한 도구들은 APM의 Util.h에 정의되어 있는데, 다음과 같다.
```cpp
    /*
      performance counter calls - wrapper around original PX4 interface
     */
    enum perf_counter_type {
        PC_COUNT,        /**< count the number of times an event occurs */
        PC_ELAPSED,      /**< measure the time elapsed performing an event */
        PC_INTERVAL      /**< measure the interval between instances of an event */
    };
    typedef void *perf_counter_t;
    virtual perf_counter_t perf_alloc(perf_counter_type t, const char *name) { return NULL; }
    virtual void perf_begin(perf_counter_t h) {}
    virtual void perf_end(perf_counter_t h) {}
    virtual void perf_count(perf_counter_t h) {}

내가 세로 추가한 테스크가 예상 이상의 시간을 잡아먹는지 확인하기 위한 용도 정도로 이용할 수 있겠다. 필요하면 그때 좀더 살펴보는 것으로,,,