Skip to content

Commit

Permalink
feat+experimental: Added experimental and flaky human-in-the-loop int…
Browse files Browse the repository at this point in the history
  • Loading branch information
anirbanbasu committed Oct 6, 2024
1 parent 7fba269 commit c31d0bd
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 124 deletions.
1 change: 0 additions & 1 deletion src/code_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,5 +163,4 @@ def check_correctness(
result = constants.EXECUTOR_MESSAGE__NO_RESULTS
finally:
process.close()
# ic(input_data, expected_output, result)
return result
74 changes: 53 additions & 21 deletions src/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from typing import List
import dirtyjson as json
import sys
import asyncio
from tqdm import tqdm

from llama_index.core.llms.llm import LLM
Expand All @@ -36,9 +35,16 @@
PYDANTIC_MODEL__CODE_OUTPUT__REASONING,
)
from utils import get_terminal_size
from workflows.common import TestCase
from workflows.common import (
TestCase,
WorkflowStatusEvent,
)
from workflows.self_reflective_coder import CompetitiveCoderWorkflow

from llama_index.core.workflow.events import (
HumanResponseEvent,
)

try:
from icecream import ic
except ImportError: # Graceful fallback if IceCream isn't installed.
Expand Down Expand Up @@ -109,16 +115,32 @@ def get_workflow_by_name(self, workflow_name: str):
return workflow
raise ValueError(f"Workflow with name '{workflow_name}' is not supported.")

def send_human_response(self, response: str):
if (
self._running_workflow_handler is not None
and not self._running_workflow_handler.is_done()
):
# self._running_workflow_handler.ctx.send_event(
# HumanResponseEvent(response=response)
# )
self._human_response = response
# self._human_input_available_indicator.state = (
# HumanInputAvailableIndicatorTask.STATE_DONE
# )
self._running_workflow_handler.ctx.send_event(
HumanResponseEvent(response=self._human_response)
)

async def run(
self,
problem: str,
test_cases: List[TestCase] = None,
runtime_limit: float = 30.0,
workflow: str = CompetitiveCoderWorkflow.__name__,
workflow_name: str = CompetitiveCoderWorkflow.__name__,
):
# Instantiating the ReAct workflow instead may not be always enough to get the desired responses to certain questions.
chosen_workflow = self.get_workflow_by_name(workflow)
chosen_workflow = self.get_workflow_by_name(workflow_name)

# Should the timeout be None for human-in-the-loop workflows?
workflow_init_kwargs = {"llm": self.llm, "timeout": 180, "verbose": True}
workflow_run_kwargs = {}

