Skip to content

Commit

Permalink
refactor(plugins/driver_matrix_tests): Multi-stage pipeline for Drivers
Browse files Browse the repository at this point in the history
This commit reworks the way Driver Matrix Runs are submitted to Argus,
replacing previously one-shot architecture with a multi-stage pipeline
akin to SCT, where driver matrix run first submits itself, and then
submits results sequentially (either a pass or failure), finalizing with
a call that makes backend determine status based on resulting payload.
Patch failures are now supported. Entire logic was moved from the client
to the backend, removing the need to update the client in case of large
changes to the API / submitted data.

Fixes scylladb#289
  • Loading branch information
k0machi committed Jul 15, 2024
1 parent 151940f commit 96c74d7
Show file tree
Hide file tree
Showing 11 changed files with 518 additions and 211 deletions.
2 changes: 1 addition & 1 deletion argus/backend/plugins/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class PluginModelBase(Model):
_plugin_name = "unknown"
# Metadata
build_id = columns.Text(required=True, partition_key=True)
start_time = columns.DateTime(required=True, primary_key=True, clustering_order="DESC", default=datetime.now, custom_index=True)
start_time = columns.DateTime(required=True, primary_key=True, clustering_order="DESC", default=datetime.utcnow, custom_index=True)
id = columns.UUID(index=True, required=True)
release_id = columns.UUID(index=True)
group_id = columns.UUID(index=True)
Expand Down
39 changes: 39 additions & 0 deletions argus/backend/plugins/driver_matrix_tests/controller.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from flask import Blueprint, request

from argus.backend.error_handlers import handle_api_exception
from argus.backend.plugins.driver_matrix_tests.raw_types import DriverMatrixSubmitEnvRequest, DriverMatrixSubmitFailureRequest, DriverMatrixSubmitResultRequest
from argus.backend.service.user import api_login_required
from argus.backend.plugins.driver_matrix_tests.service import DriverMatrixService
from argus.backend.util.common import get_payload

bp = Blueprint("driver_matrix_api", __name__, url_prefix="/driver_matrix")
bp.register_error_handler(Exception, handle_api_exception)
Expand All @@ -22,3 +24,40 @@ def driver_matrix_test_report():
"status": "ok",
"response": result
}

@bp.route("/result/submit", methods=["POST"])
@api_login_required
def submit_result():
payload = get_payload(request)
request_data = DriverMatrixSubmitResultRequest(**payload)

result = DriverMatrixService().submit_driver_result(driver_name=request_data.driver_name, driver_type=request_data.driver_type, run_id=request_data.run_id, raw_xml=request_data.raw_xml)
return {
"status": "ok",
"response": result
}


@bp.route("/result/fail", methods=["POST"])
@api_login_required
def submit_failure():
payload = get_payload(request)
request_data = DriverMatrixSubmitFailureRequest(**payload)

result = DriverMatrixService().submit_driver_failure(driver_name=request_data.driver_name, driver_type=request_data.driver_type, run_id=request_data.run_id, failure_reason=request_data.failure_reason)
return {
"status": "ok",
"response": result
}

@bp.route("/env/submit", methods=["POST"])
@api_login_required
def submit_env():
payload = get_payload(request)
request_data = DriverMatrixSubmitEnvRequest(**payload)

result = DriverMatrixService().submit_env_info(run_id=request_data.run_id, raw_env=request_data.raw_env)
return {
"status": "ok",
"response": result
}
245 changes: 243 additions & 2 deletions argus/backend/plugins/driver_matrix_tests/model.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from dataclasses import dataclass
from datetime import datetime
from functools import reduce
import logging
from pprint import pformat
import re
from typing import Literal, TypedDict
from uuid import UUID
from xml.etree import ElementTree
from cassandra.cqlengine import columns
from argus.backend.db import ScyllaCluster
from argus.backend.models.web import ArgusRelease
Expand All @@ -12,6 +16,8 @@
from argus.backend.util.enums import TestStatus


LOGGER = logging.getLogger(__name__)

class DriverMatrixPluginError(Exception):
pass

Expand All @@ -26,6 +32,65 @@ class DriverMatrixRunSubmissionRequest():
matrix_results: list[RawMatrixTestResult]


@dataclass(init=True, repr=True, frozen=True)
class DriverMatrixRunSubmissionRequestV2():
schema_version: str
run_id: str
job_name: str
job_url: str


TestTypeType = Literal['java', 'cpp', 'python', 'gocql']


class AdaptedXUnitData(TypedDict):
timestamp: str


def python_driver_matrix_adapter(xml: ElementTree.ElementTree) -> AdaptedXUnitData:
testsuites = list(xml.getroot().iter("testsuite"))

return {
"timestamp": testsuites[0].attrib.get("timestamp"),
}


