catask/app.py

738 lines
24 KiB
Python

from flask import Flask, Blueprint, jsonify, request, abort, render_template, flash, session, redirect, url_for
from flask_compress import Compress
from dotenv import load_dotenv
from mysql.connector import errorcode
from functools import wraps
from werkzeug.utils import secure_filename
from pathlib import Path
import requests
import secrets
import shutil
import zipfile
import tarfile
import mysql.connector
import urllib
import functions as func
import os
import json
import constants as const
# used for admin routes
logged_in = False
load_dotenv()
app = Flask(const.appName)
app.secret_key = os.environ.get("APP_SECRET")
cfg = func.loadJSON(const.configFile)
app.config.from_mapping(cfg)
app.config.update(cfg)
# compress to improve page load speed
Compress(app)
# -- blueprints --
api_bp = Blueprint('api', const.appName)
admin_bp = Blueprint('admin', const.appName)
# -- cli commands --
@app.cli.command("init-db")
def initDatabase():
dbName = os.environ.get("DB_NAME")
print(f"Attempting to connect to database {dbName}...")
try:
conn = func.connectToDb()
cursor = conn.cursor()
print("Connected successfully")
conn.database = dbName
except mysql.connector.Error as error:
if error.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Bad credentials")
elif error.errno == errorcode.ER_BAD_DB_ERROR:
dbPort = os.environ.get("DB_PORT")
if not dbPort:
dbPort = 3306
conn = mysql.connector.connect(
user=os.environ.get("DB_USER"),
password=os.environ.get("DB_PASS"),
host=os.environ.get("DB_HOST"),
port=dbPort,
database='mysql'
)
cursor = conn.cursor()
func.createDatabase(cursor, dbName)
conn.database = dbName
else:
print("Error:", error)
return
with open('schema.sql', 'r') as schema_file:
schema = schema_file.read()
try:
cursor.execute(schema, multi=True)
print(f"Database {dbName} was successfully initialized!")
except mysql.connector.Error as error:
print(f"Failed to initialize database {dbName}: {error}")
finally:
cursor.close()
conn.close()
# -- decorators --
def loginRequired(f):
@wraps(f)
def login_required(*args, **kwargs):
if not logged_in:
return abort(404)
return f(*args, **kwargs)
return login_required
# -- before request --
@app.before_request
def before_request():
# app.logger.debug("[CatAsk] app.before_request triggered")
global logged_in
logged_in = session.get('logged_in')
# -- context processors --
@app.context_processor
def inject_stuff():
app.logger.debug("[CatAsk] app.context_processor inject_stuff() triggered")
cfg = func.loadJSON(const.configFile)
# for 1.6.0 or later
questionCount = int(getQuestionCount())
return dict(metadata=func.generateMetadata(), len=len, str=str, const=const, cfg=cfg, logged_in=logged_in, version_id=const.version_id, version=const.version, appName=const.appName, questionCount=questionCount)
# -- template filters --
@app.template_filter('render_markdown')
def render_markdown(text):
# app.logger.debug("[CatAsk] app.template_filter render_markdown(text) triggered")
return func.renderMarkdown(text)
# -- client (frontend) routes --
@app.route('/', methods=['GET'])
def index():
conn = func.connectToDb()
cursor = conn.cursor(dictionary=True)
app.logger.debug("[CatAsk/Home] SELECT'ing pinned questions")
cursor.execute("SELECT * FROM questions WHERE answered=%s AND pinned=%s ORDER BY creation_date DESC", (True, True))
pinned_questions = cursor.fetchall()
app.logger.debug("[CatAsk/Home] SELECT'ing non-pinned questions")
cursor.execute("SELECT * FROM questions WHERE answered=%s AND pinned=%s ORDER BY creation_date DESC", (True, False))
non_pinned_questions = cursor.fetchall()
questions = pinned_questions + non_pinned_questions
app.logger.debug("[CatAsk/Home] SELECT'ing answers")
cursor.execute("SELECT * FROM answers ORDER BY creation_date DESC")
answers = cursor.fetchall()
metadata = func.generateMetadata()
combined = []
for question in questions:
question_answers = [answer for answer in answers if answer['question_id'] == question['id']]
combined.append({
'question': question,
'answers': question_answers
})
cursor.close()
conn.close()
return render_template('index.html', combined=combined, urllib=urllib, trimContent=func.trimContent, metadata=metadata, getRandomWord=func.getRandomWord, formatRelativeTime=func.formatRelativeTime)
@app.route('/inbox/', methods=['GET'])
@loginRequired
def inbox():
conn = func.connectToDb()
cursor = conn.cursor(dictionary=True)
app.logger.debug("[CatAsk/Inbox] SELECT'ing unanswered questions")
cursor.execute("SELECT * FROM questions WHERE answered=%s ORDER BY creation_date DESC", (False,))
questions = cursor.fetchall()
for question in questions:
cursor.execute("UPDATE questions SET unread=%s WHERE id=%s", (False, question['id']))
cursor.close()
conn.close()
return render_template('inbox.html', questions=questions, formatRelativeTime=func.formatRelativeTime)
@app.route('/q/<int:question_id>/', methods=['GET'])
def viewQuestion(question_id):
question = func.getQuestion(question_id)
answer = func.getAnswer(question_id)
metadata = func.generateMetadata(question, answer)
return render_template('view_question.html', question=question, urllib=urllib, answer=answer, metadata=metadata, formatRelativeTime=func.formatRelativeTime, trimContent=func.trimContent)
# TODO: implement this and private questions should be here too
"""
@app.route('/questions/', methods=['GET'])
def seeAskedQuestions():
conn = func.connectToDb()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM questions WHERE asker_id=%s ORDER BY creation_date DESC", (asker_id,))
questions = cursor.fetchall()
cursor.close()
conn.close()
return render_template('asked_questions.html', questions=questions, formatRelativeTime=func.formatRelativeTime)
"""
# -- admin client routes --
@admin_bp.route('/login/', methods=['GET', 'POST'])
def login():
global logged_in
if request.method == 'POST':
admin_password = request.form.get('admin_password')
if admin_password == os.environ.get('ADMIN_PASSWORD'):
session['logged_in'] = True
session.permanent = request.form.get('remember_me', False)
return redirect(url_for('index'))
else:
flash("Wrong password", 'danger')
return redirect(url_for('admin.login'))
else:
if logged_in:
return redirect('index')
else:
return render_template('admin/login.html')
@admin_bp.route('/logout/', methods=['POST'])
@loginRequired
def logout():
session['logged_in'] = False
return redirect(url_for('index'))
@admin_bp.route('/', methods=['GET', 'POST'])
@loginRequired
def index():
return redirect(url_for('admin.information'))
@admin_bp.route('/information/', methods=['GET', 'POST'])
@loginRequired
def information():
return render_template('admin/categories/instance.html')
@admin_bp.route('/accessibility/', methods=['GET', 'POST'])
@loginRequired
def accessibility():
return render_template('admin/categories/accessibility.html')
@admin_bp.route('/general/', methods=['GET', 'POST'])
@loginRequired
def general():
return render_template('admin/categories/general.html')
@admin_bp.route('/customize/', methods=['GET', 'POST'])
@loginRequired
def customize():
return render_template('admin/categories/customize.html')
@admin_bp.route('/antispam/', methods=['GET', 'POST'])
@loginRequired
def antispam():
return render_template('admin/categories/antispam.html')
@admin_bp.route('/emojis/', methods=['GET', 'POST'])
@loginRequired
def emojis():
if not os.path.exists(const.emojiPath):
os.makedirs(const.emojiPath)
packs = func.listEmojiPacks()
if packs:
for pack in packs:
json_pack = bool(pack.get('website') and pack.get('exportedAt'))
if json_pack:
break
else:
json_pack = False
emojis = func.listEmojis()
return render_template('admin/categories/emojis.html', json_pack=json_pack, packs=packs, emojis=emojis, formatRelativeTime=func.formatRelativeTime2)
@admin_bp.route('/blacklist/', methods=['GET', 'POST'])
@loginRequired
def blacklist():
if request.method == 'POST':
action = request.form.get('action')
blacklist = request.form.get('blacklist')
with open(const.blacklistFile, 'w') as file:
file.write(blacklist)
return {'message': 'Blacklist updated!'}, 200
else:
blacklist = func.readPlainFile(const.blacklistFile)
if blacklist == []:
blacklist = ''
return render_template('admin/categories/blacklist.html', blacklist=blacklist)
# TODO: implement deleting exports
@admin_bp.route('/import-export/', methods=['GET'])
@loginRequired
def importExport():
if os.path.exists(const.exportsFile):
exports = func.loadJSON(const.exportsFile)
else:
exports = None
return render_template('admin/categories/import.html', exports=exports)
# TODO: implement first-launch setup route
"""
@admin_bp.route('/post-install/', methods=['GET', 'POST'])
@loginRequired
def postInstall():
if config.postInstallCompleted:
return abort(404)
else:
pass
"""
# -- server routes --
@api_bp.errorhandler(404)
def notFound():
return jsonify({'error': 'Not found'}), 404
@api_bp.errorhandler(400)
def badRequest(e):
return jsonify({'error': str(e)}), 400
@api_bp.errorhandler(500)
def internalServerError(e):
return jsonify({'error': str(e)}), 500
# why should i have a manifest.json file when i can just make it a route
@api_bp.route("/manifest.json", methods=['GET'])
def pwaManifest():
# not sure about theme_color but whateva
return jsonify({
"short_name": const.appName,
"name": cfg['instance']['title'],
"icons": [
{
"src": url_for("static", filename="icons/favicon/android-chrome-192x192.png"),
"sizes": "192x192",
"type": "image/png"
},
{
"src": url_for("static", filename="icons/favicon/android-chrome-512x512.png"),
"sizes": "512x512",
"type": "image/png"
}
],
"start_url": "/",
"display": "standalone",
"theme_color": cfg['style']['accentLight'],
"background_color": ""
})
# -- question routes --
@api_bp.route('/add_question/', methods=['POST'])
def addQuestion():
from_who = request.form.get('from_who', cfg['anonName'])
question = request.form.get('question', '')
cw = request.form.get('cw', '')
if not question:
abort(400, "Question field must not be empty")
if len(question) > int(cfg['charLimit']) or len(from_who) > int(cfg['charLimit']):
abort(400, "Question exceeds the character limit")
return func.addQuestion(from_who, question, cw)
@api_bp.route('/delete_question/', methods=['DELETE'])
@loginRequired
def deleteQuestion():
question_id = request.args.get('question_id', '')
if not question_id:
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
conn = func.connectToDb()
cursor = conn.cursor()
app.logger.debug("[CatAsk/API/delete_question] DELETE'ing a question from database")
cursor.execute("DELETE FROM questions WHERE id=%s", (question_id,))
cursor.close()
conn.close()
return {'message': 'Successfully deleted question.'}, 200
@api_bp.route('/return_to_inbox/', methods=['POST'])
@loginRequired
def returnToInbox():
question_id = request.args.get('question_id', '')
if not question_id:
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
conn = func.connectToDb()
cursor = conn.cursor()
app.logger.debug("[CatAsk/API/return_to_inbox] SELECT'ing a question from database")
cursor.execute("SELECT from_who, content, creation_date, cw FROM questions WHERE id=%s", (question_id,))
row = cursor.fetchone()
question = {
'from_who': row[0],
'content': row[1],
'creation_date': row[2],
'cw': row[3]
}
app.logger.debug("[CatAsk/API/return_to_inbox] DELETE'ing a question from database")
cursor.execute("DELETE FROM questions WHERE id=%s", (question_id,))
app.logger.debug("[CatAsk/API/return_to_inbox] INSERT'ing a question into database")
cursor.execute("INSERT INTO questions (from_who, content, creation_date, answered, cw) VALUES (%s, %s, %s, %s, %s)", (question["from_who"], question["content"], question["creation_date"], False, question['cw']))
cursor.close()
conn.close()
return {'message': 'Successfully returned question to inbox.'}, 200
@api_bp.route('/pin_question/', methods=['POST'])
@loginRequired
def pinQuestion():
question_id = request.args.get('question_id', '')
if not question_id:
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
conn = func.connectToDb()
cursor = conn.cursor()
app.logger.debug("[CatAsk/API/pin_question] UPDATE'ing a question to pin it")
cursor.execute("UPDATE questions SET pinned=%s WHERE id=%s", (True, question_id))
cursor.close()
conn.close()
return {'message': 'Successfully pinned question.'}, 200
@api_bp.route('/unpin_question/', methods=['POST'])
@loginRequired
def unpinQuestion():
question_id = request.args.get('question_id', '')
if not question_id:
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
conn = func.connectToDb()
cursor = conn.cursor()
app.logger.debug("[CatAsk/API/unpin_question] UPDATE'ing a question to unpin it")
cursor.execute("UPDATE questions SET pinned=%s WHERE id=%s", (False, question_id))
cursor.close()
conn.close()
return {'message': 'Successfully unpinned question.'}, 200
@api_bp.route('/add_answer/', methods=['POST'])
@loginRequired
def addAnswer():
question_id = request.args.get('question_id', '')
answer = request.form.get('answer')
cw = request.form.get('cw', '')
if not question_id:
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
if not answer:
abort(400, "Missing 'answer' attribute or 'answer' is empty")
return func.addAnswer(question_id, answer, cw)
# -- uploaders --
@api_bp.route('/upload_favicon/', methods=['POST'])
@loginRequired
def uploadFavicon():
favicon = request.files.get('favicon')
if favicon and func.allowedFile(favicon.filename):
filename = secure_filename(favicon.filename)
favicon_path = const.faviconDir / filename
favicon.save(favicon_path)
func.generateFavicon(filename)
return {'message': 'Successfully updated favicon!'}, 201
elif favicon and not func.allowedFile(favicon.filename):
return {'error': 'File type is not supported'}, 400
elif not favicon:
return {'error': "favicon is not specified"}, 400
@api_bp.route('/upload_emoji/', methods=['POST'])
@loginRequired
def uploadEmoji():
if 'emoji' not in request.files:
return jsonify({'error': 'No file part in the request'}), 400
emoji = request.files['emoji']
if emoji.filename == '':
return jsonify({'error': 'No file selected for uploading'}), 400
if not func.allowedFile(emoji.filename):
return jsonify({'error': 'Invalid file type. Only png, jpg, jpeg, webp, bmp, jxl supported'}), 400
# Secure the filename and determine the archive name
filename = secure_filename(emoji.filename)
emoji_path = const.emojiPath / filename
emoji.save(emoji_path)
return jsonify({'message': f'Emoji {filename} successfully uploaded'}), 201
@api_bp.route('/upload_emoji_pack/', methods=['POST'])
@loginRequired
def uploadEmojiPack():
if 'emoji_archive' not in request.files:
return jsonify({'error': 'No file part in the request'}), 400
emoji_archive = request.files['emoji_archive']
if emoji_archive.filename == '':
return jsonify({'error': 'No file selected for uploading'}), 400
if not func.allowedArchive(emoji_archive.filename):
return jsonify({'error': 'Invalid file type. Only .zip, .tar, .tar.gz, .tar.bz2 allowed'}), 400
filename = secure_filename(emoji_archive.filename)
archive_name = func.stripArchExtension(filename)
extract_path = const.emojiPath / archive_name
extract_path.mkdir(parents=True, exist_ok=True)
archive_path = extract_path / filename
emoji_archive.save(archive_path)
try:
if zipfile.is_zipfile(archive_path):
with zipfile.ZipFile(archive_path, 'r') as zip_ref:
zip_ref.extractall(extract_path)
elif tarfile.is_tarfile(archive_path):
with tarfile.open(archive_path, 'r:*') as tar_ref:
tar_ref.extractall(extract_path)
else:
return jsonify({'error': 'Unsupported archive format'}), 400
# parse meta.json if it exists
meta_json_path = extract_path / 'meta.json'
if meta_json_path.exists():
processed_emojis = func.processEmojis(meta_json_path)
return jsonify({'message': f'Successfully uploaded and processed {len(processed_emojis)} emojis from archive "{filename}".'}), 201
else:
return jsonify({'message': f'Archive {filename} successfully uploaded and extracted.'}), 201
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
archive_path.unlink()
# -- getters --
# this isn't used anywhere yet, but maybe it will in the future
@api_bp.route('/get_emojis/', methods=['GET'])
@loginRequired
def getEmojis():
return func.listEmojis()
@api_bp.route('/hyper/get_emoji_packs/', methods=['GET'])
@loginRequired
def getEmojiPacks():
index = int(request.args.get('index', ''))
packs = func.listEmojiPacks()
pack = packs[index]
rel_path = pack['relative_path']
emojis = pack['emojis']
html = ''
for emoji in emojis:
html += f"""
<div class="mb-2">
<img src="/{ rel_path }/{ emoji['file_name'] }" width="28" height="28" class="emoji" title=":{ emoji['name'] }:">
<span>{ emoji['name'] }</span>
</div>
"""
return html
# -- delete routes --
@api_bp.route('/delete_emoji/', methods=['DELETE'])
@loginRequired
def deleteEmoji():
emoji_name = request.args.get('emoji_name')
emoji_base_path = const.emojiPath
emoji_file_path = emoji_base_path / emoji_name
emoji_ext = os.path.splitext(f"{emoji_base_path}/{emoji_file_path}")
if not emoji_file_path.exists() or not emoji_file_path.is_file():
return jsonify({'error': 'Emoji not found'}), 404
try:
emoji_file_path.unlink()
return jsonify({'message': f'Emoji "{emoji_name}" deleted successfully'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@api_bp.route('/delete_emoji_pack/', methods=['DELETE'])
@loginRequired
def deleteEmojiPack():
pack_name = request.args.get('pack_name')
emojis_path = const.emojiPath
emoji_base_path = Path.cwd() / 'static' / 'emojis' / pack_name.lower()
if not emoji_base_path.exists() or not emoji_base_path.is_dir():
return jsonify({'error': f'Emoji pack "{pack_name.capitalize()}" not found'}), 404
try:
for json_file in emojis_path.glob(f'{pack_name.lower()}.json'):
json_file.unlink()
shutil.rmtree(emoji_base_path)
return jsonify({'message': f'Emoji pack "{pack_name.capitalize()}" deleted successfully.'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
# -- misc --
@api_bp.route('/update_blacklist/', methods=['PUT'])
@loginRequired
def updateBlacklist():
action = request.form.get('action')
blacklist = request.form.get('blacklist')
with open(const.blacklistFile, 'w') as file:
file.write(blacklist)
return {'message': 'Blacklist updated!'}, 200
@api_bp.route('/get_question_count/', methods=['GET'])
def getQuestionCount():
conn = func.connectToDb()
cursor = conn.cursor()
app.logger.debug("[CatAsk/API/get_question_count] SELECT'ing question count from database")
cursor.execute("SELECT COUNT(id) FROM questions WHERE answered=%s AND unread=%s", (False, True))
question_count = cursor.fetchone()
cursor.close()
conn.close()
return str(question_count[0])
# -- import/export --
@api_bp.route('/import_data/', methods=['PUT'])
def importData():
os.makedirs(const.tempDir, exist_ok=True)
archive = request.files['import_archive']
filename = secure_filename(archive.filename)
file_path = const.tempDir / filename
archive.save(file_path)
return func.importData(file_path)
@api_bp.route('/import_rs_data/', methods=['PUT'])
def importRsData():
os.makedirs(const.tempDir, exist_ok=True)
archive = request.files['import_archive']
filename = secure_filename(archive.filename)
file_path = const.tempDir / filename
archive.save(file_path)
return func.retrospringImport(file_path)
@api_bp.route('/create_export/', methods=['POST'])
@loginRequired
def createExport():
return func.createExport()
@api_bp.route('/delete_export/', methods=['DELETE'])
@loginRequired
def deleteExport():
timestamp = request.args.get('timestamp', '')
return func.deleteExport(timestamp)
# unused
@api_bp.route('/view_question/', methods=['GET'])
@loginRequired
def viewQuestion():
question_id = request.args.get('question_id', '')
if not question_id:
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
conn = func.connectToDb()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id, from_who, creation_date, content, answered, answer_id FROM questions WHERE id=%s", (question_id,))
question = cursor.fetchone()
cursor.close()
conn.close()
return jsonify(question)
@api_bp.route('/view_answer/', methods=['GET'])
@loginRequired
def viewAnswer():
answer_id = request.args.get('answer_id', '')
if not answer_id:
abort(400, "Missing 'answer_id' attribute or 'answer_id' is empty")
conn = func.connectToDb()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id, question_id, creation_date, content FROM answers WHERE id=%s", (answer_id,))
answer = cursor.fetchone()
cursor.close()
conn.close()
return jsonify(answer)
@api_bp.route('/update_config/', methods=['POST'])
@loginRequired
def updateConfig():
cfg = func.loadJSON(const.configFile)
configuration = request.form
for key, value in configuration.items():
cleaned_key = key.removeprefix('_') if key.startswith('_') else key
nested_keys = cleaned_key.split('.')
current_dict = cfg
for nested_key in nested_keys[:-1]:
current_dict = current_dict.setdefault(nested_key, {})
# Convert the checkbox value 'True'/'False' strings to actual booleans
if value.lower() == 'true':
value = True
elif value.lower() == 'false':
value = False
current_dict[nested_keys[-1]] = value
func.saveJSON(cfg, const.configFile)
app.config.update(cfg)
return {'message': 'Settings saved!'}
app.register_blueprint(api_bp, url_prefix='/api/v1')
app.register_blueprint(admin_bp, url_prefix='/admin')
if __name__ == '__main__':
app.run(debug=True)