catask/app.py

715 lines
23 KiB
Python

from flask import Flask, Blueprint, jsonify, request, abort, render_template, flash, session, redirect, url_for
from flask_compress import Compress
from dotenv import load_dotenv
from mysql.connector import errorcode
from functools import wraps
from werkzeug.utils import secure_filename
from pathlib import Path
import secrets
import shutil
import zipfile
import tarfile
import mysql.connector
import urllib
import functions as func
import os
import json
import constants as const
# used for admin routes
logged_in = False
load_dotenv()
app = Flask(const.appName)
app.secret_key = os.environ.get("APP_SECRET")
cfg = func.loadJSON(const.configFile)
app.config.from_mapping(cfg)
app.config.update(cfg)
# compress to improve page load speed
Compress(app)
# -- blueprints --
api_bp = Blueprint('api', const.appName)
admin_bp = Blueprint('admin', const.appName)
# -- cli commands --
@app.cli.command("init-db")
def initDatabase():
dbName = os.environ.get("DB_NAME")
print(f"Attempting to connect to database {dbName}...")
try:
conn = func.connectToDb()
cursor = conn.cursor()
print("Connected successfully")
conn.database = dbName
except mysql.connector.Error as error:
if error.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Bad credentials")
elif error.errno == errorcode.ER_BAD_DB_ERROR:
dbPort = os.environ.get("DB_PORT")
if not dbPort:
dbPort = 3306
conn = mysql.connector.connect(
user=os.environ.get("DB_USER"),
password=os.environ.get("DB_PASS"),
host=os.environ.get("DB_HOST"),
port=dbPort,
database='mysql'
)
cursor = conn.cursor()
func.createDatabase(cursor, dbName)
conn.database = dbName
else:
print("Error:", error)
return
with open('schema.sql', 'r') as schema_file:
schema = schema_file.read()
try:
cursor.execute(schema, multi=True)
print(f"Database {dbName} was successfully initialized!")
except mysql.connector.Error as error:
print(f"Failed to initialize database {dbName}: {error}")
finally:
cursor.close()
conn.close()
# -- decorators --
def loginRequired(f):
@wraps(f)
def login_required(*args, **kwargs):
if not logged_in:
return abort(404)
return f(*args, **kwargs)
return login_required
# -- before request --
@app.before_request
def before_request():
# app.logger.debug("[CatAsk] app.before_request triggered")
global logged_in
logged_in = session.get('logged_in')
# -- context processors --
@app.context_processor
def inject_stuff():
app.logger.debug("[CatAsk] app.context_processor inject_stuff() triggered")
cfg = func.loadJSON(const.configFile)
# for 1.6.0 or later
# questionCount = getQuestionCount()
return dict(metadata=func.generateMetadata(), len=len, str=str, const=const, cfg=cfg, logged_in=logged_in, version_id=const.version_id, version=const.version, appName=const.appName)
# -- template filters --
@app.template_filter('render_markdown')
def render_markdown(text):
# app.logger.debug("[CatAsk] app.template_filter render_markdown(text) triggered")
return func.renderMarkdown(text)
# -- client (frontend) routes --
@app.route('/', methods=['GET'])
def index():
conn = func.connectToDb()
cursor = conn.cursor(dictionary=True)
app.logger.debug("[CatAsk/Home] SELECT'ing pinned questions")
cursor.execute("SELECT * FROM questions WHERE answered=%s AND pinned=%s ORDER BY creation_date DESC", (True, True))
pinned_questions = cursor.fetchall()
app.logger.debug("[CatAsk/Home] SELECT'ing non-pinned questions")
cursor.execute("SELECT * FROM questions WHERE answered=%s AND pinned=%s ORDER BY creation_date DESC", (True, False))
non_pinned_questions = cursor.fetchall()
questions = pinned_questions + non_pinned_questions
app.logger.debug("[CatAsk/Home] SELECT'ing answers")
cursor.execute("SELECT * FROM answers ORDER BY creation_date DESC")
answers = cursor.fetchall()
metadata = func.generateMetadata()
combined = []
for question in questions:
question_answers = [answer for answer in answers if answer['question_id'] == question['id']]
combined.append({
'question': question,
'answers': question_answers
})
cursor.close()
conn.close()
return render_template('index.html', combined=combined, urllib=urllib, trimContent=func.trimContent, metadata=metadata, getRandomWord=func.getRandomWord, formatRelativeTime=func.formatRelativeTime)
@app.route('/inbox/', methods=['GET'])
@loginRequired
def inbox():
conn = func.connectToDb()
cursor = conn.cursor(dictionary=True)
app.logger.debug("[CatAsk/Inbox] SELECT'ing unanswered questions")
cursor.execute("SELECT * FROM questions WHERE answered=%s ORDER BY creation_date DESC", (False,))
questions = cursor.fetchall()
cursor.close()
conn.close()
return render_template('inbox.html', questions=questions, formatRelativeTime=func.formatRelativeTime)
@app.route('/q/<int:question_id>/', methods=['GET'])
def viewQuestion(question_id):
question = func.getQuestion(question_id)
answer = func.getAnswer(question_id)
metadata = func.generateMetadata(question, answer)
return render_template('view_question.html', question=question, urllib=urllib, answer=answer, metadata=metadata, formatRelativeTime=func.formatRelativeTime, trimContent=func.trimContent)
# TODO: implement this and private questions should be here too
"""
@app.route('/questions/', methods=['GET'])
def seeAskedQuestions():
conn = func.connectToDb()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM questions WHERE asker_id=%s ORDER BY creation_date DESC", (asker_id,))
questions = cursor.fetchall()
cursor.close()
conn.close()
return render_template('asked_questions.html', questions=questions, formatRelativeTime=func.formatRelativeTime)
"""
# -- admin client routes --
@admin_bp.route('/login/', methods=['GET', 'POST'])
def login():
global logged_in
if request.method == 'POST':
admin_password = request.form.get('admin_password')
if admin_password == os.environ.get('ADMIN_PASSWORD'):
session['logged_in'] = True
session.permanent = request.form.get('remember_me', False)
return redirect(url_for('index'))
else:
flash("Wrong password", 'danger')
return redirect(url_for('admin.login'))
else:
if logged_in:
return redirect('index')
else:
return render_template('admin/login.html')
@admin_bp.route('/logout/', methods=['POST'])
@loginRequired
def logout():
session['logged_in'] = False
return redirect(url_for('index'))
@admin_bp.route('/', methods=['GET', 'POST'])
@loginRequired
def index():
return redirect(url_for('admin.information'))
@admin_bp.route('/information/', methods=['GET', 'POST'])
@loginRequired
def information():
return render_template('admin/categories/instance.html')
@admin_bp.route('/general/', methods=['GET', 'POST'])
@loginRequired
def general():
return render_template('admin/categories/general.html')
@admin_bp.route('/customize/', methods=['GET', 'POST'])
@loginRequired
def customize():
return render_template('admin/categories/customize.html')
@admin_bp.route('/emojis/', methods=['GET', 'POST'])
@loginRequired
def emojis():
if not os.path.exists(const.emojiPath):
os.makedirs(const.emojiPath)
packs = func.listEmojiPacks()
if packs:
for pack in packs:
# print(pack,'\n')
json_pack = bool(pack.get('website') and pack.get('exportedAt'))
print(json_pack)
if json_pack:
break
else:
json_pack = False
emojis = func.listEmojis()
return render_template('admin/categories/emojis.html', json_pack=json_pack, packs=packs, emojis=emojis, formatRelativeTime=func.formatRelativeTime2)
@admin_bp.route('/blacklist/', methods=['GET', 'POST'])
@loginRequired
def blacklist():
if request.method == 'POST':
action = request.form.get('action')
blacklist = request.form.get('blacklist')
with open(const.blacklistFile, 'w') as file:
file.write(blacklist)
return {'message': 'Blacklist updated!'}, 200
else:
blacklist = func.readPlainFile(const.blacklistFile)
if blacklist == []:
blacklist = ''
return render_template('admin/categories/blacklist.html', blacklist=blacklist)
# TODO: implement first-launch setup route
"""
@admin_bp.route('/post-install/', methods=['GET', 'POST'])
@loginRequired
def postInstall():
if config.postInstallCompleted:
return abort(404)
else:
pass
"""
# -- server routes --
@api_bp.errorhandler(404)
def notFound():
return jsonify({'error': 'Not found'}), 404
@api_bp.errorhandler(400)
def badRequest(e):
return jsonify({'error': str(e)}), 400
@api_bp.errorhandler(500)
def internalServerError(e):
return jsonify({'error': str(e)}), 500
# why should i have a manifest.json file when i can just make it a route
@api_bp.route("/manifest.json", methods=['GET'])
def pwaManifest():
# not sure about theme_color but whateva
return jsonify({
"short_name": const.appName,
"name": cfg['instance']['title'],
"icons": [
{
"src": url_for("static", filename="icons/favicon/android-chrome-192x192.png"),
"sizes": "192x192",
"type": "image/png"
},
{
"src": url_for("static", filename="icons/favicon/android-chrome-512x512.png"),
"sizes": "512x512",
"type": "image/png"
}
],
"start_url": "/",
"display": "standalone",
"theme_color": cfg['style']['accentLight'],
"background_color": ""
})
# -- question routes --
@api_bp.route('/add_question/', methods=['POST'])
def addQuestion():
from_who = request.form.get('from_who', cfg['anonName'])
question = request.form.get('question', '')
cw = request.form.get('cw', '')
if not question:
abort(400, "Question field must not be empty")
if len(question) > int(cfg['charLimit']) or len(from_who) > int(cfg['charLimit']):
abort(400, "Question exceeds the character limit")
return func.addQuestion(from_who, question, cw)
@api_bp.route('/delete_question/', methods=['DELETE'])
@loginRequired
def deleteQuestion():
question_id = request.args.get('question_id', '')
if not question_id:
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
conn = func.connectToDb()
cursor = conn.cursor()
app.logger.debug("[CatAsk/API/delete_question] DELETE'ing a question from database")
cursor.execute("DELETE FROM questions WHERE id=%s", (question_id,))
cursor.close()
conn.close()
return {'message': 'Successfully deleted question.'}, 200
@api_bp.route('/return_to_inbox/', methods=['POST'])
@loginRequired
def returnToInbox():
question_id = request.args.get('question_id', '')
if not question_id:
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
conn = func.connectToDb()
cursor = conn.cursor()
app.logger.debug("[CatAsk/API/return_to_inbox] SELECT'ing a question from database")
cursor.execute("SELECT from_who, content, creation_date, cw FROM questions WHERE id=%s", (question_id,))
row = cursor.fetchone()
question = {
'from_who': row[0],
'content': row[1],
'creation_date': row[2],
'cw': row[3]
}
app.logger.debug("[CatAsk/API/return_to_inbox] DELETE'ing a question from database")
cursor.execute("DELETE FROM questions WHERE id=%s", (question_id,))
app.logger.debug("[CatAsk/API/return_to_inbox] INSERT'ing a question into database")
cursor.execute("INSERT INTO questions (from_who, content, creation_date, answered, cw) VALUES (%s, %s, %s, %s, %s)", (question["from_who"], question["content"], question["creation_date"], False, question['cw']))
cursor.close()
conn.close()
return {'message': 'Successfully returned question to inbox.'}, 200
@api_bp.route('/pin_question/', methods=['POST'])
@loginRequired
def pinQuestion():
question_id = request.args.get('question_id', '')
if not question_id:
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
conn = func.connectToDb()
cursor = conn.cursor()
app.logger.debug("[CatAsk/API/pin_question] UPDATE'ing a question to pin it")
cursor.execute("UPDATE questions SET pinned=%s WHERE id=%s", (True, question_id))
cursor.close()
conn.close()
return {'message': 'Successfully pinned question.'}, 200
@api_bp.route('/unpin_question/', methods=['POST'])
@loginRequired
def unpinQuestion():
question_id = request.args.get('question_id', '')
if not question_id:
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
conn = func.connectToDb()
cursor = conn.cursor()
app.logger.debug("[CatAsk/API/unpin_question] UPDATE'ing a question to unpin it")
cursor.execute("UPDATE questions SET pinned=%s WHERE id=%s", (False, question_id))
cursor.close()
conn.close()
return {'message': 'Successfully unpinned question.'}, 200
@api_bp.route('/add_answer/', methods=['POST'])
@loginRequired
def addAnswer():
question_id = request.args.get('question_id', '')
answer = request.form.get('answer')
cw = request.form.get('cw', '')
if not question_id:
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
if not answer:
abort(400, "Missing 'answer' attribute or 'answer' is empty")
conn = func.connectToDb()
try:
cursor = conn.cursor()
app.logger.debug("[CatAsk/API/add_answer] INSERT'ing an answer into database")
cursor.execute("INSERT INTO answers (question_id, content, cw) VALUES (%s, %s, %s)", (question_id, answer, cw))
answer_id = cursor.lastrowid
app.logger.debug("[CatAsk/API/add_answer] UPDATE'ing question to set answered and answer_id")
cursor.execute("UPDATE questions SET answered=%s, answer_id=%s WHERE id=%s", (True, answer_id, question_id))
conn.commit()
except Exception as e:
conn.rollback()
return jsonify({'error': str(e)}), 500
finally:
cursor.close()
conn.close()
return jsonify({'message': 'Answer added successfully!'}), 201
# -- uploaders --
@api_bp.route('/upload_favicon/', methods=['POST'])
@loginRequired
def uploadFavicon():
favicon = request.files.get('favicon')
if favicon and func.allowedFile(favicon.filename):
filename = secure_filename(favicon.filename)
favicon_path = const.faviconDir / filename
favicon.save(favicon_path)
func.generateFavicon(filename)
return {'message': 'Successfully updated favicon!'}, 201
elif favicon and not func.allowedFile(favicon.filename):
return {'error': 'File type is not supported'}, 400
elif not favicon:
return {'error': "favicon is not specified"}, 400
@api_bp.route('/upload_emoji/', methods=['POST'])
@loginRequired
def uploadEmoji():
if 'emoji' not in request.files:
return jsonify({'error': 'No file part in the request'}), 400
emoji = request.files['emoji']
if emoji.filename == '':
return jsonify({'error': 'No file selected for uploading'}), 400
if not func.allowedFile(emoji.filename):
return jsonify({'error': 'Invalid file type. Only png, jpg, jpeg, webp, bmp, jxl supported'}), 400
# Secure the filename and determine the archive name
filename = secure_filename(emoji.filename)
emoji_path = const.emojiPath / filename
emoji.save(emoji_path)
return jsonify({'message': f'Emoji {filename} successfully uploaded'}), 201
@api_bp.route('/upload_emoji_pack/', methods=['POST'])
@loginRequired
def uploadEmojiPack():
if 'emoji_archive' not in request.files:
return jsonify({'error': 'No file part in the request'}), 400
emoji_archive = request.files['emoji_archive']
if emoji_archive.filename == '':
return jsonify({'error': 'No file selected for uploading'}), 400
if not func.allowedArchive(emoji_archive.filename):
return jsonify({'error': 'Invalid file type. Only .zip, .tar, .tar.gz, .tar.bz2 allowed'}), 400
filename = secure_filename(emoji_archive.filename)
archive_name = func.stripArchExtension(filename)
extract_path = const.emojiPath / archive_name
extract_path.mkdir(parents=True, exist_ok=True)
archive_path = extract_path / filename
emoji_archive.save(archive_path)
try:
if zipfile.is_zipfile(archive_path):
with zipfile.ZipFile(archive_path, 'r') as zip_ref:
zip_ref.extractall(extract_path)
elif tarfile.is_tarfile(archive_path):
with tarfile.open(archive_path, 'r:*') as tar_ref:
tar_ref.extractall(extract_path)
else:
return jsonify({'error': 'Unsupported archive format'}), 400
# parse meta.json if it exists
meta_json_path = extract_path / 'meta.json'
if meta_json_path.exists():
processed_emojis = func.processEmojis(meta_json_path)
# emoji_metadata = func.loadJSON(meta_json_path)
# emojis = emoji_metadata.get('emojis', [])
# processed_emojis = []
# for emoji in emojis:
# emoji_info = {
# 'name': emoji['emoji']['name'],
# 'file_name': emoji['fileName']
# }
# processed_emojis.append(emoji_info)
# app.logger.debug(f"[CatAsk/API/upload_emoji_pack] Processed emoji: {emoji_info['name']} (File: {emoji_info['file_name']})")
# pack_json = {
# 'name': emoji_metadata['emojis'][0]['emoji']['category'].capitalize(),
# 'exportedAt': emoji_metadata["exportedAt"],
# 'emojis': processed_emojis
# }
# pack_json_name = const.emojiPath / (emoji_metadata['emojis'][0]['emoji']['category'] + '.json')
# func.saveJSON(pack_json, pack_json_name)
return jsonify({'message': f'Successfully uploaded and processed {len(processed_emojis)} emojis from archive "{filename}".'}), 201
else:
return jsonify({'message': f'Archive {filename} successfully uploaded and extracted.'}), 201
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
archive_path.unlink()
# -- getters --
# this isn't used anywhere yet, but maybe it will in the future
@api_bp.route('/get_emojis/', methods=['GET'])
@loginRequired
def getEmojis():
return func.listEmojis()
@api_bp.route('/hyper/get_emoji_packs/', methods=['GET'])
@loginRequired
def getEmojiPacks():
index = int(request.args.get('index', ''))
packs = func.listEmojiPacks()
pack = packs[index]
rel_path = pack['relative_path']
emojis = pack['emojis']
html = ''
for emoji in emojis:
html += f"""
<div class="mb-2">
<img src="/{ rel_path }/{ emoji['file_name'] }" width="28" height="28" class="emoji" title=":{ emoji['name'] }:">
<span>{ emoji['name'] }</span>
</div>
"""
return html
# -- delete routes --
@api_bp.route('/delete_emoji/', methods=['DELETE'])
@loginRequired
def deleteEmoji():
emoji_name = request.args.get('emoji_name')
emoji_base_path = const.emojiPath
emoji_file_path = emoji_base_path / emoji_name
print(emoji_file_path)
emoji_ext = os.path.splitext(f"{emoji_base_path}/{emoji_file_path}")
print("emoji ext:", emoji_ext)
if not emoji_file_path.exists() or not emoji_file_path.is_file():
return jsonify({'error': 'Emoji not found'}), 404
try:
emoji_file_path.unlink()
return jsonify({'message': f'Emoji "{emoji_name}" deleted successfully'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@api_bp.route('/delete_emoji_pack/', methods=['DELETE'])
@loginRequired
def deleteEmojiPack():
pack_name = request.args.get('pack_name')
emojis_path = const.emojiPath
emoji_base_path = Path.cwd() / 'static' / 'emojis' / pack_name.lower()
if not emoji_base_path.exists() or not emoji_base_path.is_dir():
return jsonify({'error': f'Emoji pack "{pack_name.capitalize()}" not found'}), 404
try:
for json_file in emojis_path.glob(f'{pack_name.lower()}.json'):
json_file.unlink()
shutil.rmtree(emoji_base_path)
return jsonify({'message': f'Emoji pack "{pack_name.capitalize()}" deleted successfully.'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
# -- misc --
@api_bp.route('/get_question_count/', methods=['GET'])
def getQuestionCount():
conn = func.connectToDb()
cursor = conn.cursor()
app.logger.debug("[CatAsk/API/get_question_count] SELECT'ing question count from database")
cursor.execute("SELECT COUNT(id) FROM questions WHERE answered=%s", (False,))
question_count = cursor.fetchone()
cursor.close()
conn.close()
return str(question_count[0])
# unused
"""
@api_bp.route('/view_question/', methods=['GET'])
@loginRequired
def viewQuestion():
question_id = request.args.get('question_id', '')
if not question_id:
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
conn = func.connectToDb()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id, from_who, creation_date, content, answered, answer_id FROM questions WHERE id=%s", (question_id,))
question = cursor.fetchone()
cursor.close()
conn.close()
return jsonify(question)
@api_bp.route('/view_answer/', methods=['GET'])
@loginRequired
def viewAnswer():
answer_id = request.args.get('answer_id', '')
if not answer_id:
abort(400, "Missing 'answer_id' attribute or 'answer_id' is empty")
conn = func.connectToDb()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id, question_id, creation_date, content FROM answers WHERE id=%s", (answer_id,))
answer = cursor.fetchone()
cursor.close()
conn.close()
return jsonify(answer)
"""
@api_bp.route('/update_config/', methods=['POST'])
@loginRequired
def updateConfig():
cfg = func.loadJSON(const.configFile)
configuration = request.form
for key, value in configuration.items():
cleaned_key = key.removeprefix('_') if key.startswith('_') else key
nested_keys = cleaned_key.split('.')
current_dict = cfg
for nested_key in nested_keys[:-1]:
current_dict = current_dict.setdefault(nested_key, {})
# Convert the checkbox value 'True'/'False' strings to actual booleans
if value.lower() == 'true':
value = True
elif value.lower() == 'false':
value = False
current_dict[nested_keys[-1]] = value
func.saveJSON(cfg, const.configFile)
app.config.update(cfg)
return {'message': 'Changes saved!'}
app.register_blueprint(api_bp, url_prefix='/api/v1')
app.register_blueprint(admin_bp, url_prefix='/admin')
if __name__ == '__main__':
app.run(debug=True)