Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dashboard] Add third parties. #82

Open
wants to merge 11 commits into
base: dashboard-0.1.8.dev
Choose a base branch
from
82 changes: 31 additions & 51 deletions dashboard/baserpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .rpc.permissions import DashboardRPC_Permissions
from .rpc.utils import rpccheck
from .rpc.webhooks import DashboardRPC_Webhooks
from .rpc.thirdparties import DashboardRPC_ThirdParties

HUMANIZED_PERMISSIONS = {
"view": "View server on dashboard",
Expand Down Expand Up @@ -51,6 +52,8 @@ def __init__(self, cog: commands.Cog):
self.extensions.append(DashboardRPC_Permissions(self.cog))
self.extensions.append(DashboardRPC_AliasCC(self.cog))
self.extensions.append(DashboardRPC_Webhooks(self.cog))
self.third_parties_handler = DashboardRPC_ThirdParties(self.cog)
self.extensions.append(self.third_parties_handler)

# To make sure that both RPC server and client are on the same "version"
self.version = random.randint(1, 10000)
Expand Down Expand Up @@ -171,7 +174,7 @@ async def get_variables(self):
try:
botavatar = str(self.bot.user.avatar_url_as(static_format="png"))
except AttributeError:
botavatar = str(self.bot.user.avatar)
botavatar = str(self.bot.user.display_avatar)

returning = {
"bot": {
Expand Down Expand Up @@ -199,15 +202,12 @@ async def get_variables(self):
"uptime": uptime_str,
},
},
"third_parties": await self.third_parties_handler.get_third_parties(),
}

if self.owner is None:
app_info = await self.bot.application_info()
if app_info.team:
self.owner = str(app_info.team.name)
else:
self.owner = str(app_info.owner)

self.owner = str(app_info.team.name) if app_info.team else str(app_info.owner)
returning["bot"]["owner"] = self.owner
return returning

Expand All @@ -220,42 +220,32 @@ async def get_commands(self):
returning = []
downloader = self.bot.get_cog("Downloader")
for name, cog in self.bot.cogs.copy().items():
stripped = []

for c in cog.__cog_commands__:
if not c.parent:
stripped.append(c)

stripped = [c for c in cog.__cog_commands__ if not c.parent]
cmds = await self.build_cmd_list(stripped, do_escape=False)
if not cmds:
continue
# if not cmds:
# continue

author = "Unknown"
repo = "Unknown"
# Taken from Trusty's downloader fuckery,
# https://gist.github.com/TrustyJAID/784c8c32dd45b1cc8155ed42c0c56591
if name not in self.cog_info_cache:
if downloader:
module = downloader.cog_name_from_instance(cog)
installed, cog_info = await downloader.is_installed(module)
if installed:
author = humanize_list(cog_info.author) if cog_info.author else "Unknown"
try:
repo = (
cog_info.repo.clean_url if cog_info.repo.clean_url else "Unknown"
)
except AttributeError:
repo = "Unknown (Removed from Downloader)"
elif cog.__module__.startswith("redbot."):
author = "Cog Creators"
repo = "https://github.com/Cog-Creators/Red-DiscordBot"
self.cog_info_cache[name] = {}
self.cog_info_cache[name]["author"] = author
self.cog_info_cache[name]["repo"] = repo
else:
if name in self.cog_info_cache:
author = self.cog_info_cache[name]["author"]
repo = self.cog_info_cache[name]["repo"]

elif downloader:
module = downloader.cog_name_from_instance(cog)
installed, cog_info = await downloader.is_installed(module)
if installed:
author = humanize_list(cog_info.author) if cog_info.author else "Unknown"
try:
repo = cog_info.repo.clean_url or "Unknown"
except AttributeError:
repo = "Unknown (Removed from Downloader)"
elif cog.__module__.startswith("redbot."):
author = "Cog Creators"
repo = "https://github.com/Cog-Creators/Red-DiscordBot"
self.cog_info_cache[name] = {"author": author, "repo": repo}
returning.append(
{
"name": escape(name or ""),
Expand All @@ -265,8 +255,7 @@ async def get_commands(self):
"repo": repo,
}
)
returning = sorted(returning, key=lambda k: k["name"])
return returning
return sorted(returning, key=lambda k: k["name"])

@rpccheck()
async def get_users_servers(self, userid: int, page: int):
Expand Down Expand Up @@ -308,6 +297,7 @@ async def get_users_servers(self, userid: int, page: int):
)(),
"go": False,
}

if is_owner:
guilds.append(sgd)
continue
Expand Down Expand Up @@ -347,13 +337,9 @@ async def get_server(self, userid: int, serverid: int):

user = guild.get_member(userid)
baseuser = self.bot.get_user(userid)
is_owner = False
if await self.bot.is_owner(baseuser):
is_owner = True

