import re
from datetime import datetime
from time import mktime
from discord import Embed, Forbidden, HTTPException
from discord.ext import commands, tasks
from discord.ext.commands import BadArgument
from discord_slash import SlashContext, cog_ext, SlashCommandOptionType
from discord_slash.utils import manage_commands
from feedparser import parse
import db
from administrator import slash
from administrator.check import is_enabled
from administrator.logger import logger
extension_name = "tomuss"
logger = logger.getChild(extension_name)
url_re = re.compile(r"https://tomuss\.univ-lyon1\.fr/S/[0-9]{4}/[a-zA-Z]+/rss/.+")
class Tomuss(commands.Cog):
def __init__(self, bot: commands.Bot):
self.bot = bot
self.tomuss_loop.start()
slash.get_cog_commands(self)
def description(self):
return "PCP Univ Lyon 1"
@cog_ext.cog_subcommand(base="tomuss", name="set", description="Set your tomuss RSS feed", options=[
manage_commands.create_option("url", "The RSS URL", SlashCommandOptionType.STRING, True)])
async def tomuss_set(self, ctx: SlashContext, url: str):
if not url_re.fullmatch(url):
raise BadArgument()
entries = parse(url).entries
if not entries:
raise BadArgument()
last = datetime.fromtimestamp(mktime(sorted(entries, key=lambda e: e.published_parsed)[0].published_parsed))
s = db.Session()
t = s.query(db.Tomuss).get(ctx.author.id)
if t:
t.url = url
t.last = last
else:
t = db.Tomuss(ctx.author.id, url, last)
s.add(t)
s.commit()
s.close()
await ctx.channel.send(f"Tomuss RSS set for {ctx.author.mention} \U0001f44d")
@cog_ext.cog_subcommand(base="tomuss", name="unset", description="Unset your tomuss RSS feed")
async def tomuss_unset(self, ctx: SlashContext):
s = db.Session()
t = s.query(db.Tomuss).get(ctx.author.id)
if not t:
raise BadArgument()
s.delete(t)
s.commit()
s.close()
await ctx.send(content="\U0001f44d")
@tasks.loop(minutes=5)
async def tomuss_loop(self):
s = db.Session()
for t in s.query(db.Tomuss).all():
u = await self.bot.fetch_user(t.user_id)
if not u:
s.delete(t)
s.commit()
continue
last = t.last.utctimetuple()
entries = list(filter(lambda e: e.published_parsed > last,
sorted(parse(t.url).entries, key=lambda e: e.published_parsed)))
if entries:
embed = Embed(title="Tomuss update !")
for e in entries:
if len(e.title) > 256:
title = e.title[:253] + "..."
else:
title = e.title
summary = e.summary.replace("
", "\n").replace("", "**").replace("", "**")
if len(summary) > 1024:
summary = summary[:1021] + "..."
embed.add_field(name=title, value=summary)
try:
await u.send(embed=embed)
except Forbidden:
s.delete(t)
s.commit()
continue
except HTTPException:
await u.send("Too much to send, I can't handle it sorry...")
finally:
t.last = datetime.fromtimestamp(mktime(entries[-1].published_parsed))
s.add(t)
s.commit()
s.close()
def cog_unload(self):
self.tomuss_loop.stop()
def setup(bot):
logger.info(f"Loading...")
try:
bot.add_cog(Tomuss(bot))
except Exception as e:
logger.error(f"Error loading: {e}")
else:
logger.info(f"Load successful")
def teardown(bot):
logger.info(f"Unloading...")
try:
bot.remove_cog("Tomuss")
except Exception as e:
logger.error(f"Error unloading: {e}")
else:
logger.info(f"Unload successful")