Expand All @@ -129,15 +151,15 @@ async def run(
workflow_run_kwargs["test_cases"] = test_cases
workflow_run_kwargs["runtime_limit"] = runtime_limit
else:
raise ValueError(f"Workflow '{workflow}' is not supported.")
raise ValueError(f"Workflow '{workflow_name}' is not supported.")

self.workflow: Workflow = chosen_workflow(**workflow_init_kwargs)
workflow: Workflow = chosen_workflow(**workflow_init_kwargs)
print(
f"\nAttempting the question using the {workflow} workflow. This may take a while...",
f"\nAttempting the question using the {workflow_name} workflow. This may take a while...",
flush=True,
)

self.running_workflow_handler: WorkflowHandler = self.workflow.run(
self._running_workflow_handler: WorkflowHandler = workflow.run(
**workflow_run_kwargs
)
done: bool = False
Expand All @@ -152,18 +174,28 @@ async def run(
desc=PROJECT_NAME,
colour="yellow",
)
async for ev in self.workflow.stream_events():
total_steps = ev.total_steps
finished_steps = ev.finished_steps
print(f"\n{str(ev.msg)}", flush=True)
progress_bar.reset(total=total_steps)
progress_bar.update(finished_steps)
progress_bar.refresh()
yield done, finished_steps, total_steps, ev
async for event in self._running_workflow_handler.stream_events():
if isinstance(event, WorkflowStatusEvent):
total_steps = event.total_steps
finished_steps = event.finished_steps
print(f"\n{str(event.msg)}", flush=True)
progress_bar.reset(total=total_steps)
progress_bar.update(finished_steps)
progress_bar.refresh()
yield done, finished_steps, total_steps, event
try:
done, pending = await asyncio.wait([self.running_workflow_handler])
if done:
result = json.loads(self.running_workflow_handler.result())
# done_tasks, pending_tasks = await asyncio.wait(
# [self._running_workflow_handler],
# timeout=runtime_limit,
# )
# done = self._running_workflow_handler.is_done()
# if done_tasks and done:
# result = json.loads(self._running_workflow_handler.result())

# for task in pending_tasks:
# ic(task)
result = json.loads(await self._running_workflow_handler)
done = True
except Exception as e:
result = {
PYDANTIC_MODEL__CODE_OUTPUT__REASONING: f"\nException in running the workflow(s). Type: {type(e).__name__}. Message: '{str(e)}'",
Expand All @@ -176,4 +208,4 @@ async def run(
raise e
finally:
progress_bar.close()
yield done, finished_steps, total_steps, result
yield done, finished_steps, total_steps, result
124 changes: 69 additions & 55 deletions src/webapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
from engine import ChattyCoderEngine
from workflows.common import TestCase, WorkflowStatusEvent
from utils import parse_env
from llama_index.core.workflow.events import InputRequiredEvent

from workflows.self_reflective_coder import DraftSolutionResultEvent

try:
from icecream import ic
Expand Down Expand Up @@ -50,8 +53,6 @@
class GradioApp:
"""The main Gradio app class."""

_gr_state_user_input_text = gr.State(constants.EMPTY_STRING)

def __init__(self):
"""Default constructor for the Gradio app."""
# Load the environment variables
Expand Down Expand Up @@ -234,6 +235,7 @@ async def find_solution(
):
# Stream events and results
self.agent_task_pending = True
chat_history = []
generator = self.workflow_engine.run(
problem=user_question,
test_cases=test_cases,
Expand All @@ -243,22 +245,35 @@ async def find_solution(
done,
finished_steps,
total_steps,
result,
streamed_object,
) in generator:
if done:
agent_status(progress=None)
self.agent_task_pending = False
# ic(result)
# chat_history.clear()
yield (
result[PYDANTIC_MODEL__CODE_OUTPUT__REASONING],
result[PYDANTIC_MODEL__CODE_OUTPUT__PSEUDOCODE],
result[PYDANTIC_MODEL__CODE_OUTPUT__CODE],
chat_history,
)
yield [
streamed_object[PYDANTIC_MODEL__CODE_OUTPUT__REASONING],
streamed_object[PYDANTIC_MODEL__CODE_OUTPUT__PSEUDOCODE],
streamed_object[PYDANTIC_MODEL__CODE_OUTPUT__CODE],
gr.update(),
]
else:
if isinstance(result, WorkflowStatusEvent):
status_msg = result.msg
if isinstance(streamed_object, InputRequiredEvent):
# Hide the progress bar?
agent_status(progress=None)
chat_history.append(
ChatMessage(
role=MessageRole.ASSISTANT,
content=str(streamed_object.msg),
).dict()
)
yield [
gr.update(),
gr.update(),
gr.update(),
gr.update(value=chat_history),
]
elif isinstance(streamed_object, WorkflowStatusEvent):
status_msg = streamed_object.msg
status = (
str(status_msg)[:125] + ELLIPSIS
if len(str(status_msg)) > 125
Expand All @@ -267,12 +282,29 @@ async def find_solution(
agent_status(
progress=(finished_steps, total_steps), desc=status
)
chat_history.append(
ChatMessage(
role=MessageRole.ASSISTANT, content=status_msg
).dict()
)
yield (EMPTY_STRING, EMPTY_STRING, EMPTY_STRING, chat_history)
elif isinstance(streamed_object, DraftSolutionResultEvent):
yield [
streamed_object.reasoning,
streamed_object.pseudocode,
streamed_object.code,
chat_history,
]

async def hitl_response(self, response: str, chat_history):
"""
Handle the human-in-the-loop response.
Args:
response (str): The response from the human.
chat_history (List[ChatMessage]): The chat history.
"""
self.workflow_engine.send_human_response(response)
chat_history.append(ChatMessage(role=MessageRole.USER, content=response).dict())
# ic(chat_history)
return [
gr.update(value=EMPTY_STRING),
gr.update(value=chat_history),
]

def add_test_case(
self, test_cases: list[TestCase] | None, test_case_in: str, test_case_out: str
Expand Down Expand Up @@ -349,37 +381,6 @@ def construct_interface(self):
btn_theme_toggle = gr.Button("Toggle dark mode")
with gr.Row(elem_id="ui_main"):
with gr.Column(elem_id="ui_main_left"):
# with gr.Accordion(
# label=f"{self._llm_provider} LLM and agent orchestrator configuration",
# open=False,
# ):
# with gr.Group():
# gr.JSON(
# value=self._llm.model,
# label=f"{self._llm_provider} LLM configuration",
# show_label=True,
# )
# gr.Image(
# value=PIL.Image.open(
# BytesIO(
# self._agent_orchestrator.agent_graph.get_graph().draw_mermaid_png(
# curve_style=CurveStyle.BASIS,
# # The node styles are SVG attributes, see: https://developer.mozilla.org/en-US/docs/Web/SVG/Attribute
# node_colors=NodeStyles(
# first="fill:#9ccc2b, fill-opacity:0.35, font-family:'monospace'",
# last="fill:#cc2b2b, fill-opacity:0.25, font-family:'monospace'",
# default="fill:#2ba9cc, fill-opacity:0.35, font-family:'monospace'",
# ),
# )
# )
# ),
# label="Agent orchestrator graph",
# show_label=True,
# show_download_button=False,
# show_fullscreen_button=False,
# show_share_button=False,
# format="png",
# )
with gr.Group():
chk_show_user_input_preview = gr.Checkbox(
value=False,
Expand All @@ -400,12 +401,12 @@ def construct_interface(self):
with gr.Group():
chatbot_hitl = gr.Chatbot(
label="Human-in-the-loop chat",
layout="panel",
layout="bubble",
type="messages",
bubble_full_width=True,
placeholder="The AI model will initiate a chat when necessary.",
bubble_full_width=False,
# placeholder="The AI model will initiate a chat when necessary.",
)
gr.Textbox(
input_hitl_response = gr.Textbox(
label="Human message",
show_label=False,
placeholder="Enter a response to the last message from the AI model.",
Expand Down Expand Up @@ -452,12 +453,15 @@ def construct_interface(self):
line_breaks=True,
)
output_pseudocode = gr.Code(
label="Pseudocode", show_label=True
label="Pseudocode",
show_label=True,
interactive=False,
)
output_code = gr.Code(
label="Python code",
show_label=True,
language="python",
interactive=False,
)
# Button actions
btn_theme_toggle.click(
Expand All @@ -481,6 +485,7 @@ def construct_interface(self):
chatbot_hitl,
],
api_name="get_coding_solution",
# queue=False,
)

btn_add_test_case.click(
Expand Down Expand Up @@ -508,6 +513,15 @@ def construct_interface(self):
api_name=False,
)

input_hitl_response.submit(
self.hitl_response,
inputs=[input_hitl_response, chatbot_hitl],
outputs=[input_hitl_response, chatbot_hitl],
api_name="human_response",
# show_progress=False,
# queue=False,
)

chk_show_user_input_preview.change(
fn=lambda checked: (
(
Expand Down
25 changes: 1 addition & 24 deletions src/workflows/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@

"""Stuff common to all the workflows."""

from llama_index.core.workflow import (
Event,
)
from llama_index.core.workflow import Event

from llama_index.core.bridge.pydantic import BaseModel, Field

Expand Down Expand Up @@ -63,24 +61,3 @@ class WorkflowStatusEvent(Event):
msg: str
total_steps: int = 0
finished_steps: int = 0


class WorkflowStatusWithPartialSolution(Event):
"""
Event to update the status of the workflow with a partial solution.
Fields:
msg (str): The message to display.
partial_solution (str): The partial solution.
partial_pseudo_code (str): The partial pseudo code.
partial_code (str): The partial code.
total_steps (int): Optional total number of steps, defaults to zero.
finished_steps (int): Optional number of steps finished, defaults to zero.
"""

msg: str
partial_solution: str
partial_pseudo_code: str
partial_code: str
total_steps: int = 0
finished_steps: int = 0
Loading

0 comments on commit c31d0bd

Please sign in to comment.