Skip to content

Commit

Permalink
Allow wsrelay to fail without FATAL (#15191)
Browse files Browse the repository at this point in the history
We have not identify the root cause of wsrelay failure but attempt to make wsrelay restart itself resulted in postgres and redis connection leak. We were not able to fully identify where the redis connection leak comes from so reverting back to failing and removing startsecs 30 will prevent wsrelay to FATAL
  • Loading branch information
TheRealHaoLiu authored May 20, 2024
1 parent 7de350d commit fc9064e
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 87 deletions.
32 changes: 18 additions & 14 deletions awx/main/management/commands/run_wsrelay.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ def handle(self, *arg, **options):
migrating = bool(executor.migration_plan(executor.loader.graph.leaf_nodes()))
connection.close() # Because of async nature, main loop will use new connection, so close this
except Exception as exc:
logger.warning(f'Error on startup of run_wsrelay (error: {exc}), retry in 10s...')
time.sleep(10)
time.sleep(10) # Prevent supervisor from restarting the service too quickly and the service to enter FATAL state
# sleeping before logging because logging rely on setting which require database connection...
logger.warning(f'Error on startup of run_wsrelay (error: {exc}), slept for 10s...')
return

# In containerized deployments, migrations happen in the task container,
Expand All @@ -121,13 +122,14 @@ def handle(self, *arg, **options):
return

try:
my_hostname = Instance.objects.my_hostname()
my_hostname = Instance.objects.my_hostname() # This relies on settings.CLUSTER_HOST_ID which requires database connection
logger.info('Active instance with hostname {} is registered.'.format(my_hostname))
except RuntimeError as e:
# the CLUSTER_HOST_ID in the task, and web instance must match and
# ensure network connectivity between the task and web instance
logger.info('Unable to return currently active instance: {}, retry in 5s...'.format(e))
time.sleep(5)
time.sleep(10) # Prevent supervisor from restarting the service too quickly and the service to enter FATAL state
# sleeping before logging because logging rely on setting which require database connection...
logger.warning(f"Unable to return currently active instance: {e}, slept for 10s before return.")
return

if options.get('status'):
Expand Down Expand Up @@ -166,12 +168,14 @@ def handle(self, *arg, **options):

WebsocketsMetricsServer().start()

while True:
try:
asyncio.run(WebSocketRelayManager().run())
except KeyboardInterrupt:
logger.info('Shutting down Websocket Relayer')
break
except Exception as e:
logger.exception('Error in Websocket Relayer, exception: {}. Restarting in 10 seconds'.format(e))
time.sleep(10)
try:
logger.info('Starting Websocket Relayer...')
websocket_relay_manager = WebSocketRelayManager()
asyncio.run(websocket_relay_manager.run())
except KeyboardInterrupt:
logger.info('Terminating Websocket Relayer')
except BaseException as e: # BaseException is used to catch all exceptions including asyncio.CancelledError
time.sleep(10) # Prevent supervisor from restarting the service too quickly and the service to enter FATAL state
# sleeping before logging because logging rely on setting which require database connection...
logger.warning(f"Encounter error while running Websocket Relayer {e}, slept for 10s...")
return
114 changes: 42 additions & 72 deletions awx/main/wsrelay.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,6 @@ async def cleanup_offline_host(self, hostname):
except asyncio.CancelledError:
# Handle the case where the task was already cancelled by the time we got here.
pass
except Exception as e:
logger.warning(f"Failed to cancel relay connection for {hostname}: {e}")

del self.relay_connections[hostname]

Expand All @@ -297,16 +295,13 @@ async def cleanup_offline_host(self, hostname):
self.stats_mgr.delete_remote_host_stats(hostname)
except KeyError:
pass
except Exception as e:
logger.warning(f"Failed to delete stats for {hostname}: {e}")

async def run(self):
event_loop = asyncio.get_running_loop()

self.stats_mgr = RelayWebsocketStatsManager(event_loop, self.local_hostname)
self.stats_mgr.start()

# Set up a pg_notify consumer for allowing web nodes to "provision" and "deprovision" themselves gracefully.
database_conf = deepcopy(settings.DATABASES['default'])
database_conf['OPTIONS'] = deepcopy(database_conf.get('OPTIONS', {}))

Expand All @@ -318,79 +313,54 @@ async def run(self):
if 'PASSWORD' in database_conf:
database_conf['OPTIONS']['password'] = database_conf.pop('PASSWORD')

task = None
async_conn = await psycopg.AsyncConnection.connect(
dbname=database_conf['NAME'],
host=database_conf['HOST'],
user=database_conf['USER'],
port=database_conf['PORT'],
**database_conf.get("OPTIONS", {}),
)

# Managing the async_conn here so that we can close it if we need to restart the connection
async_conn = None
await async_conn.set_autocommit(True)
on_ws_heartbeat_task = event_loop.create_task(self.on_ws_heartbeat(async_conn))

# Establishes a websocket connection to /websocket/relay on all API servers
try:
while True:
if not task or task.done():
try:
# Try to close the connection if it's open
if async_conn:
try:
await async_conn.close()
except Exception as e:
logger.warning(f"Failed to close connection to database for pg_notify: {e}")

# and re-establish the connection
async_conn = await psycopg.AsyncConnection.connect(
dbname=database_conf['NAME'],
host=database_conf['HOST'],
user=database_conf['USER'],
port=database_conf['PORT'],
**database_conf.get("OPTIONS", {}),
)
await async_conn.set_autocommit(True)

# before creating the task that uses the connection
task = event_loop.create_task(self.on_ws_heartbeat(async_conn), name="on_ws_heartbeat")
logger.info("Creating `on_ws_heartbeat` task in event loop.")

except Exception as e:
logger.warning(f"Failed to connect to database for pg_notify: {e}")

future_remote_hosts = self.known_hosts.keys()
current_remote_hosts = self.relay_connections.keys()
deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts)
new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts)

