Commit b6571ad2 authored by Gabriel Dengler's avatar Gabriel Dengler
Browse files

Initial commit

parents
*.info
.env
env
__pycache__
# Übersicht
Fork von [campus_bot](https://gitlab.cs.fau.de/ux45upys/campus_bot), angepasst für Telegram.
#!/usr/bin/env python3
import time
import requests
from credentials import get_credentials, update_credentials
from contextlib import ExitStack
from bs4 import BeautifulSoup
from log import print_info, print_warn
from urllib.parse import urlparse
from colorama import init as colorama_init, Fore, Style
from lxml.html import fromstring
from urllib.parse import unquote
headers = {
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
+ '(KHTML, like Gecko) Chrome/80.0.3987.122 Safari/537.36'
}
url_login_form = 'https://www.campus.uni-erlangen.de/qisserver/rds?' \
'state=user&type=1'
url_exams_page = 'https://www.campus.uni-erlangen.de/qisserver/rds?' \
'state=template&template=pruefungen'
AUTH_STATE_URL = 'https://www.campus.uni-erlangen.de/Shibboleth.sso/Login'
#AUTH_STATE_URL = 'https://www.studon.fau.de/studon/saml.php?target=ilias_app_oauth2'
OAUTH_URL = 'https://www.sso.uni-erlangen.de/simplesaml/module.php/core/loginuserpass.php?'
#SAML_URL = 'https://www.studon.fau.de/simplesaml/module.php/saml/sp/saml2-acs.php/default-sp'
SAML_URL = 'https://www.campus.uni-erlangen.de/Shibboleth.sso/SAML2/POST'
#SAML_URL = 'https://www.campus.uni-erlangen.de/Shibboleth.sso/Login'
class Connection:
def __init__(self, username=None):
self.url_exam_results = None
self._print = False
self.username = username
self.password = None
self.form_data = {'submit': 'Anmelden'}
colorama_init(autoreset=True)
def __enter__(self):
self.s = requests.session()
with ExitStack() as stack:
stack.enter_context(self.s)
self._stack = stack.pop_all()
self.username, self.password = get_credentials(self.username)
if not self.__connect():
print_warn('Wrong username or password for user '+self.username)
#self.username, self.password = update_credentials(self.username)
return None
else:
print_info('connection successful')
return self
def get_username(self):
return self.username
def __exit__(self, type, value, traceback):
self._stack.__exit__(type, value, traceback)
def __get(self, url):
print_info(f"get {urlparse(url).netloc}")
r = self.s.get(url, headers=headers, timeout=5)
r.raise_for_status()
return r
def __post(self, url, data):
print_info(f"get {urlparse(url).netloc}")
r = self.s.post(url, data=data, headers=headers, timeout=5)
r.raise_for_status()
return r
def __connect(self):
self.form_data['username'] = self.username
self.form_data['password'] = self.password
use_sso = True
if use_sso:
success = self.do_sso()
if not success:
return False
r = self.__get(url_exams_page)
url_overview_page = find_overview_page_url(r.content)
if not url_overview_page:
return False
r = self.__get(url_overview_page)
self.url_exam_results = find_exam_results_url(r.content)
if self.url_exam_results:
return True
else:
while True:
print_info(f'logging in as {self.username}')
self.__post(url_login_form, self.form_data)
r = self.__get(url_exams_page)
url_overview_page = find_overview_page_url(r.content)
if not url_overview_page:
return False
r = self.__get(url_overview_page)
self.url_exam_results = find_exam_results_url(r.content)
if self.url_exam_results:
return True
time.sleep(1)
def do_sso(self):
print(Style.DIM + 'generate auth state session...', end=' ', flush=True)
self.s = requests.Session()
auth_state_rq = self.s.get(AUTH_STATE_URL)
dom = fromstring(auth_state_rq.text)
auth_state = dom.xpath('//input[@name=\'AuthState\']/@value')
if len(auth_state) == 0:
auth_state_rq = self.s.get(
unquote(dom.xpath('//a[@id=\'redirect\']/@href')[0]))
dom = fromstring(auth_state_rq.text)
auth_state = dom.xpath('//input[@name=\'AuthState\']/@value')[0]
print(Style.DIM, 'done')
data = {
'username': self.username,
'password': self.password,
'AuthState': auth_state
}
oauth_rq = self.s.post(OAUTH_URL, data=data)
if 'AuthState' in oauth_rq.text:
return False
dom = fromstring(oauth_rq.text)
saml_res = dom.xpath('//input[@name=\'SAMLResponse\']/@value')[0]
payload = {'SAMLResponse': saml_res}
saml_rq = self.s.post(SAML_URL, data=payload)
return True
def __get_grades_page(self):
while True:
try:
return self.__get(self.url_exam_results).content
except Exception:
print_warn('connection lost, reconnecting:')
try:
while not self.__connect():
pass
except Exception as e:
print_warn(e)
time.sleep(1)
def get_grades(self):
rows = None
while not rows:
try:
grades_html = self.__get_grades_page()
rows = BeautifulSoup(grades_html, 'html5lib') \
.find('table', attrs={'id': 'notenspiegel'}) \
.find('tbody').find_all('tr')
except Exception as e:
print_warn(e)
self.__connect()
def p(list): return [' '.join(s.get_text().split()) for s in list]
headers = [p(r) for r in [r.find_all('th')
for r in rows] if len(r) > 0]
grades = [p(r) for r in [r.find_all('td') for r in rows] if len(r) > 0]
header = next((h for h in headers if h[0].startswith('#')))
grades = [dict(zip(header, row)) for row in grades]
modules = []
for grade in grades:
if not grade.get('Semester'):
grade['exams'] = []
modules.append(grade)
else:
if len(modules) == 0:
modules.append({ 'exams': [] })
modules[-1]['exams'].append(grade)
return modules
def find_overview_page_url(page):
try:
return BeautifulSoup(page, 'html5lib') \
.find('a', attrs={'id': 'notenspiegelStudent'}) \
.get('href')
except Exception as e:
print_warn(e)
return None
def find_exam_results_url(page):
return BeautifulSoup(page, 'html5lib') \
.find('meta', attrs={'http-equiv': 'refresh'}) \
.get('content')[6:]
if __name__ == '__main__':
with Connection() as c:
grades = c.get_grades()
for grade in grades:
print(f'{grade.get("Prüfungstext")}:'.rjust(
60), f'{grade.get("Note")}')
#!/usr/bin/env python3
import keyring
from getpass import getpass
from log import print_info, print_warn, color, BOLD, WHITE, CLEAR
service_id = 'campus'
last_user = 'last_user'
def get_credentials(username=None):
if not username:
username = get_username()
password = get_password(username)
return (username, password)
def update_credentials(username=None):
if not username:
username = get_username()
password = update_password(username)
return (username, password)
def confirm_username(username):
s = input(f'{color(BOLD, WHITE)}log in as {username}?{CLEAR} [Y,n] ')
if s == 'y' or s == 'Y' or not s:
return True
return False
def get_username():
username = None
try:
with open(last_user, 'r') as f:
username = f.readline()
except FileNotFoundError:
pass
if username and confirm_username(username):
return username
username = input('username: ')
with open(last_user, 'w') as f:
f.write(username)
return username
def get_password(username):
password = None
try:
password = keyring.get_password(service_id, username)
except:
pass
if not password:
print_info(f"couldn't find password for {username} in keyring")
return update_password(username)
else:
print_info(f'found password for {username} in keyring')
return password
def update_password(username):
password = getpass(f'enter password for {username}: ')
print_info(f'updating password for {username} in keyring')
try:
keyring.set_password(service_id, username, password)
except Exception as e:
print_warn("failed to set password:")
print_warn(e)
return password
def delete_user_from_key_ring(username):
keyring.delete_password(service_id, username)
print_info(username, 'has been deleted from keyring')
from datetime import datetime
BLACK = 30
RED = 31
GREEN = 32
YELLOW = 33
BLUE = 34
MAGENTA = 35
CYAN = 36
WHITE = 37
BG_BLACK = 40
BG_RED = 41
BG_GREEN = 42
BG_YELLOW = 43
BG_BLUE = 44
BG_MAGENTA = 45
BG_CYAN = 46
BG_WHITE = 47
NORMAL = 0
BOLD = 1
ITALIC = 3
UNDERLINED = 4
BLINK = 5
REVERSE = 7
CONCEALED = 8
CLEAR = "\033[m"
def color(type, color):
return "\033[" + str(type) + ";" + str(color) + "m"
def print_time():
time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f'[{color(BOLD, GREEN)}{time}{CLEAR}] ', end='')
def print_info(s, end='\n'):
print_time()
print(s, end=end)
def print_warn(s, end='\n'):
print_time()
print(f'{color(BOLD, YELLOW)}{s}{CLEAR}', end=end)
def print_error(s, end='\n'):
print_time()
print(f'{color(BOLD, RED)}{s}{CLEAR}', end=end)
APScheduler==3.6.3
beautifulsoup4==4.9.3
bs4==0.0.1
certifi==2020.12.5
cffi==1.14.4
chardet==3.0.4
colorama==0.4.4
cryptography==3.3.1
html5lib==1.1
idna==2.10
jeepney==0.6.0
keyring==21.5.0
keyrings.alt==4.0.2
lxml==4.6.2
pycparser==2.20
python-dotenv==0.16.0
python-telegram-bot==13.3
pytz==2021.1
requests==2.25.0
SecretStorage==3.3.0
six==1.15.0
soupsieve==2.1
tornado==6.1
tzlocal==2.1
urllib3==1.26.2
webencodings==0.5.1
import campus
import pickle
import time
import threading
import sys
import os
from dotenv import load_dotenv
import logging
from credentials import get_credentials, update_credentials
from telegram.ext import Updater
from telegram.ext import CommandHandler
from telegram.ext import MessageHandler, Filters
# Structures
pw = None
token = None
# Ids of exams which are to ignore
to_ignore = {}
# Grades
current_grades = dict()
# Requests
request_lock = threading.Lock()
class Request:
def __init__(self, chat_id, username):
self.chat_id = chat_id
self.username = username
def __eq__(self, other):
return isinstance(other, Request) and other.username == self.username and other.chat_id == self.chat_id
def __hash__(self):
return (self.chat_id, self.username).__hash__()
request_queue = set()
def load_requests():
global request_queue
try:
with open('requests.info', 'rb') as f:
request_queue = pickle.load(f)
except FileNotFoundError:
pass
def save_requests():
global request_queue
with open('requests.info', 'wb') as f:
pickle.dump(request_queue, f)
def add_request(username, chat_id):
ret = False
if request_lock.acquire(False):
try:
request_queue.add(Request(chat_id, username))
save_requests()
ret = True
finally:
request_lock.release()
return ret
# Associations
username_to_chats = dict()
chat_to_usernames = dict()
def add_user(username):
if username not in username_to_chats.keys():
username_to_chats[username] = set()
current_grades[username] = []
save_associations()
def add_chat(chat_id):
if chat_id not in chat_to_usernames.keys():
chat_to_usernames[chat_id] = set()
def remove_user(username):
if username in username_to_chats.keys():
for chat_id in username_to_chats.pop(username):
chat_to_usernames[chat_id].remove(username)
save_associations()
def remove_chat(chat_id):
if chat_id in chat_to_usernames.keys():
for username in chat_to_usernames.pop(chat_id):
username_to_chats[username].remove(chat_id)
save_associations()
def associate_chat_with_username(username, chat_id):
if username not in username_to_chats.keys():
logging.warning("Attempted to associate chat to non-existing username")
return False
if chat_id not in chat_to_usernames.keys():
add_chat(chat_id)
username_to_chats[username].add(chat_id)
chat_to_usernames[chat_id].add(username)
save_associations()
return True
def load_associations():
global username_to_chats
try:
with open('chats.info', 'rb') as f:
username_to_chats = pickle.load(f)
except FileNotFoundError:
pass
for username in username_to_chats.keys():
for chat_id in username_to_chats[username]:
associate_chat_with_username(username, chat_id)
def save_associations():
with open('chats.info', 'wb') as f:
pickle.dump(username_to_chats, f)
# Telegram
updater = None
dispatcher = None
# Telegram Helper-Functions
def send_message(chat_id, message):
if updater.bot.send_message(chat_id=chat_id, text=message) is None:
remove_chat(chat_id)
logging.warning("Tried to send message to an invalid chat.")
def send_startup_message(chat_id):
send_message(chat_id, "I am here.")
def send_goodbye_message(chat_id):
send_message(chat_id, "I am not here anymore.")
def is_relevant_module(module):
try:
id = int(module['#'])
return id not in to_ignore and module['exams']
except:
return False
def read_module_properties(module):
try:
exam = module['Prüfungstext']
date = module['Prüfungsdatum']
grade = module['Note']
status = module['Status']
ects = module['ECTS']
if not grade or not ects:
ects = 0
grade_ects = 0
for child_exam in module['exams']:
try:
raw_ects = ''.join(filter(lambda x: x == '.' or x.isdigit(), child_exam['ECTS'].replace(',', '.')))
raw_grade = ''.join(filter(lambda x: x == '.' or x.isdigit(), child_exam['Note'].replace(',', '.')))
ects += float(raw_ects)
grade_ects += float(raw_ects) * float(raw_grade)
except ValueError:
pass
grade = grade_ects/ects
grade = '{:.1f}'.format(grade).replace('.', ',')
ects = '{:.1f}'.format(ects).replace('.', ',')
if not date:
date = set()
for child_exam in module['exams']:
if child_exam['Prüfungsdatum']: date.add(child_exam['Prüfungsdatum'])
date = '/'.join(date)
if not status:
status = set()
for child_exam in module['exams']:
if child_exam['Status']: status.add(child_exam['Status'])
status = '/'.join(status)
return exam, date, grade, status, ects
except:
return None
def construct_update_message(change, username):
try:
if not is_relevant_module(change):
return None
exam, date, grade, status, ects = read_module_properties(change)
return '\n'.join([
'There is an update to the exams of user '+username+':',
'Exam: ' + exam,
'On: ' + date,
'Grade: ' + grade,
'Status: ' + status,
'ECTS: ' + ects,
'Raw change: ' + str(change)
])
except:
return None
# Telegram Commands
def tgc_on_register(update, context):
chat_id = update.effective_chat.id
args = " ".join(update.message.text.split()).split(' ')
if len(args) != 3:
context.bot.send_message(chat_id=chat_id, text="Usage: /register PW Username")
return
if args[1] != pw:
context.bot.send_message(chat_id=chat_id, text="This password is incorrect.")
return
if args[2] not in username_to_chats.keys():
context.bot.send_message(chat_id=chat_id, text="Given username is not known.")
return
# Add Request
if add_request(args[2], chat_id):
context.bot.send_message(chat_id=chat_id, text="Added request.")
else:
context.bot.send_message(chat_id=chat_id, text="Did not add request. Try again later.")
def tgc_on_grades(update, context):