Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added keyword for ssh_timeout and improved argument passing for ssh. #483

Merged
merged 10 commits into from
Apr 10, 2024
4 changes: 1 addition & 3 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,4 @@ updates:
schedule:
interval: "daily"
open-pull-requests-limit: 10
versioning-strategy: "widen"
target:
versions: [">=3.0.0"]
versioning-strategy: "auto"
2 changes: 2 additions & 0 deletions bibigrid.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
cloud: openstack # name of clouds.yaml cloud-specification key (which is value to top level key clouds)

# -- BEGIN: GENERAL CLUSTER INFORMATION --
# sshTimeout: 5 # Number of ssh connection attempts with 2^attempt seconds in between (2^sshTimeout-1 is the max time before returning with an error)

## sshPublicKeyFiles listed here will be added to access the cluster. A temporary key is created by bibigrid itself.
#sshPublicKeyFiles:
# - [public key one]
Expand Down
35 changes: 17 additions & 18 deletions bibigrid/core/actions/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def __init__(self, providers, configurations, config_path, log, debug=False, clu
self.ssh_user = configurations[0].get("sshUser") or "ubuntu"
self.ssh_add_public_key_commands = ssh_handler.get_add_ssh_public_key_commands(
configurations[0].get("sshPublicKeyFiles"))
self.ssh_timeout = configurations[0].get("sshTimeout", 4)
self.config_path = config_path
self.master_ip = None
self.log.debug("Cluster-ID: %s", self.cluster_id)
Expand Down Expand Up @@ -151,8 +152,7 @@ def generate_security_groups(self):
for cidr in tmp_configuration['subnet_cidrs']:
rules.append(
{"direction": "ingress", "ethertype": "IPv4", "protocol": "tcp", "port_range_min": None,
"port_range_max": None, "remote_ip_prefix": cidr,
"remote_group_id": None})
"port_range_max": None, "remote_ip_prefix": cidr, "remote_group_id": None})
provider.append_rules_to_security_group(default_security_group_id, rules)
configuration["security_groups"] = [self.default_security_group_name] # store in configuration
# when running a multi-cloud setup create an additional wireguard group
Expand Down Expand Up @@ -232,17 +232,17 @@ def initialize_instances(self):
Setup all servers
"""
for configuration in self.configurations:
ssh_data = {"floating_ip": configuration["floating_ip"], "private_key": KEY_FOLDER + self.key_name,
"username": self.ssh_user, "commands": None, "filepaths": None,
"gateway": configuration.get("gateway", {}), "timeout": self.ssh_timeout}
if configuration.get("masterInstance"):
self.master_ip = configuration["floating_ip"]
ssh_handler.ansible_preparation(floating_ip=configuration["floating_ip"],
private_key=KEY_FOLDER + self.key_name, username=self.ssh_user,
commands=self.ssh_add_public_key_commands, log=self.log,
gateway=configuration.get("gateway", {}))
ssh_data["commands"] = self.ssh_add_public_key_commands + ssh_handler.ANSIBLE_SETUP
ssh_data["filepaths"] = [(ssh_data["private_key"], ssh_handler.PRIVATE_KEY_FILE)]
ssh_handler.execute_ssh(ssh_data, self.log)
elif configuration.get("vpnInstance"):
ssh_handler.execute_ssh(floating_ip=configuration["floating_ip"],
private_key=KEY_FOLDER + self.key_name, username=self.ssh_user,
commands=ssh_handler.VPN_SETUP, log=self.log,
gateway=configuration.get("gateway", {}))
ssh_data["commands"] = ssh_handler.VPN_SETUP
ssh_handler.execute_ssh(ssh_data, self.log)

def prepare_volumes(self, provider, mounts):
"""
Expand Down Expand Up @@ -316,9 +316,10 @@ def upload_data(self):
else:
commands = [ssh_handler.get_ac_command(self.providers, AC_NAME.format(
cluster_id=self.cluster_id))] + ssh_handler.ANSIBLE_START
ssh_handler.execute_ssh(floating_ip=self.master_ip, private_key=KEY_FOLDER + self.key_name,
username=self.ssh_user, filepaths=FILEPATHS, commands=commands, log=self.log,
gateway=self.configurations[0].get("gateway", {}))
ssh_data = {"floating_ip": self.master_ip, "private_key": KEY_FOLDER + self.key_name,
"username": self.ssh_user, "commands": commands, "filepaths": FILEPATHS,
"gateway": self.configurations[0].get("gateway", {}), "timeout": self.ssh_timeout}
ssh_handler.execute_ssh(ssh_data=ssh_data, log=self.log)

