From fbf82565e1cf8ca564d791e173a28e306d5ddc63 Mon Sep 17 00:00:00 2001 From: Willard Nilges Date: Sun, 3 Nov 2024 11:25:30 -0500 Subject: [PATCH] lint and move test class --- src/meshapi/tests/test_replay_join_records.py | 20 +++++++---- src/meshapi/util/join_records.py | 34 ++----------------- 2 files changed, 17 insertions(+), 37 deletions(-) diff --git a/src/meshapi/tests/test_replay_join_records.py b/src/meshapi/tests/test_replay_join_records.py index ec9b9dba..3a908b1b 100644 --- a/src/meshapi/tests/test_replay_join_records.py +++ b/src/meshapi/tests/test_replay_join_records.py @@ -5,12 +5,20 @@ from django.test import TestCase from meshapi.models.install import Install -from meshapi.util.join_records import ( - JOIN_RECORD_BASE_NAME, - JoinRecord, - MockJoinRecordProcessor, - s3_content_to_join_record, -) +from meshapi.util.join_records import JOIN_RECORD_BASE_NAME, JoinRecord, s3_content_to_join_record + + +class MockJoinRecordProcessor: + def __init__(self, data: dict[str, JoinRecord]) -> None: + self.bucket_name: str = "mock_bucket" + # Store join record by S3 key and value. + self.bucket: dict[str, JoinRecord] = data + + def upload(self, join_record: JoinRecord, key: str) -> None: + self.bucket[key] = join_record + + def get_all(self) -> list[JoinRecord]: + return list(self.bucket.values()) # Integration test to ensure that we can fetch JoinRecords from an S3 bucket, diff --git a/src/meshapi/util/join_records.py b/src/meshapi/util/join_records.py index ef661a60..d1fd1d3d 100644 --- a/src/meshapi/util/join_records.py +++ b/src/meshapi/util/join_records.py @@ -3,7 +3,6 @@ import logging import os import tempfile -from abc import ABC, abstractmethod from dataclasses import dataclass, fields from typing import Optional @@ -41,27 +40,13 @@ def s3_content_to_join_record(object_key: str, content: str) -> JoinRecord: return join_record -class JoinRecordProcessorInterface(ABC): - @abstractmethod - def __init__(self) -> None: - pass - - @abstractmethod - def upload(self, join_record: JoinRecord, key: str) -> None: - pass - - @abstractmethod - def get_all(self) -> list[JoinRecord]: - pass - - -class JoinRecordProcessor(JoinRecordProcessorInterface): +class JoinRecordProcessor: def __init__(self) -> None: self.s3_client = boto3.client( "s3", endpoint_url=JOIN_RECORD_ENDPOINT, - #aws_access_key_id=JOIN_RECORD_ACCESS_KEY, - #aws_secret_access_key=JOIN_RECORD_SECRET_KEY, + # aws_access_key_id=JOIN_RECORD_ACCESS_KEY, + # aws_secret_access_key=JOIN_RECORD_SECRET_KEY, config=Config(signature_version="s3v4"), # Ensure S3 signature v4 is used ) @@ -91,16 +76,3 @@ def get_all(self) -> list[JoinRecord]: print("Bucket is empty or does not exist.") return join_records - - -class MockJoinRecordProcessor(JoinRecordProcessorInterface): - def __init__(self, data: dict[str, JoinRecord]) -> None: - self.bucket_name: str = "mock_bucket" - # Store join record by S3 key and value. - self.bucket: dict[str, JoinRecord] = data - - def upload(self, join_record: JoinRecord, key: str) -> None: - self.bucket[key] = join_record - - def get_all(self) -> list[JoinRecord]: - return list(self.bucket.values())