Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conntrack latency measurement recipe #376

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 186 additions & 0 deletions lnst/RecipeCommon/Perf/Measurements/LatencyMeasurement.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import time
import signal
import socket
import logging

from lnst.RecipeCommon.Perf.Measurements.BaseFlowMeasurement import (
BaseFlowMeasurement,
NetworkFlowTest,
Flow,
)
from lnst.Tests.Latency import LatencyClient, LatencyServer

from lnst.RecipeCommon.Perf.Measurements.Results.LatencyMeasurementResults import (
LatencyMeasurementResults,
)
from lnst.RecipeCommon.Perf.Measurements.Results.AggregatedLatencyMeasurementResults import (
AggregatedLatencyMeasurementResults,
)

from lnst.Controller.Job import Job

from lnst.RecipeCommon.Perf.Results import (
PerfInterval,
ScalarSample,
ParallelPerfResult,
SequentialScalarResult,
)
from lnst.Controller.RecipeResults import ResultType


class LatencyMeasurement(BaseFlowMeasurement):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed on tech meeting that this could maybe work as a standard measurement which simply measures latency in a regular interval and is combined with an additional measurement which is "more primary" in the overall PerfRecipeConfiguration which has a "10 second" quiet period and then executes the poisoning, in that way you will get the following hierarchy:

1. cpu measurement: [....]
2. cpu measurement: [....]
3. latency measurement:   [10, 1, 1, 1, 1, 1, 100, 100, ...]
4. poisoning measurement: [0 , 0, 0, 0, 0, 100, 100, ...]

and afterwards you can postprocess these results to get:

1. cpu measurement: [....]
2. cpu measurement: [....]
3. latency measurement - start:    [10]
4. latency measurement - middle:   [1, 1, 1, 1, 1]
5. latency measurement - poisoned: [100, 100, ...]

and evaluate these each individually

def __init__(
self,
flows,
recipe_conf,
port: int,
payload_size: int,
samples_count: int,
cache_poison_tool: callable = None,
):
super().__init__(recipe_conf)

self._flows = flows
self._latency_flows = []

self._port = port
self._data_size = payload_size
self._samples_count = samples_count

self._cache_poison_tool = cache_poison_tool

def version(self):
return "1.0"

@property
def data_size(self):
return self._data_size

@property
def samples_count(self):
return self._samples_count

@property
def cache_poison_tool_name(self):
return self._cache_poison_tool.__name__

@property
def flows(self):
return self._flows

def start(self):
logging.info("Starting LatencyMeasurement")

jobs = self._prepare_jobs()

for measurements in self._latency_flows:
measurements.server_job.start(bg=True)
time.sleep(5)
measurements.client_job.start(bg=True)

time.sleep(
0.1 * self._samples_count
) # should be enough for client to gather samples

if self._cache_poison_tool is not None:
logging.info("Cache poisoning tool is set, running...")
self._cache_poison_tool(self.recipe_conf)
else:
logging.warning("No cache poisoning tool set, is this intended?")

return True

def _prepare_jobs(self):
for flow in self._flows:
server = self._prepare_server(flow)
client = self._prepare_client(flow)

self._latency_flows.append(NetworkFlowTest(flow, server, client))

def _prepare_client(self, flow: Flow) -> Job:
params = {
"host": flow.receiver_bind,
"port": self._port,
"duration": flow.duration,
"samples_count": self._samples_count,
"data_size": self._data_size,
}

client = LatencyClient(**params)
return flow.generator.prepare_job(client)

def _prepare_server(self, flow: Flow) -> Job:
params = {
"host": flow.receiver_bind,
"samples_count": self._samples_count,
"data_size": self._data_size,
}

server = LatencyServer(**params)
return flow.receiver.prepare_job(server)

def finish(self):
for measurements in self._latency_flows:
measurements.client_job.kill(signal.SIGINT)
measurements.server_job.kill(signal.SIGINT)

def collect_results(self) -> list[LatencyMeasurementResults]:
results = []
# each list element represents measuremet results for one flow
# so, most of the time there will be only one element
# but in case of parallel flows, there will be more

