1
0
Fork 0
This repository has been archived on 2024-02-17. You can view files and clone it, but cannot push or open issues or pull requests.
Administrator-py/extensions/calendar.py

284 lines
11 KiB
Python
Raw Normal View History

import re
2020-05-28 15:14:18 +02:00
from datetime import datetime, timedelta, timezone
from operator import xor
import ics
import requests
2020-05-28 14:32:34 +02:00
from discord import Embed, DMChannel, TextChannel
from discord.ext import commands
from discord.ext import tasks
from discord.ext.commands import CommandNotFound, BadArgument, MissingRequiredArgument
from bot_bde import db
from bot_bde.logger import logger
extension_name = "calendar"
logger = logger.getChild(extension_name)
url_re = re.compile(r"http:\/\/adelb\.univ-lyon1\.fr\/jsp\/custom\/modules\/plannings\/anonymous_cal\.jsp\?resources="
r"([0-9]+)&projectId=([0-9]+)")
2020-05-28 12:13:41 +02:00
def query_calendar(name: str, guild: int) -> db.Calendar:
s = db.Session()
c: db.Calendar = s.query(db.Calendar).filter(db.Calendar.server == guild).filter(db.Calendar.name == name).first()
s.close()
if not c:
raise BadArgument()
return c
async def get_one_mention(ctx: commands.Context):
if ctx.message.channel_mentions and ctx.message.mentions:
raise BadArgument()
elif ctx.message.channel_mentions:
if len(ctx.message.channel_mentions) > 1:
raise BadArgument()
else:
m = ctx.message.channel_mentions[0].id
elif ctx.message.mentions:
if len(ctx.message.mentions) > 1:
raise BadArgument()
else:
m = ctx.message.mentions[0]
if not m.dm_channel:
await m.create_dm()
m = m.dm_channel.id
else:
m = ctx.channel.id
return m
class Calendar(commands.Cog):
def __init__(self, bot: commands.Bot):
self.bot = bot
@commands.group("calendar", pass_context=True)
async def calendar(self, ctx: commands.Context):
if ctx.invoked_subcommand is None:
await ctx.invoke(self.calendar_help)
@calendar.group("help", pass_context=True)
async def calendar_help(self, ctx: commands.Context):
embed = Embed(title="Calendar help")
embed.add_field(name="calendar define <name> <url>", value="Define a calendar", inline=False)
embed.add_field(name="calendar list", value="List all server calendar", inline=False)
embed.add_field(name="calendar remove <name>", value="Remove a server calendar", inline=False)
embed.add_field(name="calendar day <name> [date]", value="show the current day or the given day", inline=False)
embed.add_field(name="calendar week <name> [date]", value="Show the week or the given week", inline=False)
embed.add_field(name="calendar notify",
value="Command group to manage calendar notifications", inline=False)
await ctx.send(embed=embed)
@calendar.group("define", pass_context=True)
@commands.guild_only()
async def calendar_define(self, ctx: commands.Context, name: str, url: str):
try:
ics.Calendar(requests.get(url).text)
except Exception:
raise BadArgument()
m = url_re.findall(url)
if not m:
raise BadArgument()
s = db.Session()
if s.query(db.Calendar).filter(db.Calendar.server == ctx.guild.id).filter(db.Calendar.name == name).first():
s.close()
raise BadArgument()
s.add(db.Calendar(name, int(m[0][0]), int(m[0][1]), ctx.guild.id))
s.commit()
s.close()
await ctx.message.add_reaction("\U0001f44d")
@calendar.group("list", pass_context=True)
@commands.guild_only()
async def calendar_list(self, ctx: commands.Context):
embed = Embed(title="Calendar list")
s = db.Session()
for c in s.query(db.Calendar).filter(db.Calendar.server == ctx.guild.id).all():
embed.add_field(name=c.name, value=f"resources: {c.resources} | project id: {c.project_id}", inline=False)
s.close()
await ctx.send(embed=embed)
@calendar.group("remove", pass_context=True)
@commands.guild_only()
async def calendar_remove(self, ctx: commands.Context, name: str = None):
if name is None:
await ctx.invoke(self.calendar_list)
else:
s = db.Session()
c = s.query(db.Calendar).filter(db.Calendar.server == ctx.guild.id).filter(db.Calendar.name == name).first()
if c:
s.delete(c)
s.commit()
s.close()
await ctx.message.add_reaction("\U0001f44d")
else:
s.close()
raise BadArgument()
@calendar.group("day", pass_context=True)
@commands.guild_only()
async def calendar_day(self, ctx: commands.Context, name: str, day: str = None):
2020-05-28 12:13:41 +02:00
c = query_calendar(name, ctx.guild.id)
if day is None:
date = datetime.now()
else:
try:
date = datetime.strptime(day, "%d/%m/%Y")
except ValueError:
raise BadArgument()
embed = Embed(title=f"Day calendar: {c.name}", description=date.strftime("%d/%m/%Y"))
for e in c.events(date, date):
embed.add_field(name=f"{e.begin.strftime('%H:%M')} - {e.end.strftime('%H:%M')}",
value=f"{e.name} | {e.location} - {e.organizer}", inline=False)
await ctx.send(embed=embed)
2020-05-28 12:13:41 +02:00
@calendar.group("week", pass_context=True)
@commands.guild_only()
2020-05-28 12:13:41 +02:00
async def calendar_week(self, ctx: commands.Context, name: str, day: str = None):
c = query_calendar(name, ctx.guild.id)
if day is None:
date = datetime.now()
else:
try:
date = datetime.strptime(day, "%d/%m/%Y")
except ValueError:
raise BadArgument()
date -= timedelta(days=date.weekday())
embed = Embed(title=f"Week calendar: {c.name}",
description=f"{date.strftime('%d/%m/%Y')} - {(date + timedelta(days=4)).strftime('%d/%m/%Y')}")
for d in range(5):
events = []
for e in c.events(date, date):
events.append(f"*{e.begin.strftime('%H:%M')} - {e.end.strftime('%H:%M')}*: "
f"**{e.name}** | {e.location} - {e.organizer}")
embed.add_field(name=date.strftime("%d/%m/%Y"), value="\n".join(events) or "Nothing !", inline=False)
date = date + timedelta(days=1)
await ctx.send(embed=embed)
@calendar.group("notify", pass_context=True)
async def calendar_notify(self, ctx: commands.Context):
if ctx.invoked_subcommand is None:
await ctx.invoke(self.calendar_notify_help)
@calendar_notify.group("help", pass_context=True)
async def calendar_notify_help(self, ctx: commands.Context):
embed = Embed(title="Calendar notify help")
embed.add_field(name="calendar notify add <name> [#channel|@user]",
value="Notify the current channel or the giver channel/user of calendar events", inline=False)
embed.add_field(name="calendar notify remove <name> [#channel|@user]",
value="Remove the calendar notify of the current channel or the given channel/user",
inline=False)
embed.add_field(name="calendar notify list [name]",
value="List all notify of all calendar or the given one", inline=False)
await ctx.send(embed=embed)
@calendar_notify.group("add", pass_context=True)
@commands.guild_only()
async def calendar_notify_set(self, ctx: commands.Context, name: str):
m = await get_one_mention(ctx)
s = db.Session()
c = query_calendar(name, ctx.guild.id)
n = s.query(db.CalendarNotify).filter(db.CalendarNotify.channel == m) \
.filter(db.CalendarNotify.calendar_id == c.id) \
.first()
if not n:
s.add(db.CalendarNotify(m, c.id))
else:
s.close()
raise BadArgument()
s.commit()
s.close()
await ctx.message.add_reaction("\U0001f44d")
@calendar_notify.group("remove", pass_context=True)
async def calendar_notify_remove(self, ctx: commands.Context, name: str):
m = await get_one_mention(ctx)
s = db.Session()
c = query_calendar(name, ctx.guild.id)
n = s.query(db.CalendarNotify).filter(db.CalendarNotify.channel == m) \
.filter(db.CalendarNotify.calendar_id == c.id) \
.first()
if n:
s.delete(n)
else:
s.close()
raise BadArgument()
s.commit()
s.close()
await ctx.message.add_reaction("\U0001f44d")
2020-05-28 14:32:34 +02:00
@calendar_notify.group("list")
@commands.guild_only()
async def calendar_notify_list(self, ctx: commands.Context, name: str = None):
2020-05-28 14:32:34 +02:00
s = db.Session()
embed = Embed(title="Notify list")
if name is None:
calendars = s.query(db.Calendar).filter(db.Calendar.server == ctx.guild.id).all()
else:
calendars = [query_calendar(name, ctx.guild.id)]
for c in calendars:
2020-05-28 14:32:34 +02:00
notify = []
for n in c.calendars_notify:
ch = self.bot.get_channel(n.channel)
if type(ch) == TextChannel:
notify.append(ch.mention)
elif type(ch) == DMChannel:
notify.append(ch.recipient.mention)
embed.add_field(name=c.name, value="\n".join(notify) or "Nothing here", inline=False)
2020-05-28 14:32:34 +02:00
await ctx.send(embed=embed)
@calendar_notify.group("trigger", pass_context=True)
@commands.guild_only()
async def calendar_notify_trigger(self, ctx: commands.Context, name: str):
c = query_calendar(name, ctx.guild.id)
now = datetime.now()
await c.notify(self.bot, c.events(now, now)[0])
2020-05-28 15:14:18 +02:00
@tasks.loop(minutes=1)
async def calendar_notify_loop(self):
s = db.Session()
now = datetime.now().replace(tzinfo=timezone.utc).astimezone(tz=None)
for c in s.query(db.Calendar).all():
for e in c.events(now, now):
if xor(e.begin >= now - timedelta(minutes=30), e.begin >= now - timedelta(minutes=10)):
self.bot.loop.create_task(await c.notify(self.bot, e))
@commands.Cog.listener()
async def on_command_error(self, ctx: commands.Context, error):
if ctx.invoked_with == extension_name or \
(ctx.command.root_parent and ctx.command.root_parent.name == extension_name):
if isinstance(error, CommandNotFound) \
or isinstance(error, BadArgument) \
or isinstance(error, MissingRequiredArgument):
await ctx.message.add_reaction("\u2753")
await ctx.message.delete(delay=30)
else:
await ctx.send("An error occurred !")
raise error
2020-05-28 15:14:18 +02:00
def cog_unload(self):
self.calendar_notify_loop.stop()
def setup(bot):
logger.info(f"Loading...")
try:
2020-05-28 15:14:18 +02:00
calendar = Calendar(bot)
bot.add_cog(calendar)
calendar.calendar_notify_loop.start()
except Exception as e:
logger.error(f"Error loading: {e}")
else:
logger.info(f"Load successful")
def teardown(bot):
logger.info(f"Unloading...")
try:
bot.remove_cog("Calendar")
except Exception as e:
logger.error(f"Error unloading: {e}")
else:
logger.info(f"Unload successful")