import discord from discord.ext import commands from cogs.utils import checks from cogs.utils.converters import GlobalUser from __main__ import set_cog from .utils.dataIO import dataIO from .utils.chat_formatting import pagify, box import importlib import traceback import logging import asyncio import threading import datetime import glob import os import aiohttp log = logging.getLogger("red.owner") class CogNotFoundError(Exception): pass class CogLoadError(Exception): pass class NoSetupError(CogLoadError): pass class CogUnloadError(Exception): pass class OwnerUnloadWithoutReloadError(CogUnloadError): pass class Owner: """All owner-only commands that relate to debug bot operations.""" def __init__(self, bot): self.bot = bot self.setowner_lock = False self.disabled_commands = dataIO.load_json("data/red/disabled_commands.json") self.global_ignores = dataIO.load_json("data/red/global_ignores.json") self.session = aiohttp.ClientSession(loop=self.bot.loop) def __unload(self): self.session.close() @commands.command() @checks.is_owner() async def load(self, *, cog_name: str): """Loads a cog Example: load mod""" module = cog_name.strip() if "cogs." not in module: module = "cogs." + module try: self._load_cog(module) except CogNotFoundError: await self.bot.say("That cog could not be found.") except CogLoadError as e: log.exception(e) traceback.print_exc() await self.bot.say("There was an issue loading the cog. Check" " your console or logs for more information.") except Exception as e: log.exception(e) traceback.print_exc() await self.bot.say('Cog was found and possibly loaded but ' 'something went wrong. Check your console ' 'or logs for more information.') else: set_cog(module, True) await self.disable_commands() await self.bot.say("The cog has been loaded.") @commands.group(invoke_without_command=True) @checks.is_owner() async def unload(self, *, cog_name: str): """Unloads a cog Example: unload mod""" module = cog_name.strip() if "cogs." not in module: module = "cogs." + module if not self._does_cogfile_exist(module): await self.bot.say("That cog file doesn't exist. I will not" " turn off autoloading at start just in case" " this isn't supposed to happen.") else: set_cog(module, False) try: # No matter what we should try to unload it self._unload_cog(module) except OwnerUnloadWithoutReloadError: await self.bot.say("I cannot allow you to unload the Owner plugin" " unless you are in the process of reloading.") except CogUnloadError as e: log.exception(e) traceback.print_exc() await self.bot.say('Unable to safely unload that cog.') else: await self.bot.say("The cog has been unloaded.") @unload.command(name="all") @checks.is_owner() async def unload_all(self): """Unloads all cogs""" cogs = self._list_cogs() still_loaded = [] for cog in cogs: set_cog(cog, False) try: self._unload_cog(cog) except OwnerUnloadWithoutReloadError: pass except CogUnloadError as e: log.exception(e) traceback.print_exc() still_loaded.append(cog) if still_loaded: still_loaded = ", ".join(still_loaded) await self.bot.say("I was unable to unload these cogs: " "{}".format(still_loaded)) else: await self.bot.say("All cogs are now unloaded.") @checks.is_owner() @commands.command(name="reload") async def _reload(self, *, cog_name: str): """Reloads a cog Example: reload audio""" module = cog_name.strip() if "cogs." not in module: module = "cogs." + module try: self._unload_cog(module, reloading=True) except: pass try: self._load_cog(module) except CogNotFoundError: await self.bot.say("That cog cannot be found.") except NoSetupError: await self.bot.say("That cog does not have a setup function.") except CogLoadError as e: log.exception(e) traceback.print_exc() await self.bot.say("That cog could not be loaded. Check your" " console or logs for more information.") else: set_cog(module, True) await self.disable_commands() await self.bot.say("The cog has been reloaded.") @commands.command(name="cogs") @checks.is_owner() async def _show_cogs(self): """Shows loaded/unloaded cogs""" # This function assumes that all cogs are in the cogs folder, # which is currently true. # Extracting filename from __module__ Example: cogs.owner loaded = [c.__module__.split(".")[1] for c in self.bot.cogs.values()] # What's in the folder but not loaded is unloaded unloaded = [c.split(".")[1] for c in self._list_cogs() if c.split(".")[1] not in loaded] if not unloaded: unloaded = ["None"] msg = ("+ Loaded\n" "{}\n\n" "- Unloaded\n" "{}" "".format(", ".join(sorted(loaded)), ", ".join(sorted(unloaded))) ) for page in pagify(msg, [" "], shorten_by=16): await self.bot.say(box(page.lstrip(" "), lang="diff")) @commands.command(pass_context=True, hidden=True) @checks.is_owner() async def debug(self, ctx, *, code): """Evaluates code""" def check(m): if m.content.strip().lower() == "more": return True author = ctx.message.author channel = ctx.message.channel code = code.strip('` ') result = None global_vars = globals().copy() global_vars['bot'] = self.bot global_vars['ctx'] = ctx global_vars['message'] = ctx.message global_vars['author'] = ctx.message.author global_vars['channel'] = ctx.message.channel global_vars['server'] = ctx.message.server try: result = eval(code, global_vars, locals()) except Exception as e: await self.bot.say(box('{}: {}'.format(type(e).__name__, str(e)), lang="py")) return if asyncio.iscoroutine(result): result = await result result = str(result) if not ctx.message.channel.is_private: censor = (self.bot.settings.email, self.bot.settings.password, self.bot.settings.token) r = "[EXPUNGED]" for w in censor: if w is None or w == "": continue result = result.replace(w, r) result = result.replace(w.lower(), r) result = result.replace(w.upper(), r) result = list(pagify(result, shorten_by=16)) for i, page in enumerate(result): if i != 0 and i % 4 == 0: last = await self.bot.say("There are still {} messages. " "Type `more` to continue." "".format(len(result) - (i+1))) msg = await self.bot.wait_for_message(author=author, channel=channel, check=check, timeout=10) if msg is None: try: await self.bot.delete_message(last) except: pass finally: break await self.bot.say(box(page, lang="py")) @commands.group(name="set", pass_context=True) async def _set(self, ctx): """Changes Red's core settings""" if ctx.invoked_subcommand is None: await self.bot.send_cmd_help(ctx) return @_set.command(pass_context=True) async def owner(self, ctx): """Sets owner""" if self.bot.settings.no_prompt is True: await self.bot.say("Console interaction is disabled. Start Red " "without the `--no-prompt` flag to use this " "command.") return if self.setowner_lock: await self.bot.say("A set owner command is already pending.") return if self.bot.settings.owner is not None: await self.bot.say( "The owner is already set. Remember that setting the owner " "to someone else other than who hosts the bot has security " "repercussions and is *NOT recommended*. Proceed at your own risk." ) await asyncio.sleep(3) await self.bot.say("Confirm in the console that you're the owner.") self.setowner_lock = True t = threading.Thread(target=self._wait_for_answer, args=(ctx.message.author,)) t.start() @_set.command() @checks.is_owner() async def defaultmodrole(self, *, role_name: str): """Sets the default mod role name This is used if a server-specific role is not set""" self.bot.settings.default_mod = role_name self.bot.settings.save_settings() await self.bot.say("The default mod role name has been set.") @_set.command() @checks.is_owner() async def defaultadminrole(self, *, role_name: str): """Sets the default admin role name This is used if a server-specific role is not set""" self.bot.settings.default_admin = role_name self.bot.settings.save_settings() await self.bot.say("The default admin role name has been set.") @_set.command(pass_context=True) @checks.is_owner() async def prefix(self, ctx, *prefixes): """Sets Red's global prefixes Accepts multiple prefixes separated by a space. Enclose in double quotes if a prefix contains spaces. Example: set prefix ! $ ? "two words" """ if prefixes == (): await self.bot.send_cmd_help(ctx) return self.bot.settings.prefixes = sorted(prefixes, reverse=True) self.bot.settings.save_settings() log.debug("Setting global prefixes to:\n\t{}" "".format(self.bot.settings.prefixes)) p = "prefixes" if len(prefixes) > 1 else "prefix" await self.bot.say("Global {} set".format(p)) @_set.command(pass_context=True, no_pm=True) @checks.serverowner_or_permissions(administrator=True) async def serverprefix(self, ctx, *prefixes): """Sets Red's prefixes for this server Accepts multiple prefixes separated by a space. Enclose in double quotes if a prefix contains spaces. Example: set serverprefix ! $ ? "two words" Issuing this command with no parameters will reset the server prefixes and the global ones will be used instead.""" server = ctx.message.server if prefixes == (): self.bot.settings.set_server_prefixes(server, []) self.bot.settings.save_settings() current_p = ", ".join(self.bot.settings.prefixes) await self.bot.say("Server prefixes reset. Current prefixes: " "`{}`".format(current_p)) return prefixes = sorted(prefixes, reverse=True) self.bot.settings.set_server_prefixes(server, prefixes) self.bot.settings.save_settings() log.debug("Setting server's {} prefixes to:\n\t{}" "".format(server.id, self.bot.settings.prefixes)) p = "Prefixes" if len(prefixes) > 1 else "Prefix" await self.bot.say("{} set for this server.\n" "To go back to the global prefixes, do" " `{}set serverprefix` " "".format(p, prefixes[0])) @_set.command(pass_context=True) @checks.is_owner() async def name(self, ctx, *, name): """Sets Red's name""" name = name.strip() if name != "": try: await self.bot.edit_profile(self.bot.settings.password, username=name) except: await self.bot.say("Failed to change name. Remember that you" " can only do it up to 2 times an hour." "Use nicknames if you need frequent " "changes. {}set nickname" "".format(ctx.prefix)) else: await self.bot.say("Done.") else: await self.bot.send_cmd_help(ctx) @_set.command(pass_context=True, no_pm=True) @checks.is_owner() async def nickname(self, ctx, *, nickname=""): """Sets Red's nickname Leaving this empty will remove it.""" nickname = nickname.strip() if nickname == "": nickname = None try: await self.bot.change_nickname(ctx.message.server.me, nickname) await self.bot.say("Done.") except discord.Forbidden: await self.bot.say("I cannot do that, I lack the " "\"Change Nickname\" permission.") @_set.command(pass_context=True) @checks.is_owner() async def game(self, ctx, *, game=None): """Sets Red's playing status Leaving this empty will clear it.""" server = ctx.message.server current_status = server.me.status if server is not None else None if game: game = game.strip() await self.bot.change_presence(game=discord.Game(name=game), status=current_status) log.debug('Status set to "{}" by owner'.format(game)) else: await self.bot.change_presence(game=None, status=current_status) log.debug('status cleared by owner') await self.bot.say("Done.") @_set.command(pass_context=True) @checks.is_owner() async def status(self, ctx, *, status=None): """Sets Red's status Statuses: online idle dnd invisible""" statuses = { "online" : discord.Status.online, "idle" : discord.Status.idle, "dnd" : discord.Status.dnd, "invisible" : discord.Status.invisible } server = ctx.message.server current_game = server.me.game if server is not None else None if status is None: await self.bot.change_presence(status=discord.Status.online, game=current_game) await self.bot.say("Status reset.") else: status = statuses.get(status.lower(), None) if status: await self.bot.change_presence(status=status, game=current_game) await self.bot.say("Status changed.") else: await self.bot.send_cmd_help(ctx) @_set.command(pass_context=True) @checks.is_owner() async def stream(self, ctx, streamer=None, *, stream_title=None): """Sets Red's streaming status Leaving both streamer and stream_title empty will clear it.""" server = ctx.message.server current_status = server.me.status if server is not None else None if stream_title: stream_title = stream_title.strip() if "twitch.tv/" not in streamer: streamer = "https://www.twitch.tv/" + streamer game = discord.Game(type=1, url=streamer, name=stream_title) await self.bot.change_presence(game=game, status=current_status) log.debug('Owner has set streaming status and url to "{}" and {}'.format(stream_title, streamer)) elif streamer is not None: await self.bot.send_cmd_help(ctx) return else: await self.bot.change_presence(game=None, status=current_status) log.debug('stream cleared by owner') await self.bot.say("Done.") @_set.command() @checks.is_owner() async def avatar(self, url): """Sets Red's avatar""" try: async with self.session.get(url) as r: data = await r.read() await self.bot.edit_profile(self.bot.settings.password, avatar=data) await self.bot.say("Done.") log.debug("changed avatar") except Exception as e: await self.bot.say("Error, check your console or logs for " "more information.") log.exception(e) traceback.print_exc() @_set.command(name="token") @checks.is_owner() async def _token(self, token): """Sets Red's login token""" if len(token) < 50: await self.bot.say("Invalid token.") else: self.bot.settings.token = token self.bot.settings.save_settings() await self.bot.say("Token set. Restart me.") log.debug("Token changed.") @_set.command(name="adminrole", pass_context=True, no_pm=True) @checks.serverowner() async def _server_adminrole(self, ctx, *, role: discord.Role): """Sets the admin role for this server""" server = ctx.message.server if server.id not in self.bot.settings.servers: await self.bot.say("Remember to set modrole too.") self.bot.settings.set_server_admin(server, role.name) await self.bot.say("Admin role set to '{}'".format(role.name)) @_set.command(name="modrole", pass_context=True, no_pm=True) @checks.serverowner() async def _server_modrole(self, ctx, *, role: discord.Role): """Sets the mod role for this server""" server = ctx.message.server if server.id not in self.bot.settings.servers: await self.bot.say("Remember to set adminrole too.") self.bot.settings.set_server_mod(server, role.name) await self.bot.say("Mod role set to '{}'".format(role.name)) @commands.group(pass_context=True) @checks.is_owner() async def blacklist(self, ctx): """Blacklist management commands Blacklisted users will be unable to issue commands""" if ctx.invoked_subcommand is None: await self.bot.send_cmd_help(ctx) @blacklist.command(name="add") async def _blacklist_add(self, user: GlobalUser): """Adds user to Red's global blacklist""" if user.id not in self.global_ignores["blacklist"]: self.global_ignores["blacklist"].append(user.id) self.save_global_ignores() await self.bot.say("User has been blacklisted.") else: await self.bot.say("User is already blacklisted.") @blacklist.command(name="remove") async def _blacklist_remove(self, user: GlobalUser): """Removes user from Red's global blacklist""" if user.id in self.global_ignores["blacklist"]: self.global_ignores["blacklist"].remove(user.id) self.save_global_ignores() await self.bot.say("User has been removed from the blacklist.") else: await self.bot.say("User is not blacklisted.") @blacklist.command(name="list") async def _blacklist_list(self): """Lists users on the blacklist""" blacklist = self._populate_list(self.global_ignores["blacklist"]) if blacklist: for page in blacklist: await self.bot.say(box(page)) else: await self.bot.say("The blacklist is empty.") @blacklist.command(name="clear") async def _blacklist_clear(self): """Clears the global blacklist""" self.global_ignores["blacklist"] = [] self.save_global_ignores() await self.bot.say("Blacklist is now empty.") @commands.group(pass_context=True) @checks.is_owner() async def whitelist(self, ctx): """Whitelist management commands If the whitelist is not empty, only whitelisted users will be able to use Red""" if ctx.invoked_subcommand is None: await self.bot.send_cmd_help(ctx) @whitelist.command(name="add") async def _whitelist_add(self, user: GlobalUser): """Adds user to Red's global whitelist""" if user.id not in self.global_ignores["whitelist"]: if not self.global_ignores["whitelist"]: msg = "\nNon-whitelisted users will be ignored." else: msg = "" self.global_ignores["whitelist"].append(user.id) self.save_global_ignores() await self.bot.say("User has been whitelisted." + msg) else: await self.bot.say("User is already whitelisted.") @whitelist.command(name="remove") async def _whitelist_remove(self, user: GlobalUser): """Removes user from Red's global whitelist""" if user.id in self.global_ignores["whitelist"]: self.global_ignores["whitelist"].remove(user.id) self.save_global_ignores() await self.bot.say("User has been removed from the whitelist.") else: await self.bot.say("User is not whitelisted.") @whitelist.command(name="list") async def _whitelist_list(self): """Lists users on the whitelist""" whitelist = self._populate_list(self.global_ignores["whitelist"]) if whitelist: for page in whitelist: await self.bot.say(box(page)) else: await self.bot.say("The whitelist is empty.") @whitelist.command(name="clear") async def _whitelist_clear(self): """Clears the global whitelist""" self.global_ignores["whitelist"] = [] self.save_global_ignores() await self.bot.say("Whitelist is now empty.") @commands.command() @checks.is_owner() async def shutdown(self, silently : bool=False): """Shuts down Red""" wave = "\N{WAVING HAND SIGN}" skin = "\N{EMOJI MODIFIER FITZPATRICK TYPE-3}" try: # We don't want missing perms to stop our shutdown if not silently: await self.bot.say("Shutting down... " + wave + skin) except: pass await self.bot.shutdown() @commands.command() @checks.is_owner() async def restart(self, silently : bool=False): """Attempts to restart Red Makes Red quit with exit code 26 The restart is not guaranteed: it must be dealt with by the process manager in use""" try: if not silently: await self.bot.say("Restarting...") except: pass await self.bot.shutdown(restart=True) @commands.group(name="command", pass_context=True) @checks.is_owner() async def command_disabler(self, ctx): """Disables/enables commands With no subcommands returns the disabled commands list""" if ctx.invoked_subcommand is None: await self.bot.send_cmd_help(ctx) if self.disabled_commands: msg = "Disabled commands:\n```xl\n" for cmd in self.disabled_commands: msg += "{}, ".format(cmd) msg = msg.strip(", ") await self.bot.whisper("{}```".format(msg)) @command_disabler.command() async def disable(self, *, command): """Disables commands/subcommands""" comm_obj = await self.get_command(command) if comm_obj is KeyError: await self.bot.say("That command doesn't seem to exist.") elif comm_obj is False: await self.bot.say("You cannot disable owner restricted commands.") else: comm_obj.enabled = False comm_obj.hidden = True self.disabled_commands.append(command) self.save_disabled_commands() await self.bot.say("Command has been disabled.") @command_disabler.command() async def enable(self, *, command): """Enables commands/subcommands""" if command in self.disabled_commands: self.disabled_commands.remove(command) self.save_disabled_commands() await self.bot.say("Command enabled.") else: await self.bot.say("That command is not disabled.") return try: comm_obj = await self.get_command(command) comm_obj.enabled = True comm_obj.hidden = False except: # In case it was in the disabled list but not currently loaded pass # No point in even checking what returns async def get_command(self, command): command = command.split() try: comm_obj = self.bot.commands[command[0]] if len(command) > 1: command.pop(0) for cmd in command: comm_obj = comm_obj.commands[cmd] except KeyError: return KeyError for check in comm_obj.checks: if hasattr(check, "__name__") and check.__name__ == "is_owner_check": return False return comm_obj async def disable_commands(self): # runs at boot for cmd in self.disabled_commands: cmd_obj = await self.get_command(cmd) try: cmd_obj.enabled = False cmd_obj.hidden = True except: pass @commands.command() @checks.is_owner() async def join(self): """Shows Red's invite URL""" if self.bot.user.bot: await self.bot.whisper("Invite URL: " + self.bot.oauth_url) @commands.command(pass_context=True, no_pm=True) @checks.is_owner() async def leave(self, ctx): """Leaves server""" message = ctx.message await self.bot.say("Are you sure you want me to leave this server?" " Type yes to confirm.") response = await self.bot.wait_for_message(author=message.author) if response.content.lower().strip() == "yes": await self.bot.say("Alright. Bye :wave:") log.debug('Leaving "{}"'.format(message.server.name)) await self.bot.leave_server(message.server) else: await self.bot.say("Ok I'll stay here then.") @commands.command(pass_context=True) @checks.is_owner() async def servers(self, ctx): """Lists and allows to leave servers""" owner = ctx.message.author servers = sorted(list(self.bot.servers), key=lambda s: s.name.lower()) msg = "" for i, server in enumerate(servers): msg += "{}: {}\n".format(i, server.name) msg += "\nTo leave a server just type its number." for page in pagify(msg, ['\n']): await self.bot.say(page) while msg is not None: msg = await self.bot.wait_for_message(author=owner, timeout=15) try: msg = int(msg.content) await self.leave_confirmation(servers[msg], owner, ctx) break except (IndexError, ValueError, AttributeError): pass async def leave_confirmation(self, server, owner, ctx): await self.bot.say("Are you sure you want me " "to leave {}? (yes/no)".format(server.name)) msg = await self.bot.wait_for_message(author=owner, timeout=15) if msg is None: await self.bot.say("I guess not.") elif msg.content.lower().strip() in ("yes", "y"): await self.bot.leave_server(server) if server != ctx.message.server: await self.bot.say("Done.") else: await self.bot.say("Alright then.") @commands.command(pass_context=True) @commands.cooldown(1, 60, commands.BucketType.user) async def contact(self, ctx, *, message : str): """Sends a message to the owner""" if self.bot.settings.owner is None: await self.bot.say("I have no owner set.") return server = ctx.message.server owner = discord.utils.get(self.bot.get_all_members(), id=self.bot.settings.owner) author = ctx.message.author footer = "User ID: " + author.id if ctx.message.server is None: source = "through DM" else: source = "from {}".format(server) footer += " | Server ID: " + server.id if isinstance(author, discord.Member): colour = author.colour else: colour = discord.Colour.red() description = "Sent by {} {}".format(author, source) e = discord.Embed(colour=colour, description=message) if author.avatar_url: e.set_author(name=description, icon_url=author.avatar_url) else: e.set_author(name=description) e.set_footer(text=footer) try: await self.bot.send_message(owner, embed=e) except discord.InvalidArgument: await self.bot.say("I cannot send your message, I'm unable to find" " my owner... *sigh*") except discord.HTTPException: await self.bot.say("Your message is too long.") except: await self.bot.say("I'm unable to deliver your message. Sorry.") else: await self.bot.say("Your message has been sent.") @commands.command() async def info(self): """Shows info about Red""" author_repo = "https://git.drycat.fr/Geekcat/KiTTY-Discord-Bot/" red_repo = author_repo + "" server_url = "https://discord.io/techcordfr" dpy_repo = "https://github.com/" python_url = "https://www.python.org/" since = datetime.datetime(2017, 1, 2, 0, 0) days_since = (datetime.datetime.utcnow() - since).days dpy_version = "[{}]({})".format(discord.__version__, dpy_repo) py_version = "[{}.{}.{}]({})".format(*os.sys.version_info[:3], python_url) owner_set = self.bot.settings.owner is not None owner = self.bot.settings.owner if owner_set else None if owner: owner = discord.utils.get(self.bot.get_all_members(), id=owner) if not owner: try: owner = await self.bot.get_user_info(self.bot.settings.owner) except: owner = None if not owner: owner = "Unknown" about = ( "This is [KiTTY, an open source cat Discord bot]({})." "".format(author_repo)) embed = discord.Embed(colour=discord.Colour.red()) embed.add_field(name="Instance owned by", value=str(owner)) embed.add_field(name="Python", value=py_version) embed.add_field(name="discord.py", value=dpy_version) embed.add_field(name="About KiTTY", value=about, inline=False) embed.set_footer(text="Bringing joy since 2017 (over " "{} days ago!)".format(days_since)) try: await self.bot.say(embed=embed) except discord.HTTPException: await self.bot.say("I need the `Embed links` permission " "to send this") @commands.command() async def uptime(self): """Shows Red's uptime""" since = self.bot.uptime.strftime("%Y-%m-%d %H:%M:%S") passed = self.get_bot_uptime() await self.bot.say("Been up for: **{}** (since {} UTC)" "".format(passed, since)) @commands.command() async def version(self): """Shows Red's current version""" response = self.bot.loop.run_in_executor(None, self._get_version) result = await asyncio.wait_for(response, timeout=10) try: await self.bot.say(embed=result) except discord.HTTPException: await self.bot.say("I need the `Embed links` permission " "to send this") @commands.command(pass_context=True) @checks.is_owner() async def traceback(self, ctx, public: bool=False): """Sends to the owner the last command exception that has occurred If public (yes is specified), it will be sent to the chat instead""" if not public: destination = ctx.message.author else: destination = ctx.message.channel if self.bot._last_exception: for page in pagify(self.bot._last_exception): await self.bot.send_message(destination, box(page, lang="py")) else: await self.bot.say("No exception has occurred yet.") def _populate_list(self, _list): """Used for both whitelist / blacklist Returns a paginated list""" users = [] total = len(_list) for user_id in _list: user = discord.utils.get(self.bot.get_all_members(), id=user_id) if user: users.append("{} ({})".format(user, user.id)) if users: not_found = total - len(users) users = ", ".join(users) if not_found: users += "\n\n ... and {} users I could not find".format(not_found) return list(pagify(users, delims=[" ", "\n"])) return [] def _load_cog(self, cogname): if not self._does_cogfile_exist(cogname): raise CogNotFoundError(cogname) try: mod_obj = importlib.import_module(cogname) importlib.reload(mod_obj) self.bot.load_extension(mod_obj.__name__) except SyntaxError as e: raise CogLoadError(*e.args) except: raise def _unload_cog(self, cogname, reloading=False): if not reloading and cogname == "cogs.owner": raise OwnerUnloadWithoutReloadError( "Can't unload the owner plugin :P") try: self.bot.unload_extension(cogname) except: raise CogUnloadError def _list_cogs(self): cogs = [os.path.basename(f) for f in glob.glob("cogs/*.py")] return ["cogs." + os.path.splitext(f)[0] for f in cogs] def _does_cogfile_exist(self, module): if "cogs." not in module: module = "cogs." + module if module not in self._list_cogs(): return False return True def _wait_for_answer(self, author): print(author.name + " requested to be set as owner. If this is you, " "type 'yes'. Otherwise press enter.") print() print("*DO NOT* set anyone else as owner. This has security " "repercussions.") choice = "None" while choice.lower() != "yes" and choice == "None": choice = input("> ") if choice == "yes": self.bot.settings.owner = author.id self.bot.settings.save_settings() print(author.name + " has been set as owner.") self.setowner_lock = False self.owner.hidden = True else: print("The set owner request has been ignored.") self.setowner_lock = False def _get_version(self): if not os.path.isdir(".git"): msg = "This instance of Red hasn't been installed with git." e = discord.Embed(title=msg, colour=discord.Colour.red()) return e commands = " && ".join(( r'git config --get remote.origin.url', # Remote URL r'git rev-list --count HEAD', # Number of commits r'git rev-parse --abbrev-ref HEAD', # Branch name r'git show -s -n 3 HEAD --format="%cr|%s|%H"' # Last 3 commits )) result = os.popen(commands).read() url, ncommits, branch, commits = result.split("\n", 3) if url.endswith(".git"): url = url[:-4] if url.startswith("git@"): domain, _, resource = url[4:].partition(':') url = 'https://{}/{}'.format(domain, resource) repo_name = url.split("/")[-1] embed = discord.Embed(title="Updates of " + repo_name, description="Last three updates", colour=discord.Colour.red(), url="{}/tree/{}".format(url, branch)) for line in commits.split('\n'): if not line: continue when, commit, chash = line.split("|") commit_url = url + "/commit/" + chash content = "[{}]({}) - {} ".format(chash[:6], commit_url, commit) embed.add_field(name=when, value=content, inline=False) embed.set_footer(text="Total commits: " + ncommits) return embed def get_bot_uptime(self, *, brief=False): # Courtesy of Danny now = datetime.datetime.utcnow() delta = now - self.bot.uptime hours, remainder = divmod(int(delta.total_seconds()), 3600) minutes, seconds = divmod(remainder, 60) days, hours = divmod(hours, 24) if not brief: if days: fmt = '{d} days, {h} hours, {m} minutes, and {s} seconds' else: fmt = '{h} hours, {m} minutes, and {s} seconds' else: fmt = '{h}h {m}m {s}s' if days: fmt = '{d}d ' + fmt return fmt.format(d=days, h=hours, m=minutes, s=seconds) def save_global_ignores(self): dataIO.save_json("data/red/global_ignores.json", self.global_ignores) def save_disabled_commands(self): dataIO.save_json("data/red/disabled_commands.json", self.disabled_commands) def _import_old_data(data): """Migration from mod.py""" try: data["blacklist"] = dataIO.load_json("data/mod/blacklist.json") except FileNotFoundError: pass try: data["whitelist"] = dataIO.load_json("data/mod/whitelist.json") except FileNotFoundError: pass return data def check_files(): if not os.path.isfile("data/red/disabled_commands.json"): print("Creating empty disabled_commands.json...") dataIO.save_json("data/red/disabled_commands.json", []) if not os.path.isfile("data/red/global_ignores.json"): print("Creating empty global_ignores.json...") data = {"blacklist": [], "whitelist": []} try: data = _import_old_data(data) except Exception as e: log.error("Failed to migrate blacklist / whitelist data from " "mod.py: {}".format(e)) dataIO.save_json("data/red/global_ignores.json", data) def setup(bot): check_files() n = Owner(bot) bot.add_cog(n)