for measurements in self._latency_flows:
flow_results = LatencyMeasurementResults(
measurement=self,
flow=measurements.flow,
)

flow_results.latency = SequentialScalarResult()

samples = []
prev_duration = 0
prev_timestamp, _ = measurements.client_job.result[0]
for latency, timestamp in measurements.client_job.result:
samples.append(
ScalarSample(latency, "nanoseconds", prev_timestamp + prev_duration)
)
prev_duration = latency
prev_timestamp = timestamp

flow_results.latency.extend(samples)

results.append(flow_results)

return results

@classmethod
def _report_flow_results(cls, recipe, flow_results):
desc = []
desc.append(flow_results.describe())

recipe.add_result(
ResultType.PASS,
"\n".join(desc),
data=dict(
flow_results=flow_results,
),
)

def _aggregate_flows(self, old_flow, new_flow):
if old_flow is not None and old_flow.flow is not new_flow.flow:
raise MeasurementError("Aggregating incompatible Flows")

new_result = AggregatedLatencyMeasurementResults(
measurement=self, flow=new_flow.flow
)

new_result.add_results(old_flow)
new_result.add_results(new_flow)
return new_result

def __repr__(self):
return "{}({})".format(
self.__class__.__name__,
repr(self.recipe_conf),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from .LatencyMeasurementResults import LatencyMeasurementResults

from lnst.RecipeCommon.Perf.Measurements.MeasurementError import MeasurementError
from lnst.RecipeCommon.Perf.Results import ParallelScalarResult


class AggregatedLatencyMeasurementResults(LatencyMeasurementResults):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._latency_samples = ParallelScalarResult() # container for parallel measurements

def add_results(self, results):
if results is None:
return
elif isinstance(results, AggregatedLatencyMeasurementResults):
self.latency.extend(results.latency)
elif isinstance(results, LatencyMeasurementResults):
self.latency.append(results.latency)
else:
raise MeasurementError("Adding incorrect results.")

Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from lnst.RecipeCommon.Perf.Results import SequentialScalarResult
from lnst.RecipeCommon.Perf.Measurements.Results.FlowMeasurementResults import (
FlowMeasurementResults,
)
from lnst.RecipeCommon.Perf.Measurements.MeasurementError import MeasurementError
from .BaseMeasurementResults import BaseMeasurementResults


class LatencyMeasurementResults(BaseMeasurementResults):
def __init__(self, flow, *args, **kwargs):
super().__init__(*args, **kwargs)
self._latency_samples = (
SequentialScalarResult()
) # samples are ALWAYS sequential
self._flow = flow

@property
def flow(self):
return self._flow

@property
def latency(self) -> SequentialScalarResult:
return self._latency_samples

@latency.setter
def latency(self, value: SequentialScalarResult):
self._latency_samples = value

@property
def cached_latency(self):
return self.latency.samples_slice(slice(1, -1))

@property
def uncached_latency(self):
first = self.latency.samples_slice(slice(None, 1))
last = self.latency.samples_slice(slice(-1, None))
merged = first.merge_with(last)

return merged

@property
def metrics(self) -> list[str]:
return ["cached_latency", "uncached_latency"]

def add_results(self, results):
if results is None:
return
if isinstance(results, LatencyMeasurementResults):
self.latency.append(results.latency)
else:
raise MeasurementError("Adding incorrect results.")

def time_slice(self, start, end):
result_copy = LatencyMeasurementResults(self.measurement, self.flow)

result_copy.latency = self.latency.time_slice(start, end)

return self

def describe(self) -> str:
uncached_average = self.uncached_latency.average
cached_average = self.cached_latency.average

desc = []
desc.append(str(self.flow))
desc.append(
"Generator <-> receiver cached latency (average): {latency:>10.2f} {unit}.".format(
latency=cached_average, unit=self.latency.unit
)
)
desc.append(
"Generator <-> receiver uncached latency (average): {latency:>10.2f} {unit}.".format(
latency=uncached_average, unit=self.latency.unit
)
)
desc.append(
"Uncached average / cached average ratio: {ratio:.4f}".format(
ratio=uncached_average / cached_average,
)
)

return "\n".join(desc)
Loading
Loading