def java_driver_matrix_adapter(xml: ElementTree.ElementTree) -> AdaptedXUnitData:
testsuites = xml.getroot()
ts_now = datetime.utcnow().timestamp()
try:
time_taken = float(testsuites.attrib.get("time"))
except ValueError:
time_taken = 0.0

timestamp = datetime.utcfromtimestamp(ts_now - time_taken).isoformat()

return {
"timestamp": timestamp,
}


def cpp_driver_matrix_adapter(xml: ElementTree.ElementTree) -> AdaptedXUnitData:
testsuites = xml.getroot()

return {
"timestamp": testsuites.attrib.get("timestamp"),
}


def gocql_driver_matrix_adapter(xml: ElementTree.ElementTree) -> AdaptedXUnitData:
testsuites = list(xml.getroot().iter("testsuite"))

return {
"timestamp": testsuites[0].attrib.get("timestamp"),
}


def generic_adapter(xml: ElementTree.ElementTree) -> AdaptedXUnitData:
return {
"timestamp": datetime.utcnow().isoformat()
}

class DriverTestRun(PluginModelBase):
_plugin_name = "driver-matrix-tests"
__table_name__ = "driver_test_run"
Expand All @@ -35,6 +100,14 @@ class DriverTestRun(PluginModelBase):

_no_upstream = ["rust"]

_TEST_ADAPTERS = {
"java": java_driver_matrix_adapter,
"cpp": cpp_driver_matrix_adapter,
"python": python_driver_matrix_adapter,
"gocql": gocql_driver_matrix_adapter,
}