def start_start_instance_threads(self):
"""
Expand Down Expand Up @@ -354,8 +355,7 @@ def extended_network_configuration(self):
f"{configuration_b['subnet_cidrs']})")
# add provider_b network as allowed network
for cidr in configuration_b["subnet_cidrs"]:
allowed_addresses.append(
{'ip_address': cidr, 'mac_address': configuration_a["mac_addr"]})
allowed_addresses.append({'ip_address': cidr, 'mac_address': configuration_a["mac_addr"]})
# configure security group rules
provider_a.append_rules_to_security_group(self.wireguard_security_group_name, [
{"direction": "ingress", "ethertype": "IPv4", "protocol": "udp", "port_range_min": 51820,
Expand Down Expand Up @@ -443,9 +443,8 @@ def log_cluster_start_info(self):
port = int(sympy.sympify(gateway["portFunction"]).subs(dict(octets)))
ssh_ip = gateway["ip"]
self.log.log(42, f"Cluster {self.cluster_id} with master {self.master_ip} up and running!")
self.log.log(42,
f"SSH: ssh -i '{KEY_FOLDER}{self.key_name}' {self.ssh_user}@{ssh_ip}"
f"{f' -p {port}' if gateway else ''}")
self.log.log(42, f"SSH: ssh -i '{KEY_FOLDER}{self.key_name}' {self.ssh_user}@{ssh_ip}"
f"{f' -p {port}' if gateway else ''}")
self.log.log(42, f"Terminate cluster: ./bibigrid.sh -i '{self.config_path}' -t -cid {self.cluster_id}")
self.log.log(42, f"Detailed cluster info: ./bibigrid.sh -i '{self.config_path}' -l -cid {self.cluster_id}")
if self.configurations[0].get("ide"):
Expand Down
2 changes: 1 addition & 1 deletion bibigrid/core/startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def run_action(args, configurations, config_path):
creator = create.Create(providers=providers, configurations=configurations, log=LOG, debug=args.debug,
config_path=config_path)
LOG.log(42, "Creating a new cluster takes about 10 or more minutes depending on your cloud provider "
"and your configuration. Be patient.")
"and your configuration. Please be patient.")
exit_state = creator.create()
else:
if not args.cluster_id:
Expand Down
11 changes: 7 additions & 4 deletions bibigrid/core/utility/ansible_configurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

import mergedeep
import yaml
from bibigrid.core.actions.version import __version__

from bibigrid.core.actions import create
from bibigrid.core.actions import ide
from bibigrid.core.actions.version import __version__
from bibigrid.core.utility import id_generation
from bibigrid.core.utility import yaml_dumper
from bibigrid.core.utility.handler import configuration_handler
Expand All @@ -30,6 +30,7 @@
SLURM_CONF = {"db": "slurm", "db_user": "slurm", "db_password": "changeme",
"munge_key": id_generation.generate_munge_key(),
"elastic_scheduling": {"SuspendTime": 3600, "ResumeTimeout": 900, "TreeWidth": 128}}
CLOUD_SCHEDULING = {"sshTimeout": 4}


def delete_old_vars(log):
Expand Down Expand Up @@ -182,7 +183,10 @@ def generate_common_configuration_yaml(cidrs, configurations, cluster_id, ssh_us
"slurm": master_configuration.get("slurm", True), "ssh_user": ssh_user,
"slurm_conf": mergedeep.merge({}, SLURM_CONF,
master_configuration.get("slurmConf", {}),
strategy=mergedeep.Strategy.TYPESAFE_REPLACE)}
strategy=mergedeep.Strategy.TYPESAFE_REPLACE),
"cloud_scheduling": mergedeep.merge({}, CLOUD_SCHEDULING,
master_configuration.get("cloudScheduling", {}),
strategy=mergedeep.Strategy.TYPESAFE_REPLACE)}
if master_configuration.get("nfs"):
nfs_shares = master_configuration.get("nfsShares", [])
nfs_shares = nfs_shares + DEFAULT_NFS_SHARES
Expand All @@ -199,8 +203,7 @@ def generate_common_configuration_yaml(cidrs, configurations, cluster_id, ssh_us
master_configuration.get("zabbixConf", {}),
strategy=mergedeep.Strategy.TYPESAFE_REPLACE)

for from_key, to_key in [("ansibleRoles", "ansible_roles"),
("ansibleGalaxyRoles", "ansible_galaxy_roles")]:
for from_key, to_key in [("ansibleRoles", "ansible_roles"), ("ansibleGalaxyRoles", "ansible_galaxy_roles")]:
pass_through(master_configuration, common_configuration_yaml, from_key, to_key)

if len(configurations) > 1:
Expand Down
111 changes: 43 additions & 68 deletions bibigrid/core/utility/handler/ssh_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,51 +87,52 @@ def copy_to_server(sftp, local_path, remote_path, log):
copy_to_server(sftp, os.path.join(local_path, filename), os.path.join(remote_path, filename), log)


def is_active(client, floating_ip_address, private_key, username, log, gateway, timeout=5):
def is_active(client, paramiko_key, ssh_data, log):
"""
Checks if connection is possible and therefore if server is active.
Raises paramiko.ssh_exception.NoValidConnectionsError if timeout is reached
@param client: created client
@param floating_ip_address: ip to connect to
@param private_key: SSH-private_key
@param username: SSH-username
@param paramiko_key: SSH-private_key
@param log:
@param timeout: how long to wait between ping
@param gateway: if node should be reached over a gateway port is set to 30000 + subnet * 256 + host
@param ssh_data: dict containing among other things gateway, floating_ip, username
(waiting grows quadratically till 2**timeout before accepting failure)
"""
attempts = 0
establishing_connection = True
log.info(f"Attempting to connect to {ssh_data['floating_ip']}... This might take a while")
port = 22
if ssh_data.get('gateway'):
log.info(f"Using SSH Gateway {ssh_data['gateway'].get('ip')}")
octets = {f'oct{enum + 1}': int(elem) for enum, elem in enumerate(ssh_data['floating_ip'].split("."))}
port = int(sympy.sympify(ssh_data['gateway']["portFunction"]).subs(dict(octets)))
log.info(f"Port {port} will be used (see {ssh_data['gateway']['portFunction']} and octets {octets}).")
while establishing_connection:
try:
port = 22
if gateway:
log.info(f"Using SSH Gateway {gateway.get('ip')}")
octets = {f'oct{enum + 1}': int(elem) for enum, elem in enumerate(floating_ip_address.split("."))}
port = int(sympy.sympify(gateway["portFunction"]).subs(dict(octets)))
log.info(f"Port {port} will be used (see {gateway['portFunction']} and octets {octets}).")
client.connect(hostname=gateway.get("ip") or floating_ip_address, username=username,
pkey=private_key, timeout=7, auth_timeout=5, port=port)
log.info(f"Attempt {attempts}/{ssh_data['timeout']}. Connecting to {ssh_data['floating_ip']}")
client.connect(hostname=ssh_data['gateway'].get("ip") or ssh_data['floating_ip'],
username=ssh_data['username'], pkey=paramiko_key, timeout=7,
auth_timeout=ssh_data['timeout'], port=port)
establishing_connection = False
log.info(f"Successfully connected to {floating_ip_address}")
log.info(f"Successfully connected to {ssh_data['floating_ip']}.")
except paramiko.ssh_exception.NoValidConnectionsError as exc:
log.info(f"Attempting to connect to {floating_ip_address}... This might take a while", )
if attempts < timeout:
time.sleep(2 ** attempts)
if attempts < ssh_data['timeout']:
sleep_time = 2 ** (attempts+2)
time.sleep(sleep_time)
log.info(f"Waiting {sleep_time} before attempting to reconnect.")
attempts += 1
else:
log.error(f"Attempt to connect to {floating_ip_address} failed.")
log.error(f"Attempt to connect to {ssh_data['floating_ip']} failed.")
raise ConnectionException(exc) from exc
except socket.timeout as exc:
log.warning("Socket timeout exception occurred. Try again ...")
if attempts < timeout:
if attempts < ssh_data['timeout']:
attempts += 1
else:
log.error(f"Attempt to connect to {floating_ip_address} failed, due to a socket timeout.")
log.error(f"Attempt to connect to {ssh_data['floating_ip']} failed, due to a socket timeout.")
raise ConnectionException(exc) from exc
except TimeoutError as exc: # pylint: disable=duplicate-except
log.error("The attempt to connect to %s failed. Possible known reasons:"
"\n\t-Your network's security group doesn't allow SSH.", floating_ip_address)
"\n\t-Your network's security group doesn't allow SSH.", ssh_data['floating_ip'])
raise ConnectionException(exc) from exc


Expand Down Expand Up @@ -183,61 +184,35 @@ def execute_ssh_cml_commands(client, commands, log):
raise ExecutionException(msg)


def ansible_preparation(floating_ip, private_key, username, log, gateway, commands=None, filepaths=None):
"""
Installs python and pip. Then installs ansible over pip.
Copies private key to instance so cluster-nodes are reachable and sets permission as necessary.
Copies additional files and executes additional commands if given.
The playbook is copied later, because it needs all servers setup and is not time intensive.
See: create.update_playbooks
@param floating_ip: public ip of server to ansible-prepare
@param private_key: generated private key of all cluster-server
@param username: username of all server
@param log:
@param commands: additional commands to execute
@param filepaths: additional files to copy: (localpath, remotepath)
@param gateway
"""
if filepaths is None:
filepaths = []
if commands is None:
commands = []
log.info("Ansible preparation...")
commands = ANSIBLE_SETUP + commands
filepaths.append((private_key, PRIVATE_KEY_FILE))
execute_ssh(floating_ip, private_key, username, log, gateway, commands, filepaths)


def execute_ssh(floating_ip, private_key, username, log, gateway, commands=None, filepaths=None):
def execute_ssh(ssh_data, log):
"""
Executes commands on remote and copies files given in filepaths
@param floating_ip: public ip of remote
@param private_key: key of remote
@param username: username of remote
@param commands: commands

