Skip to content

Commit

Permalink
Merge pull request #119 from mlcommons/unit-test
Browse files Browse the repository at this point in the history
Refactor KinetoOperator and TraceLinker for code quality and test coverage
  • Loading branch information
TaekyungHeo authored Jul 13, 2024
2 parents cc9ebfa + c2dd55a commit 695b761
Show file tree
Hide file tree
Showing 4 changed files with 493 additions and 282 deletions.
54 changes: 41 additions & 13 deletions src/trace_link/kineto_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ class KinetoOperator:
name (str): Name of the operator.
phase (Optional[str]): Execution phase of the operator.
inclusive_dur (int): Total duration of the operator, including its children.
exclusive_dur (int): Duration of the operator execution alone. Corresponds to the self time field in chrome://tracing.
exclusive_dur (int): Duration of the operator execution alone. Corresponds to the self time field in
chrome://tracing.
timestamp (int): Start time of the operator in microseconds.
external_id (int): An external identifier associated with the operator.
ev_idx (int): Event index of the operator.
Expand Down Expand Up @@ -86,14 +87,50 @@ def is_cpu_op(self) -> bool:
return True
return False

def is_cuda_launch_op(self) -> bool:
def is_cuda_runtime_op(self) -> bool:
"""
Determine whether the operator is a CUDA runtime operator.
Returns
bool: True if it's a CUDA runtime operator, otherwise False.
"""
return self.category == "cuda_runtime"

def is_cuda_driver_op(self) -> bool:
"""
Determine whether the operator is a CUDA driver operator.
Returns
bool: True if it's a CUDA driver operator, otherwise False.
"""
return self.category == "cuda_driver"

def is_ac2g_op(self) -> bool:
"""
Check if the operator is categorized as 'ac2g', which stands for arrows from CPU to GPU.
Excerpt from https://pytorch.org/docs/stable/torch.compiler_profiling_torch_compile.html
```
Every kernel on the GPU occurs after being launched by code running on the CPU. The profiler can draw
connections (i.e. "flows") between the GPU and CPU events to show which CPU event launched a GPU kernel.
This is particularly helpful because, with a few exceptions, GPU kernels are launched asynchronously.
To view a flow connection, click on a GPU kernel and click "ac2g".
````
Returns
bool: True if the operator is an 'ac2g' type, otherwise False.
"""
return self.category == "ac2g"

def is_kernel_launch_op(self) -> bool:
"""
Determine whether the operator is a kernel-launching CUDA runtime operator.
Returns
bool: True if it's a launch operation, otherwise False.
"""
cuda_launch_categories = {"cuda_runtime", "cuda_driver"}
cuda_launch_categories = self.is_cuda_runtime_op() or self.is_cuda_driver_op()
cuda_launch_operations = {
"cuLaunchKernel",
"cuLaunchKernelEx",
Expand All @@ -105,7 +142,7 @@ def is_cuda_launch_op(self) -> bool:
"cudaMemcpyToSymbol",
"cudaLaunchCooperativeKernel",
}
return self.category in cuda_launch_categories and self.name in cuda_launch_operations
return cuda_launch_categories and self.name in cuda_launch_operations

def is_gpu_op(self) -> bool:
"""
Expand All @@ -116,12 +153,3 @@ def is_gpu_op(self) -> bool:
"""
gpu_categories = {"kernel", "gpu_memcpy"}
return self.category in gpu_categories

def is_arrow_op(self) -> bool:
"""
Check if the operator is categorized as 'ac2g', which stands for arrows from CPU to GPU.
Returns
bool: True if the operator is an 'ac2g' type, otherwise False.
"""
return self.category == "ac2g"
8 changes: 4 additions & 4 deletions src/trace_link/trace_linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def construct_kineto_data_structures(self, kineto_ops: List[KinetoOperator]) ->
kineto_tid_cpu_ops_map.setdefault(op.tid, []).append(op)
logging.debug(f"Added CPU or user annotation op: {op.name}")

