Skip to content

Commit

Permalink
Looks ok
Browse files Browse the repository at this point in the history
  • Loading branch information
aticie committed Feb 17, 2022
1 parent ca3ceac commit ebed2d8
Show file tree
Hide file tree
Showing 9 changed files with 313 additions and 195 deletions.
179 changes: 172 additions & 7 deletions src/bots/bot_manager.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,178 @@
from helpers.database_helper import UserDatabase, StatisticsDatabase
import asyncio
import datetime
import os
import sqlite3
from itertools import islice
from multiprocessing import Process, Lock
from typing import List

import requests
from azure.core.exceptions import ResourceNotFoundError
from azure.servicebus import ServiceBusMessage
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus.aio.management import ServiceBusAdministrationClient

from bots.irc_bot import IrcBot
from bots.twitch_bot import TwitchBot
from helpers.logger import RonniaLogger


def batcher(iterable, batch_size):
iterator = iter(iterable)
while batch := list(islice(iterator, batch_size)):
yield batch


class TwitchAPI:
def __init__(self, client_id, client_secret):
self.client_id = client_id
self.client_secret = client_secret

self.access_token = self.get_token()

def get_token(self):
"""
Gets access token from Twitch API
"""
url = "https://id.twitch.tv/oauth2/token?client_id={}&client_secret={}&grant_type=client_credentials".format(
self.client_id, self.client_secret)
response = requests.post(url)
return response.json()['access_token']

def get_streams(self, user_ids: List[int]):
"""
Gets streams from Twitch API helix/streams only users playing osu!
"""
headers = {'Authorization': 'Bearer {}'.format(self.access_token),
'Client-ID': self.client_id}
streams = []
for user_id in batcher(user_ids, 100):
# game_id = 21465 is osu!
url = f"https://api.twitch.tv/helix/streams?first=100&game_id=21465&" + "&".join(
[f"user_id={user}" for user in user_id])
response = requests.get(url, headers=headers)
streams += response.json()['data']
return streams


class TwitchProcess(Process):
def __init__(self, user_list: List[str], join_lock: Lock):
super().__init__()
self.join_lock = join_lock
self.user_list = user_list
self.bot = None

def initialize(self):
self.bot = TwitchBot(initial_channel_ids=self.user_list, join_lock=self.join_lock)

def run(self) -> None:
self.initialize()
self.bot.run()


class IRCProcess(Process):
def __init__(self):
super().__init__()
self.bot = None

def initialize(self) -> None:
self.bot = IrcBot("#osu", os.getenv('OSU_USERNAME'), "irc.ppy.sh", password=os.getenv("IRC_PASSWORD"))

def run(self) -> None:
self.initialize()
self.bot.start()


class BotManager:
def __init__(self, ):
self.users_db = sqlite3.connect(os.path.join(os.getenv('DB_DIR'), 'users.db'))
self.join_lock = Lock()
self.instance_message_queue = None

self.twitch_client = TwitchAPI(os.getenv('CLIENT_ID'), os.getenv('CLIENT_SECRET'))
self._loop = asyncio.get_event_loop()

self.servicebus_connection_string = os.getenv('SERVICE_BUS_CONN_STRING')
self.servicebus_webserver_queue_name = 'webserver-signups'
self.servicebus_webserver_reply_queue_name = 'webserver-signups-reply'
self.servicebus_bot_queue_name = 'bot-signups'
self.servicebus_bot_reply_queue_name = 'bot-signups-reply'
self.servicebus_queues = {'webserver-signups': {'max_delivery_count': 100,
'default_message_time_to_live': datetime.timedelta(seconds=10)},
'webserver-signups-reply': {'max_delivery_count': 100,
'default_message_time_to_live': datetime.timedelta(
seconds=10)},
'bot-signups': {'max_delivery_count': 100,
'default_message_time_to_live': datetime.timedelta(seconds=10)},
'bot-signups-reply': {'max_delivery_count': 100,
'default_message_time_to_live': datetime.timedelta(seconds=10)},
'twitch-to-irc': {'max_delivery_count': 100,
'default_message_time_to_live': datetime.timedelta(seconds=10)},
}

self.servicebus_mgmt = ServiceBusAdministrationClient.from_connection_string(self.servicebus_connection_string)
self.servicebus_client = ServiceBusClient.from_connection_string(conn_str=self.servicebus_connection_string)

self.bot_instances = []
self.bot_processes = []

self.irc_process = IRCProcess()

def start(self):
self._loop.run_until_complete(self.initialize_queues())

all_users = self.users_db.execute('SELECT * FROM users;').fetchall()
all_user_twitch_ids = [user[4] for user in all_users]
streaming_user_ids = [user['user_id'] for user in self.twitch_client.get_streams(all_user_twitch_ids)]

for user_id in all_user_twitch_ids:
if user_id not in streaming_user_ids:
streaming_user_ids.append(user_id)

self.irc_process.start()

for user_id_list in batcher(streaming_user_ids, 100):
p = TwitchProcess(user_id_list, self.join_lock)
p.start()
self.bot_processes.append(p)

async def initialize_queues(self):
"""
Initializes webserver & bot, signup and reply queues
"""
logger.info('Initializing queues...')
for queue_name, queue_properties in self.servicebus_queues.items():
try:
queue_details = await self.servicebus_mgmt.get_queue(queue_name)
except ResourceNotFoundError:
await self.servicebus_mgmt.create_queue(queue_name, **queue_properties)

