Skip to content

Commit

Permalink
Adds async infrence code for parallel infrence and argument for --bat…
Browse files Browse the repository at this point in the history
…ch-size
  • Loading branch information
TikZSZ committed Jul 3, 2024
1 parent 5c1a5e9 commit 3a35360
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 36 deletions.
26 changes: 20 additions & 6 deletions berkeley-function-call-leaderboard/model_handler/handler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from model_handler.model_style import ModelStyle
import json, os

import aiofiles

class BaseHandler:
model_name: str
Expand All @@ -24,20 +24,23 @@ def decode_execute(self, result):
# This method takes raw model output and convert it to standard execute checker input.
pass

def write(self, result, file_to_open):
# This method is used to write the result to the file.
## make the write function async
async def write(self, result, file_to_open):
# Ensure the result directories exist
if not os.path.exists("./result"):
os.mkdir("./result")
if not os.path.exists("./result/" + self.model_name):
os.mkdir("./result/" + self.model_name)
with open(

# Use aiofiles to write asynchronously
async with aiofiles.open(
"./result/"
+ self.model_name
+ "/"
+ file_to_open.replace(".json", "_result.json"),
"a+",
mode='a+'
) as f:
f.write(json.dumps(result) + "\n")
await f.write(json.dumps(result) + "\n")

def load_result(self, test_category):
# This method is used to load the result from the file.
Expand All @@ -48,3 +51,14 @@ def load_result(self, test_category):
for line in f:
result_list.append(json.loads(line))
return result_list

# open the result file and sort it on idx
def sort_results(self,file_to_open):
path = "./result/"+ self.model_name+ "/" + file_to_open.replace(".json", "_result.json")
with open(path,mode='r',) as f:
lines = f.readlines()
results = [json.loads(line) for line in lines]
sorted_results = sorted(results, key=lambda x: x['idx'])
with open(path, mode='w') as f:
for result in sorted_results:
f.write(json.dumps(result) + "\n")
99 changes: 69 additions & 30 deletions berkeley-function-call-leaderboard/openfunctions_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from model_handler.handler_map import handler_map
from model_handler.model_style import ModelStyle
from model_handler.constant import USE_COHERE_OPTIMIZATION

import aiohttp
import asyncio
from functools import wraps,partial

def get_args():
parser = argparse.ArgumentParser()
Expand All @@ -18,6 +20,7 @@ def get_args():
parser.add_argument("--max-tokens", type=int, default=1200)
parser.add_argument("--num-gpus", default=1, type=int)
parser.add_argument("--timeout", default=60, type=int)
parser.add_argument('--batch-size', type=int, default=1, help='Batch size for processing (default: 1)')

args = parser.parse_args()
return args
Expand All @@ -39,9 +42,21 @@ def get_args():
"sql": "gorilla_openfunctions_v1_test_sql.json",
}

def make_async(func):
@wraps(func)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_running_loop()
pfunc = partial(func, *args, **kwargs)
return await loop.run_in_executor(executor, pfunc)
return run

## automatically wraps the the handler to make handler.inference async
def build_handler(model_name, temperature, top_p, max_tokens):
handler = handler_map[model_name](model_name, temperature, top_p, max_tokens)

if not asyncio.iscoroutinefunction(handler.inference):
handler.inference = make_async(handler.inference)
return handler


Expand All @@ -55,19 +70,37 @@ def load_file(test_category):
return test_cate, files_to_open


if __name__ == "__main__":
async def fetch_and_process(session, index, test_case, handler, test_category, file_to_open):
user_question, functions = test_case["question"], test_case["function"]
if isinstance(functions, (dict, str)):
functions = [functions]

result, metadata = await handler.inference(user_question, functions, test_category)

result_to_write = {
"idx": index,
"result": result,
"input_token_count": metadata["input_tokens"],
"output_token_count": metadata["output_tokens"],
"latency": metadata["latency"],
}
await handler.write(result_to_write, file_to_open)

async def main():
args = get_args()
if USE_COHERE_OPTIMIZATION and "command-r-plus" in args.model:
args.model = args.model + "-optimized"

handler = build_handler(args.model, args.temperature, args.top_p, args.max_tokens)

if handler.model_style == ModelStyle.OSSMODEL:
result = handler.inference(
result = await handler.inference(
question_file="eval_data_total.json",
test_category=args.test_category,
num_gpus=args.num_gpus,
)
for res in result[0]:
handler.write(res, "result.json")
await handler.write(res, "result.json")
else:
test_cate, files_to_open = load_file(args.test_category)
for test_category, file_to_open in zip(test_cate, files_to_open):
Expand All @@ -76,35 +109,41 @@ def load_file(test_category):
with open("./data/" + file_to_open) as f:
for line in f:
test_cases.append(json.loads(line))
num_existing_result = 0 # if the result file already exists, skip the test cases that have been tested.
if os.path.exists(

num_existing_result = 0
result_file_path = (
"./result/"
+ args.model.replace("/", "_")
+ "/"
+ file_to_open.replace(".json", "_result.json")
):
with open(
"./result/"
+ args.model.replace("/", "_")
+ "/"
+ file_to_open.replace(".json", "_result.json")
) as f:
)
if os.path.exists(result_file_path):
with open(result_file_path) as f:
for line in f:
num_existing_result += 1
for index, test_case in enumerate(tqdm(test_cases)):
if index < num_existing_result:
continue
user_question, functions = test_case["question"], test_case["function"]
if type(functions) is dict or type(functions) is str:
functions = [functions]
result, metadata = handler.inference(
user_question, functions, test_category
)
result_to_write = {
"idx": index,
"result": result,
"input_token_count": metadata["input_tokens"],
"output_token_count": metadata["output_tokens"],
"latency": metadata["latency"],
}
handler.write(result_to_write, file_to_open)

async with aiohttp.ClientSession() as session:
batch_size = args.batch_size # Number of iterations to run at a time
tasks = []
# Create a tqdm progress bar for the entire dataset
progress_bar = tqdm(total=len(test_cases), desc="Processing test cases")

for start_index in range(0, len(test_cases), batch_size):
end_index = min(start_index + batch_size, len(test_cases))
for index in range(start_index, end_index):
if index < num_existing_result:
progress_bar.update(1) # Update for skipped items
continue
test_case = test_cases[index]
task = asyncio.create_task(fetch_and_process(session, index, test_case, handler, test_category, file_to_open))
task.add_done_callback(lambda _: progress_bar.update(1)) # Update progress when task is done
tasks.append(task)
await asyncio.gather(*tasks)
tasks.clear()
progress_bar.close()
## sort results since async entires could be out of order
handler.sort_results(file_to_open)


if __name__ == "__main__":
asyncio.run(main())
1 change: 1 addition & 0 deletions berkeley-function-call-leaderboard/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ anthropic~=0.29.0
openai
numpy
cohere~=5.2.5
aiofiles

0 comments on commit 3a35360

Please sign in to comment.