1
0
Fork 0

Rework of project structure for separation by modules

This commit is contained in:
Ethanell 2019-12-26 01:03:05 +01:00
parent c1898214ac
commit 4b24b2eb31
15 changed files with 543 additions and 471 deletions

3
.gitignore vendored
View file

@ -9,8 +9,7 @@ logs/
calendars/
# Bot private
edt
edt.*
edt.db
token.ini
# PyCharm

View file

@ -1,9 +1,10 @@
import datetime
from os.path import getmtime, isfile
import ics
import requests
from ics.timeline import Timeline
from aiogram.utils import markdown
from os.path import getmtime, isfile
from ics.timeline import Timeline
URL = "http://adelb.univ-lyon1.fr/jsp/custom/modules/plannings/anonymous_cal.jsp"

97
TelegramEDT/__init__.py Normal file
View file

@ -0,0 +1,97 @@
import datetime
import logging
from os import mkdir
from os.path import isdir, isfile
from threading import RLock
from aiogram import Bot, Dispatcher, types
from aiogram.types import reply_keyboard, ContentType
from aiogram.utils.callback_data import CallbackData
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from TelegramEDT.EDTcalendar import Calendar
from TelegramEDT.base import Base, User
from TelegramEDT.lang import lang
tables = False
if not isdir("logs"):
mkdir("logs")
if not isdir("calendars"):
mkdir("calendars")
if not isfile("edt.db"):
tables = True
logger = logging.getLogger("TelegramEDT")
log_date = datetime.datetime.now(datetime.timezone.utc).astimezone(tz=None).date()
logging.basicConfig(
filename=f"logs/{log_date}.log",
format="{%(levelname)s}[%(asctime)s]: %(name)s | %(message)s",
level=logging.INFO,
)
if not isfile("token.ini"):
logger.critical("No token specified, impossible to start the bot !")
exit(1)
API_TOKEN = open("token.ini").readline().replace("\n", "")
ADMIN_ID = 148441652
TIMES = ["", "day", "next", "week", "next week"]
bot = Bot(token=API_TOKEN)
posts_cb = CallbackData("post", "id", "action")
dp = Dispatcher(bot)
engine = create_engine("sqlite:///edt.db")
Session = sessionmaker(bind=engine)
session = Session()
if tables:
Base.metadata.create_all(engine)
dbL = RLock()
key = reply_keyboard.ReplyKeyboardMarkup()
for k in ["Edt", "Kfet", "Setkfet", "Setedt", "Notif", "Settomuss"]:
key.add(reply_keyboard.KeyboardButton(k))
def check_id(user: types.User):
with dbL:
if (user.id,) not in session.query(User.id).all():
logger.info(f"{user.username} add to the db")
if user.locale and user.locale.language:
lg = user.locale.language
else:
lg = ""
session.add(User(id=user.id, language=lg))
session.commit()
from TelegramEDT.basic import start, help_cmd
dp.register_message_handler(start, commands="start")
dp.register_message_handler(help_cmd, commands="help")
from TelegramEDT.edt import edt_cmd, edt_query, inline_edt, edt_await, edt_geturl
dp.register_message_handler(edt_cmd, lambda msg: msg.text.lower() == "edt")
dp.register_inline_handler(inline_edt)
dp.register_callback_query_handler(edt_query, posts_cb.filter(action=["day", "next", "week", "next week"]))
dp.register_message_handler(edt_await, lambda msg: msg.text.lower() == "setedt")
dp.register_message_handler(edt_geturl, commands="getedt")
from TelegramEDT.kfet import kfet, kfet_set
dp.register_message_handler(kfet, lambda msg: msg.text.lower() == "kfet")
dp.register_message_handler(kfet_set, lambda msg: msg.text.lower() == "setkfet")
from TelegramEDT.tomuss import settomuss
dp.register_message_handler(settomuss, lambda msg: msg.text.lower() == "settomuss")
from TelegramEDT.notif import notif, notif_cmd, notif_query
dp.register_message_handler(notif_cmd, lambda msg: msg.text.lower() == "notif")
dp.register_callback_query_handler(notif_query, posts_cb.filter(action=["toggle", "time", "cooldown"]))
from TelegramEDT.await_cmd import await_cmd, have_await_cmd
dp.register_message_handler(await_cmd, lambda msg: have_await_cmd(msg), content_types=[ContentType.TEXT, ContentType.PHOTO])
from TelegramEDT.tools import get_id, get_logs, get_db, eval_cmd, errors
dp.register_message_handler(get_id, commands="getid")
dp.register_message_handler(get_logs, commands="getlogs")
dp.register_message_handler(get_db, commands="getdb")
dp.register_message_handler(eval_cmd, commands="eval")
dp.register_errors_handler(errors)

