From f7de704930fe65bdeb52607833831d9874d3f2e8 Mon Sep 17 00:00:00 2001 From: cliang li Date: Fri, 1 Sep 2023 22:46:30 -0700 Subject: [PATCH 1/3] Add mocks and unittests for Client/Server socket communication, add mocks and unittests for indexing, add mock for minio. --- dss_datamover/unittests/conftest.py | 379 ++++++++++++++++++++- dss_datamover/unittests/pytest_config.json | 66 +++- dss_datamover/unittests/test_indexing.py | 105 ++++++ dss_datamover/unittests/test_socket.py | 236 ++++++++++++- 4 files changed, 773 insertions(+), 13 deletions(-) create mode 100644 dss_datamover/unittests/test_indexing.py diff --git a/dss_datamover/unittests/conftest.py b/dss_datamover/unittests/conftest.py index bfce7a6..bdff9b8 100755 --- a/dss_datamover/unittests/conftest.py +++ b/dss_datamover/unittests/conftest.py @@ -33,10 +33,29 @@ """ from utils import config +from logger import MultiprocessingLogger +from master_application import Master +from multiprocessing import Queue, Value, Lock +from socket_communication import ClientSocket, ServerSocket import json import os +import socket import pytest +from enum import Enum + + +class Status(Enum): + NORMAL = 0 + CONNECTIONERROR = 1 + CONNECTIONREFUSEERROR = 2 + SOCKETTIMEOUT = 3 + SOCKETERROR = 4 + EXCEPTION = 5 + MISALIGNEDBUFSIZE = 6 + WRONGBUFSIZE = 7 + LISTENING = 8 + CLOSED = 9 @pytest.fixture(scope="session") @@ -59,6 +78,36 @@ def get_config_dict(get_config_object): return get_config_object.get_config() +@pytest.fixture(scope="session") +def get_system_config_object(): + config_obj = config.Config({}, config_filepath="/etc/dss/datamover/config.json") + return config_obj + + +@pytest.fixture(scope="session") +def get_system_config_dict(get_system_config_object): + return get_system_config_object.get_config() + + +@pytest.fixture +def get_multiprocessing_logger(tmpdir): + logger_status = Value('i', 0) # 0=NOT-STARTED, 1=RUNNING, 2=STOPPED + logger_queue = Queue() + logger_lock = Value('i', 0) + logging_path = tmpdir + logging_level = "INFO" + + logger = MultiprocessingLogger(logger_queue, logger_lock, logger_status) + logger.config(logging_path, __file__, logging_level) + logger.create_logger_handle() + logger.start() + + yield logger + + # teardown logger + logger.stop() + + @pytest.fixture def clear_datamover_cache(get_pytest_configs): cache_files = get_pytest_configs["cache"] @@ -71,20 +120,145 @@ class MockSocket(): """ Dummy Object for an actual socket, should simulate all basic functions of a socket object """ - # TODO: finish building out MockSocket class - def __init__(self): - self.host = "xxx.xxxx.xxxx.xxxx" - self.port = "xxxx" + def __init__(self, family=0, type=0, proto=0, fileno=0): + self.timeout = 0 + self.status = Status.NORMAL + self.data = '' + self.data_index = 0 # indicates the starting pos of the sending data when calling recv + self.max_bufsize = 10 # maximum length of return data when calling recv - def connect(self): - return True + def connect(self, address): + if self.status == Status.CONNECTIONERROR: + raise ConnectionError + elif self.status == Status.CONNECTIONREFUSEERROR: + raise ConnectionRefusedError + elif self.status == Status.SOCKETERROR: + raise socket.error + elif self.status == Status.SOCKETTIMEOUT: + raise socket.timeout + else: + return + + def recv(self, bufsize): + if self.status == Status.CONNECTIONERROR: + raise ConnectionError + elif self.status == Status.SOCKETTIMEOUT: + raise socket.timeout + elif self.status == Status.EXCEPTION: + raise Exception + elif self.status == Status.MISALIGNEDBUFSIZE: + ret = self.data[self.data_index: self.data_index + bufsize + 1] + return ret + else: + ret = '' + if not self.data: + return ret + if self.data_index >= len(self.data): + raise Exception + if bufsize > self.max_bufsize: + bufsize = self.max_bufsize + if bufsize >= len(self.data) - self.data_index: + ret = self.data[self.data_index:] + self.data_index = len(self.data) + else: + ret = self.data[self.data_index: self.data_index + bufsize] + self.data_index += bufsize + return ret.encode("utf8", "ignore") - def recv(self): + def send(self, data, flags=None): + return self.sendall(data, flags) + + def sendall(self, data, flags=None): + self.data = '' + self.data_index = 0 + if self.status == Status.CONNECTIONERROR: + raise ConnectionError + elif self.status == Status.CONNECTIONREFUSEERROR: + raise ConnectionRefusedError + elif self.status == Status.SOCKETERROR: + raise socket.error + elif self.status == Status.SOCKETTIMEOUT: + raise socket.timeout + else: + self.data = data + return + + def setsockopt(self, param1, param2, param3): pass - def sendall(self): + def settimeout(self, new_timeout): pass + def close(self): + if self.status == Status.LISTENING or self.status == Status.NORMAL: + self.status = Status.CLOSED + else: + raise Exception + + def listen(self, backlog): + self.status = Status.LISTENING + + def bind(self, address): + if self.status == Status.NORMAL: + return + else: + raise Exception + + def get_default_ip(self): + default_ip = "" + pytest_config_filepath = os.path.dirname(__file__) + "/pytest_config.json" + with open(pytest_config_filepath) as f: + pytest_configs = json.load(f) + default_ip = pytest_configs['default_ip'] + return default_ip + + def accept(self): + return self, (self.get_default_ip(), 1234) + + +@pytest.fixture +def get_header_length(mocker, get_config_dict): + return get_config_dict.get("socket_options", {}).get("response_header_length", 10) + + +class MockLogger(): + + def __init__(self): + self.logs = {'error': [], 'info': [], 'warn': [], 'excep': [], 'fatal': [], 'debug': []} + + def error(self, msg): + self.logs['error'].append(msg) + + def info(self, msg): + self.logs['info'].append(msg) + + def warn(self, msg): + self.logs['warn'].append(msg) + + def excep(self, msg): + self.logs['excep'].append(msg) + + def fatal(self, msg): + self.logs['fatal'].append(msg) + + def debug(self, msg): + self.logs['debug'].append(msg) + + def get_last(self, type): + ret = '' + if len(self.logs[type]) > 0: + ret = self.logs[type][-1] + return ret + + def clear(self): + for key in self.logs: + self.logs[key].clear() + + +@pytest.fixture +def get_mock_logger(): + return MockLogger() + @pytest.fixture def get_mock_clientsocket(mocker): @@ -94,5 +268,192 @@ def get_mock_clientsocket(mocker): @pytest.fixture def get_mock_serversocket(mocker): - mock_serversocket = mocker.patch('socket_communication.ClientSocket', spec=True) + mock_serversocket = mocker.patch('socket_communication.ServerSocket', spec=True) return mock_serversocket + + +@pytest.fixture +def get_master_dryrun(get_system_config_dict, get_pytest_configs): + def instantiate_master_object(operation): + get_system_config_dict["config"] = get_pytest_configs["config"] + get_system_config_dict["dest_path"] = get_pytest_configs["dest_path"] + get_system_config_dict["dryrun"] = True + master = Master(operation, get_system_config_dict) + print("instantiated master obj") + master.start() + print("successfully started master obj") + return master + return instantiate_master_object + + +@pytest.fixture +def get_master(get_system_config_dict, get_pytest_configs): + def instantiate_master_object(operation): + get_system_config_dict["config"] = get_pytest_configs["config"] + get_system_config_dict["dest_path"] = get_pytest_configs["dest_path"] + master = Master(operation, get_system_config_dict) + print("instantiated master obj") + master.start() + print("successfully started master obj") + return master + return instantiate_master_object + + +@pytest.fixture +def get_master_for_indexing(get_system_config_dict, get_pytest_configs): + get_system_config_dict["config"] = get_pytest_configs["config"] + get_system_config_dict["dest_path"] = get_pytest_configs["dest_path"] + master = Master("PUT", get_system_config_dict) + return master + + +@pytest.fixture +def shutdown_master_without_nfscluster(): + def _method(master): + print("shutting down master") + master.stop_logging() + print("stopping logging") + master.stop_monitor() + print("stopping monitoring") + return _method + + +@pytest.fixture +def shutdown_master(): + def _method(master): + print("shutting down master") + master.nfs_cluster_obj.umount_all() + print("unmounting nfs cluster") + master.stop_logging() + print("stopping logging") + master.stop_monitor() + print("stopping monitoring") + return _method + + +class MockMinio(): + def __init__(self): + self.data = {} + + def list(self, key=''): + if not key: + return list(self.data.items()) + return self.data[key].items() if isinstance(self.data[key], dict) else [(key, self.data[key])] + + def get(self, key): + if key in self.data: + return self.data[key] + return None + + def put(self, key, value): + if key in self.data: + self.data[key] = value + return True + return False + + def delete(self, key): + if key in self.data: + self.data.pop(key) + return True + return False + + +class MockNFSCluster(): + def __init__(self, config={}, user_id="ansible", password="password", logger=None): + self.local_mounts = {} + self.config = {'cluster1': ['/data/pics', '/data/docs', '/mnt/size/5GB', '/mnt/size/10GB'], + 'cluster2': ['/data/test/put', '/bird', '/mnt/block/4KB', '/mnt/size/10MB']} + self.mounted = False + + def mount_all(self): + for ip, nfs_shares in self.config.items(): + self.local_mounts[ip] = nfs_shares + self.mounted = True + + def umount_all(self): + self.local_mounts.clear() + self.mounted = False + + def get_mounts(self): + return self.local_mounts + + def mount_based_on_prefix(self, prefix): + cluster_ip = "" + mounted_nfs_share = "" + ret = 0 + for ip, nfs_share in self.config.items(): + if prefix.startswith(ip): + cluster_ip = ip + if not cluster_ip: + ret = 1 + else: + for nfs_share in self.config[cluster_ip]: + nfs_share_prefix = cluster_ip + nfs_share + if nfs_share_prefix.startswith(prefix): + self.mount(cluster_ip, nfs_share) + mounted_nfs_share = nfs_share + break + return cluster_ip, mounted_nfs_share, ret, "" + + def mount(self, cluster_ip, nfs_share): + self.local_mounts[cluster_ip] = nfs_share + self.mounted = True + + +@pytest.fixture +def get_mock_nfs_cluster(): + return MockNFSCluster() + + +@pytest.fixture +def get_indexing_kwargs(): + return {"data": "", "task_queue": Queue(), "logger": MockLogger(), "index_data_queue": Queue(), + "nfs_cluster": "", "nfs_share": "", "max_index_size": 2, "indexing_started_flag": Value('i', 0), + "index_data_count": Value('i', 0), "index_msg_count": Value('i', 0), "index_data_queue_size": 10000, "progress_of_indexing": {}, + "prefix_index_data": {}, "standalone": False, "params": {"s3config": "dss_client", "dryrun": False, "resume_flag": False}} + + +class MockDirEntry(): + + class EntryStat(): + def __init__(self, size): + self.st_size = size + + def __init__(self, name, size): + self.stat_ = self.EntryStat(size) + self.path = "" + self.name = name + + def stat(self): + return self.stat_ + + def is_dir(self): + return False + + +class MockOSDirOperations(): + + def mock_os_scan_dir(dir): + return [ + MockDirEntry("file0", 0), MockDirEntry("file1", 100), MockDirEntry("file2", 100), + MockDirEntry("file3", 100), MockDirEntry("file4", 100) + ] + + def mock_os_access(dir, flag): + return True + + def mock_os_access_failure(dir, flag): + return False + + def mock_iterate_dir(**kwargs): + file_set = ['file1', 'file2'] + yield {"dir": "/data", "files": file_set, "size": 200} + + def mock_iterate_dir_no_dir(**kwargs): + yield {"files": ['file1'], "size": 200} + + def mock_iterate_dir_no_files(**kwargs): + yield {"dir": "/data", "size": 200} + + def mock_oterate_dir_no_size(**kwargs): + yield {"dir": "/data", "size": 0} diff --git a/dss_datamover/unittests/pytest_config.json b/dss_datamover/unittests/pytest_config.json index a8e1f7f..5b471df 100755 --- a/dss_datamover/unittests/pytest_config.json +++ b/dss_datamover/unittests/pytest_config.json @@ -1,6 +1,66 @@ { - "config": "/etc/dss/datamover/config.json", + "config": "/etc/dss/datamover/standard_config.json", "dest_path": "/tmp/xyz", "cache": ["/var/log/dss/prefix_index_data.json", "/var/log/dss/dm_resume_prefix_dir_keys.txt"], - "fs_config": {"server_as_prefix": "yes"} -} \ No newline at end of file + "default_ip": "1.2.3.4", + "clients_hosts_or_ip_addresses": ["1.2.3.4", "5.6.7.8"], + "master": { + "host_or_ip_address": "1.2.3.4", + "workers": 1, + "max_index_size": 1000, + "size": "1GB" + }, + "client": { + "workers": 25, + "max_index_size": 1000, + "user_id": "ansible", + "password": "ansible" + }, + "message": { + "port_index": 6000, + "port_status": 6001 + }, + "socket_options": { + "connection_retry_delay": 2, + "max_connection_time_threshold": 300, + "response_header_length": 10, + "recv_timeout": 60 + }, + "fs_config": { + "mounted": false, + "nfs": { + "202.0.0.103": ["/dir1", "/dir2"] + }, + "nfsport": 2049, + "server_as_prefix": "yes" + }, + "s3_storage": { + "minio": {"url": "1.2.3.4:9000", "access_key": "minio", "secret_key": "minio123"}, + "bucket": "bucket", + "client_lib": "dss_client" + }, + "operations": ["PUT", "GET", "DEL", "LIST"], + "logging": { + "path": "/var/log/dss", + "level": "INFO", + "max_log_file_size": 1048576, + "log_file_backup_count": 5 + }, + "dss_targets": { + "subsystem_nqn": { + "1": ["vm1"], + "2": ["vm2"], + "3": ["vm3"], + "4": ["vm4"] + }, + "installation_path": "/usr/dss/nkv-target" + }, + "environment": { + "gcc": { + "version": "5.1", + "source": "/usr/local/bin/setenv-for-gcc510.sh", + "required": true + } + }, + "compaction": "yes" +} diff --git a/dss_datamover/unittests/test_indexing.py b/dss_datamover/unittests/test_indexing.py new file mode 100644 index 0000000..7dc36af --- /dev/null +++ b/dss_datamover/unittests/test_indexing.py @@ -0,0 +1,105 @@ +import pytest +import re +from conftest import MockNFSCluster, MockOSDirOperations +from task import iterate_dir, indexing + + +def validate_task_queue(task_queue, nfs_config, prefix=""): + queue_size = task_queue.qsize() + nfs_size = 0 + if not prefix: + for ip_address in nfs_config.keys(): + nfs_size += len(nfs_config[ip_address]) + assert queue_size == nfs_size + for i in range(queue_size): + task = task_queue.get() + assert task.params['nfs_cluster'] in nfs_config + assert task.params['nfs_share'] in nfs_config[task.params['nfs_cluster']] + else: + for i in range(queue_size): + task = task_queue.get() + path = task.params['nfs_cluster'] + task.params['nfs_share'] + assert path.startswith(prefix) + + +@pytest.mark.usefixtures( + "get_master_for_indexing", + "get_mock_logger", + "get_indexing_kwargs" +) +class TestIndexing: + + def test_start_indexing(self, mocker, get_master_for_indexing, get_mock_logger): + mocker.patch('master_application.NFSCluster', MockNFSCluster) + master = get_master_for_indexing + master.logger = get_mock_logger + # positive no prefix + master.start_indexing() + validate_task_queue(master.task_queue, master.nfs_cluster_obj.config) + # positive with prefix + master.nfs_cluster_obj.umount_all() + master.prefix = 'cluster1/mnt/size/' + master.start_indexing() + validate_task_queue(master.task_queue, master.nfs_cluster_obj.config, master.prefix) + # invalid prefix + master.nfs_cluster_obj.umount_all() + master.prefixes.clear() + master.prefix = '/cluster1/mnt/' + master.start_indexing() + assert re.match(r'Bad prefix.*', get_mock_logger.get_last('fatal')) + # mount failure + master.nfs_cluster_obj.umount_all() + master.prefixes.clear() + master.prefix = 'notexist/data/' + master.start_indexing() + assert master.indexing_started_flag.value == -1 + assert re.match(r'Mounting failed.*', get_mock_logger.get_last('fatal')) + + def test_iterate_dir(self, mocker): + mocker.patch('os.scandir', MockOSDirOperations.mock_os_scan_dir) + for result in iterate_dir(data="", logger=None, max_index_size=2, resume_flag=False): + assert len(result["files"]) <= 2 + assert result["size"] == 200 + + def test_indexing(self, mocker, get_indexing_kwargs): + mocker.patch('os.access', MockOSDirOperations.mock_os_access) + mocker.patch('task.iterate_dir', MockOSDirOperations.mock_iterate_dir) + # positive case + indexing(**get_indexing_kwargs) + assert get_indexing_kwargs["index_msg_count"].value == 1 + assert get_indexing_kwargs["task_queue"].qsize() == 0 + assert get_indexing_kwargs["index_data_count"].value == 2 + # positive case standalone + get_indexing_kwargs["standalone"] = True + indexing(**get_indexing_kwargs) + assert get_indexing_kwargs["index_msg_count"].value == 1 + assert get_indexing_kwargs["task_queue"].qsize() == 1 + assert get_indexing_kwargs["index_data_count"].value == 4 + # negative case access fail + mocker.patch('os.access', MockOSDirOperations.mock_os_access_failure) + indexing(**get_indexing_kwargs) + assert get_indexing_kwargs["index_msg_count"].value == 1 + assert get_indexing_kwargs["task_queue"].qsize() == 1 + assert get_indexing_kwargs["index_data_count"].value == 4 + assert re.match(r'Read permission.*', get_indexing_kwargs["logger"].get_last("error")) + # negative case no 'dir' + mocker.patch('os.access', MockOSDirOperations.mock_os_access) + mocker.patch('task.iterate_dir', MockOSDirOperations.mock_iterate_dir_no_dir) + indexing(**get_indexing_kwargs) + assert get_indexing_kwargs["index_msg_count"].value == 1 + assert get_indexing_kwargs["task_queue"].qsize() == 1 + assert get_indexing_kwargs["index_data_count"].value == 4 + assert re.match(r'.*iterate_dir.*', get_indexing_kwargs["logger"].get_last("error")) + # positive case no 'files' + mocker.patch('task.iterate_dir', MockOSDirOperations.mock_iterate_dir_no_files) + indexing(**get_indexing_kwargs) + assert get_indexing_kwargs["index_msg_count"].value == 1 + assert get_indexing_kwargs["task_queue"].qsize() == 2 + assert get_indexing_kwargs["index_data_count"].value == 4 + assert get_indexing_kwargs["progress_of_indexing"]["/data"] == 'Pending' + # negative case no 'size' + mocker.patch('task.iterate_dir', MockOSDirOperations.mock_oterate_dir_no_size) + indexing(**get_indexing_kwargs) + assert get_indexing_kwargs["index_msg_count"].value == 1 + assert get_indexing_kwargs["task_queue"].qsize() == 2 + assert get_indexing_kwargs["index_data_count"].value == 4 diff --git a/dss_datamover/unittests/test_socket.py b/dss_datamover/unittests/test_socket.py index 05941a2..e9a5494 100755 --- a/dss_datamover/unittests/test_socket.py +++ b/dss_datamover/unittests/test_socket.py @@ -33,8 +33,242 @@ """ import pytest +import socket +import os +import json +import re +from socket_communication import ServerSocket, ClientSocket +from conftest import MockSocket, Status +import time -@pytest.mark.usefixtures("get_mock_clientsocket") +def prepare_send_data(msg, header_len, wrong_size=False): + if wrong_size: + msg_len = (str(len(msg) + 3)).zfill(header_len) + else: + msg_len = (str(len(msg))).zfill(header_len) + ret = msg_len + msg + return ret + + +@pytest.mark.usefixtures("get_pytest_configs", "get_config_dict", "get_mock_logger", "get_header_length") class TestSocketCommunication(): """ Unit tests for both ClientSocket and ServerSocket objects""" + def test_client_socket_connect(self, mocker, get_config_dict, get_mock_logger, get_pytest_configs): + mocker.patch('socket.socket', MockSocket) + client_socket = ClientSocket(get_config_dict, get_mock_logger) + # positive case + ret = client_socket.connect(get_pytest_configs["default_ip"], 1234) + assert ret is None + # Invalid IP + with pytest.raises(ConnectionError, match=r"Invalid IP Address given - .*"): + client_socket.connect('*.@.^.!', 1234) + + def test_client_socket_send_json(self, mocker, get_config_dict, get_mock_logger, get_pytest_configs): + mocker.patch('socket.socket', MockSocket) + client_socket = ClientSocket(get_config_dict, get_mock_logger) + client_socket.connect(get_pytest_configs["default_ip"], 1234) + # positive case + ret = client_socket.send_json(r'{}') + assert ret + ret = client_socket.send_json(get_config_dict) + assert ret + # empty message + ret = client_socket.send_json(r'') + assert not ret + # socket timeout + client_socket.socket.status = Status.SOCKETTIMEOUT + with pytest.raises(socket.timeout): + ret = client_socket.send_json({'k1': 'v1'}) + assert not ret + # Bad Message + client_socket.socket.status = Status.NORMAL + ret = client_socket.send_json('Not Json') + assert ret + assert re.match(r'ClientSocket: BAD MSG - .*', get_mock_logger.get_last('error')) + + def test_client_socket_recv_json(self, mocker, get_config_dict, get_mock_logger, get_header_length, get_pytest_configs): + mocker.patch('socket.socket', MockSocket) + client_socket = ClientSocket(get_config_dict, get_mock_logger) + json_str = json.dumps(get_config_dict) + short_str = 'short msg' + client_socket.connect(get_pytest_configs["default_ip"], 1234) + # positive + client_socket.socket.sendall(prepare_send_data(json_str, get_header_length)) + ret = client_socket.recv_json("JSON") + assert ret == get_config_dict + client_socket.socket.sendall(prepare_send_data(short_str, get_header_length)) + ret = client_socket.recv_json("STRING") + assert ret == short_str + # empty message + msg = '' + client_socket.socket.sendall(msg) + ret = client_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ClientSocket: .*', get_mock_logger.get_last('error')) + # wrong header size + json_str = json.dumps(get_config_dict) + msg = prepare_send_data(json_str, get_header_length + 1) + client_socket.socket.sendall(msg) + ret = client_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ClientSocket: .*', get_mock_logger.get_last('error')) + # wrong body size + msg = prepare_send_data(json_str, get_header_length, True) + client_socket.socket.sendall(msg) + ret = client_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ClientSocket: .*', get_mock_logger.get_last('error')) + # wrong recv size + msg = prepare_send_data(json_str, get_header_length) + client_socket.socket.status = Status.MISALIGNEDBUFSIZE + client_socket.socket.sendall(msg) + ret = client_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ClientSocket: .*', get_mock_logger.get_last('error')) + # socket timeout + client_socket.socket.status = Status.NORMAL + msg = prepare_send_data(json_str, get_header_length) + client_socket.socket.sendall(msg) + client_socket.socket.status = Status.SOCKETTIMEOUT + ret = client_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ClientSocket: .*', get_mock_logger.get_last('error')) + # Bad JSON data + client_socket.socket.status = Status.NORMAL + bad_json_str = 'abcdefghijk!@#$0987654321' + msg = prepare_send_data(bad_json_str, get_header_length) + client_socket.socket.sendall(msg) + ret = client_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ClientSocket: Bad JSON data - .*', get_mock_logger.get_last('error')) + + def test_client_socket_close(self, mocker, get_config_dict, get_mock_logger, get_pytest_configs): + mocker.patch('socket.socket', MockSocket) + client_socket = ClientSocket(get_config_dict, get_mock_logger) + client_socket.connect(get_pytest_configs["default_ip"], 1234) + # positive + client_socket.close() + assert client_socket.socket.status == Status.CLOSED + # negative + client_socket.socket.status = Status.EXCEPTION + client_socket.close() + assert re.match(r'Close socket.*', get_mock_logger.get_last('excep')) + + def test_server_socket_bind(self, mocker, get_config_dict, get_mock_logger, get_pytest_configs): + mocker.patch('socket.socket', MockSocket) + server_socket = ServerSocket(get_config_dict, get_mock_logger) + # positive + ret = server_socket.bind(get_pytest_configs["default_ip"], 1234) + assert ret is None + assert server_socket.socket.status == Status.LISTENING + assert server_socket.client_socket is None + assert re.match(r'Client is listening for message on .*', get_mock_logger.get_last('info')) + # invalid IP + with pytest.raises(ConnectionError, match=r'Invalid IP Address - .*'): + server_socket.bind('1.2.3.256', 1234) + assert re.match(r'Wrong ip_address - .*', get_mock_logger.get_last('error')) + + def test_server_socket_accept(self, mocker, get_config_dict, get_mock_logger, get_pytest_configs): + mocker.patch('socket.socket', MockSocket) + server_socket = ServerSocket(get_config_dict, get_mock_logger) + server_socket.bind(get_pytest_configs["default_ip"], 1234) + # positive + server_socket.accept() + assert server_socket.client_socket is not None + assert re.match(r'Connected to .*', get_mock_logger.get_last('info')) + + def test_server_socket_send_json(self, mocker, get_config_dict, get_mock_logger, get_pytest_configs): + mocker.patch('socket.socket', MockSocket) + server_socket = ServerSocket(get_config_dict, get_mock_logger) + # send before bind + ret = server_socket.send_json({}) + assert not ret + # positive + server_socket.bind(get_pytest_configs["default_ip"], 1234) + server_socket.accept() + ret = server_socket.send_json(r'{}') + assert ret + ret = server_socket.send_json(get_config_dict) + assert ret + # empty message + ret = server_socket.send_json(r'') + assert not ret + # connection error + server_socket.socket.status = Status.CONNECTIONERROR + ret = server_socket.send_json({'k1': 'v1'}) + assert not ret + assert re.match(r'ConnectionError-.*', get_mock_logger.get_last('error')) + # bad json + server_socket.socket.status = Status.NORMAL + ret = server_socket.send_json('This is Not Json', format="JSON") + assert ret + assert re.match(r'ServerSocket: BAD MSG - .*', get_mock_logger.get_last('error')) + + def test_server_socket_recv_json(self, mocker, get_config_dict, get_mock_logger, get_header_length, get_pytest_configs): + mocker.patch('socket.socket', MockSocket) + server_socket = ServerSocket(get_config_dict, get_mock_logger) + # positive + json_str = json.dumps(get_config_dict) + short_str = 'short msg' + server_socket.bind(get_pytest_configs["default_ip"], 1234) + server_socket.accept() + server_socket.client_socket.sendall(prepare_send_data(json_str, get_header_length)) + ret = server_socket.recv_json("JSON") + assert ret == get_config_dict + server_socket.client_socket.sendall(prepare_send_data(short_str, get_header_length)) + ret = server_socket.recv_json("STRING") + assert ret == short_str + # empty message + msg = '' + server_socket.client_socket.sendall(msg) + ret = server_socket.recv_json("JSON") + assert ret == {} + # wrong header size + msg = prepare_send_data(json_str, get_header_length + 1) + server_socket.client_socket.sendall(msg) + ret = server_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ServerSocket: .*', get_mock_logger.get_last('error')) + # wrong body size + msg = prepare_send_data(json_str, get_header_length, wrong_size=True) + server_socket.client_socket.sendall(msg) + ret = server_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ServerSocket: .*', get_mock_logger.get_last('error')) + # wrong recv size + msg = prepare_send_data(json_str, get_header_length) + server_socket.client_socket.status = Status.MISALIGNEDBUFSIZE + server_socket.client_socket.sendall(msg) + ret = server_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ServerSocket: .*', get_mock_logger.get_last('error')) + # socket timeout + msg = prepare_send_data(json_str, get_header_length) + server_socket.client_socket.sendall(msg) + server_socket.client_socket.status = Status.SOCKETTIMEOUT + ret = server_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ServerSocket: .*', get_mock_logger.get_last('error')) + # Bad JSON + server_socket.client_socket.status = Status.NORMAL + bad_json_str = 'abcdefghijk!@#$0987654321' + msg = prepare_send_data(bad_json_str, get_header_length) + server_socket.client_socket.sendall(msg) + ret = server_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ServerSocket: Bad JSON data - .*', get_mock_logger.get_last('error')) + + def test_server_socket_close(self, mocker, get_config_dict, get_mock_logger, get_pytest_configs): + mocker.patch('socket.socket', MockSocket) + server_socket = ServerSocket(get_config_dict, get_mock_logger) + server_socket.bind(get_pytest_configs["default_ip"], 1234) + server_socket.accept() + # positive + server_socket.close() + assert server_socket.socket.status == Status.CLOSED + assert server_socket.client_socket.status == Status.CLOSED + # close error + server_socket.socket.status = Status.EXCEPTION + server_socket.close() + assert re.match(r'Closing Socket.*', get_mock_logger.get_last('error')) From bc8428be6d079939084e1c83a2b4aa515453b44c Mon Sep 17 00:00:00 2001 From: cliang li Date: Fri, 1 Sep 2023 23:09:04 -0700 Subject: [PATCH 2/3] removed the get_system_config_object and get_system_config_dict fixtures. Redundant since we have get_config_object. --- dss_datamover/unittests/conftest.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/dss_datamover/unittests/conftest.py b/dss_datamover/unittests/conftest.py index bdff9b8..41ebdd1 100755 --- a/dss_datamover/unittests/conftest.py +++ b/dss_datamover/unittests/conftest.py @@ -78,12 +78,6 @@ def get_config_dict(get_config_object): return get_config_object.get_config() -@pytest.fixture(scope="session") -def get_system_config_object(): - config_obj = config.Config({}, config_filepath="/etc/dss/datamover/config.json") - return config_obj - - @pytest.fixture(scope="session") def get_system_config_dict(get_system_config_object): return get_system_config_object.get_config() From 648a587b8cb7964163cb42794875e50df232338f Mon Sep 17 00:00:00 2001 From: cliang li Date: Fri, 1 Sep 2023 23:32:01 -0700 Subject: [PATCH 3/3] Replace get_system_config with get_pytest_configs --- dss_datamover/unittests/conftest.py | 31 +++++++++------------- dss_datamover/unittests/pytest_config.json | 3 ++- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/dss_datamover/unittests/conftest.py b/dss_datamover/unittests/conftest.py index 41ebdd1..909a7c2 100755 --- a/dss_datamover/unittests/conftest.py +++ b/dss_datamover/unittests/conftest.py @@ -78,11 +78,6 @@ def get_config_dict(get_config_object): return get_config_object.get_config() -@pytest.fixture(scope="session") -def get_system_config_dict(get_system_config_object): - return get_system_config_object.get_config() - - @pytest.fixture def get_multiprocessing_logger(tmpdir): logger_status = Value('i', 0) # 0=NOT-STARTED, 1=RUNNING, 2=STOPPED @@ -267,12 +262,12 @@ def get_mock_serversocket(mocker): @pytest.fixture -def get_master_dryrun(get_system_config_dict, get_pytest_configs): +def get_master_dryrun(get_config_dict, get_pytest_configs): def instantiate_master_object(operation): - get_system_config_dict["config"] = get_pytest_configs["config"] - get_system_config_dict["dest_path"] = get_pytest_configs["dest_path"] - get_system_config_dict["dryrun"] = True - master = Master(operation, get_system_config_dict) + get_config_dict["config"] = get_pytest_configs["config"] + get_config_dict["dest_path"] = get_pytest_configs["dest_path"] + get_config_dict["dryrun"] = True + master = Master(operation, get_config_dict) print("instantiated master obj") master.start() print("successfully started master obj") @@ -281,11 +276,11 @@ def instantiate_master_object(operation): @pytest.fixture -def get_master(get_system_config_dict, get_pytest_configs): +def get_master(get_config_dict, get_pytest_configs): def instantiate_master_object(operation): - get_system_config_dict["config"] = get_pytest_configs["config"] - get_system_config_dict["dest_path"] = get_pytest_configs["dest_path"] - master = Master(operation, get_system_config_dict) + get_config_dict["config"] = get_pytest_configs["config"] + get_config_dict["dest_path"] = get_pytest_configs["dest_path"] + master = Master(operation, get_config_dict) print("instantiated master obj") master.start() print("successfully started master obj") @@ -294,10 +289,10 @@ def instantiate_master_object(operation): @pytest.fixture -def get_master_for_indexing(get_system_config_dict, get_pytest_configs): - get_system_config_dict["config"] = get_pytest_configs["config"] - get_system_config_dict["dest_path"] = get_pytest_configs["dest_path"] - master = Master("PUT", get_system_config_dict) +def get_master_for_indexing(get_config_dict, get_pytest_configs): + get_config_dict["config"] = get_pytest_configs["config"] + get_config_dict["dest_path"] = get_pytest_configs["dest_path"] + master = Master("PUT", get_config_dict) return master diff --git a/dss_datamover/unittests/pytest_config.json b/dss_datamover/unittests/pytest_config.json index 5b471df..768c8b1 100755 --- a/dss_datamover/unittests/pytest_config.json +++ b/dss_datamover/unittests/pytest_config.json @@ -62,5 +62,6 @@ "required": true } }, - "compaction": "yes" + "compaction": "yes", + "ip_address_family": "IPV4" }