From 793036acf9a009d78004fd9db9f1420d5e3374d3 Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Tue, 2 Jul 2024 17:32:24 -0400 Subject: [PATCH 1/3] Configure global logging, refactor classes to use direct logging calls --- src/converter/converter.py | 24 +++--- src/converter/pytorch_converter.py | 55 +++++++------- src/converter/text_converter.py | 3 +- src/trace_link/trace_linker.py | 92 +++++++++++------------ tests/converter/test_pytorch_converter.py | 53 +++++-------- 5 files changed, 101 insertions(+), 126 deletions(-) diff --git a/src/converter/converter.py b/src/converter/converter.py index c0cd701e..606fb94c 100644 --- a/src/converter/converter.py +++ b/src/converter/converter.py @@ -2,16 +2,15 @@ import logging import sys import traceback -from logging import FileHandler from .pytorch_converter import PyTorchConverter from .text_converter import TextConverter -def get_logger(log_filename: str) -> logging.Logger: +def setup_logging(log_filename: str) -> None: formatter = logging.Formatter("%(levelname)s [%(asctime)s] %(message)s", datefmt="%m/%d/%Y %I:%M:%S %p") - file_handler = FileHandler(log_filename, mode="w") + file_handler = logging.FileHandler(log_filename, mode="w") file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(formatter) @@ -19,12 +18,7 @@ def get_logger(log_filename: str) -> logging.Logger: stream_handler.setLevel(logging.WARNING) stream_handler.setFormatter(formatter) - logger = logging.getLogger(__file__) - logger.setLevel(logging.DEBUG) - logger.addHandler(file_handler) - logger.addHandler(stream_handler) - - return logger + logging.basicConfig(level=logging.DEBUG, handlers=[file_handler, stream_handler]) def main() -> None: @@ -45,26 +39,26 @@ def main() -> None: parser.add_argument("--log_filename", type=str, default="debug.log", help="Log filename") args = parser.parse_args() - logger = get_logger(args.log_filename) - logger.debug(" ".join(sys.argv)) + setup_logging(args.log_filename) + logging.debug(" ".join(sys.argv)) try: if args.input_type == "Text": - converter = TextConverter(args.input_filename, args.output_filename, args.num_npus, args.num_passes, logger) + converter = TextConverter(args.input_filename, args.output_filename, args.num_npus, args.num_passes) converter.convert() elif args.input_type == "PyTorch": - converter = PyTorchConverter(args.input_filename, args.output_filename, logger) + converter = PyTorchConverter(args.input_filename, args.output_filename) converter.convert() else: supported_types = ["Text", "PyTorch"] - logger.error( + logging.error( f"The input type '{args.input_type}' is not supported. " f"Supported types are: {', '.join(supported_types)}." ) sys.exit(1) except Exception: traceback.print_exc() - logger.debug(traceback.format_exc()) + logging.debug(traceback.format_exc()) sys.exit(1) diff --git a/src/converter/pytorch_converter.py b/src/converter/pytorch_converter.py index ebb7b544..a1f16365 100644 --- a/src/converter/pytorch_converter.py +++ b/src/converter/pytorch_converter.py @@ -31,21 +31,18 @@ class PyTorchConverter: Attributes input_filename (str): Input file name containing PyTorch execution trace. output_filename (str): Output file name for the converted Chakra trace. - logger (logging.Logger): Logger for logging information during conversion. """ - def __init__(self, input_filename: str, output_filename: str, logger: logging.Logger) -> None: + def __init__(self, input_filename: str, output_filename: str) -> None: """ Initialize the PyTorch to Chakra converter. It sets up necessary attributes and prepares the environment. Args: input_filename (str): Name of the input file containing PyTorch execution trace. output_filename (str): Name of the output file for the converted Chakra trace. - logger (logging.Logger): Logger for logging information during the conversion. """ self.input_filename = input_filename self.output_filename = output_filename - self.logger = logger def convert(self) -> None: """Convert PyTorch execution traces into the Chakra format.""" @@ -92,12 +89,12 @@ def load_pytorch_execution_traces(self) -> Dict: Returns Dict: The loaded PyTorch execution trace data. """ - self.logger.info("Loading PyTorch execution traces from file.") + logging.info("Loading PyTorch execution traces from file.") try: with open(self.input_filename, "r") as pytorch_et: return json.load(pytorch_et) except IOError as e: - self.logger.error(f"Error opening file {self.input_filename}: {e}") + logging.error(f"Error opening file {self.input_filename}: {e}") raise Exception(f"Could not open file {self.input_filename}") from e def _parse_and_instantiate_nodes( @@ -115,7 +112,7 @@ def _parse_and_instantiate_nodes( Tuple: A tuple containing PyTorch schema, PID, time, start timestamp, finish timestamp, and dictionary of PyTorch node objects. """ - self.logger.info("Extracting and processing node data from execution trace.") + logging.info("Extracting and processing node data from execution trace.") pytorch_schema = pytorch_et_data["schema"] pytorch_pid = pytorch_et_data["pid"] pytorch_time = pytorch_et_data["time"] @@ -155,7 +152,7 @@ def _establish_parent_child_relationships( self._update_node_type_counts(node_type_counts, pytorch_node) for node_type, count in node_type_counts.items(): - self.logger.info(f"{node_type}: {count}") + logging.info(f"{node_type}: {count}") return pytorch_node_objects @@ -244,13 +241,13 @@ def open_chakra_execution_trace(self, output_filename: str) -> IO[bytes]: Returns: IO[bytes]: File handle for the Chakra execution trace output file. """ - self.logger.info(f"Opening Chakra execution trace file: {output_filename}") + logging.info(f"Opening Chakra execution trace file: {output_filename}") try: chakra_et = open(output_filename, "wb") # noqa: SIM115 return chakra_et except IOError as e: err_msg = f"Error opening file {output_filename}: {e}" - self.logger.error(err_msg) + logging.error(err_msg) raise Exception(err_msg) from e def convert_nodes(self, pytorch_nodes: Dict[int, PyTorchNode], chakra_nodes: Dict[int, ChakraNode]) -> None: @@ -302,7 +299,7 @@ def convert_to_chakra_node( Returns: ChakraNode: The converted Chakra node. """ - self.logger.debug(f"Converting PyTorch node ID {pytorch_node.id} to Chakra node.") + logging.debug(f"Converting PyTorch node ID {pytorch_node.id} to Chakra node.") chakra_node = ChakraNode() chakra_node.id = pytorch_node.id chakra_node.name = pytorch_node.name @@ -483,7 +480,7 @@ def convert_ctrl_dep_to_data_dep( # noqa: C901 if node_op_type == PyTorchNodeType.GPU_OP: if last_visited_any and last_visited_any.id not in current_node.data_deps: current_node.data_deps.append(last_visited_any.id) - self.logger.debug( + logging.debug( f"GPU Node ID {current_node.id} now has a data dependency on Node ID {last_visited_any.id}" ) last_visited_any = last_visited_non_gpu @@ -492,12 +489,12 @@ def convert_ctrl_dep_to_data_dep( # noqa: C901 dep_id = pytorch_node.inter_thread_dep if dep_id not in current_node.data_deps: current_node.data_deps.append(dep_id) - self.logger.debug( + logging.debug( f"CPU Node ID {current_node.id} now has an inter-thread data dependency on Node ID {dep_id}" ) if last_visited_non_gpu and last_visited_non_gpu.id not in current_node.data_deps: current_node.data_deps.append(last_visited_non_gpu.id) - self.logger.debug( + logging.debug( f"CPU Node ID {current_node.id} now has a data dependency on non-GPU Node ID " f"{last_visited_non_gpu.id}" ) @@ -533,9 +530,9 @@ def remove_dangling_nodes(self, chakra_nodes: Dict[int, ChakraNode]) -> Dict[int del chakra_nodes[node_id] if dangling_nodes: - self.logger.info(f"Identified and removed {len(dangling_nodes)} dangling nodes:") + logging.info(f"Identified and removed {len(dangling_nodes)} dangling nodes:") for node_id in dangling_nodes: - self.logger.info(f" - Node ID {node_id}") + logging.info(f" - Node ID {node_id}") return chakra_nodes @@ -580,7 +577,7 @@ def dfs(node_id: int, path: List[int]) -> bool: """ if node_id in stack: cycle_nodes = " -> ".join([chakra_nodes[n].name for n in path + [node_id]]) - self.logger.error(f"Cyclic dependency detected: {cycle_nodes}") + logging.error(f"Cyclic dependency detected: {cycle_nodes}") return True if node_id in visited: return False @@ -615,12 +612,12 @@ def write_chakra_et( Encode and write both the metadata and individual nodes to create a complete execution trace. """ - self.logger.info("Writing Chakra execution trace.") + logging.info("Writing Chakra execution trace.") self._write_global_metadata( chakra_et, pytorch_schema, pytorch_pid, pytorch_time, pytorch_start_ts, pytorch_finish_ts ) self._encode_and_write_nodes(chakra_et, chakra_nodes) - self.logger.info("Chakra execution trace writing completed.") + logging.info("Chakra execution trace writing completed.") def _write_global_metadata( self, @@ -637,7 +634,7 @@ def _write_global_metadata( This process includes encoding metadata like schema, process ID, timestamps, and other relevant information for the Chakra execution trace. """ - self.logger.info("Encoding global metadata for Chakra execution trace.") + logging.info("Encoding global metadata for Chakra execution trace.") global_metadata = GlobalMetadata( attr=[ ChakraAttr(name="schema", string_val=pytorch_schema), @@ -656,12 +653,12 @@ def _encode_and_write_nodes(self, chakra_et: IO[bytes], chakra_nodes: Dict[int, Each node from the PyTorch execution trace is encoded and written into the Chakra format. This includes node IDs, names, types, dependencies, and other attributes. """ - self.logger.info("Encoding and writing nodes for Chakra execution trace.") + logging.info("Encoding and writing nodes for Chakra execution trace.") seen_nids = set() for nid in sorted(chakra_nodes.keys()): if nid in seen_nids: err_msg = f"Duplicate NID {nid} detected in Chakra nodes." - self.logger.error(err_msg) + logging.error(err_msg) raise ValueError(err_msg) seen_nids.add(nid) chakra_node = chakra_nodes[nid] @@ -676,7 +673,7 @@ def close_chakra_execution_trace(self, chakra_et: IO[bytes]) -> None: Args: chakra_et (IO[bytes]): File handle for the Chakra execution trace output file. """ - self.logger.info("Closing Chakra execution trace file.") + logging.info("Closing Chakra execution trace file.") if chakra_et and not chakra_et.closed: chakra_et.close() @@ -697,7 +694,7 @@ def simulate_execution( pytorch_nodes (Dict[int, PyTorchNode]): The PyTorch nodes to reference for additional information. parent_to_children_map (Dict[int, List[int]]): Mapping from parent node IDs to their child node IDs. """ - self.logger.info("Simulating execution of Chakra nodes based on data dependencies.") + logging.info("Simulating execution of Chakra nodes based on data dependencies.") ready_cpu_nodes = [ (node_id, chakra_nodes[node_id]) @@ -723,7 +720,7 @@ def simulate_execution( cpu_node_id, cpu_node = ready_cpu_nodes.pop(0) current_cpu_node = (cpu_node_id, current_time) issued_nodes.add(cpu_node_id) - self.logger.info( + logging.info( f"Issuing CPU Node ID {cpu_node_id} ({cpu_node.name}) at {current_time}us with duration " f"{cpu_node.duration_micros}us" ) @@ -732,7 +729,7 @@ def simulate_execution( gpu_node_id, gpu_node = ready_gpu_nodes.pop(0) current_gpu_node = (gpu_node_id, current_time) issued_nodes.add(gpu_node_id) - self.logger.info( + logging.info( f"Issuing GPU Node ID {gpu_node_id} ({gpu_node.name}) at {current_time}us with duration " f"{gpu_node.duration_micros}us" ) @@ -743,14 +740,14 @@ def simulate_execution( current_cpu_node and current_time - current_cpu_node[1] >= chakra_nodes[current_cpu_node[0]].duration_micros ): - self.logger.info(f"CPU Node ID {current_cpu_node[0]} completed at {current_time}us") + logging.info(f"CPU Node ID {current_cpu_node[0]} completed at {current_time}us") current_cpu_node = None if ( current_gpu_node and current_time - current_gpu_node[1] >= chakra_nodes[current_gpu_node[0]].duration_micros ): - self.logger.info(f"GPU Node ID {current_gpu_node[0]} completed at {current_time}us") + logging.info(f"GPU Node ID {current_gpu_node[0]} completed at {current_time}us") current_gpu_node = None for node_id in list(issued_nodes): @@ -766,4 +763,4 @@ def simulate_execution( issued_nodes.clear() - self.logger.info("Simulation of Chakra node execution completed.") + logging.info("Simulation of Chakra node execution completed.") diff --git a/src/converter/text_converter.py b/src/converter/text_converter.py index 50eccec3..1bad778f 100644 --- a/src/converter/text_converter.py +++ b/src/converter/text_converter.py @@ -52,13 +52,12 @@ def __init__(self, line: str) -> None: class TextConverter: def __init__( - self, input_filename: str, output_filename: str, num_npus: int, num_passes: int, logger: logging.Logger + self, input_filename: str, output_filename: str, num_npus: int, num_passes: int ) -> None: self.input_filename = input_filename self.output_filename = output_filename self.num_npus = num_npus self.num_passes = num_passes - self.logger = logger self.next_node_id = 0 def get_global_metadata(self): diff --git a/src/trace_link/trace_linker.py b/src/trace_link/trace_linker.py index 84ac5db7..5d38efa4 100644 --- a/src/trace_link/trace_linker.py +++ b/src/trace_link/trace_linker.py @@ -31,7 +31,6 @@ class TraceLinker: Attributes id_assigner (UniqueIdAssigner): Assigns unique IDs to operators. - logger (logging.Logger): Logger for the class. """ def __init__(self, log_level: str = "INFO") -> None: @@ -42,8 +41,7 @@ def __init__(self, log_level: str = "INFO") -> None: log_level (str): Logging level for the class. """ self.id_assigner = UniqueIdAssigner() - self.logger: logging.Logger = logging.getLogger(__name__) - self.logger.setLevel(log_level.upper()) + logging.setLevel(log_level.upper()) def link(self, pytorch_et_file: str, kineto_file: str, output_file: str) -> None: """ @@ -115,13 +113,13 @@ def load_pytorch_et(self, pytorch_et_file: str) -> List[PyTorchOperator]: Returns: List[PyTorchOperator]: List of PyTorch operators. """ - self.logger.info("Starting to load PyTorch Execution Trace.") + logging.info("Starting to load PyTorch Execution Trace.") pytorch_et = load_execution_trace_file(pytorch_et_file) root_node = pytorch_et.get_nodes()[1] # Root node is usually 1-based pytorch_ops = self.extract_pytorch_ops(root_node) - self.logger.info(f"Original ops in PyTorch ET: {len(pytorch_ops)}") - self.logger.info("PyTorch Execution Trace loaded successfully.") + logging.info(f"Original ops in PyTorch ET: {len(pytorch_ops)}") + logging.info("PyTorch Execution Trace loaded successfully.") return pytorch_ops @@ -161,7 +159,7 @@ def load_kineto_trace(self, kineto_file: str) -> Dict: Returns: Dict: Dictionary containing various data structures needed for linking traces. """ - self.logger.info("Starting to load Kineto Trace.") + logging.info("Starting to load Kineto Trace.") kineto_trace_data = read_dictionary_from_json_file(kineto_file) sorted_kineto_ops = sorted( [KinetoOperator(op) for op in kineto_trace_data["traceEvents"]], @@ -174,12 +172,12 @@ def load_kineto_trace(self, kineto_file: str) -> Dict: kineto_data["sorted_kineto_cpu_ops"] = sorted(kineto_data["kineto_cpu_ops"], key=lambda op: op.timestamp) kineto_data["sorted_kineto_cpu_op_ts"] = [op.timestamp for op in kineto_data["sorted_kineto_cpu_ops"]] - self.logger.info( + logging.info( f"Processed Kineto trace with {len(kineto_data['kineto_cpu_ops'])} CPU ops, " f"{len(kineto_data['kineto_id_cuda_launch_op_map'])} CPU launcher ops, " f"and {len(kineto_data['kineto_gpu_ops'])} GPU ops." ) - self.logger.info("Kineto Trace loaded successfully.") + logging.info("Kineto Trace loaded successfully.") return kineto_data def construct_kineto_data_structures(self, kineto_ops: List[KinetoOperator]) -> Dict: @@ -195,7 +193,7 @@ def construct_kineto_data_structures(self, kineto_ops: List[KinetoOperator]) -> Returns: Dict: Dictionary containing categorized operators and timing boundaries. """ - self.logger.info("Categorizing Kineto operators and calculating timing boundaries.") + logging.info("Categorizing Kineto operators and calculating timing boundaries.") process_start_time = sys.maxsize process_end_time = 0 thread_info = {} @@ -211,7 +209,7 @@ def construct_kineto_data_structures(self, kineto_ops: List[KinetoOperator]) -> if op.is_cpu_op(): kineto_cpu_ops.append(op) kineto_tid_cpu_ops_map.setdefault(op.tid, []).append(op) - self.logger.debug(f"Added CPU or user annotation op: {op.name}") + logging.debug(f"Added CPU or user annotation op: {op.name}") elif op.is_cuda_launch_op(): kineto_id_cuda_launch_op_map[op.external_id] = op @@ -220,11 +218,11 @@ def construct_kineto_data_structures(self, kineto_ops: List[KinetoOperator]) -> f"Duplicate correlation ID {op.correlation} found in self.kineto_id_cuda_launch_op_map." ) kineto_correlation_cuda_runtime_map[op.correlation] = op - self.logger.debug(f"Added CPU launcher op: {op.name}") + logging.debug(f"Added CPU launcher op: {op.name}") elif op.is_gpu_op(): kineto_gpu_ops.append(op) - self.logger.debug(f"Added GPU op: {op.name}") + logging.debug(f"Added GPU op: {op.name}") elif op.is_arrow_op(): assert (op.phase == "s") or (op.phase == "f") @@ -234,7 +232,7 @@ def construct_kineto_data_structures(self, kineto_ops: List[KinetoOperator]) -> "should generally be populated for 'ac2g' operators. Please verify the validity of " "the Kineto trace and the 'op' data." ) - self.logger.error(error_msg) + logging.error(error_msg) raise KeyError(error_msg) kineto_id_arrow_op_map[op.id] = op @@ -275,10 +273,10 @@ def calculate_exclusive_dur(self, kineto_tid_cpu_ops_map: Dict[int, List[KinetoO kineto_tid_cpu_ops_map (Dict[int, List[KinetoOperator]]): Map of thread IDs to their corresponding Kineto operators. """ - self.logger.info("Calculating exclusive durations for Kineto operators in parallel.") + logging.info("Calculating exclusive durations for Kineto operators in parallel.") def process_ops_for_thread(ops: List[KinetoOperator]) -> None: - self.logger.info(f"Processing {len(ops)} operators in thread.") + logging.info(f"Processing {len(ops)} operators in thread.") sorted_ops = sorted(ops, key=lambda op: (op.timestamp, op.inclusive_dur)) for i, op in enumerate(sorted_ops): exclusive_dur = op.inclusive_dur @@ -307,11 +305,11 @@ def process_ops_for_thread(ops: List[KinetoOperator]) -> None: f"(ts: {op.timestamp}, inclusive_dur: {op.inclusive_dur}, rf_id: {op.rf_id}): " f"Duration cannot be less than zero." ) - self.logger.error(error_msg) + logging.error(error_msg) raise ValueError(error_msg) op.exclusive_dur = exclusive_dur - self.logger.debug( + logging.debug( f"Node '{op.name}' (ts: {op.timestamp}, inclusive_dur: {op.inclusive_dur}, " f"rf_id: {op.rf_id}) exclusive duration: {op.exclusive_dur} microseconds." ) @@ -322,7 +320,7 @@ def process_ops_for_thread(ops: List[KinetoOperator]) -> None: for future in as_completed(futures): future.result() # Wait for all threads to complete and handle any exceptions - self.logger.info("Exclusive durations for Kineto operators calculated successfully.") + logging.info("Exclusive durations for Kineto operators calculated successfully.") @staticmethod def merge_overlapping_intervals(intervals: List[Tuple[int, int]]) -> List[Tuple[int, int]]: @@ -412,7 +410,7 @@ def enforce_inter_thread_order( Returns: Dict[int, List[KinetoOperator]]: Updated map with enforced inter-thread order. """ - self.logger.info("Enforcing inter-thread order in Kineto traces.") + logging.info("Enforcing inter-thread order in Kineto traces.") with ThreadPoolExecutor() as executor: futures = { @@ -426,9 +424,9 @@ def enforce_inter_thread_order( tid = futures[future] try: future.result() - self.logger.debug(f"Thread {tid} dependencies processed.") + logging.debug(f"Thread {tid} dependencies processed.") except Exception as e: - self.logger.error(f"Error processing thread {tid}: {e}") + logging.error(f"Error processing thread {tid}: {e}") return kineto_tid_cpu_ops_map @@ -444,7 +442,7 @@ def process_thread_inter_thread_order( ops_by_tid (Dict[int, List[KinetoOperator]]): Kineto operators grouped by thread ID. threshold (int): Threshold for significant gap detection in microseconds. """ - self.logger.info(f"Thread {tid}: Identifying gaps for dependency linking with threshold {threshold}us.") + logging.info(f"Thread {tid}: Identifying gaps for dependency linking with threshold {threshold}us.") sorted_ops = sorted(ops, key=lambda op: op.timestamp) last_cpu_node_rf_id = None @@ -455,7 +453,7 @@ def process_thread_inter_thread_order( ): last_cpu_node_rf_id = self.find_last_cpu_node_before_timestamp(ops_by_tid, tid, op.timestamp) if last_cpu_node_rf_id: - self.logger.debug( + logging.debug( f"Thread {tid}: Linking op '{op.name}' to CPU node before gap with rf_id " f"'{last_cpu_node_rf_id}'." ) @@ -482,7 +480,7 @@ def find_last_cpu_node_before_timestamp( Returns: Optional[int]: The ID of the last CPU node found, or None if not found. """ - self.logger.debug(f"Finding last CPU node before timestamp {timestamp} excluding thread {exclude_tid}.") + logging.debug(f"Finding last CPU node before timestamp {timestamp} excluding thread {exclude_tid}.") last_cpu_node = None last_cpu_node_rf_id = None latest_timestamp = 0 @@ -498,7 +496,7 @@ def find_last_cpu_node_before_timestamp( latest_timestamp = op.timestamp last_cpu_node_rf_id = op.rf_id if last_cpu_node: - self.logger.debug(f"Last CPU node before timestamp {timestamp} found: {last_cpu_node}") + logging.debug(f"Last CPU node before timestamp {timestamp} found: {last_cpu_node}") return last_cpu_node_rf_id def link_traces( @@ -520,7 +518,7 @@ def link_traces( This process relies on the assumption of an 'exact match' between these traces. """ - self.logger.info("Starting the process of linking PyTorch and Kineto traces.") + logging.info("Starting the process of linking PyTorch and Kineto traces.") ( kineto_cpu_ops, sorted_kineto_cpu_ops, @@ -556,7 +554,7 @@ def link_traces( pytorch_op_id_to_timestamp_map, pytorch_op_id_to_inter_thread_dep_map, ) - self.logger.info("Traces have been successfully linked.") + logging.info("Traces have been successfully linked.") return pytorch_et_plus_data def add_thread_and_process_annotations( @@ -576,7 +574,7 @@ def add_thread_and_process_annotations( start and end times, collected during the categorization process to insert appropriate annotations directly into the Kineto operators list. """ - self.logger.info("Adding process and thread annotations to Kineto operators.") + logging.info("Adding process and thread annotations to Kineto operators.") # Insert process annotation operator. This operator represents the # overall time span of the trace process. @@ -589,7 +587,7 @@ def add_thread_and_process_annotations( } ) kineto_cpu_ops.insert(0, process_annotation_op) - self.logger.debug( + logging.debug( "Process annotation added with start time {} and duration {}.".format( kineto_process_start_time, kineto_process_end_time - kineto_process_start_time, @@ -620,7 +618,7 @@ def add_thread_and_process_annotations( kineto_cpu_ops.insert(position, thread_annotation_op) else: kineto_cpu_ops.append(thread_annotation_op) - self.logger.debug( + logging.debug( "Thread {} annotation added with start time {} and duration {}.".format(tid, start_ts, inclusive_dur) ) @@ -640,7 +638,7 @@ def map_pytorch_to_kineto_ops( kineto_gpu_ops: List[KinetoOperator], ) -> Tuple[Dict[int, List[KinetoOperator]], Dict[int, int], Dict[int, int], Dict[int, int], Dict[int, int]]: """Map PyTorch ET nodes to corresponding Kineto operators.""" - self.logger.info("Mapping PyTorch ET nodes to Kineto operators.") + logging.info("Mapping PyTorch ET nodes to Kineto operators.") cpu_ev_idx_to_gpu_ops_map = self.group_gpu_ops_by_cpu_launchers( kineto_gpu_ops, kineto_correlation_cuda_runtime_map, sorted_kineto_cpu_ops, sorted_kineto_cpu_op_ts ) @@ -655,7 +653,7 @@ def map_pytorch_to_kineto_ops( kineto_ops_count = len(kineto_cpu_ops) if pytorch_ops_count > kineto_ops_count: # The specific comment is placed within the if block as requested. - self.logger.warning( + logging.warning( f"Number of PyTorch operators ({pytorch_ops_count}) is larger than the number of Kineto operators " f"({kineto_ops_count}). Expected PyTorch ops (CPU only) to be fewer than Kineto ops (CPU and GPU). " f"Logging this rare but possible scenario." @@ -665,7 +663,7 @@ def map_pytorch_to_kineto_ops( if (pytorch_op.rf_id is not None) and (pytorch_op.rf_id in kineto_rf_id_to_kineto_op_map): kineto_op = kineto_rf_id_to_kineto_op_map[pytorch_op.rf_id] if kineto_op is None: - self.logger.warning( + logging.warning( f"No corresponding Kineto op found for PyTorch op ID: " f"{pytorch_op.id}, Name: '{pytorch_op.name}'." ) @@ -678,7 +676,7 @@ def map_pytorch_to_kineto_ops( pytorch_op_id_to_inter_thread_dep_map[pytorch_op.id], ) = self.link_ops(pytorch_op, kineto_op, cpu_ev_idx_to_gpu_ops_map, kineto_rf_id_to_kineto_op_map) - self.logger.info("Completed mapping of PyTorch operators to Kineto operators.") + logging.info("Completed mapping of PyTorch operators to Kineto operators.") return ( pytorch_op_id_to_kineto_ops_map, pytorch_op_id_to_inclusive_dur_map, @@ -720,7 +718,7 @@ def group_gpu_ops_by_cpu_launchers( ) if not parent_cpu_op: warning_msg = f"Missing parent CPU operator for GPU op '{gpu_op.name}'. Orphaned GPU operator." - self.logger.warning(warning_msg) + logging.warning(warning_msg) continue if parent_cpu_op.ev_idx == "": @@ -728,10 +726,10 @@ def group_gpu_ops_by_cpu_launchers( f"Missing 'ev_idx' for CPU operator {parent_cpu_op.name}. " f"Cannot link GPU op {gpu_op.name} to {parent_cpu_op.name}." ) - self.logger.warning(error_msg) + logging.warning(error_msg) continue - self.logger.debug(f"group_gpu_ops_by_cpu_launchers '{parent_cpu_op.name}' -> '{gpu_op.name}'") + logging.debug(f"group_gpu_ops_by_cpu_launchers '{parent_cpu_op.name}' -> '{gpu_op.name}'") cpu_ev_idx_to_gpu_ops_map.setdefault(parent_cpu_op.ev_idx, []).append(gpu_op) @@ -774,12 +772,12 @@ def find_parent_cpu_op( "incomplete map, cuda_launch_operations, in is_cuda_launch_op. Please update the map properly to cover" " all CUDA runtime launch operators." ) - self.logger.warning(warning_msg) + logging.warning(warning_msg) return None kineto_runtime_op = kineto_correlation_cuda_runtime_map[kineto_gpu_op.correlation] kineto_gpu_op.tid = kineto_runtime_op.tid - self.logger.debug( + logging.debug( f"Found CUDA runtime operation '{kineto_runtime_op.name}' for GPU operator '{kineto_gpu_op.name}'." ) @@ -790,7 +788,7 @@ def find_parent_cpu_op( kineto_gpu_op, sorted_kineto_cpu_ops, sorted_kineto_cpu_op_ts, kineto_runtime_op.timestamp ) if not parent_cpu_op: - self.logger.warning( + logging.warning( f"No parent CPU operator found for GPU operator '{kineto_gpu_op.name}' " f"linked to CUDA runtime operation '{kineto_runtime_op.name}' " f"(ts: {kineto_runtime_op.timestamp})." @@ -933,7 +931,7 @@ def construct_et_plus_data( Returns: Dict: The constructed ET+ data. """ - self.logger.info("Constructing ET+ data.") + logging.info("Constructing ET+ data.") with open(pytorch_et_file, "r") as file: pytorch_et_data = json.load(file) @@ -1056,10 +1054,10 @@ def dump_pytorch_execution_trace_plus(self, pytorch_et_plus_data: Dict, output_f pytorch_et_plus_data (Dict): The constructed ET+ data. output_file (str): The file path where the ET+ data will be saved. """ - self.logger.info(f"Starting to dump ET+ data to {output_file}.") + logging.info(f"Starting to dump ET+ data to {output_file}.") if pytorch_et_plus_data is None: - self.logger.error("ET+ data not constructed. Please run construct_et_plus_data first.") + logging.error("ET+ data not constructed. Please run construct_et_plus_data first.") return if "nodes" in pytorch_et_plus_data: @@ -1068,8 +1066,8 @@ def dump_pytorch_execution_trace_plus(self, pytorch_et_plus_data: Dict, output_f try: with open(output_file, "w") as file: json.dump(pytorch_et_plus_data, file, indent=4) - self.logger.info(f"ET+ data dumped to {output_file}.") + logging.info(f"ET+ data dumped to {output_file}.") except IOError as e: - self.logger.error(f"Failed to dump ET+ data to {output_file}. Error: {e}") + logging.error(f"Failed to dump ET+ data to {output_file}. Error: {e}") except Exception as e: - self.logger.error(f"An unexpected error occurred while dumping ET+ data. Error: {e}") + logging.error(f"An unexpected error occurred while dumping ET+ data. Error: {e}") diff --git a/tests/converter/test_pytorch_converter.py b/tests/converter/test_pytorch_converter.py index 1cc30a1c..d2dd1ff7 100644 --- a/tests/converter/test_pytorch_converter.py +++ b/tests/converter/test_pytorch_converter.py @@ -1,5 +1,4 @@ import json -import logging from typing import Dict from unittest.mock import MagicMock, mock_open, patch @@ -18,13 +17,6 @@ from chakra.src.converter.pytorch_node import PyTorchNode -@pytest.fixture -def mock_logger() -> logging.Logger: - logger = logging.getLogger("PyTorchConverter") - logger.setLevel(logging.DEBUG) - return logger - - @pytest.fixture def sample_pytorch_data() -> Dict: return { @@ -81,26 +73,23 @@ def mock_chakra_node() -> ChakraNode: return node -def test_initialization(mock_logger: logging.Logger) -> None: - converter = PyTorchConverter("input.json", "output.json", mock_logger) +def test_initialization() -> None: + converter = PyTorchConverter("input.json", "output.json") assert converter.input_filename == "input.json" assert converter.output_filename == "output.json" - assert converter.logger == mock_logger @patch("builtins.open", new_callable=mock_open) -def test_load_pytorch_execution_traces( - mock_file: MagicMock, mock_logger: logging.Logger, sample_pytorch_data: Dict -) -> None: +def test_load_pytorch_execution_traces(mock_file: MagicMock, sample_pytorch_data: Dict) -> None: mock_file.return_value.read.return_value = json.dumps(sample_pytorch_data) - converter = PyTorchConverter("input.json", "output.json", mock_logger) + converter = PyTorchConverter("input.json", "output.json") data = converter.load_pytorch_execution_traces() assert data == sample_pytorch_data mock_file.assert_called_once_with("input.json", "r") -def test_parse_and_instantiate_nodes(mock_logger: logging.Logger, sample_pytorch_data: Dict) -> None: - converter = PyTorchConverter("input.json", "output.json", mock_logger) +def test_parse_and_instantiate_nodes(sample_pytorch_data: Dict) -> None: + converter = PyTorchConverter("input.json", "output.json") ( pytorch_schema, pytorch_pid, @@ -142,8 +131,8 @@ def create_sample_graph(parent_id: int = 0, expected_child_id: int = 0) -> Dict[ @pytest.mark.parametrize("parent_id, expected_child_id", [(1, 2), (None, None)]) -def test_establish_parent_child_relationships(mock_logger: MagicMock, parent_id: int, expected_child_id: int) -> None: - converter = PyTorchConverter("input.json", "output.json", mock_logger) +def test_establish_parent_child_relationships(parent_id: int, expected_child_id: int) -> None: + converter = PyTorchConverter("input.json", "output.json") pytorch_nodes = create_sample_graph(parent_id, expected_child_id) pytorch_nodes = converter._establish_parent_child_relationships(pytorch_nodes, []) @@ -154,8 +143,8 @@ def test_establish_parent_child_relationships(mock_logger: MagicMock, parent_id: assert len(pytorch_nodes[1].children) == 0 -def test_convert_nodes(mock_logger: logging.Logger, sample_pytorch_data: Dict) -> None: - converter = PyTorchConverter("input.json", "output.json", mock_logger) +def test_convert_nodes(sample_pytorch_data: Dict) -> None: + converter = PyTorchConverter("input.json", "output.json") ( pytorch_schema, pytorch_pid, @@ -172,8 +161,8 @@ def test_convert_nodes(mock_logger: logging.Logger, sample_pytorch_data: Dict) - assert chakra_nodes[2].id == 2 -def test_convert_ctrl_dep_to_data_dep(mock_logger: logging.Logger, sample_pytorch_data: Dict) -> None: - converter = PyTorchConverter("input.json", "output.json", mock_logger) +def test_convert_ctrl_dep_to_data_dep(sample_pytorch_data: Dict) -> None: + converter = PyTorchConverter("input.json", "output.json") ( pytorch_schema, pytorch_pid, @@ -191,8 +180,8 @@ def test_convert_ctrl_dep_to_data_dep(mock_logger: logging.Logger, sample_pytorc @patch("builtins.open", new_callable=mock_open) -def test_write_chakra_et(mock_file: MagicMock, mock_logger: logging.Logger, sample_pytorch_data: Dict) -> None: - converter = PyTorchConverter("input.json", "output.json", mock_logger) +def test_write_chakra_et(mock_file: MagicMock, sample_pytorch_data: Dict) -> None: + converter = PyTorchConverter("input.json", "output.json") converter.chakra_et = mock_file() ( pytorch_schema, @@ -218,8 +207,8 @@ def test_write_chakra_et(mock_file: MagicMock, mock_logger: logging.Logger, samp @patch("builtins.open", new_callable=mock_open) -def test_close_chakra_execution_trace(mock_file: MagicMock, mock_logger: logging.Logger) -> None: - converter = PyTorchConverter("input.json", "output.json", mock_logger) +def test_close_chakra_execution_trace(mock_file: MagicMock) -> None: + converter = PyTorchConverter("input.json", "output.json") file_handle = mock_file() file_handle.closed = False # Simulate an open file converter.chakra_et = file_handle @@ -236,9 +225,7 @@ def test_close_chakra_execution_trace(mock_file: MagicMock, mock_logger: logging ({"name": "other_op", "is_gpu_op": False}, COMP_NODE), ], ) -def test_get_chakra_node_type_from_pytorch_node( - mock_logger: logging.Logger, pytorch_node_data: Dict, expected_type: int -) -> None: +def test_get_chakra_node_type_from_pytorch_node(pytorch_node_data: Dict, expected_type: int) -> None: # Create a mock PyTorchNode with the required attributes pytorch_node = MagicMock(spec=PyTorchNode) pytorch_node.name = pytorch_node_data["name"] @@ -257,7 +244,7 @@ def test_get_chakra_node_type_from_pytorch_node( mock_pytorch_node = PyTorchNode("1.0.2-chakra.0.0.4", mock_pytorch_node_data) pytorch_nodes = {0: mock_pytorch_node, 1: pytorch_node} - converter = PyTorchConverter("input.json", "output.json", mock_logger) + converter = PyTorchConverter("input.json", "output.json") node_type = converter.get_chakra_node_type_from_pytorch_node(pytorch_nodes, pytorch_node) assert node_type == expected_type @@ -272,7 +259,7 @@ def test_get_chakra_node_type_from_pytorch_node( ("broadcast", BROADCAST), ], ) -def test_get_collective_comm_type(mock_logger: logging.Logger, name: str, expected_comm_type: int) -> None: - converter = PyTorchConverter("input.json", "output.json", mock_logger) +def test_get_collective_comm_type(name: str, expected_comm_type: int) -> None: + converter = PyTorchConverter("input.json", "output.json") comm_type = converter.get_collective_comm_type(name) assert comm_type == expected_comm_type From fac83123b42ac6cad23bec756035b8d6b6c30e7b Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Wed, 15 May 2024 20:03:25 -0400 Subject: [PATCH 2/3] Add tests/data/1.0.2-chakra.0.0.4/llama_pytorch24.05.tgz --- .gitattributes | 1 + tests/data/1.0.2-chakra.0.0.4/llama_pytorch24.05.tgz | 3 +++ 2 files changed, 4 insertions(+) create mode 100644 .gitattributes create mode 100644 tests/data/1.0.2-chakra.0.0.4/llama_pytorch24.05.tgz diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 00000000..3606a430 --- /dev/null +++ b/.gitattributes @@ -0,0 +1 @@ +*.tgz filter=lfs diff=lfs merge=lfs -text diff --git a/tests/data/1.0.2-chakra.0.0.4/llama_pytorch24.05.tgz b/tests/data/1.0.2-chakra.0.0.4/llama_pytorch24.05.tgz new file mode 100644 index 00000000..a7f62ca6 --- /dev/null +++ b/tests/data/1.0.2-chakra.0.0.4/llama_pytorch24.05.tgz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:0090ea7a1190504d391023183f49ce254f3ec9ae0972f753b103e0c33c72875c +size 163568333 From 63176c87cf1b764afe6e0957a469f584cd9549b7 Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Wed, 15 May 2024 20:15:46 -0400 Subject: [PATCH 3/3] Add integration_tests.yml --- .github/workflows/integration_tests.yml | 35 +++++ ci_tools/integration_tests.py | 160 +++++++++++++++++++++++ tests/ci_tools/test_integration_tests.py | 49 +++++++ 3 files changed, 244 insertions(+) create mode 100644 .github/workflows/integration_tests.yml create mode 100644 ci_tools/integration_tests.py create mode 100644 tests/ci_tools/test_integration_tests.py diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml new file mode 100644 index 00000000..6bb72ebe --- /dev/null +++ b/.github/workflows/integration_tests.yml @@ -0,0 +1,35 @@ +name: Integration Tests + +on: pull_request + +jobs: + integration-tests: + runs-on: ubuntu-latest + + steps: + - name: Checkout Code + uses: actions/checkout@v2 + with: + lfs: true + + - name: Setup Python Environment + uses: actions/setup-python@v2 + with: + python-version: '3.10.14' + + - name: Install Dependencies + run: | + pip install -r requirements-dev.txt + pip install . + + - name: Install PARAM + run: | + git clone https://github.com/facebookresearch/param.git + cd param/train/compute/python/ + git checkout c83ce8429110a86549c40fec5a01acbd9fbd54a4 + pip install . + + - name: Extract and Validate + run: | + python3 ci_tools/integration_tests.py --tgz_path tests/data/1.0.2-chakra.0.0.4/llama_pytorch24.05.tgz \ + --num_ranks 8 --tolerance 0.05 --expected_times_ms 14597 14597 14968 14638 14649 14700 14677 14735 diff --git a/ci_tools/integration_tests.py b/ci_tools/integration_tests.py new file mode 100644 index 00000000..bc2e4ddb --- /dev/null +++ b/ci_tools/integration_tests.py @@ -0,0 +1,160 @@ +import argparse +import concurrent.futures +import os +import re +import subprocess +import tarfile + + +def extract_tgz(tgz_path: str, extract_to: str) -> None: + """ + Extracts a .tgz file to the specified directory. + + Args: + tgz_path (str): Path to the .tgz file. + extract_to (str): Directory to extract the files to. + """ + print(f"Extracting {tgz_path} to {extract_to}") + with tarfile.open(tgz_path, "r:gz") as tar: + tar.extractall(path=extract_to) + + +def run_command(command: str) -> None: + """ + Executes a given shell command and checks for errors. + + Args: + command (str): The shell command to execute. + + Raises: + RuntimeError: If the command fails. + """ + print(f"Running command: {command}") + os.environ["PATH"] = "/Users/theo/venv/bin/:" + os.environ.get("PATH", "") + try: + subprocess.run(command, check=True, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Command failed: {command}") from e + + +def run_commands_in_parallel(commands: list) -> None: + """ + Executes multiple commands in parallel. + + Args: + commands (list): A list of shell commands to execute. + """ + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [executor.submit(run_command, cmd) for cmd in commands] + for future in concurrent.futures.as_completed(futures): + future.result() + + +def run_trace_link(data_path: str, num_ranks: int) -> None: + """ + Prepares and runs chakra_trace_link commands in parallel for each file pair. + + Args: + data_path (str): The directory where the data files are located. + num_ranks (int): The number of file pairs to process. + """ + commands = [ + f"chakra_trace_link --pytorch-et-file {data_path}/chakra_host_et_{i}.json " + f"--kineto-file {data_path}/kineto_{i}.json " + f"--output-file {data_path}/chakra_et_plus_{i}.json" + for i in range(num_ranks) + ] + run_commands_in_parallel(commands) + + +def run_converter(data_path: str, num_ranks: int) -> None: + """ + Prepares and runs chakra_converter commands in parallel for each output of chakra_trace_link. + + Args: + data_path (str): The directory where the output files are located. + num_ranks (int): The number of output files to process. + """ + commands = [ + f"chakra_converter --input_filename {data_path}/chakra_et_plus_{i}.json " + f"--output_filename {data_path}/chakra_final_{i}.chakra " + f"--input_type PyTorch --log_filename /tmp/rank_{i}.log" + for i in range(num_ranks) + ] + run_commands_in_parallel(commands) + + +def validate_log(filename: str, expected_time_us: int, tolerance: float) -> None: + """ + Validates the log file to ensure the last operation completes within the expected time with an allowable error. + + Args: + filename (str): Path to the log file. + expected_time_us (int): Expected completion time in microseconds. + tolerance (float): Acceptable error percentage as a decimal. + + Raises: + ValueError: If the log does not contain the expected output or is outside the acceptable time range. + """ + completion_pattern = re.compile( + r"INFO \[\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2} PM\] GPU Node ID \d+ completed at (\d+)us" + ) + with open(filename, "r") as file: + last_time = None + for line in file: + match = completion_pattern.search(line) + if match: + last_time = int(match.group(1)) + + if last_time is None: + raise ValueError(f"No completion time found in {filename}") + + lower_bound = expected_time_us * (1 - tolerance) + upper_bound = expected_time_us * (1 + tolerance) + + if not lower_bound <= last_time <= upper_bound: + raise ValueError( + f"Completion time in {filename} is {last_time}us; expected between {lower_bound}us and {upper_bound}us." + ) + print(f"Validation successful for {filename}: {last_time}us is within the acceptable range.") + + +def parse_args(): + """ + Parses command line arguments. + """ + parser = argparse.ArgumentParser(description="Run integration tests for chakra_trace_link and chakra_converter.") + parser.add_argument("--tgz_path", type=str, required=True, help="Path to the tgz file to extract.") + parser.add_argument("--num_ranks", type=int, required=True, help="Number of ranks to process.") + parser.add_argument("--tolerance", type=float, required=True, help="Acceptable error percentage as a decimal.") + parser.add_argument( + "--expected_times_ms", type=int, nargs="+", required=True, help="List of expected times in milliseconds." + ) + return parser.parse_args() + + +def main() -> None: + """ + Main function to execute the integration test sequence. + """ + args = parse_args() + extract_dir = os.path.dirname(args.tgz_path) + data_path = os.path.join(extract_dir, os.path.basename(args.tgz_path).replace(".tgz", "")) + + # Extracting files + extract_tgz(args.tgz_path, extract_dir) + + expected_times_us = [time * 1000 for time in args.expected_times_ms] + + # Run trace link and converter processes + run_trace_link(data_path, args.num_ranks) + run_converter(data_path, args.num_ranks) + + # Validate output logs + for i in range(args.num_ranks): + log_file = f"/tmp/rank_{i}.log" + validate_log(log_file, expected_times_us[i], args.tolerance) + + +if __name__ == "__main__": + main() diff --git a/tests/ci_tools/test_integration_tests.py b/tests/ci_tools/test_integration_tests.py new file mode 100644 index 00000000..ee516cd0 --- /dev/null +++ b/tests/ci_tools/test_integration_tests.py @@ -0,0 +1,49 @@ +import subprocess +from unittest.mock import MagicMock, patch + +import pytest + +from ci_tools.integration_tests import extract_tgz, run_command, validate_log + + +def test_extract_tgz(): + """Test extracting a tgz file to ensure tarfile.open and extractall are called.""" + with patch("tarfile.open", MagicMock()) as mock_tar: + mock_tar.return_value.__enter__.return_value.extractall = MagicMock() + extract_tgz("path/to/test.tgz", "path/to/extract") + mock_tar.assert_called_once_with("path/to/test.tgz", "r:gz") + mock_tar.return_value.__enter__.return_value.extractall.assert_called_once_with(path="path/to/extract") + + +def test_run_command_success(): + """Test run_command with a command that succeeds without raising an error.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + run_command("echo 'Hello World'") + mock_run.assert_called_once() + + +def test_run_command_failure(): + """Test run_command with a command that fails and should raise RuntimeError.""" + with patch("subprocess.run", side_effect=subprocess.CalledProcessError(1, "cmd", "Error message")): + with pytest.raises(RuntimeError) as excinfo: + run_command("exit 1") + assert "Command failed: exit 1" in str(excinfo.value) + + +def test_validate_log_success(tmp_path): + """Test validate_log to ensure it passes when the last operation completes within the expected time.""" + log_file = tmp_path / "log.txt" + log_file.write_text("INFO [05/15/2024 08:32:04 PM] GPU Node ID 301123 completed at 1000000us") + validate_log(str(log_file), 1000000, 0.05) + + +def test_validate_log_failure(tmp_path): + """ + Test validate_log to ensure it raises a ValueError when the last operation is outside the acceptable time range. + """ + log_file = tmp_path / "log.txt" + log_file.write_text("INFO [05/15/2024 08:32:04 PM] GPU Node ID 301123 completed at 900000us") + with pytest.raises(ValueError) as excinfo: + validate_log(str(log_file), 1000000, 0.05) + assert "expected between" in str(excinfo.value)