Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: autopilot-manager: Migrate to use new settings #2975

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions core/services/ardupilot_manager/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# This file is used to define general configurations for the app
from collections import namedtuple
from pathlib import Path

import appdirs

SERVICE_NAME = "autopilot-manager"


SETTINGS_PATH = Path(appdirs.user_config_dir(SERVICE_NAME))
LOG_PATH = Path.joinpath(SETTINGS_PATH, "logs")
FIRMWARE_PATH = Path.joinpath(SETTINGS_PATH, "firmware")
USER_FIRMWARE_PATH = Path("/usr/blueos/userdata/firmware")

BLUE_OS_FILES_PATH = Path.joinpath(Path.home(), "blueos-files")
DEFAULTS_PATH = Path.joinpath(BLUE_OS_FILES_PATH, "ardupilot-manager/default")

@staticmethod
def create_app_folders() -> None:
"""Create the necessary folders for proper app function."""
for folder in Settings.app_folders:
try:
Path.mkdir(folder, parents=True, exist_ok=True)
except FileExistsError:
logger.warning(f"Found file {folder} where a folder should be. Removing file and creating folder.")
Path.unlink(folder)
Path.mkdir(folder)

StaticFile = namedtuple("StaticFile", "parent filename url")
DEFAULT_RESOURCES = [
StaticFile(
DEFAULTS_PATH, "ardupilot_navigator", "https://firmware.ardupilot.org/Sub/stable-4.5.0/navigator/ardusub",
),
StaticFile(
DEFAULTS_PATH, "ardupilot_navigator", "https://firmware.ardupilot.org/Sub/stable-4.5.0/navigator64/ardusub",
),
StaticFile(
DEFAULTS_PATH, "ardupilot_pixhawk1", "https://firmware.ardupilot.org/Sub/stable-4.5.0/Pixhawk1/ardusub.apj"
),
StaticFile(
DEFAULTS_PATH, "ardupilot_pixhawk4", "https://firmware.ardupilot.org/Sub/stable-4.5.0/Pixhawk4/ardusub.apj"
),
]


__all__ = [
"SERVICE_NAME",
"DEFAULT_RESOURCES",
"SETTINGS_PATH",
"LOG_PATH",
"FIRMWARE_PATH",
"USER_FIRMWARE_PATH",
"BLUE_OS_FILES_PATH",
"DEFAULTS_PATH",
]
121 changes: 12 additions & 109 deletions core/services/ardupilot_manager/settings.py
Original file line number Diff line number Diff line change
@@ -1,116 +1,19 @@
import json
from copy import deepcopy
from pathlib import Path
from typing import Any, Dict, Optional, Union, cast
from typing import Any, Dict, Optional

import appdirs
from loguru import logger
from commonwealth.settings.bases.pydantic_base import PydanticSettings

from typedefs import SITLFrame

SERVICE_NAME = "ardupilot-manager"
class SettingsV1(PydanticSettings):
sitl_frame: Optional[str] = None
prefered_router: Optional[str] = None


class Settings:
app_name = SERVICE_NAME
settings_path = Path(appdirs.user_config_dir(app_name))
settings_file = Path.joinpath(settings_path, "settings.json")
firmware_folder = Path.joinpath(settings_path, "firmware")
user_firmware_folder = Path("/usr/blueos/userdata/firmware")
log_path = Path.joinpath(settings_path, "logs")
app_folders = [settings_path, firmware_folder, log_path, user_firmware_folder]

blueos_files_folder = Path.joinpath(Path.home(), "blueos-files")
defaults_folder = Path.joinpath(blueos_files_folder, "ardupilot-manager/default")
sitl_frame = SITLFrame.UNDEFINED
preferred_router: Optional[str] = None

def __init__(self) -> None:
self.root: Dict[str, Union[int, Dict[str, Any]]] = {"version": 0, "content": {}}

