diff --git a/dss_datamover/unittests/conftest.py b/dss_datamover/unittests/conftest.py old mode 100755 new mode 100644 index bfce7a6..5039e61 --- a/dss_datamover/unittests/conftest.py +++ b/dss_datamover/unittests/conftest.py @@ -33,10 +33,29 @@ """ from utils import config +from logger import MultiprocessingLogger +from master_application import Master +from multiprocessing import Queue, Value, Lock +from socket_communication import ClientSocket, ServerSocket import json import os +import socket import pytest +from enum import Enum + + +class Status(Enum): + NORMAL = 0 + CONNECTIONERROR = 1 + CONNECTIONREFUSEERROR = 2 + SOCKETTIMEOUT = 3 + SOCKETERROR = 4 + EXCEPTION = 5 + MISALIGNEDBUFSIZE = 6 + WRONGBUFSIZE = 7 + LISTENING = 8 + CLOSED = 9 @pytest.fixture(scope="session") @@ -59,6 +78,36 @@ def get_config_dict(get_config_object): return get_config_object.get_config() +@pytest.fixture(scope="session") +def get_system_config_object(): + config_obj = config.Config({}, config_filepath="/etc/dss/datamover/config.json") + return config_obj + + +@pytest.fixture(scope="session") +def get_system_config_dict(get_system_config_object): + return get_system_config_object.get_config() + + +@pytest.fixture +def get_multiprocessing_logger(tmpdir): + logger_status = Value('i', 0) # 0=NOT-STARTED, 1=RUNNING, 2=STOPPED + logger_queue = Queue() + logger_lock = Value('i', 0) + logging_path = tmpdir + logging_level = "INFO" + + logger = MultiprocessingLogger(logger_queue, logger_lock, logger_status) + logger.config(logging_path, __file__, logging_level) + logger.create_logger_handle() + logger.start() + + yield logger + + # teardown logger + logger.stop() + + @pytest.fixture def clear_datamover_cache(get_pytest_configs): cache_files = get_pytest_configs["cache"] @@ -71,20 +120,139 @@ class MockSocket(): """ Dummy Object for an actual socket, should simulate all basic functions of a socket object """ - # TODO: finish building out MockSocket class - def __init__(self): - self.host = "xxx.xxxx.xxxx.xxxx" - self.port = "xxxx" + def __init__(self, family=0, type=0, proto=0, fileno=0): + self.timeout = 0 + self.status = Status.NORMAL + self.data = '' + self.data_index = 0 # indicates the starting pos of the sending data when calling recv + self.max_bufsize = 10 # maximum length of return data when calling recv + + def connect(self, address): + if self.status == Status.CONNECTIONERROR: + raise ConnectionError + elif self.status == Status.CONNECTIONREFUSEERROR: + raise ConnectionRefusedError + elif self.status == Status.SOCKETERROR: + raise socket.error + elif self.status == Status.SOCKETTIMEOUT: + raise socket.timeout + else: + return + + def recv(self, bufsize): + if self.status == Status.CONNECTIONERROR: + raise ConnectionError + elif self.status == Status.SOCKETTIMEOUT: + raise socket.timeout + elif self.status == Status.EXCEPTION: + raise Exception + elif self.status == Status.MISALIGNEDBUFSIZE: + ret = self.data[self.data_index: self.data_index + bufsize + 1] + return ret + else: + ret = '' + if not self.data: + return ret + if self.data_index >= len(self.data): + raise Exception + if bufsize > self.max_bufsize: + bufsize = self.max_bufsize + if bufsize >= len(self.data) - self.data_index: + ret = self.data[self.data_index:] + self.data_index = len(self.data) + else: + ret = self.data[self.data_index: self.data_index + bufsize] + self.data_index += bufsize + return ret.encode("utf8", "ignore") + + def send(self, data, flags=None): + return self.sendall(data, flags) - def connect(self): - return True + def sendall(self, data, flags=None): + self.data = '' + self.data_index = 0 + if self.status == Status.CONNECTIONERROR: + raise ConnectionError + elif self.status == Status.CONNECTIONREFUSEERROR: + raise ConnectionRefusedError + elif self.status == Status.SOCKETERROR: + raise socket.error + elif self.status == Status.SOCKETTIMEOUT: + raise socket.timeout + else: + self.data = data + return - def recv(self): + def setsockopt(self, param1, param2, param3): pass - def sendall(self): + def settimeout(self, new_timeout): pass + def close(self): + if self.status == Status.LISTENING or self.status == Status.NORMAL: + self.status = Status.CLOSED + else: + raise Exception + + def listen(self, backlog): + self.status = Status.LISTENING + + def bind(self, address): + if self.status == Status.NORMAL: + return + else: + raise Exception + + def get_default_ip(self): + default_ip = "" + pytest_config_filepath = os.path.dirname(__file__) + "/pytest_config.json" + with open(pytest_config_filepath) as f: + pytest_configs = json.load(f) + default_ip = pytest_configs['default_ip'] + return default_ip + + def accept(self): + return self, (self.get_default_ip(), 1234) + + +@pytest.fixture +def get_header_length(mocker, get_config_dict): + return get_config_dict.get("socket_options", {}).get("response_header_length", 10) + + +class MockLogger(): + + def __init__(self): + self.logs = {'error': [], 'info': [], 'warn': [], 'excep': []} + + def error(self, msg): + self.logs['error'].append(msg) + + def info(self, msg): + self.logs['info'].append(msg) + + def warn(self, msg): + self.logs['warn'].append(msg) + + def excep(self, msg): + self.logs['excep'].append(msg) + + def get_last(self, type): + ret = '' + if len(self.logs[type]) > 0: + ret = self.logs[type][-1] + return ret + + def clear(self): + for key in self.logs: + self.logs[key].clear() + + +@pytest.fixture +def get_mock_logger(): + return MockLogger() + @pytest.fixture def get_mock_clientsocket(mocker): @@ -94,5 +262,83 @@ def get_mock_clientsocket(mocker): @pytest.fixture def get_mock_serversocket(mocker): - mock_serversocket = mocker.patch('socket_communication.ClientSocket', spec=True) + mock_serversocket = mocker.patch('socket_communication.ServerSocket', spec=True) return mock_serversocket + + +@pytest.fixture +def get_master_dryrun(get_system_config_dict, get_pytest_configs): + def instantiate_master_object(operation): + get_system_config_dict["config"] = get_pytest_configs["config"] + get_system_config_dict["dest_path"] = get_pytest_configs["dest_path"] + get_system_config_dict["dryrun"] = True + master = Master(operation, get_system_config_dict) + print("instantiated master obj") + master.start() + print("successfully started master obj") + return master + return instantiate_master_object + + +@pytest.fixture +def get_master(get_system_config_dict, get_pytest_configs): + def instantiate_master_object(operation): + get_system_config_dict["config"] = get_pytest_configs["config"] + get_system_config_dict["dest_path"] = get_pytest_configs["dest_path"] + master = Master(operation, get_system_config_dict) + print("instantiated master obj") + master.start() + print("successfully started master obj") + return master + return instantiate_master_object + + +@pytest.fixture +def shutdown_master_without_nfscluster(): + def _method(master): + print("shutting down master") + master.stop_logging() + print("stopping logging") + master.stop_monitor() + print("stopping monitoring") + return _method + + +@pytest.fixture +def shutdown_master(): + def _method(master): + print("shutting down master") + master.nfs_cluster_obj.umount_all() + print("unmounting nfs cluster") + master.stop_logging() + print("stopping logging") + master.stop_monitor() + print("stopping monitoring") + return _method + + +class MockMinio(): + def __init__(self): + self.data = {} + + def list(self, key=''): + if not key: + return list(self.data.items()) + return self.data[key].items() if isinstance(self.data[key], dict) else [(key, self.data[key])] + + def get(self, key): + if key in self.data: + return self.data[key] + return None + + def put(self, key, value): + if key in self.data: + self.data[key] = value + return True + return False + + def delete(self, key): + if key in self.data: + self.data.pop(key) + return True + return False diff --git a/dss_datamover/unittests/pytest_config.json b/dss_datamover/unittests/pytest_config.json index a8e1f7f..5b471df 100755 --- a/dss_datamover/unittests/pytest_config.json +++ b/dss_datamover/unittests/pytest_config.json @@ -1,6 +1,66 @@ { - "config": "/etc/dss/datamover/config.json", + "config": "/etc/dss/datamover/standard_config.json", "dest_path": "/tmp/xyz", "cache": ["/var/log/dss/prefix_index_data.json", "/var/log/dss/dm_resume_prefix_dir_keys.txt"], - "fs_config": {"server_as_prefix": "yes"} -} \ No newline at end of file + "default_ip": "1.2.3.4", + "clients_hosts_or_ip_addresses": ["1.2.3.4", "5.6.7.8"], + "master": { + "host_or_ip_address": "1.2.3.4", + "workers": 1, + "max_index_size": 1000, + "size": "1GB" + }, + "client": { + "workers": 25, + "max_index_size": 1000, + "user_id": "ansible", + "password": "ansible" + }, + "message": { + "port_index": 6000, + "port_status": 6001 + }, + "socket_options": { + "connection_retry_delay": 2, + "max_connection_time_threshold": 300, + "response_header_length": 10, + "recv_timeout": 60 + }, + "fs_config": { + "mounted": false, + "nfs": { + "202.0.0.103": ["/dir1", "/dir2"] + }, + "nfsport": 2049, + "server_as_prefix": "yes" + }, + "s3_storage": { + "minio": {"url": "1.2.3.4:9000", "access_key": "minio", "secret_key": "minio123"}, + "bucket": "bucket", + "client_lib": "dss_client" + }, + "operations": ["PUT", "GET", "DEL", "LIST"], + "logging": { + "path": "/var/log/dss", + "level": "INFO", + "max_log_file_size": 1048576, + "log_file_backup_count": 5 + }, + "dss_targets": { + "subsystem_nqn": { + "1": ["vm1"], + "2": ["vm2"], + "3": ["vm3"], + "4": ["vm4"] + }, + "installation_path": "/usr/dss/nkv-target" + }, + "environment": { + "gcc": { + "version": "5.1", + "source": "/usr/local/bin/setenv-for-gcc510.sh", + "required": true + } + }, + "compaction": "yes" +} diff --git a/dss_datamover/unittests/test_socket.py b/dss_datamover/unittests/test_socket.py index 05941a2..3b797cc 100755 --- a/dss_datamover/unittests/test_socket.py +++ b/dss_datamover/unittests/test_socket.py @@ -33,8 +33,243 @@ """ import pytest +import socket +import os +import json +import re +from socket_communication import ServerSocket, ClientSocket +from conftest import MockSocket, Status +import time -@pytest.mark.usefixtures("get_mock_clientsocket") +def prepare_send_data(msg, header_len, wrong_size=False): + if wrong_size: + msg_len = (str(len(msg) + 3)).zfill(header_len) + else: + msg_len = (str(len(msg))).zfill(header_len) + ret = msg_len + msg + return ret + + +@pytest.mark.usefixtures("get_pytest_configs", "get_config_dict", "get_mock_logger", "get_header_length") class TestSocketCommunication(): """ Unit tests for both ClientSocket and ServerSocket objects""" + def test_client_socket_connect(self, mocker, get_config_dict, get_mock_logger, get_pytest_configs): + mocker.patch('socket.socket', MockSocket) + client_socket = ClientSocket(get_config_dict, get_mock_logger) + # positive case + ret = client_socket.connect(get_pytest_configs["default_ip"], 1234) + assert ret is None + # Invalid IP + with pytest.raises(ConnectionError, match=r"Invalid IP Address given - .*"): + client_socket.connect('*.@.^.!', 1234) + + def test_client_socket_send_json(self, mocker, get_config_dict, get_mock_logger, get_pytest_configs): + mocker.patch('socket.socket', MockSocket) + client_socket = ClientSocket(get_config_dict, get_mock_logger) + client_socket.connect(get_pytest_configs["default_ip"], 1234) + # positive case + ret = client_socket.send_json(r'{}') + assert ret + ret = client_socket.send_json(get_config_dict) + assert ret + # empty message + ret = client_socket.send_json(r'') + assert not ret + # socket timeout + client_socket.socket.status = Status.SOCKETTIMEOUT + with pytest.raises(socket.timeout): + ret = client_socket.send_json({'k1': 'v1'}) + assert not ret + # Bad Message + client_socket.socket.status = Status.NORMAL + ret = client_socket.send_json('Not Json') + assert ret + assert re.match(r'ClientSocket: BAD MSG - .*', get_mock_logger.get_last('error')) + + def test_client_socket_recv_json(self, mocker, get_config_dict, get_mock_logger, get_header_length, get_pytest_configs): + mocker.patch('socket.socket', MockSocket) + client_socket = ClientSocket(get_config_dict, get_mock_logger) + json_str = json.dumps(get_config_dict) + short_str = 'short msg' + client_socket.connect(get_pytest_configs["default_ip"], 1234) + # positive + client_socket.socket.sendall(prepare_send_data(json_str, get_header_length)) + ret = client_socket.recv_json("JSON") + assert ret == get_config_dict + client_socket.socket.sendall(prepare_send_data(short_str, get_header_length)) + ret = client_socket.recv_json("STRING") + assert ret == short_str + # empty message + msg = '' + client_socket.socket.sendall(msg) + ret = client_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ClientSocket: .*', get_mock_logger.get_last('error')) + # wrong header size + json_str = json.dumps(get_config_dict) + msg = prepare_send_data(json_str, get_header_length + 1) + client_socket.socket.sendall(msg) + ret = client_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ClientSocket: .*', get_mock_logger.get_last('error')) + # wrong body size + msg = prepare_send_data(json_str, get_header_length, True) + client_socket.socket.sendall(msg) + ret = client_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ClientSocket: .*', get_mock_logger.get_last('error')) + # wrong recv size + msg = prepare_send_data(json_str, get_header_length) + client_socket.socket.status = Status.MISALIGNEDBUFSIZE + client_socket.socket.sendall(msg) + ret = client_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ClientSocket: .*', get_mock_logger.get_last('error')) + # socket timeout + client_socket.socket.status = Status.NORMAL + msg = prepare_send_data(json_str, get_header_length) + client_socket.socket.sendall(msg) + client_socket.socket.status = Status.SOCKETTIMEOUT + ret = client_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ClientSocket: .*', get_mock_logger.get_last('error')) + # Bad JSON data + client_socket.socket.status = Status.NORMAL + bad_json_str = 'abcdefghijk!@#$0987654321' + msg = prepare_send_data(bad_json_str, get_header_length) + client_socket.socket.sendall(msg) + ret = client_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ClientSocket: Bad JSON data - .*', get_mock_logger.get_last('error')) + + def test_client_socket_close(self, mocker, get_config_dict, get_mock_logger, get_pytest_configs): + mocker.patch('socket.socket', MockSocket) + client_socket = ClientSocket(get_config_dict, get_mock_logger) + client_socket.connect(get_pytest_configs["default_ip"], 1234) + # positive + client_socket.close() + assert client_socket.socket.status == Status.CLOSED + # negative + client_socket.socket.status = Status.EXCEPTION + client_socket.close() + assert re.match(r'Close socket.*', get_mock_logger.get_last('excep')) + + def test_server_socket_bind(self, mocker, get_config_dict, get_mock_logger, get_pytest_configs): + mocker.patch('socket.socket', MockSocket) + server_socket = ServerSocket(get_config_dict, get_mock_logger) + # positive + ret = server_socket.bind(get_pytest_configs["default_ip"], 1234) + assert ret is None + assert server_socket.socket.status == Status.LISTENING + assert server_socket.client_socket is None + assert re.match(r'Client is listening for message on .*', get_mock_logger.get_last('info')) + # invalid IP + with pytest.raises(ConnectionError, match=r'Invalid IP Address - .*'): + server_socket.bind('1.2.3.256', 1234) + assert re.match(r'Wrong ip_address - .*', get_mock_logger.get_last('error')) + + def test_server_socket_accept(self, mocker, get_config_dict, get_mock_logger, get_pytest_configs): + mocker.patch('socket.socket', MockSocket) + server_socket = ServerSocket(get_config_dict, get_mock_logger) + server_socket.bind(get_pytest_configs["default_ip"], 1234) + # positive + server_socket.accept() + assert server_socket.client_socket is not None + assert re.match(r'Connected to .*', get_mock_logger.get_last('info')) + + def test_server_socket_send_json(self, mocker, get_config_dict, get_mock_logger, get_pytest_configs): + mocker.patch('socket.socket', MockSocket) + server_socket = ServerSocket(get_config_dict, get_mock_logger) + # send before bind + ret = server_socket.send_json({}) + assert not ret + # positive + server_socket.bind(get_pytest_configs["default_ip"], 1234) + server_socket.accept() + ret = server_socket.send_json(r'{}') + assert ret + ret = server_socket.send_json(get_config_dict) + assert ret + # empty message + ret = server_socket.send_json(r'') + assert not ret + # connection error + server_socket.socket.status = Status.CONNECTIONERROR + ret = server_socket.send_json({'k1': 'v1'}) + assert not ret + assert re.match(r'ConnectionError-.*', get_mock_logger.get_last('error')) + # bad json + server_socket.socket.status = Status.NORMAL + ret = server_socket.send_json('This is Not Json', format="JSON") + assert ret + assert re.match(r'ServerSocket: BAD MSG - .*', get_mock_logger.get_last('error')) + + def test_server_socket_recv_json(self, mocker, get_config_dict, get_mock_logger, get_header_length, get_pytest_configs): + mocker.patch('socket.socket', MockSocket) + server_socket = ServerSocket(get_config_dict, get_mock_logger) + # positive + json_str = json.dumps(get_config_dict) + short_str = 'short msg' + server_socket.bind(get_pytest_configs["default_ip"], 1234) + server_socket.accept() + server_socket.client_socket.sendall(prepare_send_data(json_str, get_header_length)) + ret = server_socket.recv_json("JSON") + assert ret == get_config_dict + server_socket.client_socket.sendall(prepare_send_data(short_str, get_header_length)) + ret = server_socket.recv_json("STRING") + assert ret == short_str + # empty message + msg = '' + server_socket.client_socket.sendall(msg) + ret = server_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ServerSocket: .*', get_mock_logger.get_last('error')) + # wrong header size + msg = prepare_send_data(json_str, get_header_length + 1) + server_socket.client_socket.sendall(msg) + ret = server_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ServerSocket: .*', get_mock_logger.get_last('error')) + # wrong body size + msg = prepare_send_data(json_str, get_header_length, wrong_size=True) + server_socket.client_socket.sendall(msg) + ret = server_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ServerSocket: .*', get_mock_logger.get_last('error')) + # wrong recv size + msg = prepare_send_data(json_str, get_header_length) + server_socket.client_socket.status = Status.MISALIGNEDBUFSIZE + server_socket.client_socket.sendall(msg) + ret = server_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ServerSocket: .*', get_mock_logger.get_last('error')) + # socket timeout + msg = prepare_send_data(json_str, get_header_length) + server_socket.client_socket.sendall(msg) + server_socket.client_socket.status = Status.SOCKETTIMEOUT + ret = server_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ServerSocket: .*', get_mock_logger.get_last('error')) + # Bad JSON + server_socket.client_socket.status = Status.NORMAL + bad_json_str = 'abcdefghijk!@#$0987654321' + msg = prepare_send_data(bad_json_str, get_header_length) + server_socket.client_socket.sendall(msg) + ret = server_socket.recv_json("JSON") + assert ret == {} + assert re.match(r'ServerSocket: Bad JSON data - .*', get_mock_logger.get_last('error')) + + def test_server_socket_close(self, mocker, get_config_dict, get_mock_logger, get_pytest_configs): + mocker.patch('socket.socket', MockSocket) + server_socket = ServerSocket(get_config_dict, get_mock_logger) + server_socket.bind(get_pytest_configs["default_ip"], 1234) + server_socket.accept() + # positive + server_socket.close() + assert server_socket.socket.status == Status.CLOSED + assert server_socket.client_socket.status == Status.CLOSED + # close error + server_socket.socket.status = Status.EXCEPTION + server_socket.close() + assert re.match(r'Closing Socket.*', get_mock_logger.get_last('error'))