Skip to content

Commit

Permalink
feat: IO.TaskState (#4097)
Browse files Browse the repository at this point in the history
Adds `IO.getTaskState` which returns the state of a `Task` in the Lean
runtime's task manager. The `TaskState` inductive has 3 constructors:
`waiting`, `running`, and `finished`. The `waiting` constructor
encompasses the waiting and queued states within the C task object
documentation, because the task object does not provide a low cost way
to distinguish these different forms of waiting. Furthermore, it seems
unlikely for consumers to wish to distinguish between these internal
states. The `running` constructor encompasses both the running and
promised states in C docs. While not ideal, the C implementation does
not provide a way to distinguish between a running `Task` and a waiting
`Promise.result` (they both have null closures).

(cherry picked from commit 25e94f9)
  • Loading branch information
tydeu authored and kim-em committed May 12, 2024
1 parent a4d7e81 commit 890ae12
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 8 deletions.
38 changes: 37 additions & 1 deletion src/Init/System/IO.lean
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,44 @@ def sleep (ms : UInt32) : BaseIO Unit :=
/-- Request cooperative cancellation of the task. The task must explicitly call `IO.checkCanceled` to react to the cancellation. -/
@[extern "lean_io_cancel"] opaque cancel : @& Task α → BaseIO Unit

/-- The current state of a `Task` in the Lean runtime's task manager. -/
inductive TaskState
/--
The `Task` is waiting to be run.
It can be waiting for dependencies to complete or
sitting in the task manager queue waiting for a thread to run on.
-/
| waiting
/--
The `Task` is actively running on a thread or,
in the case of a `Promise`, waiting for a call to `IO.Promise.resolve`.
-/
| running
/--
The `Task` has finished running and its result is available.
Calling `Task.get` or `IO.wait` on the task will not block.
-/
| finished
deriving Inhabited, Repr, DecidableEq, Ord

instance : LT TaskState := ltOfOrd
instance : LE TaskState := leOfOrd
instance : Min TaskState := minOfLe
instance : Max TaskState := maxOfLe

protected def TaskState.toString : TaskState → String
| .waiting => "waiting"
| .running => "running"
| .finished => "finished"

instance : ToString TaskState := ⟨TaskState.toString⟩

/-- Returns current state of the `Task` in the Lean runtime's task manager. -/
@[extern "lean_io_get_task_state"] opaque getTaskState : @& Task α → BaseIO TaskState

/-- Check if the task has finished execution, at which point calling `Task.get` will return immediately. -/
@[extern "lean_io_has_finished"] opaque hasFinished : @& Task α → BaseIO Bool
@[inline] def hasFinished (task : Task α) : BaseIO Bool := do
return (← getTaskState task) matches .finished

/-- Wait for the task to finish, then return its result. -/
@[extern "lean_io_wait"] opaque wait (t : Task α) : BaseIO α :=
Expand Down
4 changes: 2 additions & 2 deletions src/include/lean/lean.h
Original file line number Diff line number Diff line change
Expand Up @@ -1110,8 +1110,8 @@ static inline lean_obj_res lean_task_get_own(lean_obj_arg t) {
LEAN_EXPORT bool lean_io_check_canceled_core(void);
/* primitive for implementing `IO.cancel : Task a -> IO Unit` */
LEAN_EXPORT void lean_io_cancel_core(b_lean_obj_arg t);
/* primitive for implementing `IO.hasFinished : Task a -> IO Unit` */
LEAN_EXPORT bool lean_io_has_finished_core(b_lean_obj_arg t);
/* primitive for implementing `IO.getTaskState : Task a -> IO TaskState` */
LEAN_EXPORT uint8_t lean_io_get_task_state_core(b_lean_obj_arg t);
/* primitive for implementing `IO.waitAny : List (Task a) -> IO (Task a)` */
LEAN_EXPORT b_lean_obj_res lean_io_wait_any_core(b_lean_obj_arg task_list);

Expand Down
4 changes: 2 additions & 2 deletions src/runtime/io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1066,8 +1066,8 @@ extern "C" LEAN_EXPORT obj_res lean_io_cancel(b_obj_arg t, obj_arg) {
return io_result_mk_ok(box(0));
}

extern "C" LEAN_EXPORT obj_res lean_io_has_finished(b_obj_arg t, obj_arg) {
return io_result_mk_ok(box(lean_io_has_finished_core(t)));
extern "C" LEAN_EXPORT obj_res lean_io_get_task_state(b_obj_arg t, obj_arg) {
return io_result_mk_ok(box(lean_io_get_task_state_core(t)));
}

extern "C" LEAN_EXPORT obj_res lean_io_wait(obj_arg t, obj_arg) {
Expand Down
13 changes: 11 additions & 2 deletions src/runtime/object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1030,8 +1030,17 @@ extern "C" LEAN_EXPORT void lean_io_cancel_core(b_obj_arg t) {
g_task_manager->cancel(lean_to_task(t));
}

extern "C" LEAN_EXPORT bool lean_io_has_finished_core(b_obj_arg t) {
return lean_to_task(t)->m_value != nullptr;
extern "C" LEAN_EXPORT uint8_t lean_io_get_task_state_core(b_obj_arg t) {
lean_task_object * o = lean_to_task(t);
if (o->m_imp) {
if (o->m_imp->m_closure) {
return 0; // waiting (waiting/queued)
} else {
return 1; // running (running/promised)
}
} else {
return 2; // finished
}
}

extern "C" LEAN_EXPORT b_obj_res lean_io_wait_any_core(b_obj_arg task_list) {
Expand Down
2 changes: 1 addition & 1 deletion src/runtime/object.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ inline b_obj_res task_get(b_obj_arg t) { return lean_task_get(t); }

inline bool io_check_canceled_core() { return lean_io_check_canceled_core(); }
inline void io_cancel_core(b_obj_arg t) { return lean_io_cancel_core(t); }
inline bool io_has_finished_core(b_obj_arg t) { return lean_io_has_finished_core(t); }
inline bool io_get_task_state_core(b_obj_arg t) { return lean_io_get_task_state_core(t); }
inline b_obj_res io_wait_any_core(b_obj_arg task_list) { return lean_io_wait_any_core(task_list); }

// =======================================
Expand Down
25 changes: 25 additions & 0 deletions tests/lean/run/taskState.lean
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
def assertBEq [BEq α] [ToString α] (caption : String) (actual expected : α) : IO Unit := do
unless actual == expected do
throw <| IO.userError <|
s!"{caption}: expected '{expected}', got '{actual}'"

def test : IO Unit := do
let p1 : IO.Promise Unit ← IO.Promise.new -- resolving queues the task
let p2 : IO.Promise Unit ← IO.Promise.new -- resolved once task is running
let p3 : IO.Promise Unit ← IO.Promise.new -- resolving finishes the task
let t ← BaseIO.mapTask (fun () => do p2.resolve (); IO.wait p3.result) p1.result
assertBEq "p1" (← IO.getTaskState p1.result) .running
assertBEq "p2" (← IO.getTaskState p2.result) .running
assertBEq "p3" (← IO.getTaskState p3.result) .running
assertBEq "t" (← IO.getTaskState t) .waiting
p1.resolve ()
assertBEq "p1" (← IO.getTaskState p1.result) .finished
IO.wait p2.result
assertBEq "p2" (← IO.getTaskState p2.result) .finished
assertBEq "t" (← IO.getTaskState t) .running
p3.resolve ()
assertBEq "p3" (← IO.getTaskState p3.result) .finished
IO.wait t
assertBEq "t" (← IO.getTaskState t) .finished

#eval test

0 comments on commit 890ae12

Please sign in to comment.