Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trace: construct a CCT #29

Merged
merged 43 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
4c1e111
feat: initial commit for working logic of trace to cct
adityaranjan Sep 11, 2022
7d9a707
chore: moved over cct code to the reader
adityaranjan Sep 12, 2022
4df681c
style: formatting changes
adityaranjan Sep 12, 2022
9ef0464
style: minor formatting change
adityaranjan Sep 12, 2022
62d3caf
Feature: Communication Matrix (#12)
adityaranjan Sep 26, 2022
200f5ce
feature: add cct function to trace class
adityaranjan Sep 28, 2022
da2e817
Merge branch 'develop' into feat/otf2-cct
adityaranjan Sep 29, 2022
cc50cc0
Merge branch 'develop' into feat/otf2-cct
adityaranjan Dec 2, 2022
7f1b7f6
chore: made cct function consistent with recent changes
adityaranjan Dec 2, 2022
93252aa
Merge branch 'develop' into feat/otf2-cct
adityaranjan Feb 17, 2023
bb29923
chore: modify cct function to reflect recent changes
adityaranjan Feb 17, 2023
fdd5997
Merge branch 'feat/otf2-cct' of https://github.com/hpcgroup/pipit int…
adityaranjan Feb 17, 2023
96d7d67
Merge branch 'develop' into feat/otf2-cct
adityaranjan Feb 28, 2023
28d9638
chore: remove df indices as calling context ids
adityaranjan Feb 28, 2023
c8d2ce4
Merge branch 'develop' into feat/otf2-cct
adityaranjan Mar 6, 2023
e6f5ffd
chore: only store node in enter rows
adityaranjan Mar 6, 2023
9dd3366
style: minor
adityaranjan Mar 6, 2023
50fff4b
chore: remove depth column from df and restructure node class
adityaranjan Mar 10, 2023
742cc82
style: minor
adityaranjan Mar 10, 2023
2765f0c
Merge branch 'develop' into feat/otf2-cct
adityaranjan Mar 10, 2023
0240751
chore: return cct for all readers
adityaranjan Mar 10, 2023
89c9c5d
chore: revert any changes in other prs already
adityaranjan Mar 11, 2023
7074ac7
style: minor
adityaranjan Mar 11, 2023
5a4cae3
feat: make cct compatible with projections
adityaranjan Mar 11, 2023
17c3bcb
Merge branch 'develop' into feat/otf2-cct
adityaranjan Mar 11, 2023
8b30e0f
Merge branch 'develop' into feat/otf2-cct
adityaranjan Mar 29, 2023
1a33fec
Merge branch 'develop' into feat/otf2-cct
adityaranjan Apr 23, 2023
540e0de
revert graph changes
adityaranjan Apr 23, 2023
6fb5d52
minor
adityaranjan Apr 23, 2023
3fd498c
minor
adityaranjan Apr 23, 2023
83693c3
minor
adityaranjan Apr 23, 2023
04890c2
move cct function into util
adityaranjan Apr 23, 2023
8677038
minor
adityaranjan Apr 23, 2023
2440b4e
style
adityaranjan Apr 23, 2023
2393c52
minor
adityaranjan Apr 23, 2023
1025dda
Merge branch 'develop' into feat/otf2-cct
adityaranjan May 5, 2023
82f06c8
merge
Oct 22, 2023
d4db686
chore: Merge
Oct 22, 2023
32c16dc
temp
Nov 3, 2023
4acf0f8
fixed cct function to handle unmatched events
Nov 10, 2023
4f544c9
minor
Nov 10, 2023
f1d80c0
update year
bhatele Nov 14, 2023
0444c7a
Merge branch 'develop' into feat/otf2-cct
bhatele Nov 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pipit/readers/hpctoolkit_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1356,5 +1356,7 @@ def read(self) -> pipit.trace.Trace:
}
)

# cct is needed to create trace in hpctoolkit,
# so always return it as part of the trace
self.trace_df = trace_df
return pipit.trace.Trace(None, trace_df)
return pipit.trace.Trace(None, trace_df, self.cct)
9 changes: 7 additions & 2 deletions pipit/readers/nsight_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
class NsightReader:
"""Reader for Nsight trace files"""

def __init__(self, file_name) -> None:
def __init__(self, file_name, create_cct=True) -> None:
self.file_name = file_name
self.df = None
self.create_cct = create_cct