if not user:
if not baseuser and not is_owner:
return {"status": 0}
is_owner = bool(await self.bot.is_owner(baseuser))
if not user and not baseuser and not is_owner:
return {"status": 0}

if is_owner:
humanized = ["Everything (Bot Owner)"]
Expand Down Expand Up @@ -401,23 +387,17 @@ async def get_server(self, userid: int, serverid: int):
else:
vl = "Unknown"

if not self.cog.configcache.get(serverid, {"roles": []})["roles"]:
warn = True
else:
warn = False

warn = not self.cog.configcache.get(serverid, {"roles": []})["roles"]
adminroles = []
ar = await self.bot._config.guild(guild).admin_role()
for rid in ar:
r = guild.get_role(rid)
if r:
if r := guild.get_role(rid):
adminroles.append((rid, r.name))

modroles = []
mr = await self.bot._config.guild(guild).mod_role()
for rid in mr:
r = guild.get_role(rid)
if r:
if r := guild.get_role(rid):
modroles.append((rid, r.name))

all_roles = [(r.id, r.name) for r in guild.roles]
Expand Down
164 changes: 164 additions & 0 deletions dashboard/rpc/thirdparties.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
from redbot.core.bot import Red
from redbot.core.commands import commands

import discord
import inspect
import typing

from .utils import rpccheck


def dashboard_page(name: typing.Optional[str] = None, methods: typing.List[str] = ["GET"], context_ids: typing.List[str] = None, required_kwargs: typing.List[str] = None, permissions_required: typing.List[str] = ["view"], hidden: typing.Optional[bool] = None):
if context_ids is None:
context_ids = []
if required_kwargs is None:
required_kwargs = []

def decorator(func: typing.Callable):
if name is not None and not isinstance(name, str):
raise TypeError("Name of a page must be a string.")
if name is not None:
discord.app_commands.commands.validate_name(name)
if not inspect.iscoroutinefunction(func):
raise TypeError("Func must be a coroutine.")
params = {"name": name, "methods": methods, "context_ids": context_ids, "required_kwargs": required_kwargs, "permissions_required": permissions_required, "hidden": hidden, "real_cog_name": None}
for key, value in inspect.signature(func).parameters.items():
if value.name == "self" or value.kind in [inspect._ParameterKind.POSITIONAL_ONLY, inspect._ParameterKind.VAR_KEYWORD]:
continue
if value.default is not inspect._empty:
continue
if key in ["user_id", "guild_id", "member_id", "role_id", "channel_id"] and key not in params["context_ids"]:
params["context_ids"].append(key)
elif f"{key}_id" in ["user_id", "guild_id", "member_id", "role_id", "channel_id"] and f"{key}_id" not in params["context_ids"]:
params["context_ids"].append(f"{key}_id")
elif key not in ["method", "lang_code"]:
params["required_kwargs"].append(key)
# A guild must be chose for these kwargs.
for key in ["member_id", "role_id", "channel_id"]:
if key in params["context_ids"] and "guild_id" not in params["context_ids"]:
params["context_ids"].append("guild_id")
# No guild available without user connection.
if (
"guild_id" in params["context_ids"]
and "user_id" not in params["context_ids"]
):
params["context_ids"].append("user_id")
if params["hidden"] is None:
params["hidden"] = params["required_kwargs"] or [x for x in params["context_ids"] if x not in ["user_id", "guild_id"]]
func.__dashboard_params__ = params.copy()
return func

return decorator


class DashboardRPC_ThirdParties:
def __init__(self, cog: commands.Cog):
self.bot: Red = cog.bot
self.cog: commands.Cog = cog

self.third_parties: typing.Dict[str, typing.Dict[str, typing.Tuple[typing.Callable, typing.Dict[str, bool]]]] = {}
self.third_parties_cogs: typing.Dict[str, commands.Cog] = {}

self.bot.register_rpc_handler(self.data_receive)
self.bot.add_listener(self.on_cog_add)
self.bot.add_listener(self.on_cog_remove)
self.bot.dispatch("dashboard_cog_add", self.cog)

def unload(self):
self.bot.unregister_rpc_handler(self.data_receive)
self.bot.remove_listener(self.on_cog_add)
self.bot.remove_listener(self.on_cog_remove)

@commands.Cog.listener()
async def on_cog_add(self, cog: commands.Cog):
ev = "on_dashboard_cog_add"
funcs = [listener[1] for listener in cog.get_listeners() if listener[0] == ev]
for func in funcs:
self.bot._schedule_event(func, ev, self.cog) # like in `bot.dispatch`

@commands.Cog.listener()
async def on_cog_remove(self, cog: commands.Cog):
if cog not in self.third_parties_cogs.values():
return
self.remove_third_party(cog)