elif op.is_cuda_launch_op():
elif op.is_kernel_launch_op():
kineto_id_cuda_launch_op_map[op.external_id] = op
if op.correlation in kineto_correlation_cuda_runtime_map:
raise ValueError(
Expand All @@ -224,7 +224,7 @@ def construct_kineto_data_structures(self, kineto_ops: List[KinetoOperator]) ->
kineto_gpu_ops.append(op)
logging.debug(f"Added GPU op: {op.name}")

elif op.is_arrow_op():
elif op.is_ac2g_op(): # arrow from CPU to GPU
assert (op.phase == "s") or (op.phase == "f")
if op.id is None:
error_msg = (
Expand Down Expand Up @@ -795,8 +795,8 @@ def find_parent_cpu_op(
"kernel operator. It can be a case where CUDA runtime operators are not properly identified and added "
"to the map, kineto_correlation_cuda_runtime_map. Please manually check if the corresponding CUDA "
"runtime operator with the correlation is dropped by mistake. It is likely that it is because of "
"incomplete map, cuda_launch_operations, in is_cuda_launch_op. Please update the map properly to cover"
" all CUDA runtime launch operators."
"incomplete map, cuda_launch_operations, in is_kernel_launch_op. Please update the map properly to "
"cover all CUDA runtime launch operators."
)
logging.warning(warning_msg)
return None
Expand Down
127 changes: 124 additions & 3 deletions tests/trace_link/test_kineto_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,103 @@ def test_repr_method(sample_operator_data):
assert repr(operator) == expected_repr


@pytest.mark.parametrize(
"category, expected",
[
("cpu_op", True),
("user_annotation", True),
("ProfilerStep", False),
("cuda_runtime", False),
("cuda_driver", False),
],
)
def test_is_cpu_op(category, expected):
"""Test the is_cpu_op method with various inputs."""
operator_data = {
"cat": category,
"name": "someOperation",
"ph": "X",
"dur": 100,
"ts": 1590000000,
"tid": 1234,
"args": {"External id": "123", "Ev Idx": "456", "stream": 7, "Record function id": 12, "correlation": 99},
}
operator = KinetoOperator(operator_data)
assert operator.is_cpu_op() == expected


@pytest.mark.parametrize(
"category, expected",
[
("cuda_runtime", True),
("kernel", False),
("cuda_driver", False),
("cpu_op", False),
],
)
def test_is_cuda_runtime_op(category, expected):
"""Test the is_cuda_runtime_op method with various inputs."""
operator_data = {
"cat": category,
"name": "someOperation",
"ph": "X",
"dur": 100,
"ts": 1590000000,
"tid": 1234,
"args": {"External id": "123", "Ev Idx": "456", "stream": 7, "Record function id": 12, "correlation": 99},
}
operator = KinetoOperator(operator_data)
assert operator.is_cuda_runtime_op() == expected


@pytest.mark.parametrize(
"category, expected",
[
("cuda_driver", True),
("kernel", False),
("cuda_runtime", False),
("cpu_op", False),
],
)
def test_is_cuda_driver_op(category, expected):
"""Test the is_cuda_driver_op method with various inputs."""
operator_data = {
"cat": category,
"name": "someOperation",
"ph": "X",
"dur": 100,
"ts": 1590000000,
"tid": 1234,
"args": {"External id": "123", "Ev Idx": "456", "stream": 7, "Record function id": 12, "correlation": 99},
}
operator = KinetoOperator(operator_data)
assert operator.is_cuda_driver_op() == expected


@pytest.mark.parametrize(
"category, expected",
[
("ac2g", True),
("kernel", False),
("cuda_runtime", False),
("cpu_op", False),
],
)
def test_is_ac2g_op(category, expected):
"""Test the is_ac2g_op method with various inputs."""
operator_data = {
"cat": category,
"name": "someOperation",
"ph": "X",
"dur": 100,
"ts": 1590000000,
"tid": 1234,
"args": {"External id": "123", "Ev Idx": "456", "stream": 7, "Record function id": 12, "correlation": 99},
}
operator = KinetoOperator(operator_data)
assert operator.is_ac2g_op() == expected


@pytest.mark.parametrize(
"category, name, expected",
[
Expand All @@ -71,8 +168,8 @@ def test_repr_method(sample_operator_data):
("some_other_category", "cudaLaunchKernel", False),
],
)
def test_is_cuda_launch_op(category, name, expected):
"""Test the is_cuda_launch_op method with various inputs."""
def test_is_kernel_launch_op(category, name, expected):
"""Test the is_kernel_launch_op method with various inputs."""
operator_data = {
"cat": category,
"name": name,
Expand All @@ -83,4 +180,28 @@ def test_is_cuda_launch_op(category, name, expected):
"args": {"External id": "123", "Ev Idx": "456", "stream": 7, "Record function id": 12, "correlation": 99},
}
operator = KinetoOperator(operator_data)
assert operator.is_cuda_launch_op() == expected
assert operator.is_kernel_launch_op() == expected


@pytest.mark.parametrize(
"category, expected",
[
("kernel", True),
("gpu_memcpy", True),
("cuda_runtime", False),
("cpu_op", False),
],
)
def test_is_gpu_op(category, expected):
"""Test the is_gpu_op method with various inputs."""
operator_data = {
"cat": category,
"name": "someOperation",
"ph": "X",
"dur": 100,
"ts": 1590000000,
"tid": 1234,
"args": {"External id": "123", "Ev Idx": "456", "stream": 7, "Record function id": 12, "correlation": 99},
}
operator = KinetoOperator(operator_data)
assert operator.is_gpu_op() == expected
Loading

0 comments on commit 695b761

Please sign in to comment.