import functools
from collections import OrderedDict
from typing import Any, Callable, Dict, List, Mapping, Sequence, Tuple, Union, cast
import torch
from ignite.engine import Engine, EventEnum, Events
from ignite.handlers import Timer
[docs]class BasicTimeProfiler:
"""
BasicTimeProfiler can be used to profile the handlers,
events, data loading and data processing times.
Examples:
.. code-block:: python
from ignite.contrib.handlers import BasicTimeProfiler
trainer = Engine(train_updater)
# Create an object of the profiler and attach an engine to it
profiler = BasicTimeProfiler()
profiler.attach(trainer)
@trainer.on(Events.EPOCH_COMPLETED)
def log_intermediate_results():
profiler.print_results(profiler.get_results())
trainer.run(dataloader, max_epochs=3)
profiler.write_results('path_to_dir/time_profiling.csv')
"""
events_to_ignore = [
Events.EXCEPTION_RAISED,
Events.TERMINATE,
Events.TERMINATE_SINGLE_EPOCH,
Events.DATALOADER_STOP_ITERATION,
]
def __init__(self) -> None:
self._dataflow_timer = Timer()
self._processing_timer = Timer()
self._event_handlers_timer = Timer()
self.dataflow_times = torch.zeros(1)
self.processing_times = torch.zeros(1)
self.event_handlers_times = {} # type: Dict[EventEnum, torch.Tensor]
self._events = [
Events.EPOCH_STARTED,
Events.EPOCH_COMPLETED,
Events.ITERATION_STARTED,
Events.ITERATION_COMPLETED,
Events.GET_BATCH_STARTED,
Events.GET_BATCH_COMPLETED,
Events.COMPLETED,
]
self._fmethods = [
self._as_first_epoch_started,
self._as_first_epoch_completed,
self._as_first_iter_started,
self._as_first_iter_completed,
self._as_first_get_batch_started,
self._as_first_get_batch_completed,
self._as_first_completed,
]
self._lmethods = [
self._as_last_epoch_started,
self._as_last_epoch_completed,
self._as_last_iter_started,
self._as_last_iter_completed,
self._as_last_get_batch_started,
self._as_last_get_batch_completed,
self._as_last_completed,
]
def _reset(self, num_epochs: int, total_num_iters: int) -> None:
self.dataflow_times = torch.zeros(total_num_iters)
self.processing_times = torch.zeros(total_num_iters)
self.event_handlers_times = {
Events.STARTED: torch.zeros(1),
Events.COMPLETED: torch.zeros(1),
Events.EPOCH_STARTED: torch.zeros(num_epochs),
Events.EPOCH_COMPLETED: torch.zeros(num_epochs),
Events.ITERATION_STARTED: torch.zeros(total_num_iters),
Events.ITERATION_COMPLETED: torch.zeros(total_num_iters),
Events.GET_BATCH_COMPLETED: torch.zeros(total_num_iters),
Events.GET_BATCH_STARTED: torch.zeros(total_num_iters),
}
def _as_first_started(self, engine: Engine) -> None:
if hasattr(engine.state.dataloader, "__len__"):
num_iters_per_epoch = len(engine.state.dataloader) # type: ignore[arg-type]
else:
if engine.state.epoch_length is None:
raise ValueError(
"As epoch_length is not set, we can not use BasicTimeProfiler in this case."
"Please, set trainer.run(..., epoch_length=epoch_length) in order to fix this."
)
num_iters_per_epoch = engine.state.epoch_length
self.max_epochs = cast(int, engine.state.max_epochs)
self.total_num_iters = self.max_epochs * num_iters_per_epoch
self._reset(self.max_epochs, self.total_num_iters)
self.event_handlers_names = {
e: [
h.__qualname__ if hasattr(h, "__qualname__") else h.__class__.__name__
for (h, _, _) in engine._event_handlers[e]
if "BasicTimeProfiler." not in repr(h) # avoid adding internal handlers into output
]
for e in Events
if e not in self.events_to_ignore
}
# Setup all other handlers:
engine._event_handlers[Events.STARTED].append((self._as_last_started, (engine,), {}))
for e, m in zip(self._events, self._fmethods):
engine._event_handlers[e].insert(0, (m, (engine,), {}))
for e, m in zip(self._events, self._lmethods):
engine._event_handlers[e].append((m, (engine,), {}))
# Let's go
self._event_handlers_timer.reset()
def _as_last_started(self, engine: Engine) -> None:
self.event_handlers_times[Events.STARTED][0] = self._event_handlers_timer.value()
def _as_first_epoch_started(self, engine: Engine) -> None:
self._event_handlers_timer.reset()
def _as_last_epoch_started(self, engine: Engine) -> None:
t = self._event_handlers_timer.value()
e = engine.state.epoch - 1
self.event_handlers_times[Events.EPOCH_STARTED][e] = t
def _as_first_get_batch_started(self, engine: Engine) -> None:
self._event_handlers_timer.reset()
self._dataflow_timer.reset()
def _as_last_get_batch_started(self, engine: Engine) -> None:
t = self._event_handlers_timer.value()
i = engine.state.iteration - 1
self.event_handlers_times[Events.GET_BATCH_STARTED][i] = t
def _as_first_get_batch_completed(self, engine: Engine) -> None:
self._event_handlers_timer.reset()
def _as_last_get_batch_completed(self, engine: Engine) -> None:
t = self._event_handlers_timer.value()
i = engine.state.iteration - 1
self.event_handlers_times[Events.GET_BATCH_COMPLETED][i] = t
d = self._dataflow_timer.value()
self.dataflow_times[i] = d
self._dataflow_timer.reset()
def _as_first_iter_started(self, engine: Engine) -> None:
self._event_handlers_timer.reset()
def _as_last_iter_started(self, engine: Engine) -> None:
t = self._event_handlers_timer.value()
i = engine.state.iteration - 1
self.event_handlers_times[Events.ITERATION_STARTED][i] = t
self._processing_timer.reset()
def _as_first_iter_completed(self, engine: Engine) -> None:
t = self._processing_timer.value()
i = engine.state.iteration - 1
self.processing_times[i] = t
self._event_handlers_timer.reset()
def _as_last_iter_completed(self, engine: Engine) -> None:
t = self._event_handlers_timer.value()
i = engine.state.iteration - 1
self.event_handlers_times[Events.ITERATION_COMPLETED][i] = t
def _as_first_epoch_completed(self, engine: Engine) -> None:
self._event_handlers_timer.reset()
def _as_last_epoch_completed(self, engine: Engine) -> None:
t = self._event_handlers_timer.value()
e = engine.state.epoch - 1
self.event_handlers_times[Events.EPOCH_COMPLETED][e] = t
def _as_first_completed(self, engine: Engine) -> None:
self._event_handlers_timer.reset()
def _as_last_completed(self, engine: Engine) -> None:
self.event_handlers_times[Events.COMPLETED][0] = self._event_handlers_timer.value()
# Remove added handlers:
engine.remove_event_handler(self._as_last_started, Events.STARTED)
for e, m in zip(self._events, self._fmethods):
engine.remove_event_handler(m, e)
for e, m in zip(self._events, self._lmethods):
engine.remove_event_handler(m, e)
def attach(self, engine: Engine) -> None:
if not isinstance(engine, Engine):
raise TypeError(f"Argument engine should be ignite.engine.Engine, but given {type(engine)}")
if not engine.has_event_handler(self._as_first_started):
engine._event_handlers[Events.STARTED].insert(0, (self._as_first_started, (engine,), {}))
@staticmethod
def _compute_basic_stats(data: torch.Tensor) -> Dict[str, Union[str, float, Tuple[Union[float], Union[float]]]]:
# compute on non-zero data:
data = data[data > 0]
out = [
("total", torch.sum(data).item() if len(data) > 0 else "not yet triggered")
] # type: List[Tuple[str, Union[str, float, Tuple[Union[float], Union[float]]]]]
if len(data) > 1:
out += [
("min/index", (torch.min(data).item(), torch.argmin(data).item())),
("max/index", (torch.max(data).item(), torch.argmax(data).item())),
("mean", torch.mean(data).item()),
("std", torch.std(data).item()),
]
return OrderedDict(out)
[docs] def get_results(self) -> Dict[str, Dict[str, Any]]:
"""
Method to fetch the aggregated profiler results after the engine is run
.. code-block:: python
results = profiler.get_results()
"""
total_eh_time = sum(
[(self.event_handlers_times[e]).sum() for e in Events if e not in self.events_to_ignore]
) # type: Union[int, torch.Tensor]
event_handlers_stats = dict(
[
(str(e.name).replace(".", "_"), self._compute_basic_stats(self.event_handlers_times[e]))
for e in Events
if e not in self.events_to_ignore
]
+ [("total_time", total_eh_time)] # type: ignore[list-item]
)
return OrderedDict(
[
("processing_stats", self._compute_basic_stats(self.processing_times)),
("dataflow_stats", self._compute_basic_stats(self.dataflow_times)),
("event_handlers_stats", event_handlers_stats,),
(
"event_handlers_names",
{str(e.name).replace(".", "_") + "_names": v for e, v in self.event_handlers_names.items()},
),
]
)
[docs] def write_results(self, output_path: str) -> None:
"""
Method to store the unaggregated profiling results to a csv file
.. code-block:: python
profiler.write_results('path_to_dir/awesome_filename.csv')
Example output:
.. code-block:: text
-----------------------------------------------------------------
epoch iteration processing_stats dataflow_stats Event_STARTED ...
1.0 1.0 0.00003 0.252387 0.125676
1.0 2.0 0.00029 0.252342 0.125123
"""
try:
import pandas as pd
except ImportError:
print("Need pandas to write results as files")
return
iters_per_epoch = self.total_num_iters // self.max_epochs
epochs = torch.arange(self.max_epochs, dtype=torch.float32).repeat_interleave(iters_per_epoch) + 1
iterations = torch.arange(self.total_num_iters, dtype=torch.float32) + 1
processing_stats = self.processing_times
dataflow_stats = self.dataflow_times
event_started = self.event_handlers_times[Events.STARTED].repeat_interleave(self.total_num_iters)
event_completed = self.event_handlers_times[Events.COMPLETED].repeat_interleave(self.total_num_iters)
event_epoch_started = self.event_handlers_times[Events.EPOCH_STARTED].repeat_interleave(iters_per_epoch)
event_epoch_completed = self.event_handlers_times[Events.EPOCH_COMPLETED].repeat_interleave(iters_per_epoch)
event_iter_started = self.event_handlers_times[Events.ITERATION_STARTED]
event_iter_completed = self.event_handlers_times[Events.ITERATION_COMPLETED]
event_batch_started = self.event_handlers_times[Events.GET_BATCH_STARTED]
event_batch_completed = self.event_handlers_times[Events.GET_BATCH_COMPLETED]
results_dump = torch.stack(
[
epochs,
iterations,
processing_stats,
dataflow_stats,
event_started,
event_completed,
event_epoch_started,
event_epoch_completed,
event_iter_started,
event_iter_completed,
event_batch_started,
event_batch_completed,
],
dim=1,
).numpy()
results_df = pd.DataFrame(
data=results_dump,
columns=[
"epoch",
"iteration",
"processing_stats",
"dataflow_stats",
"Event_STARTED",
"Event_COMPLETED",
"Event_EPOCH_STARTED",
"Event_EPOCH_COMPLETED",
"Event_ITERATION_STARTED",
"Event_ITERATION_COMPLETED",
"Event_GET_BATCH_STARTED",
"Event_GET_BATCH_COMPLETED",
],
)
results_df.to_csv(output_path, index=False)
[docs] @staticmethod
def print_results(results: Dict) -> str:
"""
Method to print the aggregated results from the profiler
.. code-block:: python
profiler.print_results(results)
Example output:
.. code-block:: text
----------------------------------------------------
| Time profiling stats (in seconds): |
----------------------------------------------------
total | min/index | max/index | mean | std
Processing function:
157.46292 | 0.01452/1501 | 0.26905/0 | 0.07730 | 0.01258
Dataflow:
6.11384 | 0.00008/1935 | 0.28461/1551 | 0.00300 | 0.02693
Event handlers:
2.82721
- Events.STARTED: []
0.00000
- Events.EPOCH_STARTED: []
0.00006 | 0.00000/0 | 0.00000/17 | 0.00000 | 0.00000
- Events.ITERATION_STARTED: ['PiecewiseLinear']
0.03482 | 0.00001/188 | 0.00018/679 | 0.00002 | 0.00001
- Events.ITERATION_COMPLETED: ['TerminateOnNan']
0.20037 | 0.00006/866 | 0.00089/1943 | 0.00010 | 0.00003
- Events.EPOCH_COMPLETED: ['empty_cuda_cache', 'training.<locals>.log_elapsed_time', ]
2.57860 | 0.11529/0 | 0.14977/13 | 0.12893 | 0.00790
- Events.COMPLETED: []
not yet triggered
"""
def to_str(v: Union[str, tuple]) -> str:
if isinstance(v, str):
return v
elif isinstance(v, tuple):
return f"{v[0]:.5f}/{v[1]}"
return f"{v:.5f}"
def odict_to_str(d: Mapping) -> str:
out = " | ".join([to_str(v) for v in d.values()])
return out
others = {
k: odict_to_str(v) if isinstance(v, OrderedDict) else v for k, v in results["event_handlers_stats"].items()
}
others.update(results["event_handlers_names"])
output_message = """
----------------------------------------------------
| Time profiling stats (in seconds): |
----------------------------------------------------
total | min/index | max/index | mean | std
Processing function:
{processing_stats}
Dataflow:
{dataflow_stats}
Event handlers:
{total_time:.5f}
- Events.STARTED: {STARTED_names}
{STARTED}
- Events.EPOCH_STARTED: {EPOCH_STARTED_names}
{EPOCH_STARTED}
- Events.ITERATION_STARTED: {ITERATION_STARTED_names}
{ITERATION_STARTED}
- Events.ITERATION_COMPLETED: {ITERATION_COMPLETED_names}
{ITERATION_COMPLETED}
- Events.EPOCH_COMPLETED: {EPOCH_COMPLETED_names}
{EPOCH_COMPLETED}
- Events.COMPLETED: {COMPLETED_names}
{COMPLETED}
""".format(
processing_stats=odict_to_str(results["processing_stats"]),
dataflow_stats=odict_to_str(results["dataflow_stats"]),
**others,
)
print(output_message)
return output_message
[docs]class HandlersTimeProfiler:
"""
HandlersTimeProfiler can be used to profile the handlers,
data loading and data processing times. Custom events are also
profiled by this profiler
Examples:
.. code-block:: python
from ignite.contrib.handlers import HandlersTimeProfiler
trainer = Engine(train_updater)
# Create an object of the profiler and attach an engine to it
profiler = HandlersTimeProfiler()
profiler.attach(trainer)
@trainer.on(Events.EPOCH_COMPLETED)
def log_intermediate_results():
profiler.print_results(profiler.get_results())
trainer.run(dataloader, max_epochs=3)
profiler.write_results('path_to_dir/time_profiling.csv')
"""
EVENT_FILTER_THESHOLD_TIME = 0.0001
def __init__(self) -> None:
self._dataflow_timer = Timer()
self._processing_timer = Timer()
self._event_handlers_timer = Timer()
self.dataflow_times = [] # type: List[float]
self.processing_times = [] # type: List[float]
self.event_handlers_times = {} # type: Dict[EventEnum, Dict[str, List[float]]]
@staticmethod
def _get_callable_name(handler: Callable) -> str:
# get name of the callable handler
return getattr(handler, "__qualname__", handler.__class__.__name__)
def _create_wrapped_handler(self, handler: Callable, event: EventEnum) -> Callable:
@functools.wraps(handler)
def _timeit_handler(*args: Any, **kwargs: Any) -> None:
self._event_handlers_timer.reset()
handler(*args, **kwargs)
t = self._event_handlers_timer.value()
hname = self._get_callable_name(handler)
# filter profiled time if the handler was attached to event with event filter
if not hasattr(handler, "_parent") or t >= self.EVENT_FILTER_THESHOLD_TIME:
self.event_handlers_times[event][hname].append(t)
# required to revert back to original handler after profiling
setattr(_timeit_handler, "_profiler_original", handler)
return _timeit_handler
def _timeit_processing(self) -> None:
# handler used for profiling processing times
t = self._processing_timer.value()
self.processing_times.append(t)
def _timeit_dataflow(self) -> None:
# handler used for profiling dataflow times
t = self._dataflow_timer.value()
self.dataflow_times.append(t)
def _reset(self, event_handlers_names: Mapping[EventEnum, List[str]]) -> None:
# reset the variables used for profiling
self.dataflow_times = []
self.processing_times = []
self.event_handlers_times = {e: {h: [] for h in event_handlers_names[e]} for e in event_handlers_names}
@staticmethod
def _is_internal_handler(handler: Callable) -> bool:
# checks whether the handler is internal
return any(n in repr(handler) for n in ["HandlersTimeProfiler.", "Timer."])
def _detach_profiler_handlers(self, engine: Engine) -> None:
# reverts handlers to original handlers
for e in engine._event_handlers:
for i, (func, args, kwargs) in enumerate(engine._event_handlers[e]):
if hasattr(func, "_profiler_original"):
engine._event_handlers[e][i] = (func._profiler_original, args, kwargs)
def _as_first_started(self, engine: Engine) -> None:
# wraps original handlers for profiling
self.event_handlers_names = {
e: [
self._get_callable_name(h)
for (h, _, _) in engine._event_handlers[e]
if not self._is_internal_handler(h)
]
for e in engine._allowed_events
}
self._reset(self.event_handlers_names)
for e in engine._allowed_events:
for i, (func, args, kwargs) in enumerate(engine._event_handlers[e]):
if not self._is_internal_handler(func):
engine._event_handlers[e][i] = (self._create_wrapped_handler(func, e), args, kwargs)
# processing timer
engine.add_event_handler(Events.ITERATION_STARTED, self._processing_timer.reset)
engine._event_handlers[Events.ITERATION_COMPLETED].insert(0, (self._timeit_processing, (), {}))
# dataflow timer
engine.add_event_handler(Events.GET_BATCH_STARTED, self._dataflow_timer.reset)
engine._event_handlers[Events.GET_BATCH_COMPLETED].insert(0, (self._timeit_dataflow, (), {}))
# revert back the wrapped handlers with original handlers at the end
engine.add_event_handler(Events.COMPLETED, self._detach_profiler_handlers)
def attach(self, engine: Engine) -> None:
if not isinstance(engine, Engine):
raise TypeError(f"Argument engine should be ignite.engine.Engine, but given {type(engine)}")
if not engine.has_event_handler(self._as_first_started):
engine._event_handlers[Events.STARTED].insert(0, (self._as_first_started, (engine,), {}))
[docs] def get_results(self) -> List[List[Union[str, float]]]:
"""
Method to fetch the aggregated profiler results after the engine is run
.. code-block:: python
results = profiler.get_results()
"""
total_eh_time = sum(
[
sum(self.event_handlers_times[e][h])
for e in self.event_handlers_times
for h in self.event_handlers_times[e]
]
)
total_eh_time = round(float(total_eh_time), 5,)
def compute_basic_stats(
times: Union[Sequence, torch.Tensor]
) -> List[Union[str, float, Tuple[Union[str, float], Union[str, float]]]]:
data = torch.as_tensor(times, dtype=torch.float32)
# compute on non-zero data:
data = data[data > 0]
total = round(torch.sum(data).item(), 5) if len(data) > 0 else "not triggered" # type: Union[str, float]
min_index = ("None", "None") # type: Tuple[Union[str, float], Union[str, float]]
max_index = ("None", "None") # type: Tuple[Union[str, float], Union[str, float]]
mean = "None" # type: Union[str, float]
std = "None" # type: Union[str, float]
if len(data) > 0:
min_index = (round(torch.min(data).item(), 5), torch.argmin(data).item())
max_index = (round(torch.max(data).item(), 5), torch.argmax(data).item())
mean = round(torch.mean(data).item(), 5)
if len(data) > 1:
std = round(torch.std(data).item(), 5)
return [total, min_index, max_index, mean, std]
event_handler_stats = [
[
h,
getattr(e, "name", str(e)),
*compute_basic_stats(torch.tensor(self.event_handlers_times[e][h], dtype=torch.float32)),
]
for e in self.event_handlers_times
for h in self.event_handlers_times[e]
]
event_handler_stats.append(["Total", "", total_eh_time, "", "", "", ""])
event_handler_stats.append(["Processing", "None", *compute_basic_stats(self.processing_times)])
event_handler_stats.append(["Dataflow", "None", *compute_basic_stats(self.dataflow_times)])
return event_handler_stats
[docs] def write_results(self, output_path: str) -> None:
"""
Method to store the unaggregated profiling results to a csv file
.. code-block:: python
profiler.write_results('path_to_dir/awesome_filename.csv')
Example output:
.. code-block:: text
-----------------------------------------------------------------
# processing_stats dataflow_stats training.<locals>.log_elapsed_time (EPOCH_COMPLETED) ...
1 0.00003 0.252387 0.125676
2 0.00029 0.252342 0.125123
"""
try:
import pandas as pd
except ImportError:
print("Need pandas to write results as files")
return
processing_stats = torch.tensor(self.processing_times, dtype=torch.float32)
dataflow_stats = torch.tensor(self.dataflow_times, dtype=torch.float32)
cols = [processing_stats, dataflow_stats]
headers = ["processing_stats", "dataflow_stats"]
for e in self.event_handlers_times:
for h in self.event_handlers_times[e]:
headers.append(f"{h} ({getattr(e, 'name', str(e))})")
cols.append(torch.tensor(self.event_handlers_times[e][h], dtype=torch.float32))
# Determine maximum length
max_len = max([x.numel() for x in cols])
count_col = torch.arange(max_len, dtype=torch.float32) + 1
cols.insert(0, count_col)
headers.insert(0, "#")
# pad all tensors to have same length
cols = [torch.nn.functional.pad(x, pad=(0, max_len - x.numel()), mode="constant", value=0) for x in cols]
results_dump = torch.stack(cols, dim=1,).numpy()
results_df = pd.DataFrame(data=results_dump, columns=headers,)
results_df.to_csv(output_path, index=False)
[docs] @staticmethod
def print_results(results: List[List[Union[str, float]]]) -> None:
"""
Method to print the aggregated results from the profiler
.. code-block:: python
profiler.print_results(results)
Example output:
.. code-block:: text
----------------------------------------- ----------------------- -------------- ...
Handler Event Name Total(s)
----------------------------------------- ----------------------- --------------
run.<locals>.log_training_results EPOCH_COMPLETED 19.43245
run.<locals>.log_validation_results EPOCH_COMPLETED 2.55271
run.<locals>.log_time EPOCH_COMPLETED 0.00049
run.<locals>.log_intermediate_results EPOCH_COMPLETED 0.00106
run.<locals>.log_training_loss ITERATION_COMPLETED 0.059
run.<locals>.log_time COMPLETED not triggered
----------------------------------------- ----------------------- --------------
Total 22.04571
----------------------------------------- ----------------------- --------------
Processing took total 11.29543s [min/index: 0.00393s/1875, max/index: 0.00784s/0,
mean: 0.00602s, std: 0.00034s]
Dataflow took total 16.24365s [min/index: 0.00533s/1874, max/index: 0.01129s/937,
mean: 0.00866s, std: 0.00113s]
"""
# adopted implementation of torch.autograd.profiler.build_table
handler_column_width = max([len(item[0]) for item in results]) + 4 # type: ignore[arg-type]
event_column_width = max([len(item[1]) for item in results]) + 4 # type: ignore[arg-type]
DEFAULT_COLUMN_WIDTH = 14
headers = [
"Handler",
"Event Name",
"Total(s)",
"Min(s)/IDX",
"Max(s)/IDX",
"Mean(s)",
"Std(s)",
]
# Have to use a list because nonlocal is Py3 only...
SPACING_SIZE = 2
row_format_lst = [""]
header_sep_lst = [""]
line_length_lst = [-SPACING_SIZE]
def add_column(padding: int, text_dir: str = ">") -> None:
row_format_lst[0] += "{: " + text_dir + str(padding) + "}" + (" " * SPACING_SIZE)
header_sep_lst[0] += "-" * padding + (" " * SPACING_SIZE)
line_length_lst[0] += padding + SPACING_SIZE
add_column(handler_column_width, text_dir="<")
add_column(event_column_width, text_dir="<")
for _ in headers[2:]:
add_column(DEFAULT_COLUMN_WIDTH)
row_format = row_format_lst[0]
header_sep = header_sep_lst[0]
result = []
def append(s: str) -> None:
result.append(s)
result.append("\n")
result.append("\n")
append(header_sep)
append(row_format.format(*headers))
append(header_sep)
for row in results[:-3]:
# format min/idx and max/idx
row[3] = "{}/{}".format(*row[3]) # type: ignore[misc]
row[4] = "{}/{}".format(*row[4]) # type: ignore[misc]
append(row_format.format(*row))
append(header_sep)
# print total handlers time row
append(row_format.format(*results[-3]))
append(header_sep)
summary_format = "{} took total {}s [min/index: {}, max/index: {}, mean: {}s, std: {}s]"
for row in results[-2:]:
row[3] = "{}s/{}".format(*row[3]) # type: ignore[misc]
row[4] = "{}s/{}".format(*row[4]) # type: ignore[misc]
del row[1]
append(summary_format.format(*row))
print("".join(result))