Complete Calendar with edt function, make light function name calendar for commands supports, attempt to fix the notifications and add getdb and eval admin commands
This commit is contained in:
parent
0d97422850
commit
857619fac3
2 changed files with 72 additions and 55 deletions
|
@ -4,19 +4,45 @@ import requests
|
|||
from ics.timeline import Timeline
|
||||
from aiogram.utils import markdown
|
||||
|
||||
URL = "http://adelb.univ-lyon1.fr/jsp/custom/modules/plannings/anonymous_cal.jsp"
|
||||
|
||||
|
||||
class Calendar(ics.Calendar):
|
||||
def __init__(self, url: list, firstdate: datetime.date, lastdate: datetime.date):
|
||||
super().__init__(requests.get(
|
||||
f"{url[0]}?resources={url[1]}&projectId={url[2]}&calType=ical&firstDate={firstdate}&lastDate={lastdate}"
|
||||
).text)
|
||||
def __init__(self, time: str, resources: int, url: str = URL, projectid: int = 4, pass_week: bool = True):
|
||||
super().__init__(requests.get(self._url(time, [url, resources, projectid], pass_week)).text)
|
||||
events = set()
|
||||
for e in self.events:
|
||||
events.add(Event(e))
|
||||
self.events = events
|
||||
|
||||
self.timeline = Timeline(self)
|
||||
|
||||
def _url(self, time: str, url: list, pass_week: bool):
|
||||
now = datetime.datetime.now(datetime.timezone.utc).astimezone(tz=None)
|
||||
if now.isoweekday() in [6, 7] and pass_week:
|
||||
now += datetime.timedelta(days=(7 - (now.isoweekday() - 1)))
|
||||
|
||||
dates = {
|
||||
"": [0, 0],
|
||||
"day": [0, 0],
|
||||
"next": [1, 1],
|
||||
"week": [-(now.isoweekday() - 1), 7 - now.isoweekday()],
|
||||
"next week": [7 - (now.isoweekday() - 1), 7 + (7 - now.isoweekday())]
|
||||
}
|
||||
firstdate = now.date() + datetime.timedelta(days=dates[time][0])
|
||||
lastdate = now.date() + datetime.timedelta(days=dates[time][1])
|
||||
return f"{url[0]}?resources={url[1]}&projectId={url[2]}&calType=ical&firstDate={firstdate}&lastDate={lastdate}"
|
||||
|
||||
def __str__(self):
|
||||
msg = str()
|
||||
|
||||
for e in list(self.timeline):
|
||||
msg += (str(e)[10:] if str(e.begin.date())[5:] in msg else str(e)) + "\n\n"
|
||||
|
||||
if len(msg) == 0:
|
||||
msg += markdown.italic("but nobody came...")
|
||||
return msg
|
||||
|
||||
|
||||
|
||||
class Event(ics.Event):
|
||||
def __init__(self, event: ics.Event):
|
||||
|
|
91
bot.py
91
bot.py
|
@ -34,46 +34,20 @@ if not isfile("token.ini"):
|
|||
API_TOKEN = open("token.ini").read()
|
||||
ADMIN_ID = 148441652
|
||||
TIMES = ["", "day", "next", "week", "next week"]
|
||||
URL = "http://adelb.univ-lyon1.fr/jsp/custom/modules/plannings/anonymous_cal.jsp"
|
||||
PROJECT_ID = 4
|
||||
|
||||
bot = Bot(token=API_TOKEN)
|
||||
dp = Dispatcher(bot)
|
||||
dbL = RLock()
|
||||
|
||||
|
||||
def edt(time: str, user_id: int):
|
||||
def calendar(time: str, user_id: int):
|
||||
with dbL:
|
||||
with shelve.open("edt", writeback=True) as db:
|
||||
if str(user_id) not in db or "resources" not in db[str(user_id)]:
|
||||
return markdown.bold("Your EDT is not set ! ❌")
|
||||
now = datetime.datetime.now(datetime.timezone.utc).astimezone(tz=None)
|
||||
|
||||
if now.isoweekday() in [6, 7]:
|
||||
now += datetime.timedelta(days=(7 - (now.isoweekday() - 1)))
|
||||
|
||||
dates = {
|
||||
"": [0, 0],
|
||||
"day": [0, 0],
|
||||
"next": [1, 1],
|
||||
"week": [-(now.isoweekday() - 1), 7 - now.isoweekday()],
|
||||
"next week": [7-(now.isoweekday() - 1), 7 + (7 - now.isoweekday())]
|
||||
}
|
||||
if time not in TIMES:
|
||||
elif time not in TIMES:
|
||||
return markdown.bold("Invalid choice ! ❌")
|
||||
|
||||
firstdate = now.date() + datetime.timedelta(days=dates[time][0])
|
||||
lastdate = now.date() + datetime.timedelta(days=dates[time][1])
|
||||
|
||||
c = Calendar([URL, db[str(user_id)]["resources"], PROJECT_ID], firstdate, lastdate)
|
||||
msg = str()
|
||||
|
||||
for e in list(c.timeline):
|
||||
msg += (str(e)[10:] if str(e.begin.date())[5:] in msg else str(e)) + "\n\n"
|
||||
|
||||
if len(msg) == 0:
|
||||
msg += markdown.italic("but nobody came...")
|
||||
return msg
|
||||
return str(Calendar(time, db[str(user_id)]["resources"]))
|
||||
|
||||
|
||||
async def notif():
|
||||
|
@ -82,31 +56,23 @@ async def notif():
|
|||
with shelve.open("edt", writeback=True) as db:
|
||||
for u in db:
|
||||
if ("resources" in db[u]) and ("notif" in db[u]) and (db[u]["notif"]["state"]):
|
||||
logger.info(f"notif check for {u}")
|
||||
now = datetime.datetime.now(datetime.timezone.utc).astimezone(tz=None)
|
||||
c = Calendar([URL, db[u]["resources"], PROJECT_ID], now.date(), now.date())
|
||||
for e in c.events:
|
||||
if (e.begin - now).seconds/60 <= db[u]["notif"]["time"] and\
|
||||
(now - db[u]["notif"]["last"]).seconds/60 >= db[u]["notif"]["cooldown"]:
|
||||
c = Calendar("day", db[u]["resources"], pass_week=False)
|
||||
for e in c.timeline:
|
||||
logger.info(f"{(e.begin - now).total_seconds().__abs__()//60.} <= {db[u]['notif']['time']} and {(now - db[u]['notif']['last']).total_seconds()//60} | >= {db[u]['notif']['cooldown']}")
|
||||
logger.info(f"{(e.begin - now).total_seconds().__abs__()//60 <= db[u]['notif']['time']} and {(now - db[u]['notif']['last']).total_seconds()//60 >= db[u]['notif']['cooldown']}")
|
||||
if (e.begin - now).total_seconds().__abs__()//60 <= db[u]["notif"]["time"] and\
|
||||
(now - db[u]["notif"]["last"]).total_seconds()//60 >= db[u]["notif"]["cooldown"]:
|
||||
db[u]["notif"]["last"] = now
|
||||
|
||||
msg = markdown.text(
|
||||
markdown.text("🔔"),
|
||||
markdown.code(e.name),
|
||||
markdown.text("in"),
|
||||
markdown.bold((e.begin - now).seconds//60),
|
||||
markdown.text("minutes !"),
|
||||
sep="\n"
|
||||
)
|
||||
await bot.send_message(int(u), msg, parse_mode=ParseMode.MARKDOWN)
|
||||
await bot.send_message(int(u), e, parse_mode=ParseMode.MARKDOWN)
|
||||
await sleep(60)
|
||||
|
||||
|
||||
@dp.inline_handler()
|
||||
async def inline_echo(inline_query: InlineQuery):
|
||||
text = inline_query.query.lower()
|
||||
if text not in TIMES:
|
||||
text = "invalid"
|
||||
res = edt(text, inline_query.from_user.id)
|
||||
text = inline_query.query.lower() if inline_query.query.lower() not in TIMES else "invalid"
|
||||
res = calendar(text, inline_query.from_user.id)
|
||||
input_content = InputTextMessageContent(res, parse_mode=ParseMode.MARKDOWN)
|
||||
result_id: str = hashlib.md5(res.encode()).hexdigest()
|
||||
item = InlineQueryResultArticle(
|
||||
|
@ -167,7 +133,7 @@ async def edt_cmd(message: types.Message):
|
|||
text = message.text.lower()
|
||||
if text[:4] == "/edt":
|
||||
text = text[5:]
|
||||
resp = edt(text, message.from_user.id)
|
||||
resp = calendar(text, message.from_user.id)
|
||||
key = reply_keyboard.ReplyKeyboardMarkup()
|
||||
key.add(reply_keyboard.KeyboardButton("Day"))
|
||||
key.add(reply_keyboard.KeyboardButton("Next"))
|
||||
|
@ -181,10 +147,9 @@ async def edt_set(message: types.Message):
|
|||
await message.chat.do(types.ChatActions.TYPING)
|
||||
logger.info(f"{message.from_user.username} do setedt command: {message.text}")
|
||||
resources = message.text[8:]
|
||||
now_test = datetime.datetime.now(datetime.timezone.utc).astimezone(tz=None)
|
||||
|
||||
try:
|
||||
Calendar([URL, int(resources), PROJECT_ID], now_test.date(), now_test.date())
|
||||
Calendar("", int(resources))
|
||||
except (ParseError, ConnectionError, InvalidSchema, MissingSchema, ValueError):
|
||||
msg = markdown.bold("Invalid resources ! ❌")
|
||||
else:
|
||||
|
@ -307,6 +272,32 @@ async def get_logs(message: types.Message):
|
|||
await message.reply(markdown.bold("Too much logs ! ❌"))
|
||||
|
||||
|
||||
@dp.message_handler(commands=["getdb"])
|
||||
async def get_db(message: types.Message):
|
||||
logger.info(f"{message.from_user.username} do getdb command: {message.text}")
|
||||
if message.from_user.id == ADMIN_ID:
|
||||
with dbL:
|
||||
with shelve.open("edt") as db:
|
||||
msg = markdown.text(
|
||||
markdown.italic("db:"),
|
||||
markdown.code(dict(db)),
|
||||
sep="\n"
|
||||
)
|
||||
await message.reply(msg, parse_mode=ParseMode.MARKDOWN)
|
||||
|
||||
|
||||
@dp.message_handler(commands=["eval"])
|
||||
async def get_db(message: types.Message):
|
||||
logger.info(f"{message.from_user.username} do eval command: {message.text}")
|
||||
if message.from_user.id == ADMIN_ID:
|
||||
msg = markdown.text(
|
||||
markdown.italic("eval:"),
|
||||
markdown.code(eval(message.text[6:])),
|
||||
sep="\n"
|
||||
)
|
||||
await message.reply(msg, parse_mode=ParseMode.MARKDOWN)
|
||||
|
||||
|
||||
@dp.errors_handler()
|
||||
async def errors(*args, **partial_data):
|
||||
msg = markdown.text(
|
||||
|
|
Reference in a new issue