From 0d97422850aa3b5399a79c4fd2ea7c874e7d08bb Mon Sep 17 00:00:00 2001 From: flifloo Date: Mon, 16 Sep 2019 22:02:24 +0200 Subject: [PATCH] Rework calendar and edt function --- EDTcalendar.py | 38 +++++++++++++++++ bot.py | 108 ++++++++++++++++++++++--------------------------- 2 files changed, 86 insertions(+), 60 deletions(-) create mode 100644 EDTcalendar.py diff --git a/EDTcalendar.py b/EDTcalendar.py new file mode 100644 index 0000000..392dfc6 --- /dev/null +++ b/EDTcalendar.py @@ -0,0 +1,38 @@ +import datetime +import ics +import requests +from ics.timeline import Timeline +from aiogram.utils import markdown + + +class Calendar(ics.Calendar): + def __init__(self, url: list, firstdate: datetime.date, lastdate: datetime.date): + super().__init__(requests.get( + f"{url[0]}?resources={url[1]}&projectId={url[2]}&calType=ical&firstDate={firstdate}&lastDate={lastdate}" + ).text) + events = set() + for e in self.events: + events.add(Event(e)) + self.events = events + + self.timeline = Timeline(self) + + +class Event(ics.Event): + def __init__(self, event: ics.Event): + super().__init__() + for v in event.__dict__: + setattr(self, v, event.__dict__[v]) + + self.begin = self.begin.datetime.replace(tzinfo=datetime.timezone.utc).astimezone(tz=None) + self.end = self.end.datetime.replace(tzinfo=datetime.timezone.utc).astimezone(tz=None) + self.organizer = self.description.split('\n')[3] + + def __str__(self): + return markdown.text( + markdown.bold(f"<{str(self.begin.date())[5:]}>"), + markdown.code(f"📓[{self.name}]:"), + markdown.text(f"⌚️ {str(self.begin.time())[:-3]} -> {str(self.end.time())[:-3]}"), + markdown.italic(f"📍 {self.location} 👨‍🏫 {self.organizer}"), + sep="\n" + ) diff --git a/bot.py b/bot.py index a5dd77d..f617ee0 100644 --- a/bot.py +++ b/bot.py @@ -12,9 +12,8 @@ from aiogram import Bot, Dispatcher, executor, types from aiogram.types import InlineQuery, InputTextMessageContent, InlineQueryResultArticle, ParseMode, reply_keyboard from aiogram.utils import markdown from aiogram.utils.exceptions import MessageIsTooLong -from ics import Calendar +from EDTcalendar import Calendar from ics.parse import ParseError -from requests import get from requests.exceptions import ConnectionError, InvalidSchema, MissingSchema @@ -34,50 +33,47 @@ if not isfile("token.ini"): exit(1) API_TOKEN = open("token.ini").read() ADMIN_ID = 148441652 +TIMES = ["", "day", "next", "week", "next week"] +URL = "http://adelb.univ-lyon1.fr/jsp/custom/modules/plannings/anonymous_cal.jsp" +PROJECT_ID = 4 bot = Bot(token=API_TOKEN) dp = Dispatcher(bot) dbL = RLock() -def edt(text, user_id): +def edt(time: str, user_id: int): with dbL: with shelve.open("edt", writeback=True) as db: - if str(user_id) not in db or "url" not in db[str(user_id)]: + if str(user_id) not in db or "resources" not in db[str(user_id)]: return markdown.bold("Your EDT is not set ! ❌") now = datetime.datetime.now(datetime.timezone.utc).astimezone(tz=None) - if text.lower() == "week": - firstdate = now.date() - datetime.timedelta(days=now.isoweekday()-1) - lastdate = now.date() + datetime.timedelta(days=(7 - now.isoweekday())) - elif text.lower() == "next": - now += datetime.timedelta(days=7) - firstdate = now.date() - datetime.timedelta(days=now.isoweekday()) - lastdate = now.date() + datetime.timedelta(days=(7 - now.isoweekday())) - elif text == "" or text.lower() == "day": - firstdate, lastdate = now.date(), now.date() - else: + + if now.isoweekday() in [6, 7]: + now += datetime.timedelta(days=(7 - (now.isoweekday() - 1))) + + dates = { + "": [0, 0], + "day": [0, 0], + "next": [1, 1], + "week": [-(now.isoweekday() - 1), 7 - now.isoweekday()], + "next week": [7-(now.isoweekday() - 1), 7 + (7 - now.isoweekday())] + } + if time not in TIMES: return markdown.bold("Invalid choice ! ❌") - url = f"{db[str(user_id)]['url']}&firstDate={firstdate}&lastDate={lastdate}" - c = Calendar(get(url).text) - msg = list() - days = list() + firstdate = now.date() + datetime.timedelta(days=dates[time][0]) + lastdate = now.date() + datetime.timedelta(days=dates[time][1]) + + c = Calendar([URL, db[str(user_id)]["resources"], PROJECT_ID], firstdate, lastdate) + msg = str() for e in list(c.timeline): - begin = e.begin.datetime.replace(tzinfo=datetime.timezone.utc).astimezone(tz=None) - end = e.end.datetime.replace(tzinfo=datetime.timezone.utc).astimezone(tz=None) - if begin.date() not in days: - days.append(begin.date()) - msg.append(markdown.bold(f"<{str(begin.date())[5:]}>")) - - msg.append(markdown.code(f"📓[{e.name}]:")) - msg.append(markdown.text(f"⌚️ {str(begin.time())[:-3]} -> {str(end.time())[:-3]}")) - prof = markdown.text(e.description.split('\n')[3]) - msg.append(markdown.italic(f"📍 {e.location} 👨‍🏫 {prof}" + "\n")) + msg += (str(e)[10:] if str(e.begin.date())[5:] in msg else str(e)) + "\n\n" if len(msg) == 0: - msg.append(markdown.italic("but nobody came...")) - return markdown.text(*msg, sep="\n") + msg += markdown.italic("but nobody came...") + return msg async def notif(): @@ -85,12 +81,11 @@ async def notif(): with dbL: with shelve.open("edt", writeback=True) as db: for u in db: - if ("url" in db[u]) and ("notif" in db[u]) and (db[u]["notif"]["state"]): + if ("resources" in db[u]) and ("notif" in db[u]) and (db[u]["notif"]["state"]): now = datetime.datetime.now(datetime.timezone.utc).astimezone(tz=None) - c = Calendar(get(f"{db[u]['url']}&firstDate={now.date()}&lastDate={now.date()}").text) + c = Calendar([URL, db[u]["resources"], PROJECT_ID], now.date(), now.date()) for e in c.events: - event = e.begin.datetime.replace(tzinfo=datetime.timezone.utc).astimezone(tz=None) - if (event - now).seconds/60 <= db[u]["notif"]["time"] and\ + if (e.begin - now).seconds/60 <= db[u]["notif"]["time"] and\ (now - db[u]["notif"]["last"]).seconds/60 >= db[u]["notif"]["cooldown"]: db[u]["notif"]["last"] = now @@ -98,7 +93,7 @@ async def notif(): markdown.text("🔔"), markdown.code(e.name), markdown.text("in"), - markdown.bold((event - now).seconds//60), + markdown.bold((e.begin - now).seconds//60), markdown.text("minutes !"), sep="\n" ) @@ -108,8 +103,8 @@ async def notif(): @dp.inline_handler() async def inline_echo(inline_query: InlineQuery): - text = inline_query.query - if text not in ["day", "week", "next", ""]: + text = inline_query.query.lower() + if text not in TIMES: text = "invalid" res = edt(text, inline_query.from_user.id) input_content = InputTextMessageContent(res, parse_mode=ParseMode.MARKDOWN) @@ -136,7 +131,7 @@ async def send_welcome(message: types.Message): markdown.text("💠 Welcome to the TelegramEDT, a calendar bot for the Lyon 1 University ! 💠\n"), markdown.text( markdown.text("🗓"), - markdown.code("/edt [day | week | next]"), + markdown.code("/edt [day | next | week | next week]"), markdown.text(", for show your next course") ), markdown.text( @@ -146,8 +141,8 @@ async def send_welcome(message: types.Message): ), markdown.text( markdown.text("⚙️"), - markdown.code("/setedt "), - markdown.text(", to setup your calendar") + markdown.code("/setedt "), + markdown.text(", to setup your calendar\nThe resources can be get on the url of exported calendar") ), markdown.text( markdown.text("🔗"), @@ -165,18 +160,19 @@ async def send_welcome(message: types.Message): @dp.message_handler(commands=["edt"]) -@dp.message_handler(lambda msg: msg.text.lower() in ["day", "week", "next"]) +@dp.message_handler(lambda msg: msg.text.lower() in TIMES[1:]) async def edt_cmd(message: types.Message): await message.chat.do(types.ChatActions.TYPING) logger.info(f"{message.from_user.username} do edt command: {message.text}") - text = message.text - if message.text[:4] == "/edt": - text = message.text[5:] + text = message.text.lower() + if text[:4] == "/edt": + text = text[5:] resp = edt(text, message.from_user.id) key = reply_keyboard.ReplyKeyboardMarkup() key.add(reply_keyboard.KeyboardButton("Day")) - key.add(reply_keyboard.KeyboardButton("Week")) key.add(reply_keyboard.KeyboardButton("Next")) + key.add(reply_keyboard.KeyboardButton("Week")) + key.add(reply_keyboard.KeyboardButton("Next week")) await message.reply(resp, parse_mode=ParseMode.MARKDOWN, reply_markup=key) @@ -184,25 +180,17 @@ async def edt_cmd(message: types.Message): async def edt_set(message: types.Message): await message.chat.do(types.ChatActions.TYPING) logger.info(f"{message.from_user.username} do setedt command: {message.text}") - url_http = message.text.find("http") - url_end = message.text.find(" ", url_http) - if url_end == -1: - url_end = None - url = message.text[url_http:url_end] + resources = message.text[8:] + now_test = datetime.datetime.now(datetime.timezone.utc).astimezone(tz=None) try: - Calendar(get(url).text) + Calendar([URL, int(resources), PROJECT_ID], now_test.date(), now_test.date()) except (ParseError, ConnectionError, InvalidSchema, MissingSchema, ValueError): - msg = markdown.bold("Invalid URL ! ❌") + msg = markdown.bold("Invalid resources ! ❌") else: - if "calType=vcal" in url: - url = url[:url.find("vcal")] + "ical" - elif "firstDate" in url: - url = url[:url.find("&firstDate")] - with dbL: with shelve.open("edt", writeback=True) as db: - db[str(message.from_user.id)]["url"] = url + db[str(message.from_user.id)]["resources"] = int(resources) msg = markdown.text("EDT set ✅") await message.reply(msg, parse_mode=ParseMode.MARKDOWN) @@ -214,8 +202,8 @@ async def edt_geturl(message: types.Message): logger.info(f"{message.from_user.username} do getedt command: {message.text}") with dbL: with shelve.open("edt", writeback=True) as db: - if (str(message.from_user.id) in db) and ("url" in db[str(message.from_user.id)]): - await message.reply(db[str(message.from_user.id)]["url"]) + if (str(message.from_user.id) in db) and ("resources" in db[str(message.from_user.id)]): + await message.reply(db[str(message.from_user.id)]["resources"]) else: await message.reply("No EDT set ! ❌")