1
0
Fork 0
This repository has been archived on 2024-02-17. You can view files and clone it, but cannot push or open issues or pull requests.
Administrator-py/extensions/invite.py

142 lines
4.8 KiB
Python

import re
from discord import Embed, Member, Guild, Role, CategoryChannel
from discord.abc import GuildChannel
from discord.errors import Forbidden
from discord.ext import commands
from discord.ext.commands import BadArgument
from discord_slash import cog_ext, SlashContext, SlashCommandOptionType
from discord_slash.utils import manage_commands
import db
from administrator import slash
from administrator.check import is_enabled, guild_only, has_permissions
from administrator.logger import logger
from administrator.utils import event_is_enabled
extension_name = "invite"
logger = logger.getChild(extension_name)
role_mention_re = re.compile(r"<@&[0-9]+>")
channel_mention_re = re.compile(r"<#[0-9]+>")
class Invite(commands.Cog):
def __init__(self, bot: commands.Bot):
self.bot = bot
self.invites = {}
slash.get_cog_commands(self)
self.bot.loop.create_task(self.update_invites())
def description(self):
return "Get role from a special invite link"
@is_enabled()
@guild_only()
@has_permissions(administrator=True)
async def invite_add(self, ctx: SlashContext, channel: GuildChannel, role: Role):
if isinstance(channel, CategoryChannel):
raise BadArgument()
inv = await channel.create_invite()
s = db.Session()
s.add(db.InviteRole(ctx.guild.id, inv.code, role.id))
s.commit()
s.close()
await ctx.send(content=f"Invite created: `{inv.url}`")
@cog_ext.cog_subcommand(base="invite", name="delete", description="Remove a invite", options=[
manage_commands.create_option("code", "The invitation code", SlashCommandOptionType.STRING, True)])
@is_enabled()
@guild_only()
@has_permissions(administrator=True)
async def invite_delete(self, ctx: SlashContext, code: str):
inv = next(filter(lambda i: i.code == code, await ctx.guild.invites()), None)
if not inv:
raise BadArgument()
s = db.Session()
invite_role = s.query(db.InviteRole).get({"guild_id": ctx.guild.id, "invite_code": code})
if not invite_role:
s.close()
raise BadArgument()
s.delete(invite_role)
s.commit()
s.close()
await inv.delete()
await ctx.send(content="\U0001f44d")
async def update_invites(self):
for g in self.bot.guilds:
self.invites[g.id] = await g.invites()
@commands.Cog.listener()
async def on_ready(self):
await self.update_invites()
@commands.Cog.listener()
async def on_member_join(self, member: Member):
if not event_is_enabled(self.qualified_name, member.guild.id):
return
user_invites = await member.guild.invites()
for i in self.invites[member.guild.id]:
for ui in user_invites:
if i.code == ui.code and i.uses < ui.uses:
s = db.Session()
invite_role = s.query(db.InviteRole).get({"guild_id": member.guild.id, "invite_code": i.code})
s.close()
if invite_role:
try:
await member.add_roles(member.guild.get_role(invite_role.role_id))
except Forbidden:
pass
self.invites[member.guild.id] = user_invites
@commands.Cog.listener()
async def on_invite_create(self, invite):
if not event_is_enabled(self.qualified_name, invite.guild.id):
return
self.invites[invite.guild.id] = await invite.guild.invites()
@commands.Cog.listener()
async def on_invite_delete(self, invite):
s = db.Session()
if not event_is_enabled(self.qualified_name, invite.guild.id, s):
return
invite_role = s.query(db.InviteRole).get({"guild_id": invite.guild.id, "invite_code": invite.code})
if invite_role:
s.delete(invite_role)
s.commit()
s.close()
self.invites[invite.guild.id] = await invite.guild.invites()
@commands.Cog.listener()
async def on_guild_join(self, guild: Guild):
self.invites[guild.id] = await guild.invites()
@commands.Cog.listener()
async def on_guild_remove(self, guild: Guild):
s = db.Session()
for g in s.query(db.InviteRole).filter(db.InviteRole.guild_id == guild.id).all():
s.delete(g)
s.commit()
s.close()
del self.invites[guild.id]
def setup(bot):
logger.info(f"Loading...")
try:
bot.add_cog(Invite(bot))
except Exception as e:
logger.error(f"Error loading: {e}")
else:
logger.info(f"Load successful")
def teardown(bot):
logger.info(f"Unloading...")
try:
bot.remove_cog("Invite")
except Exception as e:
logger.error(f"Error unloading: {e}")
else:
logger.info(f"Unload successful")