-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Wazuh API modules
- Loading branch information
Showing
7 changed files
with
283 additions
and
0 deletions.
There are no files selected for viewing
124 changes: 124 additions & 0 deletions
124
src/wazuh_qa_framework/wazuh_components/api/wazuh_api.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
""" | ||
Module to wrapp the main Wazuh API calls. This module contains the following: | ||
- WazuhAPI: | ||
- get_token | ||
- set_token_expiration | ||
- get_api_info | ||
- list_agents | ||
- restart_agent | ||
""" | ||
from base64 import b64encode | ||
from http import HTTPStatus | ||
import requests | ||
|
||
from wazuh_qa_framework.wazuh_components.api.wazuh_api_request import WazuhAPIRequest | ||
from wazuh_qa_framework.generic_modules.request.request import GetRequest | ||
from wazuh_qa_framework.generic_modules.exceptions.exceptions import ConnectionError, RuntimeError | ||
|
||
|
||
DEFAULT_USER = 'wazuh' | ||
DEFAULT_PASSOWRD = 'wazuh' | ||
DEFAULT_PORT = 55000 | ||
DEFAULT_ADDRESS = 'localhost' | ||
DEFAULT_PROTOCOL = 'https' | ||
DEFAULT_TOKEN_EXPIRATION = 900 | ||
|
||
|
||
class WazuhAPI: | ||
"""Class to manage the Wazuh API via requests. | ||
Args: | ||
user (str): Wazuh API user. | ||
password (str): Wazuh API password. | ||
port (int): Wazuh API port connection. | ||
address (str): Wazuh API address. | ||
protocol (str): Wazuh API protocol. | ||
auto_auth (boolean): True for getting the API auth token automatically, False otherwise. | ||
token_expiration (int): Number of seconds to set to the token expiration. | ||
Attributes: | ||
user (str): Wazuh API user. | ||
password (str): Wazuh API password. | ||
port (int): Wazuh API port connection. | ||
address (str): Wazuh API address. | ||
protocol (str): Wazuh API protocol. | ||
token (str): Wazuh API auth token. | ||
token_expiration (int): Number of seconds to set to the token expiration. | ||
""" | ||
def __init__(self, user=DEFAULT_USER, password=DEFAULT_PASSOWRD, port=DEFAULT_PORT, address=DEFAULT_ADDRESS, | ||
protocol=DEFAULT_PROTOCOL, auto_auth=True, token_expiration=DEFAULT_TOKEN_EXPIRATION): | ||
self.user = user | ||
self.password = password | ||
self.port = port | ||
self.address = address | ||
self.protocol = protocol | ||
self.url = f"{protocol}://{address}:{port}" | ||
self.token_expiration = token_expiration | ||
self.token = self.get_token() if auto_auth else None | ||
|
||
if token_expiration != DEFAULT_TOKEN_EXPIRATION: | ||
self.set_token_expiration(token_expiration) | ||
self.token = self.get_token() | ||
|
||
def get_token(self): | ||
"""Get the auth API token. | ||
Returns: | ||
str: API auth token. | ||
Raises: | ||
exceptions.RuntimeError: If there are any error when obtaining the login token. | ||
exceptions.RuntimeError: Cannot establish connection with API. | ||
""" | ||
basic_auth = f"{self.user}:{self.password}".encode() | ||
auth_header = {'Content-Type': 'application/json', 'Authorization': f'Basic {b64encode(basic_auth).decode()}'} | ||
|
||
try: | ||
response = GetRequest(f"{self.url}/security/user/authenticate?raw=true", headers=auth_header).send() | ||
|
||
if response.status_code == HTTPStatus.OK: | ||
return response.text | ||
|
||
raise RuntimeError(f"Error obtaining login token: {response.json()}") | ||
|
||
except requests.exceptions.ConnectionError as exception: | ||
raise ConnectionError(f"Cannot establish connection with {self.url}") from exception | ||
|
||
def set_token_expiration(self, num_seconds): | ||
"""Set the Wazuh API token expiration. | ||
Returns: | ||
WazuhAPIResponse: Operation result (response). | ||
""" | ||
response = WazuhAPIRequest(method='PUT', endpoint='/security/config', | ||
payload={'auth_token_exp_timeout': num_seconds}).send(self) | ||
return response | ||
|
||
@WazuhAPIRequest(method='GET', endpoint='/') | ||
def get_api_info(self, response): | ||
"""Get the Wazuh API info. | ||
Returns: | ||
dict: Wazuh API info. | ||
""" | ||
return response.data | ||
|
||
@WazuhAPIRequest(method='GET', endpoint='/agents') | ||
def list_agents(self, response): | ||
"""List the wazuh agents. | ||
Returns: | ||
dict: Wazuh API info. | ||
""" | ||
return response.data | ||
|
||
def restart_agent(self, agent_id): | ||
"""Restart a wazuh-agent. | ||
Returns: | ||
dict: Wazuh API info. | ||
""" | ||
response = WazuhAPIRequest(method='PUT', endpoint=f"/agents/{agent_id}/restart").send(self) | ||
|
||
return response |
105 changes: 105 additions & 0 deletions
105
src/wazuh_qa_framework/wazuh_components/api/wazuh_api_request.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
""" | ||
Module to wrapp the Wazuh API requests. Normally, the developers should not use this class but WazuhAPI one. This class | ||
is used by WazuhAPI to make and send the API requests. | ||
This module contains the following: | ||
- WazuhAPIRequest: | ||
- send | ||
""" | ||
import json | ||
import requests | ||
|
||
from wazuh_qa_framework.generic_modules.request.request import Request | ||
from wazuh_qa_framework.generic_modules.exceptions.exceptions import ConnectionError | ||
from wazuh_qa_framework.wazuh_components.api.wazuh_api_response import WazuhAPIResponse | ||
|
||
|
||
class WazuhAPIRequest: | ||
"""Wrapper class to manage requests to the Wazuh API. | ||
Args: | ||
endpoint (str): Target API endpoint. | ||
method (str): Request method (GET, POST, PUT, DELETE). | ||
payload (dict): Request data. | ||
headers (dict): Request headers. | ||
verify (boolean): False for ignore making insecure requests, False otherwise. | ||
Attributes: | ||
endpoint (str): Target API endpoint. | ||
method (str): Request method (GET, POST, PUT, DELETE). | ||
payload (dict): Request data. | ||
headers (dict): Request headers. | ||
verify (boolean): False for ignore making insecure requests, False otherwise. | ||
""" | ||
def __init__(self, endpoint, method, payload=None, headers=None, verify=False): | ||
self.endpoint = endpoint | ||
self.method = method.upper() | ||
self.payload = payload | ||
self.headers = headers | ||
self.verify = verify | ||
|
||
def __get_request_parameters(self, wazuh_api_object): | ||
"""Build the request parameters. | ||
Args: | ||
wazuh_api_object (WazuhAPI): Wazuh API object. | ||
""" | ||
# Get the token if we have not got it before. | ||
if wazuh_api_object.token is None: | ||
wazuh_api_object.token = wazuh_api_object.get_token() | ||
|
||
self.headers = {} if self.headers is None else self.headers | ||
self.headers.update({ | ||
'Content-Type': 'application/json', | ||
'Authorization': f'Bearer {wazuh_api_object.token}' | ||
}) | ||
|
||
request_args = { | ||
'method': self.method, | ||
'url': f"{wazuh_api_object.url}{self.endpoint}", | ||
'headers': self.headers, | ||
'verify': self.verify | ||
} | ||
|
||
if self.payload is not None: | ||
request_args['payload'] = self.payload | ||
|
||
return request_args | ||
|
||
def __call__(self, func): | ||
"""Perform directly the Wazuh API call and add the response object to the function parameters. Useful to run | ||
the request using only a python decorator. | ||
Args: | ||
func (function): Function object. | ||
""" | ||
def wrapper(obj, *args, **kwargs): | ||
kwargs['response'] = self.send(obj) | ||
|
||
return func(obj, *args, **kwargs) | ||
|
||
return wrapper | ||
|
||
def __str__(self): | ||
"""Overwrite the print object representation""" | ||
return json.dumps(self.__dict__) | ||
|
||
def send(self, wazuh_api_object): | ||
"""Send the API request. | ||
Args: | ||
wazuh_api_object (WazuhAPI): Wazuh API object. | ||
Returns: | ||
WazuhAPIResponse: Wazuh API response object. | ||
Raises: | ||
exceptions.RuntimeError: Cannot establish connection with the API. | ||
""" | ||
request_parameters = self.__get_request_parameters(wazuh_api_object) | ||
|
||
try: | ||
return WazuhAPIResponse(Request(**request_parameters).send()) | ||
except requests.exceptions.ConnectionError as exception: | ||
raise ConnectionError(f"Cannot establish connection with {wazuh_api_object.url}") from exception |
54 changes: 54 additions & 0 deletions
54
src/wazuh_qa_framework/wazuh_components/api/wazuh_api_response.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
""" | ||
Module to wrapp the Wazuh API responses. This modules contains the following: | ||
- WazuhAPIResponse | ||
""" | ||
from http import HTTPStatus | ||
from json import JSONDecodeError | ||
|
||
|
||
class WazuhAPIResponse: | ||
"""Wrapper class to handle Wazuh API responses. | ||
Args: | ||
request_response (requests.Response): Response object. | ||
Attributes: | ||
request_response (requests.Response): Response object. | ||
status_code (int): Response status code. | ||
error (int): Wazuh API response error. | ||
data (dict|str): Wazuh API response data in dict format if possible else string. | ||
""" | ||
def __init__(self, request_response): | ||
self.request_response = request_response | ||
self.status_code = request_response.status_code | ||
self.error = 0 | ||
self.data = self.__get_data() | ||
|
||
def __get_data(self): | ||
"""Set and get the custom class object data | ||
Returns: | ||
(dict|str): Wazuh API response data in dict format if possible else string. | ||
""" | ||
if self.status_code == HTTPStatus.METHOD_NOT_ALLOWED or self.status_code == HTTPStatus.UNAUTHORIZED: | ||
self.error = 1 | ||
return self.request_response.json()['title'] | ||
|
||
if self.status_code == HTTPStatus.OK: | ||
try: | ||
data_container = self.request_response.json() | ||
|
||
if 'data' in data_container: | ||
self.error = data_container['error'] if 'error' in data_container else 0 | ||
return data_container['data'] | ||
else: | ||
self.error = 0 | ||
return data_container | ||
|
||
except JSONDecodeError: | ||
return self.request_response.text | ||
|
||
def __str__(self): | ||
"""Overwrite the print object representation.""" | ||
return '{' + f"'status_code': {self.status_code}, 'data': '{self.data}', error: {self.error}" + '}' |
Empty file.
Empty file.
Empty file.
Empty file.