# This loop handles if we get an advertisement from a host we already know about but
# the advertisement has a different IP than we are currently connected to.
for hostname, address in self.known_hosts.items():
if hostname not in self.relay_connections:
# We've picked up a new hostname that we don't know about yet.
continue
while True:
if on_ws_heartbeat_task.done():
raise Exception("on_ws_heartbeat_task has exited")

future_remote_hosts = self.known_hosts.keys()
current_remote_hosts = self.relay_connections.keys()
deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts)
new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts)

# This loop handles if we get an advertisement from a host we already know about but
# the advertisement has a different IP than we are currently connected to.
for hostname, address in self.known_hosts.items():
if hostname not in self.relay_connections:
# We've picked up a new hostname that we don't know about yet.
continue

if address != self.relay_connections[hostname].remote_host:
deleted_remote_hosts.add(hostname)
new_remote_hosts.add(hostname)
if address != self.relay_connections[hostname].remote_host:
deleted_remote_hosts.add(hostname)
new_remote_hosts.add(hostname)

# Delete any hosts with closed connections
for hostname, relay_conn in self.relay_connections.items():
if not relay_conn.connected:
deleted_remote_hosts.add(hostname)
# Delete any hosts with closed connections
for hostname, relay_conn in self.relay_connections.items():
if not relay_conn.connected:
deleted_remote_hosts.add(hostname)

if deleted_remote_hosts:
logger.info(f"Removing {deleted_remote_hosts} from websocket broadcast list")
await asyncio.gather(*[self.cleanup_offline_host(h) for h in deleted_remote_hosts])
if deleted_remote_hosts:
logger.info(f"Removing {deleted_remote_hosts} from websocket broadcast list")
await asyncio.gather(*[self.cleanup_offline_host(h) for h in deleted_remote_hosts])

if new_remote_hosts:
logger.info(f"Adding {new_remote_hosts} to websocket broadcast list")
if new_remote_hosts:
logger.info(f"Adding {new_remote_hosts} to websocket broadcast list")

for h in new_remote_hosts:
stats = self.stats_mgr.new_remote_host_stats(h)
relay_connection = WebsocketRelayConnection(name=self.local_hostname, stats=stats, remote_host=self.known_hosts[h])
relay_connection.start()
self.relay_connections[h] = relay_connection
for h in new_remote_hosts:
stats = self.stats_mgr.new_remote_host_stats(h)
relay_connection = WebsocketRelayConnection(name=self.local_hostname, stats=stats, remote_host=self.known_hosts[h])
relay_connection.start()
self.relay_connections[h] = relay_connection

await asyncio.sleep(settings.BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS)
finally:
if async_conn:
logger.info("Shutting down db connection for wsrelay.")
try:
await async_conn.close()
except Exception as e:
logger.info(f"Failed to close connection to database for pg_notify: {e}")
await asyncio.sleep(settings.BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS)
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ command = awx-manage run_wsrelay
directory = /var/lib/awx
{% endif %}
autorestart = true
startsecs = 30
stopasgroup=true
killasgroup=true
stdout_logfile=/dev/stdout
Expand Down

0 comments on commit fc9064e

Please sign in to comment.