@param ssh_data: Dict containing floating_ip, private_key, username, commands, filepaths, gateway, timeout
@param log:
@param filepaths: filepaths (localpath, remotepath)
@param gateway: gateway if used
"""
if commands is None:
commands = []
paramiko_key = paramiko.ECDSAKey.from_private_key_file(private_key)
log.debug(f"Running execute_sshc with ssh_data: {ssh_data}.")
if ssh_data.get("filepaths") is None:
ssh_data["filepaths"] = []
if ssh_data.get("commands") is None:
ssh_data["commands"] = []
paramiko_key = paramiko.ECDSAKey.from_private_key_file(ssh_data["private_key"])
with paramiko.SSHClient() as client:
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
is_active(client=client, floating_ip_address=floating_ip, username=username, private_key=paramiko_key,
log=log, gateway=gateway)
is_active(client=client, paramiko_key=paramiko_key, ssh_data=ssh_data, log=log)
except ConnectionException as exc:
log.error(f"Couldn't connect to ip {gateway or floating_ip} using private key {private_key}.")
log.error(f"Couldn't connect to ip {ssh_data['gateway'] or ssh_data['floating_ip']} using private key "
f"{ssh_data['private_key']}.")
raise exc
else:
log.debug(f"Setting up {floating_ip}")
if filepaths:
log.debug(f"Setting up filepaths for {floating_ip}")
log.debug(f"Setting up {ssh_data['floating_ip']}")
if ssh_data['filepaths']:
log.debug(f"Setting up filepaths for {ssh_data['floating_ip']}")
sftp = client.open_sftp()
for local_path, remote_path in filepaths:
for local_path, remote_path in ssh_data['filepaths']:
copy_to_server(sftp=sftp, local_path=local_path, remote_path=remote_path, log=log)
log.debug("SFTP: Files %s copied.", filepaths)
if commands:
log.debug(f"Setting up commands for {floating_ip}")
execute_ssh_cml_commands(client=client, commands=commands, log=log)
log.debug("SFTP: Files %s copied.", ssh_data['filepaths'])
if ssh_data["floating_ip"]:
log.debug(f"Setting up commands for {ssh_data['floating_ip']}")
execute_ssh_cml_commands(client=client, commands=ssh_data["commands"], log=log)
13 changes: 12 additions & 1 deletion documentation/markdown/features/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ sshPublicKeyFiles:
- /home/user/.ssh/id_ecdsa_colleague.pub
```

#### sshTimeout (optional)
Defines the number of attempts that BiBiGrid will try to connect to the master instance via ssh.
Attempts have a pause of `2^(attempts+2)` seconds in between. Default value is 4.

#### cloudScheduling (optional)
This key allows you to influence cloud scheduling. Currently, only a single key `sshTimeout` can be set here.

##### sshTimeout (optional)
Defines the number of attempts that the master will try to connect to on demand created worker instances via ssh.
Attempts have a pause of `2^(attempts+2)` seconds in between. Default value is 4.

#### autoMount (optional)
> **Warning:** If a volume has an obscure filesystem, this might overwrite your data!

Expand Down Expand Up @@ -149,7 +160,7 @@ This is required if your provider has any post-launch services interfering with
seemingly random errors can occur when the service interrupts ansible's execution. Services are
listed on [de.NBI Wiki](https://cloud.denbi.de/wiki/) at `Computer Center Specific` (not yet).

####
#### gateway (optional)
In order to save valuable floating ips, BiBiGrid can also make use of a gateway to create the cluster.
For more information on how to set up a gateway, how gateways work and why they save floating ips please continue reading [here](https://cloud.denbi.de/wiki/Tutorials/SaveFloatingIPs/).

Expand Down
Loading
Loading