_artifact_fnames = {
"cpp": r"TEST-(?P<driver_name>[\w]*)-(?P<version>[\d\.-]*)",
"gocql": r"xunit\.(?P<driver_name>[\w]*)\.(?P<proto>v\d)\.(?P<version>[v\d\.]*)",
Expand Down Expand Up @@ -78,6 +151,173 @@ def parse_driver_name(cls, raw_file_name: str) -> str:

@classmethod
def submit_run(cls, request_data: dict) -> 'DriverTestRun':
if request_data["schema_version"] == "v2":
req = DriverMatrixRunSubmissionRequestV2(**request_data)
else:
return cls.submit_matrix_run(request_data)

run = cls()
run.id = req.run_id
run.build_id = req.job_name
run.build_job_url = req.job_url
run.start_time = datetime.utcnow()
run.assign_categories()
try:
run.assignee = run.get_scheduled_assignee()
except Exception: # pylint: disable=broad-except
run.assignee = None

run.status = TestStatus.CREATED.value
run.save()
return run

@classmethod
def submit_driver_result(cls, run_id: UUID, driver_name: str, driver_type: TestTypeType, xml_data: str):
run: DriverTestRun = cls.get(id=run_id)

collection = run.parse_result_xml(driver_name, xml_data, driver_type)
run.test_collection.append(collection)

if run.status == TestStatus.CREATED:
run.status = TestStatus.RUNNING.value

run.save()
return run


@classmethod
def submit_driver_failure(cls, run_id: UUID, driver_name: str, driver_type: TestTypeType, fail_message: str):
run: DriverTestRun = cls.get(id=run_id)

collection = TestCollection()
collection.failures = 1
collection.failure_message = fail_message
collection.name = driver_name
driver_info = run.get_driver_info(driver_name, driver_type)
collection.driver = driver_info.get("driver_name")
collection.tests_total = 1
run.test_collection.append(collection)

if run.status == TestStatus.CREATED:
run.status = TestStatus.RUNNING.value

run.save()
return run

@classmethod
def submit_env_info(cls, run_id: UUID, env_data: str):
run: DriverTestRun = cls.get(id=run_id)
env = run.parse_build_environment(env_data)

for key, value in env.items():
env_info = EnvironmentInfo()
env_info.key = key
env_info.value = value
run.environment_info.append(env_info)

run.scylla_version = env.get("scylla-version")

run.save()
return run

def parse_build_environment(self, raw_env: str) -> dict[str, str]:
result = {}
for line in raw_env.split("\n"):
if not line:
continue
LOGGER.debug("ENV: %s", line)
key, val = line.split(": ")
result[key] = val.strip()

return result

def get_test_cases(self, cases: list[ElementTree.Element]) -> list[TestCase]:
result = []
for raw_case in cases:
children = list(raw_case.findall("./*"))
if len(children) > 0:
status = children[0].tag
message = f"{children[0].attrib.get('message', 'no-message')} ({children[0].attrib.get('type', 'no-type')})"
else:
status = "passed"
message = ""

case = TestCase()
case.name = raw_case.attrib["name"]
case.status = status
case.time = float(raw_case.attrib.get("time", 0.0))
case.classname = raw_case.attrib.get("classname", "")
case.message = message
result.append(case)

return result

def get_driver_info(self, xml_name: str, test_type: TestTypeType) -> dict[str, str]:
if test_type == "cpp":
filename_re = r"TEST-(?P<driver_name>[\w]*)-(?P<version>[\d\.]*)(\.xml)?"
else:
filename_re = r"(?P<name>[\w]*)\.(?P<driver_name>[\w]*)\.(?P<proto>v\d)\.(?P<version>[\d\.]*)(\.xml)?"

match = re.match(filename_re, xml_name)

return match.groupdict() if match else {}

def get_passed_count(self, suite_attribs: dict[str, str]) -> int:
if (pass_count := suite_attribs.get("passed")):
return int(pass_count)
total = int(suite_attribs.get("tests", 0))
errors = int(suite_attribs.get("errors", 0))
skipped = int(suite_attribs.get("skipped", 0))
failures = int(suite_attribs.get("failures", 0))

return total - errors - skipped - failures

def parse_result_xml(self, name: str, xml_data: str, test_type: TestTypeType) -> TestCollection:
xml: ElementTree.ElementTree = ElementTree.ElementTree(ElementTree.fromstring(xml_data))
LOGGER.debug("%s", pformat(xml))
testsuites = xml.getroot()
adapted_data = self._TEST_ADAPTERS.get(test_type, generic_adapter)(xml)

driver_info = self.get_driver_info(name, test_type)
test_collection = TestCollection()
test_collection.timestamp = datetime.fromisoformat(adapted_data["timestamp"][0:-1] if adapted_data["timestamp"][-1] == "Z" else adapted_data["timestamp"])
test_collection.name = name
test_collection.driver = driver_info.get("driver_name")
test_collection.tests_total = 0
test_collection.failures = 0
test_collection.errors = 0
test_collection.disabled = 0
test_collection.skipped = 0
test_collection.passed = 0
test_collection.time = 0.0
test_collection.suites = []

for xml_suite in testsuites.iter("testsuite"):
suite = TestSuite()
suite.name = xml_suite.attrib["name"]
suite.tests_total = int(xml_suite.attrib.get("tests", 0))
suite.failures = int(xml_suite.attrib.get("failures", 0))
suite.disabled = int(0)
suite.passed = self.get_passed_count(xml_suite.attrib)
suite.skipped = int(xml_suite.attrib.get("skipped", 0))
suite.errors = int(xml_suite.attrib.get("errors", 0))
suite.time = float(xml_suite.attrib["time"])
suite.cases = self.get_test_cases(xml_suite.findall("testcase"))

test_collection.suites.append(suite)
test_collection.tests_total += suite.tests_total
test_collection.failures += suite.failures
test_collection.errors += suite.errors
test_collection.disabled += suite.disabled
test_collection.skipped += suite.skipped
test_collection.passed += suite.passed
test_collection.time += suite.time

return test_collection

@classmethod
def submit_matrix_run(cls, request_data):
# Legacy method
req = DriverMatrixRunSubmissionRequest(**request_data)
run = cls()
run.id = req.run_id # pylint: disable=invalid-name
Expand Down Expand Up @@ -153,14 +393,14 @@ def _determine_run_status(self):
if len(driver_types) <= 1 and not any(driver for driver in self._no_upstream if driver in driver_types):
return TestStatus.FAILED

failure_count = reduce(lambda acc, val: acc + (val.failures + val.errors), self.test_collection, 0)
failure_count = reduce(lambda acc, val: acc + (val.failures or 0 + val.errors or 0), self.test_collection, 0)
if failure_count > 0:
return TestStatus.FAILED

return TestStatus.PASSED

def change_status(self, new_status: TestStatus):
raise DriverMatrixPluginError("This method is obsolete. Status is now determined on submission.")
self.status = new_status

def get_events(self) -> list:
return []
Expand All @@ -170,6 +410,7 @@ def submit_product_version(self, version: str):

def finish_run(self, payload: dict = None):
self.end_time = datetime.utcnow()
self.status = self._determine_run_status().value

def submit_logs(self, logs: list[dict]):
pass
27 changes: 27 additions & 0 deletions argus/backend/plugins/driver_matrix_tests/raw_types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from dataclasses import dataclass
from typing import TypedDict
from uuid import UUID


class RawMatrixTestCase(TypedDict):
Expand Down Expand Up @@ -33,3 +35,28 @@ class RawMatrixTestResult(TypedDict):
time: float
timestamp: str
suites: list[RawMatrixTestSuite]


@dataclass(init=True, frozen=True)
class DriverMatrixSubmitResultRequest():
schema_version: str
run_id: UUID
driver_type: str
driver_name: str
raw_xml: str


@dataclass(init=True, frozen=True)
class DriverMatrixSubmitFailureRequest():
schema_version: str
run_id: UUID
driver_type: str
driver_name: str
failure_reason: str


@dataclass(init=True, frozen=True)
class DriverMatrixSubmitEnvRequest():
schema_version: str
run_id: UUID
raw_env: str
Loading

0 comments on commit 96c74d7

Please sign in to comment.