async def run_service_bus_receiver(self):
"""
Creates the receiver for the webserver queue
Forwards incoming messages to the bot instance
Replies to the webserver with a reply queue
"""
receiver = self.servicebus_client.get_queue_receiver(queue_name=self.servicebus_webserver_queue_name)
logger.info('Started servicebus receiver, listening for messages...')
async for message in receiver:
await self.receive_and_parse_message(message)
await receiver.complete_message(message)

async def receive_and_parse_message(self, message):
"""
Receive a message from the webserver signup queue and parse it, forward it to bot queue.
"""
logger.info(f'Received signup message: {message}')
async with ServiceBusClient.from_connection_string(self.servicebus_connection_string) as sb_client:
sender = sb_client.get_queue_sender(queue_name=self.servicebus_bot_queue_name)
logger.debug(f'Sending message to bot: {message}')
await sender.send_messages(message)


self.users_db = UserDatabase()
self.messages_db = StatisticsDatabase()
pass
if __name__ == '__main__':
logger = RonniaLogger(__name__)

async def get_user_data(self, user_id):
await self.users_db.get_user_data(user_id)
pass
bot_manager = BotManager()
bot_manager.start()
asyncio.run(bot_manager.run_service_bus_receiver())
53 changes: 22 additions & 31 deletions src/bots/irc_bot.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import asyncio
import json
import os
import sqlite3
from typing import Union

import attr
import logging
from threading import Lock

import irc.bot
from azure.servicebus.aio import ServiceBusClient
from irc.client import Event, ServerConnection

from helpers.database_helper import UserDatabase, StatisticsDatabase
from helpers.logger import RonniaLogger

logger = logging.getLogger('ronnia')
logger = RonniaLogger(__name__)


@attr.s
Expand All @@ -28,20 +29,30 @@ def __init__(self, channel, nickname, server, port=6667, password=None):
self.channel = channel
self.users_db = UserDatabase()
self.messages_db = StatisticsDatabase()
self._loop = asyncio.get_event_loop()

self.message_lock = Lock()
self.servicebus_connection_string = os.getenv('SERVICE_BUS_CONN_STRING')
self.servicebus_client = ServiceBusClient.from_connection_string(conn_str=self.servicebus_connection_string)
self.listen_queue_name = 'twitch-to-irc'
self._loop = asyncio.get_event_loop()

self._commands = {'disable': self.disable_requests_on_channel,
'echo': self.toggle_notifications,
'feedback': self.toggle_notifications,
'enable': self.enable_requests_on_channel,
'register': self.register_bot_on_channel,
'help': self.show_help_message,
'setsr': self.set_sr_rating
}
self.connection.set_rate_limit(1)

async def receive_servicebus_queue(self):
receiver = self.servicebus_client.get_queue_receiver(queue_name=self.listen_queue_name)
async for message in receiver:
logger.info(f'Received message from service bus: {str(message)}')
message_dict = json.loads(str(message))
target_channel = message_dict['target_channel']
message_contents = message_dict['message']
self.send_message(target_channel, message_contents)

def on_welcome(self, c: ServerConnection, e: Event):
logger.info(f"Successfully joined irc!")
self._loop.run_until_complete(self.users_db.initialize())
Expand Down Expand Up @@ -73,21 +84,10 @@ def do_command(self, e: Event):

# Check if the user is registered
existing_user = self.users_db.get_user_from_osu_username(db_nick)
if existing_user is None and cmd == 'register':
if existing_user is None:
self.send_message(e.source.nick,
f'Hello! Thanks for your interest in this bot! '
f'But, registering for the bot automatically is not supported currently. '
f'I\'m hosting this bot with the free tier compute engine... '
f'So, if it gets too many requests it might blow up! '
f'That\'s why I\'m manually allowing requests right now. '
f'(Check out the project page if you haven\'t already.)'
f'[https://github.com/aticie/ronnia] '
f'Contact me on discord and I can enable it for you! heyronii#9925')
f'Please register your osu! account (from here)[https://ronnia.me/].')
return
elif existing_user is None:
self.send_message(e.source.nick, f'Sorry, you are not registered. '
f'(Check out the project page for details.)'
f'[https://github.com/aticie/ronnia]')
else:
# Check if command is valid
try:
Expand All @@ -111,16 +111,6 @@ def disable_requests_on_channel(self, event: Event, *args, user_details: Union[d
f'If you want to re-enable requests, type !enable anytime.')
self.messages_db.add_command('disable', 'osu_irc', event.source.nick)

def register_bot_on_channel(self, event: Event, *args, user_details: Union[dict, sqlite3.Row]):
"""
Registers bot on twitch channel
:param event: Event of the current message
:param user_details: User Details Sqlite row factory
Currently not supported... TODO: Register user -> ask twitch
"""
logger.debug(f'Register bot on channel: {user_details}')

def enable_requests_on_channel(self, event: Event, *args, user_details: Union[dict, sqlite3.Row]):
"""
Enables requests on twitch channel
Expand Down Expand Up @@ -160,7 +150,8 @@ def show_help_message(self, event: Event, *args, user_details: Union[dict, sqlit
self.send_message(event.source.nick,
f'Check out the (project page)[https://github.com/aticie/ronnia] for more information. '
f'List of available commands are (listed here)'
f'[https://github.com/aticie/ronnia/wiki/Commands].')
f'[https://github.com/aticie/ronnia/wiki/Commands]. '
f'(Click here)[https://ronnia.me/ to access your dashboard. )')
self.messages_db.add_command('help', 'osu_irc', event.source.nick)

def set_sr_rating(self, event: Event, *args, user_details: Union[dict, sqlite3.Row]):
Expand Down
Loading

0 comments on commit ebed2d8

Please sign in to comment.