def add_third_party(self, cog: commands.Cog, overwrite: bool = False):
cog_name = cog.qualified_name.lower()
if cog_name in self.third_parties and not overwrite:
raise RuntimeError(f"The cog {cog_name} is already an existing third party.")
_pages = {}
for attr in dir(cog):
if hasattr((func := getattr(cog, attr)), "__dashboard_params__"):
page = func.__dashboard_params__["name"]
if page in _pages:
raise RuntimeError(f"The page {page} is already an existing page for this third party.")
func.__dashboard_params__["real_cog_name"] = cog.qualified_name
_pages[page] = (func, func.__dashboard_params__)
if not _pages:
raise RuntimeError("No page found.")
self.third_parties[cog_name] = _pages
self.third_parties_cogs[cog_name] = cog

def remove_third_party(self, cog: commands.Cog):
cog_name = cog.qualified_name.lower()
try:
del self.third_parties_cogs[cog_name]
except KeyError:
pass
return self.third_parties.pop(cog_name, None)

@rpccheck()
async def get_third_parties(self):
return {key: {k: v[1] for k, v in value.items()} for key, value in self.third_parties.items()}

@rpccheck()
async def data_receive(self, method: str, cog_name: str, page: str, context_ids: typing.Optional[typing.Dict[str, int]] = None, kwargs: typing.Dict[str, typing.Any] = None, lang_code: typing.Optional[str] = None) -> typing.Dict[str, typing.Any]:
if context_ids is None:
context_ids = {}
if kwargs is None:
kwargs = {}
cog_name = cog_name.lower()
if not cog_name or cog_name not in self.third_parties or cog_name not in self.third_parties_cogs:
return {"status": 1, "message": "Third party not found.", "error_message": "404: Looks like that third party doesn't exist... Strange..."}
if self.bot.get_cog(self.third_parties_cogs[cog_name].qualified_name) is None:
return {"status": 1, "message": "Third party not loaded.", "error_message": "404: Looks like that third party doesn't exist... Strange..."}
page = page.lower() if page is not None else page
if page not in self.third_parties[cog_name]:
return {"status": 1, "message": "Page not found.", "error_message": "404: Looks like that page doesn't exist... Strange..."}
kwargs["method"] = method
if "user_id" in self.third_parties[cog_name][page][1]["context_ids"]:
if (user := self.bot.get_user(context_ids["user_id"])) is None:
return {"status": 1, "message": "Page not found.", "error_message": "404: Looks like that I do not share any server with you..."}
kwargs["user_id"] = context_ids["user_id"]
kwargs["user"] = user
if "guild_id" in self.third_parties[cog_name][page][1]["context_ids"] and "user_id" in self.third_parties[cog_name][page][1]["context_ids"]:
if (guild := self.bot.get_guild(context_ids["guild_id"])) is None:
return {"status": 1, "message": "Page not found.", "error_message": "404: Looks like that I'm not in this server..."}
if (m := guild.get_member(context_ids["user_id"])) is None:
return {"status": 1, "message": "Page not found.", "error_message": "403: Looks like that you're not in this server..."}
if m.id != guild.owner.id:
perms = self.cog.rpc.get_perms(guildid=guild.id, m=m)
if perms is None:
return {"status": 1, "message": "Page not found.", "error_message": "403: Looks like that you haven't permissions in this server..."}
for permission in self.third_parties[cog_name][page][1]["permissions_required"]:
if permission not in perms:
return {"status": 1, "message": "Page not found.", "error_message": "403: Looks like that you haven't permissions in this server..."}
kwargs["guild_id"] = context_ids["guild_id"]
kwargs["guild"] = guild
if "member_id" in self.third_parties[cog_name][page][1]["context_ids"]:
if (member := guild.get_member(context_ids["member_id"])) is None:
return {"status": 1, "message": "Page not found.", "error_message": "404: Looks like that this member is not found in this guild..."}
kwargs["member_id"] = context_ids["member_id"]
kwargs["member"] = member
if "role_id" in self.third_parties[cog_name][page][1]["context_ids"]:
if (role := guild.get_role(context_ids["role_id"])) is None:
return {"status": 1, "message": "Page not found.", "error_message": "404: Looks like that this role is not found in this guild..."}
kwargs["role_id"] = context_ids["role_id"]
kwargs["role"] = role
if "channel_id" in self.third_parties[cog_name][page][1]["context_ids"]:
if (channel := guild.get_channel(context_ids["channel_id"])) is None:
return {"status": 1, "message": "Page not found.", "error_message": "404: Looks like that this channel is not found in this guild..."}
kwargs["channel_id"] = context_ids["channel_id"]
kwargs["channel"] = channel
kwargs["lang_code"] = lang_code or "en-EN"
return await self.third_parties[cog_name][page][0](**kwargs)