93
TelegramEDT/await_cmd.py Normal file
View file

@ -0,0 +1,93 @@
import re
import requests
from PIL import Image
from aiogram import types
from aiogram.types import ParseMode
from feedparser import parse
from ics.parse import ParseError
from pyzbar.pyzbar import decode
from requests.exceptions import ConnectionError, InvalidSchema, MissingSchema
from TelegramEDT import API_TOKEN, bot, dbL, key, logger, session, check_id
from TelegramEDT.EDTcalendar import Calendar
from TelegramEDT.base import User
from TelegramEDT.lang import lang
re_url = re.compile(r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+")
def have_await_cmd(msg: types.Message):
with dbL:
user = session.query(User).filter_by(id=msg.from_user.id).first()
return user and user.await_cmd
async def await_cmd(message: types.message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
msg = None
with dbL:
user = session.query(User).filter_by(id=message.from_user.id).first()
logger.info(f"{message.from_user.username} do awaited commande: {user.await_cmd}")
if user.await_cmd == "setedt":
url = str()
if message.photo:
file_path = await bot.get_file(message.photo[0].file_id)
file_url = f"https://api.telegram.org/file/bot{API_TOKEN}/{file_path['file_path']}"
qr = decode(Image.open(requests.get(file_url, stream=True).raw))
if qr:
url = str(qr[0].data)
elif message.text:
msg_url = re_url.findall(message.text)
if msg_url:
url = msg_url[0]
if url:
resources = url[url.find("resources") + 10:][:4]
elif message.text:
resources = message.text
try:
Calendar("", int(resources))
except (ParseError, ConnectionError, InvalidSchema, MissingSchema, ValueError, UnboundLocalError):
msg = lang(user, "setedt_err_res")
else:
user.resources = int(resources)
msg = lang(user, "setedt")
elif user.await_cmd == "setkfet":
try:
int(message.text)
except ValueError:
msg = lang(user, "err_num")
else:
user.kfet = int(message.text)
msg = lang(user, "kfet_set")
elif user.await_cmd == "settomuss":
if not len(parse(message.text).entries):
msg = lang(user, "settomuss_error")
else:
user.tomuss_rss = message.text
msg = lang(user, "settomuss")
elif user.await_cmd in ["time", "cooldown"]:
try:
value = int(message.text)
except ValueError:
msg = lang(user, "err_num")
else:
if user.await_cmd == "time":
user.nt_time = value
else:
user.nt_cooldown = value
msg = lang(user, "notif_time_cooldown").format(user.await_cmd[6:], value)
if user.await_cmd:
user.await_cmd = str()
session.commit()
if msg:
await message.reply(msg, parse_mode=ParseMode.MARKDOWN, reply_markup=key)

View file

@ -1,9 +1,11 @@
import datetime
import requests
from EDTcalendar import Calendar
from feedparser import parse
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Boolean, Date
from sqlalchemy.ext.declarative import declarative_base
from TelegramEDT.EDTcalendar import Calendar
KFET_URL = "http://kfet.bdeinfo.org/orders"
Base = declarative_base()

24
TelegramEDT/basic.py Normal file
View file

@ -0,0 +1,24 @@
from aiogram import types
from aiogram.types import ParseMode
from TelegramEDT import dbL, key, logger, session, check_id
from TelegramEDT.base import User
from TelegramEDT.lang import lang
async def start(message: types.Message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
logger.info(f"{message.from_user.username} start")
with dbL:
user = session.query(User).filter_by(id=message.from_user.id).first()
await message.reply(lang(user, "welcome"), parse_mode=ParseMode.MARKDOWN, reply_markup=key)
async def help_cmd(message: types.Message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
logger.info(f"{message.from_user.username} do help command")
with dbL:
user = session.query(User).filter_by(id=message.from_user.id).first()
await message.reply(lang(user, "help"), parse_mode=ParseMode.MARKDOWN, reply_markup=key)

79
TelegramEDT/edt.py Normal file
View file

@ -0,0 +1,79 @@
import hashlib
from aiogram import types
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton, ParseMode, InputTextMessageContent, \
InlineQueryResultArticle, InlineQuery
from TelegramEDT import dbL, key, logger, posts_cb, session, TIMES, bot, check_id
from TelegramEDT.base import User
from TelegramEDT.lang import lang
def calendar(time: str, user_id: int):
with dbL:
user = session.query(User).filter_by(id=user_id).first()
if not user.resources:
return lang(user, "edt_err_set")
elif time not in TIMES:
return lang(user, "edt_err_choice")
return str(user.calendar(time))
def edt_key():
keys = InlineKeyboardMarkup()
for i, n in enumerate(["Day", "Next", "Week", "Next week"]):
keys.add(InlineKeyboardButton(n, callback_data=posts_cb.new(id=i, action=n.lower())))
return keys
async def edt_cmd(message: types.Message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
logger.info(f"{message.from_user.username} do edt")
await message.reply(calendar("day", message.from_user.id), parse_mode=ParseMode.MARKDOWN, reply_markup=edt_key())
async def inline_edt(inline_query: InlineQuery):
check_id(inline_query.from_user)
text = inline_query.query.lower() if inline_query.query.lower() in TIMES else "invalid"
res = calendar(text, inline_query.from_user.id)
input_content = InputTextMessageContent(res, parse_mode=ParseMode.MARKDOWN)
result_id: str = hashlib.md5(res.encode()).hexdigest()
item = InlineQueryResultArticle(
id=result_id,
title=f"Your {text} course",
input_message_content=input_content,
)
await bot.answer_inline_query(inline_query.id, results=[item], cache_time=1)
async def edt_query(query: types.CallbackQuery, callback_data: dict):
check_id(query.message.from_user)
await query.message.chat.do(types.ChatActions.TYPING)
logger.info(f"{query.message.from_user.username} do edt query")
await query.message.reply(calendar(callback_data["action"], query.from_user.id), parse_mode=ParseMode.MARKDOWN,
reply_markup=edt_key())
async def edt_await(message: types.Message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
logger.info(f"{message.from_user.username} do setedt")
with dbL:
user = session.query(User).filter_by(id=message.from_user.id).first()
user.await_cmd = "setedt"
session.commit()
await message.reply(lang(user, "setedt_wait"), parse_mode=ParseMode.MARKDOWN, reply_markup=key)
async def edt_geturl(message: types.Message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
logger.info(f"{message.from_user.username} do getedt command")
with dbL:
user = session.query(User).filter_by(id=message.from_user.id).first()
if user.resources:
await message.reply(user.resources, reply_markup=key)
else:
await message.reply(lang(user, "getedt_err"), reply_markup=key)

47
TelegramEDT/kfet.py Normal file
View file

@ -0,0 +1,47 @@
import datetime
import requests
from aiogram import types
from aiogram.types import ParseMode
from aiogram.utils import markdown
from TelegramEDT import dbL, key, logger, session, check_id
from TelegramEDT.base import User, KFET_URL
from TelegramEDT.lang import lang
def get_now():
return datetime.datetime.now(datetime.timezone.utc).astimezone(tz=None)
async def kfet(message: types.Message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
logger.info(f"{message.from_user.username} do kfet")
with dbL:
user = session.query(User).filter_by(id=message.from_user.id).first()
if not 9 < get_now().hour < 14 or not get_now().isoweekday() < 6:
msg = lang(user, "kfet_close")
else:
msg = lang(user, "kfet_list")
cmds = requests.get(KFET_URL).json()
if cmds:
for c in cmds:
msg += markdown.code(c) + " " if cmds[c] == "ok" else ""
await message.reply(msg, parse_mode=ParseMode.MARKDOWN, reply_markup=key)
async def kfet_set(message: types.Message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
logger.info(f"{message.from_user.username} do setkfet")
with dbL:
user = session.query(User).filter_by(id=message.from_user.id).first()
if not 9 < get_now().hour < 14 or not get_now().isoweekday() < 5:
msg = lang(user, "kfet_close")
else:
user.await_cmd = "setkfet"
msg = lang(user, "kfet_set_await")
session.commit()
await message.reply(msg, parse_mode=ParseMode.MARKDOWN, reply_markup=key)

View file

@ -1,5 +1,6 @@
import json
from base import User
from TelegramEDT.base import User
LANG = ["en"]

83
TelegramEDT/notif.py Normal file
View file

@ -0,0 +1,83 @@
from asyncio import sleep
from aiogram import types
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton, ParseMode
from aiogram.utils import markdown
from TelegramEDT import bot, dbL, logger, posts_cb, session, check_id
from TelegramEDT.base import User
from TelegramEDT.lang import lang
async def notif():
while True:
with dbL:
for u in session.query(User).all():
nt = None
kf = None
tm = None
try:
nt = u.get_notif()
kf = u.get_kfet()
tm = u.get_tomuss()
except Exception as e:
logger.error(e)
if nt:
await bot.send_message(u.id, lang(u, "notif_event")+str(nt), parse_mode=ParseMode.MARKDOWN)
if kf:
if kf == 1:
kf = lang(u, "kfet")
elif kf == 2:
kf = lang(u, "kfet_prb")
else:
kf = lang(u, "kfet_err")
await bot.send_message(u.id, kf, parse_mode=ParseMode.MARKDOWN)
if tm:
for i in tm:
msg = markdown.text(
markdown.bold(i.title),
markdown.code(i.summary.replace("<br>", "\n").replace("<b>", "").replace("</b>", "")),
sep="\n"
)
await bot.send_message(u.id, msg, parse_mode=ParseMode.MARKDOWN)
u.tomuss_last = str(i)
session.commit()
await sleep(60)
async def notif_cmd(message: types.Message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
logger.info(f"{message.from_user.username} do notif")
keys = InlineKeyboardMarkup()
for i, n in enumerate(["Toggle", "Time", "Cooldown"]):
keys.add(InlineKeyboardButton(n, callback_data=posts_cb.new(id=i, action=n.lower())))
with dbL:
user = session.query(User).filter_by(id=message.from_user.id).first()
msg = lang(user, "notif_info").format(user.nt, user.nt_time, user.nt_cooldown)
await message.reply(msg, parse_mode=ParseMode.MARKDOWN, reply_markup=keys)
async def notif_query(query: types.CallbackQuery, callback_data: dict):
check_id(query.message.from_user)
await query.message.chat.do(types.ChatActions.TYPING)
logger.info(f"{query.message.from_user.username} do notif query")
with dbL:
user = session.query(User).filter_by(id=query.from_user.id).first()
if callback_data["action"] == "toggle":
if user.nt:
res = False
else:
res = True
user.nt = res
msg = lang(user, "notif_set").format(res)
elif callback_data["action"] in ["time", "cooldown"]:
user.await_cmd = callback_data["action"]
msg = lang(user, "notif_await")
session.commit()
await query.message.reply(msg, parse_mode=ParseMode.MARKDOWN)

18
TelegramEDT/tomuss.py Normal file
View file

@ -0,0 +1,18 @@
from aiogram import types
from aiogram.types import ParseMode
from TelegramEDT import dbL, key, logger, session, check_id
from TelegramEDT.base import User
from TelegramEDT.lang import lang
async def settomuss(message: types.Message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
logger.info(f"{message.from_user.username} do settomuss")
with dbL:
user = session.query(User).filter_by(id=message.from_user.id).first()
user.await_cmd = "settomuss"
session.commit()
await message.reply(lang(user, "settomuss_wait"), parse_mode=ParseMode.MARKDOWN, reply_markup=key)

81
TelegramEDT/tools.py Normal file
View file

@ -0,0 +1,81 @@
from aiogram import types
from aiogram.types import ParseMode
from aiogram.utils import markdown
from aiogram.utils.exceptions import MessageIsTooLong
from TelegramEDT import ADMIN_ID, bot, dbL, key, log_date, logger, session, check_id
from TelegramEDT.base import User
async def get_id(message: types.Message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
logger.info(f"{message.from_user.username} do getid command")
await message.reply(message.from_user.id, reply_markup=key)
async def get_logs(message: types.Message):
check_id(message.from_user)
logger.info(f"{message.from_user.username} do getlog command")
if message.from_user.id == ADMIN_ID:
try:
int(message.text[9:])
except ValueError:
await message.chat.do(types.ChatActions.UPLOAD_DOCUMENT)
await message.reply_document(types.InputFile(f"logs/{log_date}.log"), caption=f"The {log_date} logs",
reply_markup=key)
else:
await message.chat.do(types.ChatActions.TYPING)
logs = (open(f"logs/{log_date}.log", "r").readlines())[-int(message.text[9:]):]
log = str()
for i in logs:
log += i
msg = markdown.text(
markdown.italic("logs:"),
markdown.code(log),
sep="\n"
)
try:
await message.reply(msg, parse_mode=ParseMode.MARKDOWN, reply_markup=key)
except MessageIsTooLong:
await message.reply(markdown.bold("Too much logs ! ❌"), reply_markup=key)
async def get_db(message: types.Message):
check_id(message.from_user)
logger.info(f"{message.from_user.username} do getdb command")
if message.from_user.id == ADMIN_ID:
with dbL:
users = dict()
for u in session.query(User).all():
users[u] = u.__dict__
msg = markdown.text(
markdown.italic("db:"),
markdown.code(users),
sep="\n"
)
await message.reply(msg, parse_mode=ParseMode.MARKDOWN, reply_markup=key)
async def eval_cmd(message: types.Message):
check_id(message.from_user)
logger.info(f"{message.from_user.username} do eval command")
if message.from_user.id == ADMIN_ID:
msg = markdown.text(
markdown.italic("eval:"),
markdown.code(eval(message.text[6:])),
sep="\n"
)
await message.reply(msg, parse_mode=ParseMode.MARKDOWN, reply_markup=key)
async def errors(*args, **partial_data):
if "This Session's transaction has been rolled back due to a previous exception during flush" in args:
session.rollback()
msg = markdown.text(
markdown.bold("⚠️ An error occurred:"),
markdown.code(args),
markdown.code(partial_data),
sep="\n"
)
await bot.send_message(ADMIN_ID, msg, parse_mode=ParseMode.MARKDOWN)

463
bot.py
View file

@ -1,463 +0,0 @@
import asyncio
import datetime
import hashlib
import logging
import re
from asyncio import sleep
from os import mkdir
from os.path import isdir, isfile
from threading import RLock
import requests
from PIL import Image
from aiogram import Bot, Dispatcher, executor, types
from aiogram.types import InlineQuery, InputTextMessageContent, InlineKeyboardMarkup, InlineKeyboardButton, \
InlineQueryResultArticle, ParseMode, reply_keyboard, ContentType
from aiogram.utils import markdown
from aiogram.utils.callback_data import CallbackData
from aiogram.utils.exceptions import MessageIsTooLong
from feedparser import parse
from ics.parse import ParseError
from pyzbar.pyzbar import decode
from requests.exceptions import ConnectionError, InvalidSchema, MissingSchema
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from EDTcalendar import Calendar
from base import User, KFET_URL, Base
from lang import lang
tables = False
if not isdir("logs"):
mkdir("logs")
if not isdir("calendars"):
mkdir("calendars")
if not isfile("edt.db"):
tables = True
logger = logging.getLogger("TelegramEDT")
log_date = datetime.datetime.now(datetime.timezone.utc).astimezone(tz=None).date()
logging.basicConfig(
filename=f"logs/{log_date}.log",
format="{%(levelname)s}[%(asctime)s]: %(name)s | %(message)s",
level=logging.INFO,
)
if not isfile("token.ini"):
logger.critical("No token specified, impossible to start the bot !")
exit(1)
API_TOKEN = open("token.ini").readline().replace("\n", "")
ADMIN_ID = 148441652
TIMES = ["", "day", "next", "week", "next week"]
bot = Bot(token=API_TOKEN)
posts_cb = CallbackData("post", "id", "action")
dp = Dispatcher(bot)
engine = create_engine("sqlite:///edt.db")
Session = sessionmaker(bind=engine)
session = Session()
if tables:
Base.metadata.create_all(engine)
dbL = RLock()
re_url = re.compile(r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+")
key = reply_keyboard.ReplyKeyboardMarkup()
for k in ["Edt", "Kfet", "Setkfet", "Setedt", "Notif", "Settomuss"]:
key.add(reply_keyboard.KeyboardButton(k))
def get_now():
return datetime.datetime.now(datetime.timezone.utc).astimezone(tz=None)
def check_id(user: types.User):
with dbL:
if (user.id,) not in session.query(User.id).all():
logger.info(f"{user.username} add to the db")
if user.locale and user.locale.language:
lg = user.locale.language
else:
lg = ""
session.add(User(id=user.id, language=lg))
session.commit()
def have_await_cmd(msg: types.Message):
with dbL:
return session.query(User).filter_by(id=msg.from_user.id).first().await_cmd
def edt_key():
keys = InlineKeyboardMarkup()
for i, n in enumerate(["Day", "Next", "Week", "Next week"]):
keys.add(InlineKeyboardButton(n, callback_data=posts_cb.new(id=i, action=n.lower())))
return keys
def calendar(time: str, user_id: int):
with dbL:
user = session.query(User).filter_by(id=user_id).first()
if not user.resources:
return lang(user, "edt_err_set")
elif time not in TIMES:
return lang(user, "edt_err_choice")
return str(user.calendar(time))
async def notif():
while True:
with dbL:
for u in session.query(User).all():
nt = None
kf = None
tm = None
try:
nt = u.get_notif()
kf = u.get_kfet()
tm = u.get_tomuss()
except Exception as e:
logger.error(e)
if nt:
await bot.send_message(u.id, lang(u, "notif_event")+str(nt), parse_mode=ParseMode.MARKDOWN)
if kf:
if kf == 1:
kf = lang(u, "kfet")
elif kf == 2:
kf = lang(u, "kfet_prb")
else:
kf = lang(u, "kfet_err")
await bot.send_message(u.id, kf, parse_mode=ParseMode.MARKDOWN)
if tm:
for i in tm:
msg = markdown.text(
markdown.bold(i.title),
markdown.code(i.summary.replace("<br>", "\n").replace("<b>", "").replace("</b>", "")),
sep="\n"
)
await bot.send_message(u.id, msg, parse_mode=ParseMode.MARKDOWN)
u.tomuss_last = str(i)
session.commit()
await sleep(60)
@dp.inline_handler()
async def inline_edt(inline_query: InlineQuery):
check_id(inline_query.from_user)
text = inline_query.query.lower() if inline_query.query.lower() in TIMES else "invalid"
res = calendar(text, inline_query.from_user.id)
input_content = InputTextMessageContent(res, parse_mode=ParseMode.MARKDOWN)
result_id: str = hashlib.md5(res.encode()).hexdigest()
item = InlineQueryResultArticle(
id=result_id,
title=f"Your {text} course",
input_message_content=input_content,
)
await bot.answer_inline_query(inline_query.id, results=[item], cache_time=1)
@dp.message_handler(commands="start")
async def start(message: types.Message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
logger.info(f"{message.from_user.username} start")
with dbL:
user = session.query(User).filter_by(id=message.from_user.id).first()
await message.reply(lang(user, "welcome"), parse_mode=ParseMode.MARKDOWN, reply_markup=key)
@dp.message_handler(commands="help")
async def help_cmd(message: types.Message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
logger.info(f"{message.from_user.username} do help command")
with dbL:
user = session.query(User).filter_by(id=message.from_user.id).first()
await message.reply(lang(user, "help"), parse_mode=ParseMode.MARKDOWN, reply_markup=key)
@dp.message_handler(lambda msg: msg.text.lower() == "edt")
async def edt_cmd(message: types.Message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
logger.info(f"{message.from_user.username} do edt")
await message.reply(calendar("day", message.from_user.id), parse_mode=ParseMode.MARKDOWN, reply_markup=edt_key())
@dp.callback_query_handler(posts_cb.filter(action=["day", "next", "week", "next week"]))
async def edt_query(query: types.CallbackQuery, callback_data: dict):
check_id(query.message.from_user)
await query.message.chat.do(types.ChatActions.TYPING)
logger.info(f"{query.message.from_user.username} do edt query")
await query.message.reply(calendar(callback_data["action"], query.from_user.id), parse_mode=ParseMode.MARKDOWN,
reply_markup=edt_key())
@dp.message_handler(lambda msg: msg.text.lower() == "kfet")
async def kfet(message: types.Message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
logger.info(f"{message.from_user.username} do kfet")
with dbL:
user = session.query(User).filter_by(id=message.from_user.id).first()
if not 9 < get_now().hour < 14 or not get_now().isoweekday() < 6:
msg = lang(user, "kfet_close")
else:
msg = lang(user, "kfet_list")
cmds = requests.get(KFET_URL).json()
if cmds:
for c in cmds:
msg += markdown.code(c) + " " if cmds[c] == "ok" else ""
await message.reply(msg, parse_mode=ParseMode.MARKDOWN, reply_markup=key)
@dp.message_handler(lambda msg: msg.text.lower() == "setkfet")
async def kfet_set(message: types.Message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
logger.info(f"{message.from_user.username} do setkfet")
with dbL:
user = session.query(User).filter_by(id=message.from_user.id).first()
if not 9 < get_now().hour < 14 or not get_now().isoweekday() < 5:
msg = lang(user, "kfet_close")
else:
user.await_cmd = "setkfet"
msg = lang(user, "kfet_set_await")
session.commit()
await message.reply(msg, parse_mode=ParseMode.MARKDOWN, reply_markup=key)
@dp.message_handler(lambda msg: msg.text.lower() == "setedt")
async def edt_await(message: types.Message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
logger.info(f"{message.from_user.username} do setedt")
with dbL:
user = session.query(User).filter_by(id=message.from_user.id).first()
user.await_cmd = "setedt"
session.commit()
await message.reply(lang(user, "setedt_wait"), parse_mode=ParseMode.MARKDOWN, reply_markup=key)
@dp.message_handler(lambda msg: msg.text.lower() == "settomuss")
async def edt_await(message: types.Message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
logger.info(f"{message.from_user.username} do settomuss")
with dbL:
user = session.query(User).filter_by(id=message.from_user.id).first()
user.await_cmd = "settomuss"
session.commit()
await message.reply(lang(user, "settomuss_wait"), parse_mode=ParseMode.MARKDOWN, reply_markup=key)
@dp.message_handler(commands="getedt")
async def edt_geturl(message: types.Message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
logger.info(f"{message.from_user.username} do getedt command")
with dbL:
user = session.query(User).filter_by(id=message.from_user.id).first()
if user.resources:
await message.reply(user.resources, reply_markup=key)
else:
await message.reply(lang(user, "getedt_err"), reply_markup=key)
@dp.message_handler(lambda msg: msg.text.lower() == "notif")
async def notif_cmd(message: types.Message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
logger.info(f"{message.from_user.username} do notif")
keys = InlineKeyboardMarkup()
for i, n in enumerate(["Toggle", "Time", "Cooldown"]):
keys.add(InlineKeyboardButton(n, callback_data=posts_cb.new(id=i, action=n.lower())))
with dbL:
user = session.query(User).filter_by(id=message.from_user.id).first()
msg = lang(user, "notif_info").format(user.nt, user.nt_time, user.nt_cooldown)
await message.reply(msg, parse_mode=ParseMode.MARKDOWN, reply_markup=keys)
@dp.callback_query_handler(posts_cb.filter(action=["toggle", "time", "cooldown"]))
async def notif_query(query: types.CallbackQuery, callback_data: dict):
check_id(query.message.from_user)
await query.message.chat.do(types.ChatActions.TYPING)
logger.info(f"{query.message.from_user.username} do notif query")
with dbL:
user = session.query(User).filter_by(id=query.from_user.id).first()
if callback_data["action"] == "toggle":
if user.nt:
res = False
else:
res = True
user.nt = res
msg = lang(user, "notif_set").format(res)
elif callback_data["action"] in ["time", "cooldown"]:
user.await_cmd = callback_data["action"]
msg = lang(user, "notif_await")
session.commit()
await query.message.reply(msg, parse_mode=ParseMode.MARKDOWN)
@dp.message_handler(lambda msg: have_await_cmd(msg), content_types=[ContentType.TEXT, ContentType.PHOTO])
async def await_cmd(message: types.message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
msg = None
with dbL:
user = session.query(User).filter_by(id=message.from_user.id).first()
logger.info(f"{message.from_user.username} do awaited commande: {user.await_cmd}")
if user.await_cmd == "setedt":
url = str()
if message.photo:
file_path = await bot.get_file(message.photo[0].file_id)
file_url = f"https://api.telegram.org/file/bot{API_TOKEN}/{file_path['file_path']}"
qr = decode(Image.open(requests.get(file_url, stream=True).raw))
if qr:
url = str(qr[0].data)
elif message.text:
msg_url = re_url.findall(message.text)
if msg_url:
url = msg_url[0]
if url:
resources = url[url.find("resources") + 10:][:4]
elif message.text:
resources = message.text
try:
Calendar("", int(resources))
except (ParseError, ConnectionError, InvalidSchema, MissingSchema, ValueError, UnboundLocalError):
msg = lang(user, "setedt_err_res")
else:
user.resources = int(resources)
msg = lang(user, "setedt")
elif user.await_cmd == "setkfet":
try:
int(message.text)
except ValueError:
msg = lang(user, "err_num")
else:
user.kfet = int(message.text)
msg = lang(user, "kfet_set")
elif user.await_cmd == "settomuss":
if not len(parse(message.text).entries):
msg = lang(user, "settomuss_error")
else:
user.tomuss_rss = message.text
msg = lang(user, "settomuss")
elif user.await_cmd in ["time", "cooldown"]:
try:
value = int(message.text)
except ValueError:
msg = lang(user, "err_num")
else:
if user.await_cmd == "time":
user.nt_time = value
else:
user.nt_cooldown = value
msg = lang(user, "notif_time_cooldown").format(user.await_cmd[6:], value)
if user.await_cmd:
user.await_cmd = str()
session.commit()
if msg:
await message.reply(msg, parse_mode=ParseMode.MARKDOWN, reply_markup=key)
@dp.message_handler(commands="getid")
async def get_id(message: types.Message):
check_id(message.from_user)
await message.chat.do(types.ChatActions.TYPING)
logger.info(f"{message.from_user.username} do getid command")
await message.reply(message.from_user.id, reply_markup=key)
@dp.message_handler(commands="getlogs")
async def get_logs(message: types.Message):
check_id(message.from_user)
logger.info(f"{message.from_user.username} do getlog command")
if message.from_user.id == ADMIN_ID:
try:
int(message.text[9:])
except ValueError:
await message.chat.do(types.ChatActions.UPLOAD_DOCUMENT)
await message.reply_document(types.InputFile(f"logs/{log_date}.log"), caption=f"The {log_date} logs",
reply_markup=key)
else:
await message.chat.do(types.ChatActions.TYPING)
logs = (open(f"logs/{log_date}.log", "r").readlines())[-int(message.text[9:]):]
log = str()
for i in logs:
log += i
msg = markdown.text(
markdown.italic("logs:"),
markdown.code(log),
sep="\n"
)
try:
await message.reply(msg, parse_mode=ParseMode.MARKDOWN, reply_markup=key)
except MessageIsTooLong:
await message.reply(markdown.bold("Too much logs ! ❌"), reply_markup=key)
@dp.message_handler(commands="getdb")
async def get_db(message: types.Message):
check_id(message.from_user)
logger.info(f"{message.from_user.username} do getdb command")
if message.from_user.id == ADMIN_ID:
with dbL:
users = dict()
for u in session.query(User).all():
users[u] = u.__dict__
msg = markdown.text(
markdown.italic("db:"),
markdown.code(users),
sep="\n"
)
await message.reply(msg, parse_mode=ParseMode.MARKDOWN, reply_markup=key)
@dp.message_handler(commands="eval")
async def eval_cmd(message: types.Message):
check_id(message.from_user)
logger.info(f"{message.from_user.username} do eval command")
if message.from_user.id == ADMIN_ID:
msg = markdown.text(
markdown.italic("eval:"),
markdown.code(eval(message.text[6:])),
sep="\n"
)
await message.reply(msg, parse_mode=ParseMode.MARKDOWN, reply_markup=key)
@dp.errors_handler()
async def errors(*args, **partial_data):
msg = markdown.text(
markdown.bold("⚠️ An error occurred:"),
markdown.code(args),
markdown.code(partial_data),
sep="\n"
)
await bot.send_message(ADMIN_ID, msg, parse_mode=ParseMode.MARKDOWN)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(notif())
loop.create_task(executor.start_polling(dp, skip_updates=True))
loop.run_forever()

10
main.py Normal file
View file

@ -0,0 +1,10 @@
import asyncio
from aiogram.utils import executor
from TelegramEDT import notif, dp
loop = asyncio.get_event_loop()
loop.create_task(notif())
loop.create_task(executor.start_polling(dp, skip_updates=True))
loop.run_forever()

2
update.sh Normal file → Executable file
View file

@ -3,7 +3,7 @@ if [ -f edt.db ]; then
if [ ! -d alembic ]; then
alembic init alembic
sed -i '/sqlalchemy.url/s/= .*/= sqlite:\/\/\/edt.db/' alembic.ini
sed -i "/target_metadata = None/s/target_metadata.*/import os, sys\nsys.path.append(os.getcwd())\nfrom base import Base\ntarget_metadata = Base.metadata/" alembic/env.py
sed -i "/target_metadata = None/s/target_metadata.*/import os, sys\nsys.path.append(os.getcwd())\nfrom TelegramEDT import Base\ntarget_metadata = Base.metadata/" alembic/env.py
fi
alembic revision --autogenerate -m "Auto upgrade"