diff --git a/bot.py b/bot.py index 5b43f86..e212f35 100644 --- a/bot.py +++ b/bot.py @@ -1,4 +1,4 @@ -import logging, shelve, datetime, hashlib +import logging, shelve, datetime, hashlib, asyncio from aiogram import Bot, Dispatcher, executor, types from aiogram.types import InlineQuery, InputTextMessageContent, InlineQueryResultArticle, ParseMode, reply_keyboard from aiogram.utils import markdown @@ -7,6 +7,7 @@ from ics.parse import ParseError from requests import get from requests.exceptions import ConnectionError, InvalidSchema, MissingSchema from threading import RLock +from asyncio import sleep API_TOKEN = open("token.ini").read() logging.basicConfig(level=logging.INFO) @@ -17,10 +18,10 @@ dbL = RLock() def edt(text, user_id): with dbL: - with shelve.open("edt") as db: - if not str(user_id) in db: - return markdown.bold("Your EDT is not set !") - now = datetime.datetime.now() + with shelve.open("edt", writeback=True) as db: + if str(user_id) not in db or "url" not in db[str(user_id)]: + return markdown.bold("Your EDT is not set ! ❌") + now = datetime.datetime.now(datetime.timezone.utc).astimezone(tz=None) if text.lower() == "week": firstdate = now.date() - datetime.timedelta(days=now.isoweekday()-1) lastdate = now.date() + datetime.timedelta(days=(7 - now.isoweekday())) @@ -31,16 +32,16 @@ def edt(text, user_id): elif text == "" or text.lower() == "day": firstdate, lastdate = now.date(), now.date() else: - return markdown.bold("Invalid choice !") + return markdown.bold("Invalid choice ! ❌") - url = f"{db[str(user_id)]}&firstDate={firstdate}&lastDate={lastdate}" + url = f"{db[str(user_id)]['url']}&firstDate={firstdate}&lastDate={lastdate}" c = Calendar(get(url).text) msg = list() days = list() for e in list(c.timeline): - begin = e.begin.datetime - end = e.end.datetime + begin = e.begin.datetime.replace(tzinfo=datetime.timezone.utc).astimezone(tz=None) + end = e.end.datetime.replace(tzinfo=datetime.timezone.utc).astimezone(tz=None) if begin.date() not in days: days.append(begin.date()) msg.append(markdown.bold(f"<{str(begin.date())[5:]}>")) @@ -55,14 +56,66 @@ def edt(text, user_id): return markdown.text(*msg, sep="\n") +async def notif(): + while True: + with dbL: + with shelve.open("edt", writeback=True) as db: + for u in db: + if ("url" in db[u]) and ("notif" in db[u]) and (db[u]["notif"]["state"]): + now = datetime.datetime.now(datetime.timezone.utc).astimezone(tz=None) + c = Calendar(get(f"{db[u]['url']}&firstDate={now.date()}&lastDate={now.date()}").text) + for e in c.events: + event = e.begin.datetime.replace(tzinfo=datetime.timezone.utc).astimezone(tz=None) + if (event - now).seconds/60 <= db[u]["notif"]["time"] and\ + (now - db[u]["notif"]["last"]).seconds/60 >= db[u]["notif"]["cooldown"]: + db[u]["notif"]["last"] = now + + msg = markdown.text( + markdown.text("🔔"), + markdown.code(e.name), + markdown.text("in"), + markdown.bold((event - now).seconds//60), + markdown.text("minutes !"), + sep="\n" + ) + await bot.send_message(int(u), msg, parse_mode=ParseMode.MARKDOWN) + await sleep(60) + + @dp.message_handler(commands=["start", "help"]) async def send_welcome(message: types.Message): + with dbL: + with shelve.open("edt", writeback=True) as db: + if str(message.from_user.id) not in db: + db[str(message.from_user.id)] = dict() + msg = markdown.text( markdown.text("💠 Welcome to the TelegramEDT, a calendar bot for the Lyon 1 University ! 💠\n"), - markdown.text(markdown.code("/edt [day | week | next]"), markdown.text(", for show your next course")), - markdown.text(markdown.code("/setedt "), markdown.text(", to setup your calendar")), - markdown.text(markdown.code("/getedt"), markdown.text(", to get your calendar url")), - markdown.text(markdown.code("/help"), markdown.text(", to show this command")), + markdown.text( + markdown.text("🗓"), + markdown.code("/edt [day | week | next]"), + markdown.text(", for show your next course") + ), + markdown.text( + markdown.text("🔔"), + markdown.code("/notif "), + markdown.text(", setup notifications") + ), + markdown.text( + markdown.text("⚙ī¸"), + markdown.code("/setedt "), + markdown.text(", to setup your calendar") + ), + markdown.text( + markdown.text("🔗"), + markdown.code("/getedt"), + markdown.text(", to get your calendar url") + ), + markdown.text( + markdown.text("ℹī¸"), + markdown.code("/help"), + markdown.text(", to show this command") + ), sep="\n" ) await message.reply(msg, parse_mode=ParseMode.MARKDOWN) @@ -85,7 +138,7 @@ async def edt_cmd(message: types.Message): @dp.inline_handler() async def inline_echo(inline_query: InlineQuery): text = inline_query.query - if text not in ["week", "next", ""]: + if text not in ["day", "week", "next", ""]: text = "invalid" res = edt(text, inline_query.from_user.id) input_content = InputTextMessageContent(res) @@ -108,8 +161,8 @@ async def edt_set(message: types.Message): try: Calendar(get(url).text) - except (ParseError, ConnectionError, InvalidSchema, MissingSchema): - await message.reply("Invalid URL !") + except (ParseError, ConnectionError, InvalidSchema, MissingSchema, ValueError): + msg = markdown.bold("Invalid URL ! ❌") else: if "calType=vcal" in url: url = url[:url.find("vcal")] + "ical" @@ -117,21 +170,89 @@ async def edt_set(message: types.Message): url = url[:url.find("&firstDate")] with dbL: - with shelve.open("edt") as db: - db[str(message.from_user.id)] = url + with shelve.open("edt", writeback=True) as db: + db[str(message.from_user.id)]["url"] = url + msg = markdown.text("EDT set ✅") - await message.reply("EDT set !") + await message.reply(msg, parse_mode=ParseMode.MARKDOWN) @dp.message_handler(commands=["getedt"]) async def edt_geturl(message: types.Message): with dbL: - with shelve.open("edt") as db: - if str(message.from_user.id) in db: - await message.reply(db[str(message.from_user.id)]) + with shelve.open("edt", writeback=True) as db: + if (str(message.from_user.id) in db) and ("url" in db[str(message.from_user.id)]): + await message.reply(db[str(message.from_user.id)]["url"]) else: - await message.reply("No EDT set !") + await message.reply("No EDT set ! ❌") + + +@dp.message_handler(commands=["notif"]) +async def notif_cmd(message: types.Message): + with dbL: + with shelve.open("edt", writeback=True) as db: + if "notif" not in db[str(message.from_user.id)]: + db[str(message.from_user.id)]["notif"] = dict() + db[str(message.from_user.id)]["notif"]["state"] = False + db[str(message.from_user.id)]["notif"]["time"] = 20 + db[str(message.from_user.id)]["notif"]["cooldown"] = 20 + last = datetime.datetime.now(datetime.timezone.utc).astimezone(tz=None) - datetime.timedelta(minutes=20) + db[str(message.from_user.id)]["notif"]["last"] = last + + if message.text[7:10] == "set": + if db[str(message.from_user.id)]["notif"]["state"]: + res = False + else: + res = True + + db[str(message.from_user.id)]["notif"]["state"] = res + + msg = markdown.text( + markdown.text("Notifications set on "), + markdown.code(res), + markdown.text("✅") + ) + elif message.text[7:11] == "time" or message.text[7:15] == "cooldown": + cut = 11 if message.text[7:11] == "time" else 15 + try: + int(message.text[cut+1:]) + except ValueError: + msg = markdown.bold("Invalid number ! ❌") + else: + db[str(message.from_user.id)]["notif"][message.text[7:cut]] = int(message.text[cut+1:]) + + msg = markdown.text( + markdown.text("Notification"), + markdown.code(message.text[7:cut]), + markdown.text("set to"), + markdown.bold(message.text[cut+1:]), + markdown.text("✅") + ) + elif message.text[7:11] == "info": + msg = markdown.text( + markdown.code("Notification:"), + markdown.text( + markdown.bold("State:"), + markdown.text(db[str(message.from_user.id)]["notif"]["state"]) + ), + markdown.text( + markdown.bold("Time:"), + markdown.text(db[str(message.from_user.id)]["notif"]["time"]) + ), + markdown.text( + markdown.bold("Cooldown:"), + markdown.text(db[str(message.from_user.id)]["notif"]["cooldown"]) + ), + sep="\n" + ) + else: + msg = markdown.bold("Invalid action ! ❌") + + await message.reply(msg, parse_mode=ParseMode.MARKDOWN) if __name__ == '__main__': - executor.start_polling(dp, skip_updates=True) + loop = asyncio.get_event_loop() + loop.create_task(notif()) + loop.create_task(executor.start_polling(dp, skip_updates=True)) + loop.run_forever()