def read(self):
"""
Expand Down Expand Up @@ -103,4 +104,8 @@ def read(self):
# Applying the column list to the dataframe to rearrange
self.df = self.df.loc[:, cols]

return pipit.trace.Trace(None, self.df)
trace = pipit.trace.Trace(None, self.df)
if self.create_cct:
trace.create_cct()

return trace
9 changes: 7 additions & 2 deletions pipit/readers/otf2_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
class OTF2Reader:
"""Reader for OTF2 trace files"""

def __init__(self, dir_name, num_processes=None):
def __init__(self, dir_name, num_processes=None, create_cct=True):
self.dir_name = dir_name # directory of otf2 file being read
self.file_name = self.dir_name + "/traces.otf2"
self.create_cct = create_cct

num_cpus = mp.cpu_count()
if num_processes is None or num_processes < 1 or num_processes > num_cpus:
Expand Down Expand Up @@ -516,4 +517,8 @@ def read(self):

self.events = self.read_events() # events

return pipit.trace.Trace(self.definitions, self.events)
trace = pipit.trace.Trace(self.definitions, self.events)
if self.create_cct:
trace.create_cct()

return trace
11 changes: 8 additions & 3 deletions pipit/readers/projections_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#
# SPDX-License-Identifier: MIT


import os
import gzip
import pipit.trace
Expand Down Expand Up @@ -225,7 +224,7 @@ def read_sts_file(self):


class ProjectionsReader:
def __init__(self, projections_directory: str, num_processes=None) -> None:
def __init__(self, projections_directory: str, num_processes=None, create_cct=True) -> None:
if not os.path.isdir(projections_directory):
raise ValueError("Not a valid directory.")

Expand Down Expand Up @@ -270,6 +269,8 @@ def __init__(self, projections_directory: str, num_processes=None) -> None:
else:
self.num_processes = num_processes

self.create_cct = create_cct

# Returns an empty dict, used for reading log file into dataframe
@staticmethod
def _create_empty_dict() -> dict:
Expand Down Expand Up @@ -317,7 +318,11 @@ def read(self):
["Timestamp (ns)", "Event Type", "Name", "Process", "Attributes"]
]

return pipit.trace.Trace(None, trace_df)
trace = pipit.trace.Trace(None, trace_df)
if self.create_cct:
trace.create_cct()

return trace

def _read_log_file(self, rank_size) -> pd.DataFrame:
# has information needed in sts file
Expand Down
15 changes: 11 additions & 4 deletions pipit/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@

import numpy as np
import pandas as pd
from pipit.util.cct import create_cct


class Trace:
"""A trace dataset is read into an object of this type, which includes one
or more dataframes.
"""
A trace dataset is read into an object of this type, which
includes one or more dataframes and a calling context tree.
"""

def __init__(self, definitions, events):
def __init__(self, definitions, events, cct=None):
"""Create a new Trace object."""
self.definitions = definitions
self.events = events
self.cct = cct

# list of numeric columns which we can calculate inc/exc metrics with
self.numeric_cols = list(
Expand All @@ -26,6 +29,11 @@ def __init__(self, definitions, events):
self.inc_metrics = []
self.exc_metrics = []

def create_cct(self):
# adds a column of cct nodes to the events dataframe
# and stores the graph object in self.cct
self.cct = create_cct(self.events)

@staticmethod
def from_otf2(dirname, num_processes=None):
"""Read an OTF2 trace into a new Trace object."""
Expand Down Expand Up @@ -354,7 +362,6 @@ def comm_matrix(self, output="size"):
Communication Matrix for Peer-to-Peer (P2P) MPI messages

Arguments:

1) output -
string to choose whether the communication volume should be measured
by bytes transferred between two processes or the number of messages
Expand Down
4 changes: 4 additions & 0 deletions pipit/util/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright 2022-2023 Parallel Software and Systems Group, University of
# Maryland. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: MIT
111 changes: 111 additions & 0 deletions pipit/util/cct.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copyright 2022-2023 Parallel Software and Systems Group, University of
# Maryland. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: MIT

from pipit.graph import Graph, Node


def create_cct(events):
"""
Generic function to iterate through the events dataframe and create a CCT.
Uses pipit's graph data structure for this. Returns a CCT
and creates a new column in the Events DataFrame that stores
a reference to each row's corresponding node in the CCT.
"""

# CCT and list of nodes in DataFrame
graph = Graph()
graph_nodes = [None for i in range(len(events))]

# determines whether a node exists or not
callpath_to_node = dict()

node_id = 0 # each node has a unique id

# Filter the DataFrame to only Enter/Leave
enter_leave_df = events.loc[events["Event Type"].isin(["Enter", "Leave"])]

# list of processes and/or threads to iterate over
if "Thread" in events.columns:
exec_locations = set(zip(events["Process"], events["Thread"]))
has_thread = True
else:
exec_locations = set(events["Process"])
has_thread = False

for curr_loc in exec_locations:
# only filter by thread if the trace has a thread column
if has_thread:
curr_process, curr_thread = curr_loc
filtered_df = enter_leave_df.loc[
(enter_leave_df["Process"] == curr_process)
& (enter_leave_df["Thread"] == curr_thread)
]
else:
filtered_df = enter_leave_df.loc[(enter_leave_df["Process"] == curr_loc)]

curr_depth, callpath = 0, ""

"""
Iterating over lists instead of
DataFrame columns is more efficient
"""
df_indices = filtered_df.index.to_list()
function_names = filtered_df["Name"].to_list()
event_types = filtered_df["Event Type"].to_list()

# stacks used to iterate through the trace and add nodes to the cct
functions_stack, nodes_stack = [], []

# iterating over the events of the current thread's trace
for i in range(len(filtered_df)):
curr_df_index, evt_type, function_name = (
df_indices[i],
event_types[i],
function_names[i],
)

# encounter a new function through its entry point.
if evt_type == "Enter":
# add the function to the stack and get the call path
functions_stack.append(function_name)
callpath = "->".join(functions_stack)

# get the parent node of the function if it exists
parent_node = None if curr_depth == 0 else nodes_stack[-1]

if callpath in callpath_to_node:
# don't create new node if callpath is in map
curr_node = callpath_to_node[callpath]
else:
# create new node if callpath isn't in map
curr_node = Node(node_id, parent_node, curr_depth)
callpath_to_node[callpath] = curr_node
node_id += 1

# add node as root or child of its
# parent depending on current depth
graph.add_root(
curr_node
) if curr_depth == 0 else parent_node.add_child(curr_node)

# Update nodes stack, column, and current depth
nodes_stack.append(curr_node)
graph_nodes[curr_df_index] = curr_node
curr_depth += 1
else:
"""
Pop node from top of stack once you
encounter the Leave event for a function
"""
nodes_stack.pop()

# Update functions stack and current depth
functions_stack.pop()
curr_depth -= 1

# Update the Trace with the generated cct
events["Graph_Node"] = graph_nodes

return graph
Loading