@staticmethod
def create_app_folders() -> None:
"""Create the necessary folders for proper app function."""
for folder in Settings.app_folders:
try:
Path.mkdir(folder, parents=True, exist_ok=True)
except FileExistsError:
logger.warning(f"Found file {folder} where a folder should be. Removing file and creating folder.")
Path.unlink(folder)
Path.mkdir(folder)

def create_settings_file(self) -> None:
"""Create settings file."""
try:
if not Path.is_file(self.settings_file):
with open(self.settings_file, "w+", encoding="utf-8") as file:
logger.info(f"Creating settings file: {self.settings_file}")
json.dump(self.root, file, sort_keys=True, indent=4)

except OSError as error:
logger.error(
f"Could not create settings files: {error}\n No settings will be loaded or saved during this session."
)

@property
def content(self) -> Dict[str, Any]:
return cast(Dict[str, Any], self.root["content"])

@property
def version(self) -> int:
return cast(int, self.root["version"])

def settings_exist(self) -> bool:
"""Check if settings file exist

Returns:
bool: True if it exist
"""
return Path.is_file(self.settings_file)

def load(self) -> bool:
"""Load settings from file

Returns:
bool: False if failed
"""
if not self.settings_exist():
logger.error(f"User settings does not exist on {self.settings_file}.")
return False

data = None
try:
with open(self.settings_file, encoding="utf-8") as file:
data = json.load(file)
if data["version"] != self.root["version"]:
logger.error("User settings does not match with our supported version.")
return False

self.root = data
except Exception as error:
logger.error(f"Failed to fetch data from file ({self.settings_file}): {error}")
logger.debug(data)

return True

def save(self, content: Dict[str, Any]) -> None:
"""Save content to file

Args:
content (list): Configuration list
"""
# We don't want to write in disk if there is nothing different to write
if self.root["content"] == content:
logger.info("No new data. Not updating settings file.")
def migrate(self, data: Dict[str, Any]) -> None:
if data["VERSION"] == SettingsV1.STATIC_VERSION:
return

self.root["content"] = deepcopy(content)

try:
Path.mkdir(self.settings_path, exist_ok=True)
if data["VERSION"] < SettingsV1.STATIC_VERSION:
super().migrate(data)

with open(self.settings_file, "w+", encoding="utf-8") as file:
logger.info(f"Updating settings file: {self.settings_file}")
json.dump(self.root, file, sort_keys=True, indent=4)
except Exception as error:
logger.warning(f"Could not save settings to disk: {error}")
data["VERSION"] = SettingsV1.STATIC_VERSION
data["animal"] = self.animal
data["first_variable"] = self.first_variable
29 changes: 2 additions & 27 deletions core/services/ardupilot_manager/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,16 @@
import sys
import time
import urllib.request
from collections import namedtuple
from warnings import warn

import setuptools

from settings import Settings
from config import DEFAULT_RESOURCES

ssl._create_default_https_context = ssl._create_unverified_context


autopilot_settings = Settings()
defaults_folder = autopilot_settings.defaults_folder

StaticFile = namedtuple("StaticFile", "parent filename url")

static_files = [
StaticFile(
defaults_folder,
"ardupilot_navigator",
"https://firmware.ardupilot.org/Sub/stable-4.5.0/navigator/ardusub",
),
StaticFile(
defaults_folder,
"ardupilot_navigator",
"https://firmware.ardupilot.org/Sub/stable-4.5.0/navigator64/ardusub",
),
StaticFile(
defaults_folder, "ardupilot_pixhawk1", "https://firmware.ardupilot.org/Sub/stable-4.5.0/Pixhawk1/ardusub.apj"
),
StaticFile(
defaults_folder, "ardupilot_pixhawk4", "https://firmware.ardupilot.org/Sub/stable-4.5.0/Pixhawk4/ardusub.apj"
),
]

for file in static_files:
for file in DEFAULT_RESOURCES:
path = pathlib.Path.joinpath(file.parent, file.filename)
path.parent.mkdir(parents=True, exist_ok=True)

Expand Down
Loading