Skip to content

Commit

Permalink
updating how queue is used in match_messages
Browse files Browse the repository at this point in the history
  • Loading branch information
movsesyanae committed Sep 30, 2024
1 parent 3e65096 commit 2ee3f03
Showing 1 changed file with 27 additions and 31 deletions.
58 changes: 27 additions & 31 deletions pipit/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,14 @@ def _match_messages(self):

receive_events_names = ["MpiRecv", "MpiIrecv"]

receive_events = self.events[self.events["Name".isin(receive_events_names)]]
receive_events = self.events[self.events["Name"].isin(receive_events_names)]

# Queue is a dictionary in which each receiving process (key) will have
# a list of tuples that each contain information about an associated
# send event
queue = {process: [] for process in self.events["Process"].unique()}

# another dictionary of sending processes (key) that will have a list of
# tuple that each contain information about an associated send event
queue: dict[int, dict[int, (int, int)]] = {}
df_indices = list(send_events.index)
timestamps = list(send_events["Timestamp (ns)"])
names = list(send_events["Name"])
attrs = list(send_events["Attributes"])
processes = list(send_events["Process"])

Expand All @@ -228,51 +226,49 @@ def _match_messages(self):
for i in range(len(send_events)):
curr_df_index = df_indices[i]
curr_timestamp = timestamps[i]
curr_name = names[i]
curr_attrs = attrs[i]
curr_process = processes[i]

# Add current dataframe index, timestamp, and process to stack
if "receiver" in curr_attrs:
queue[curr_attrs["receiver"]].append(
(curr_df_index, curr_timestamp, curr_name, curr_process)
receiving_process = curr_attrs["receiver"]

# Add receiving process to queue if not already present
if receiving_process not in queue:
queue[receiving_process] = {}
# Add sending process to receiving process's queue
# if not already present
if curr_process not in queue[receiving_process]:
queue[receiving_process][curr_process] = []

# Add current dataframe index and timestamp to the correct queue
queue[receiving_process][curr_process].append(
(curr_df_index, curr_timestamp)
)

df_indices = list(receive_events.index)
timestamps = list(receive_events["Timestamp (ns)"])
names = list(receive_events["Name"])
attrs = list(receive_events["Attributes"])
processes = list(receive_events["Process"])

# Now iterate over receive events
for i in range(len(receive_events)):
curr_df_index = df_indices[i]
curr_timestamp = timestamps[i]
curr_name = names[i]
curr_attrs = attrs[i]
curr_process = processes[i]

if "sender" in curr_attrs:
send_process = None
i = 0

# We want to iterate through the queue in order
# until we find the corresponding "send" event
while send_process != curr_attrs["sender"] and i < len(
queue[curr_process]
):
send_df_index, send_timestamp, send_name, send_process = queue[
curr_process
][i]
i += 1

if send_process == curr_attrs["sender"] and i <= len(
queue[curr_process]
):
# remove matched event from queue
del queue[curr_process][i - 1]

# Fill in the lists with the matching values if event found
# send_process = None
send_process = curr_attrs["sender"]

if len(queue[curr_process][send_process]) > 0:
# Get the corresponding "send" event
send_df_index, send_timestamp = queue[curr_process][
send_process
].pop(0)

# Fill in the lists with the matching values
matching_events[send_df_index] = curr_df_index
matching_events[curr_df_index] = send_df_index

Expand Down

0 comments on commit 2ee3f03

Please sign in to comment.