diff --git a/awx/main/management/commands/run_wsrelay.py b/awx/main/management/commands/run_wsrelay.py index ee7cbca6825e..a3016ffc9296 100644 --- a/awx/main/management/commands/run_wsrelay.py +++ b/awx/main/management/commands/run_wsrelay.py @@ -101,8 +101,9 @@ def handle(self, *arg, **options): migrating = bool(executor.migration_plan(executor.loader.graph.leaf_nodes())) connection.close() # Because of async nature, main loop will use new connection, so close this except Exception as exc: - logger.warning(f'Error on startup of run_wsrelay (error: {exc}), retry in 10s...') - time.sleep(10) + time.sleep(10) # Prevent supervisor from restarting the service too quickly and the service to enter FATAL state + # sleeping before logging because logging rely on setting which require database connection... + logger.warning(f'Error on startup of run_wsrelay (error: {exc}), slept for 10s...') return # In containerized deployments, migrations happen in the task container, @@ -121,13 +122,14 @@ def handle(self, *arg, **options): return try: - my_hostname = Instance.objects.my_hostname() + my_hostname = Instance.objects.my_hostname() # This relies on settings.CLUSTER_HOST_ID which requires database connection logger.info('Active instance with hostname {} is registered.'.format(my_hostname)) except RuntimeError as e: # the CLUSTER_HOST_ID in the task, and web instance must match and # ensure network connectivity between the task and web instance - logger.info('Unable to return currently active instance: {}, retry in 5s...'.format(e)) - time.sleep(5) + time.sleep(10) # Prevent supervisor from restarting the service too quickly and the service to enter FATAL state + # sleeping before logging because logging rely on setting which require database connection... + logger.warning(f"Unable to return currently active instance: {e}, slept for 10s before return.") return if options.get('status'): @@ -166,12 +168,14 @@ def handle(self, *arg, **options): WebsocketsMetricsServer().start() - while True: - try: - asyncio.run(WebSocketRelayManager().run()) - except KeyboardInterrupt: - logger.info('Shutting down Websocket Relayer') - break - except Exception as e: - logger.exception('Error in Websocket Relayer, exception: {}. Restarting in 10 seconds'.format(e)) - time.sleep(10) + try: + logger.info('Starting Websocket Relayer...') + websocket_relay_manager = WebSocketRelayManager() + asyncio.run(websocket_relay_manager.run()) + except KeyboardInterrupt: + logger.info('Terminating Websocket Relayer') + except BaseException as e: # BaseException is used to catch all exceptions including asyncio.CancelledError + time.sleep(10) # Prevent supervisor from restarting the service too quickly and the service to enter FATAL state + # sleeping before logging because logging rely on setting which require database connection... + logger.warning(f"Encounter error while running Websocket Relayer {e}, slept for 10s...") + return diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index a4c94bd7e60b..bedf68efb030 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -285,8 +285,6 @@ async def cleanup_offline_host(self, hostname): except asyncio.CancelledError: # Handle the case where the task was already cancelled by the time we got here. pass - except Exception as e: - logger.warning(f"Failed to cancel relay connection for {hostname}: {e}") del self.relay_connections[hostname] @@ -297,8 +295,6 @@ async def cleanup_offline_host(self, hostname): self.stats_mgr.delete_remote_host_stats(hostname) except KeyError: pass - except Exception as e: - logger.warning(f"Failed to delete stats for {hostname}: {e}") async def run(self): event_loop = asyncio.get_running_loop() @@ -306,7 +302,6 @@ async def run(self): self.stats_mgr = RelayWebsocketStatsManager(event_loop, self.local_hostname) self.stats_mgr.start() - # Set up a pg_notify consumer for allowing web nodes to "provision" and "deprovision" themselves gracefully. database_conf = deepcopy(settings.DATABASES['default']) database_conf['OPTIONS'] = deepcopy(database_conf.get('OPTIONS', {})) @@ -318,79 +313,54 @@ async def run(self): if 'PASSWORD' in database_conf: database_conf['OPTIONS']['password'] = database_conf.pop('PASSWORD') - task = None + async_conn = await psycopg.AsyncConnection.connect( + dbname=database_conf['NAME'], + host=database_conf['HOST'], + user=database_conf['USER'], + port=database_conf['PORT'], + **database_conf.get("OPTIONS", {}), + ) - # Managing the async_conn here so that we can close it if we need to restart the connection - async_conn = None + await async_conn.set_autocommit(True) + on_ws_heartbeat_task = event_loop.create_task(self.on_ws_heartbeat(async_conn)) # Establishes a websocket connection to /websocket/relay on all API servers - try: - while True: - if not task or task.done(): - try: - # Try to close the connection if it's open - if async_conn: - try: - await async_conn.close() - except Exception as e: - logger.warning(f"Failed to close connection to database for pg_notify: {e}") - - # and re-establish the connection - async_conn = await psycopg.AsyncConnection.connect( - dbname=database_conf['NAME'], - host=database_conf['HOST'], - user=database_conf['USER'], - port=database_conf['PORT'], - **database_conf.get("OPTIONS", {}), - ) - await async_conn.set_autocommit(True) - - # before creating the task that uses the connection - task = event_loop.create_task(self.on_ws_heartbeat(async_conn), name="on_ws_heartbeat") - logger.info("Creating `on_ws_heartbeat` task in event loop.") - - except Exception as e: - logger.warning(f"Failed to connect to database for pg_notify: {e}") - - future_remote_hosts = self.known_hosts.keys() - current_remote_hosts = self.relay_connections.keys() - deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts) - new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts) - - # This loop handles if we get an advertisement from a host we already know about but - # the advertisement has a different IP than we are currently connected to. - for hostname, address in self.known_hosts.items(): - if hostname not in self.relay_connections: - # We've picked up a new hostname that we don't know about yet. - continue + while True: + if on_ws_heartbeat_task.done(): + raise Exception("on_ws_heartbeat_task has exited") + + future_remote_hosts = self.known_hosts.keys() + current_remote_hosts = self.relay_connections.keys() + deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts) + new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts) + + # This loop handles if we get an advertisement from a host we already know about but + # the advertisement has a different IP than we are currently connected to. + for hostname, address in self.known_hosts.items(): + if hostname not in self.relay_connections: + # We've picked up a new hostname that we don't know about yet. + continue - if address != self.relay_connections[hostname].remote_host: - deleted_remote_hosts.add(hostname) - new_remote_hosts.add(hostname) + if address != self.relay_connections[hostname].remote_host: + deleted_remote_hosts.add(hostname) + new_remote_hosts.add(hostname) - # Delete any hosts with closed connections - for hostname, relay_conn in self.relay_connections.items(): - if not relay_conn.connected: - deleted_remote_hosts.add(hostname) + # Delete any hosts with closed connections + for hostname, relay_conn in self.relay_connections.items(): + if not relay_conn.connected: + deleted_remote_hosts.add(hostname) - if deleted_remote_hosts: - logger.info(f"Removing {deleted_remote_hosts} from websocket broadcast list") - await asyncio.gather(*[self.cleanup_offline_host(h) for h in deleted_remote_hosts]) + if deleted_remote_hosts: + logger.info(f"Removing {deleted_remote_hosts} from websocket broadcast list") + await asyncio.gather(*[self.cleanup_offline_host(h) for h in deleted_remote_hosts]) - if new_remote_hosts: - logger.info(f"Adding {new_remote_hosts} to websocket broadcast list") + if new_remote_hosts: + logger.info(f"Adding {new_remote_hosts} to websocket broadcast list") - for h in new_remote_hosts: - stats = self.stats_mgr.new_remote_host_stats(h) - relay_connection = WebsocketRelayConnection(name=self.local_hostname, stats=stats, remote_host=self.known_hosts[h]) - relay_connection.start() - self.relay_connections[h] = relay_connection + for h in new_remote_hosts: + stats = self.stats_mgr.new_remote_host_stats(h) + relay_connection = WebsocketRelayConnection(name=self.local_hostname, stats=stats, remote_host=self.known_hosts[h]) + relay_connection.start() + self.relay_connections[h] = relay_connection - await asyncio.sleep(settings.BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS) - finally: - if async_conn: - logger.info("Shutting down db connection for wsrelay.") - try: - await async_conn.close() - except Exception as e: - logger.info(f"Failed to close connection to database for pg_notify: {e}") + await asyncio.sleep(settings.BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS) diff --git a/tools/ansible/roles/dockerfile/templates/supervisor_task.conf.j2 b/tools/ansible/roles/dockerfile/templates/supervisor_task.conf.j2 index 90b6313635b2..b102a50977b1 100644 --- a/tools/ansible/roles/dockerfile/templates/supervisor_task.conf.j2 +++ b/tools/ansible/roles/dockerfile/templates/supervisor_task.conf.j2 @@ -31,7 +31,6 @@ command = awx-manage run_wsrelay directory = /var/lib/awx {% endif %} autorestart = true -startsecs = 30 stopasgroup=true killasgroup=true stdout_logfile=/dev/stdout