mirror of
https://codeberg.org/catask-org/catask.git
synced 2025-04-19 21:33:41 -05:00
466 lines
15 KiB
Python
466 lines
15 KiB
Python
from flask import Flask, Blueprint, jsonify, request, abort, render_template, flash, session, redirect, url_for
|
|
from flask_compress import Compress
|
|
from dotenv import load_dotenv
|
|
from mysql.connector import errorcode
|
|
from functools import wraps
|
|
from werkzeug.utils import secure_filename
|
|
import mysql.connector
|
|
import urllib
|
|
import functions as func
|
|
import os
|
|
import json
|
|
import constants as const
|
|
|
|
# used for admin routes
|
|
logged_in = False
|
|
|
|
load_dotenv()
|
|
|
|
app = Flask(const.appName)
|
|
app.secret_key = os.environ.get("APP_SECRET")
|
|
cfg = func.loadJSON(const.configFile)
|
|
app.config.from_mapping(cfg)
|
|
app.config.update(cfg)
|
|
# compress to improve page load speed
|
|
Compress(app)
|
|
|
|
# -- blueprints --
|
|
api_bp = Blueprint('api', const.appName)
|
|
admin_bp = Blueprint('admin', const.appName)
|
|
|
|
# -- cli commands --
|
|
|
|
@app.cli.command("init-db")
|
|
def initDatabase():
|
|
dbName = os.environ.get("DB_NAME")
|
|
print(f"Attempting to connect to database {dbName}...")
|
|
try:
|
|
conn = func.connectToDb()
|
|
cursor = conn.cursor()
|
|
print("Connected successfully")
|
|
|
|
conn.database = dbName
|
|
|
|
except mysql.connector.Error as error:
|
|
if error.errno == errorcode.ER_ACCESS_DENIED_ERROR:
|
|
print("Bad credentials")
|
|
elif error.errno == errorcode.ER_BAD_DB_ERROR:
|
|
dbPort = os.environ.get("DB_PORT")
|
|
if not dbPort:
|
|
dbPort = 3306
|
|
|
|
conn = mysql.connector.connect(
|
|
user=os.environ.get("DB_USER"),
|
|
password=os.environ.get("DB_PASS"),
|
|
host=os.environ.get("DB_HOST"),
|
|
port=dbPort,
|
|
database='mysql'
|
|
)
|
|
cursor = conn.cursor()
|
|
func.createDatabase(cursor, dbName)
|
|
conn.database = dbName
|
|
else:
|
|
print("Error:", error)
|
|
return
|
|
|
|
with open('schema.sql', 'r') as schema_file:
|
|
schema = schema_file.read()
|
|
try:
|
|
cursor.execute(schema, multi=True)
|
|
print(f"Database {dbName} was successfully initialized!")
|
|
except mysql.connector.Error as error:
|
|
print(f"Failed to initialize database {dbName}: {error}")
|
|
finally:
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
# -- decorators --
|
|
|
|
def loginRequired(f):
|
|
@wraps(f)
|
|
def login_required(*args, **kwargs):
|
|
if not logged_in:
|
|
return abort(404)
|
|
return f(*args, **kwargs)
|
|
return login_required
|
|
|
|
# -- before request --
|
|
|
|
@app.before_request
|
|
def before_request():
|
|
global logged_in
|
|
logged_in = session.get('logged_in')
|
|
|
|
# -- context processors --
|
|
|
|
@app.context_processor
|
|
def inject_stuff():
|
|
cfg = func.loadJSON(const.configFile)
|
|
# for 1.6.0 or later
|
|
# questionCount = getQuestionCount()
|
|
return dict(metadata=func.generateMetadata(), len=len, str=str, const=const, cfg=cfg, logged_in=logged_in, version_id=const.version_id, version=const.version, appName=const.appName)
|
|
|
|
# -- template filters --
|
|
@app.template_filter('render_markdown')
|
|
def render_markdown(text):
|
|
return func.renderMarkdown(text)
|
|
|
|
# -- client (frontend) routes --
|
|
|
|
@app.route('/', methods=['GET'])
|
|
def index():
|
|
conn = func.connectToDb()
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
cursor.execute("SELECT * FROM questions WHERE answered=%s AND pinned=%s ORDER BY creation_date DESC", (True, True))
|
|
pinned_questions = cursor.fetchall()
|
|
|
|
cursor.execute("SELECT * FROM questions WHERE answered=%s AND pinned=%s ORDER BY creation_date DESC", (True, False))
|
|
non_pinned_questions = cursor.fetchall()
|
|
|
|
questions = pinned_questions + non_pinned_questions
|
|
|
|
cursor.execute("SELECT * FROM answers ORDER BY creation_date DESC")
|
|
answers = cursor.fetchall()
|
|
|
|
metadata = func.generateMetadata()
|
|
|
|
combined = []
|
|
for question in questions:
|
|
question_answers = [answer for answer in answers if answer['question_id'] == question['id']]
|
|
combined.append({
|
|
'question': question,
|
|
'answers': question_answers
|
|
})
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('index.html', combined=combined, urllib=urllib, trimContent=func.trimContent, metadata=metadata, getRandomWord=func.getRandomWord, formatRelativeTime=func.formatRelativeTime)
|
|
|
|
@app.route('/inbox/', methods=['GET'])
|
|
@loginRequired
|
|
def inbox():
|
|
conn = func.connectToDb()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT * FROM questions WHERE answered=%s ORDER BY creation_date DESC", (False,))
|
|
questions = cursor.fetchall()
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('inbox.html', questions=questions, formatRelativeTime=func.formatRelativeTime)
|
|
|
|
@app.route('/q/<int:question_id>/', methods=['GET'])
|
|
def viewQuestion(question_id):
|
|
question = func.getQuestion(question_id)
|
|
answer = func.getAnswer(question_id)
|
|
metadata = func.generateMetadata(question, answer)
|
|
return render_template('view_question.html', question=question, urllib=urllib, answer=answer, metadata=metadata, formatRelativeTime=func.formatRelativeTime, trimContent=func.trimContent)
|
|
|
|
# TODO: implement this and private questions should be here too
|
|
"""
|
|
@app.route('/questions/', methods=['GET'])
|
|
def seeAskedQuestions():
|
|
conn = func.connectToDb()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT * FROM questions WHERE asker_id=%s ORDER BY creation_date DESC", (asker_id,))
|
|
questions = cursor.fetchall()
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
return render_template('asked_questions.html', questions=questions, formatRelativeTime=func.formatRelativeTime)
|
|
"""
|
|
|
|
# -- admin client routes --
|
|
|
|
@admin_bp.route('/login/', methods=['GET', 'POST'])
|
|
def login():
|
|
global logged_in
|
|
if request.method == 'POST':
|
|
admin_password = request.form.get('admin_password')
|
|
if admin_password == os.environ.get('ADMIN_PASSWORD'):
|
|
session['logged_in'] = True
|
|
return redirect(url_for('index'))
|
|
else:
|
|
flash("Wrong password", 'danger')
|
|
return redirect(url_for('admin.login'))
|
|
else:
|
|
if logged_in:
|
|
return redirect('index')
|
|
else:
|
|
return render_template('admin/login.html')
|
|
|
|
@admin_bp.route('/logout/')
|
|
@loginRequired
|
|
def logout():
|
|
session['logged_in'] = False
|
|
return redirect(url_for('index'))
|
|
|
|
@admin_bp.route('/', methods=['GET', 'POST'])
|
|
@loginRequired
|
|
def index():
|
|
if request.method == 'POST':
|
|
action = request.form.get('action')
|
|
blacklist = request.form.get('blacklist')
|
|
with open(const.blacklistFile, 'w') as file:
|
|
file.write(blacklist)
|
|
return {'message': 'Changes saved!'}, 200
|
|
|
|
else:
|
|
blacklist = func.readPlainFile(const.blacklistFile)
|
|
if blacklist == []:
|
|
blacklist = ''
|
|
cfg_vars = func.loadJSON(const.configFile)
|
|
|
|
return render_template('admin/index.html', blacklist=blacklist, cfg=cfg_vars)
|
|
|
|
# TODO: implement first-launch setup route
|
|
"""
|
|
@admin_bp.route('/post-install/', methods=['GET', 'POST'])
|
|
@loginRequired
|
|
def postInstall():
|
|
if config.postInstallCompleted:
|
|
return abort(404)
|
|
else:
|
|
pass
|
|
"""
|
|
|
|
# -- server routes --
|
|
|
|
@api_bp.errorhandler(404)
|
|
def notFound():
|
|
return jsonify({'error': 'Not found'}), 404
|
|
|
|
@api_bp.errorhandler(400)
|
|
def badRequest(e):
|
|
return jsonify({'error': str(e)}), 400
|
|
|
|
@api_bp.errorhandler(500)
|
|
def internalServerError(e):
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@api_bp.route('/add_question/', methods=['POST'])
|
|
def addQuestion():
|
|
from_who = request.form.get('from_who', cfg['anonName'])
|
|
question = request.form.get('question', '')
|
|
antispam = request.form.get('antispam', '')
|
|
# reserved for version 1.5.0 or later
|
|
# private = request.form.get('private')
|
|
|
|
if not question:
|
|
abort(400, "Question field must not be empty")
|
|
if not antispam:
|
|
abort(400, "Anti-spam word must not be empty")
|
|
if len(question) > int(cfg['charLimit']) or len(from_who) > int(cfg['charLimit']):
|
|
abort(400, "Question exceeds the character limit")
|
|
|
|
antispam_wordlist = func.readPlainFile(const.antiSpamFile, split=True)
|
|
antispam_valid = antispam in antispam_wordlist
|
|
if not antispam_valid:
|
|
return {'error': 'An error has occurred'}, 500
|
|
|
|
blacklist = func.readPlainFile(const.blacklistFile, split=True)
|
|
|
|
for bad_word in blacklist:
|
|
if bad_word in question or bad_word in from_who:
|
|
# return a generic error message so bad actors wouldn't figure out the blacklist
|
|
return {'error': 'An error has occurred'}, 500
|
|
|
|
conn = func.connectToDb()
|
|
cursor = conn.cursor()
|
|
cursor.execute("INSERT INTO questions (from_who, content, answered) VALUES (%s, %s, %s)", (from_who, question, False,))
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return {'message': 'Question asked successfully!'}, 201
|
|
|
|
@api_bp.route('/delete_question/', methods=['DELETE'])
|
|
@loginRequired
|
|
def deleteQuestion():
|
|
question_id = request.args.get('question_id', '')
|
|
if not question_id:
|
|
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
|
|
|
|
conn = func.connectToDb()
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM questions WHERE id=%s", (question_id,))
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return {'message': 'Successfully deleted question.'}, 200
|
|
|
|
@api_bp.route('/return_to_inbox/', methods=['POST'])
|
|
@loginRequired
|
|
def returnToInbox():
|
|
question_id = request.args.get('question_id', '')
|
|
if not question_id:
|
|
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
|
|
|
|
conn = func.connectToDb()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT from_who, content, creation_date FROM questions WHERE id=%s", (question_id,))
|
|
row = cursor.fetchone()
|
|
|
|
question = {
|
|
'from_who': row[0],
|
|
'content': row[1],
|
|
'creation_date': row[2]
|
|
}
|
|
|
|
cursor.execute("DELETE FROM questions WHERE id=%s", (question_id,))
|
|
cursor.execute("INSERT INTO questions (from_who, content, creation_date, answered) VALUES (%s, %s, %s, %s)", (question["from_who"], question["content"], question["creation_date"], False,))
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return {'message': 'Successfully returned question to inbox.'}, 200
|
|
|
|
@api_bp.route('/pin_question/', methods=['POST'])
|
|
@loginRequired
|
|
def pinQuestion():
|
|
question_id = request.args.get('question_id', '')
|
|
if not question_id:
|
|
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
|
|
|
|
conn = func.connectToDb()
|
|
cursor = conn.cursor()
|
|
cursor.execute("UPDATE questions SET pinned=%s WHERE id=%s", (True, question_id))
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return {'message': 'Successfully pinned question.'}, 200
|
|
|
|
@api_bp.route('/unpin_question/', methods=['POST'])
|
|
@loginRequired
|
|
def unpinQuestion():
|
|
question_id = request.args.get('question_id', '')
|
|
if not question_id:
|
|
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
|
|
|
|
conn = func.connectToDb()
|
|
cursor = conn.cursor()
|
|
cursor.execute("UPDATE questions SET pinned=%s WHERE id=%s", (False, question_id))
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return {'message': 'Successfully unpinned question.'}, 200
|
|
|
|
@api_bp.route('/add_answer/', methods=['POST'])
|
|
@loginRequired
|
|
def addAnswer():
|
|
question_id = request.args.get('question_id', '')
|
|
answer = request.form.get('answer')
|
|
|
|
if not question_id:
|
|
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
|
|
if not answer:
|
|
abort(400, "Missing 'answer' attribute or 'answer' is empty")
|
|
|
|
conn = func.connectToDb()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("INSERT INTO answers (question_id, content) VALUES (%s, %s)", (question_id, answer))
|
|
answer_id = cursor.lastrowid
|
|
cursor.execute("UPDATE questions SET answered=%s, answer_id=%s WHERE id=%s", (True, answer_id, question_id))
|
|
conn.commit()
|
|
except Exception as e:
|
|
conn.rollback()
|
|
return jsonify({'error': str(e)}), 500
|
|
finally:
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return jsonify({'message': 'Answer added successfully!'}), 201
|
|
|
|
@api_bp.route('/upload_favicon/', methods=['POST'])
|
|
@loginRequired
|
|
def uploadFavicon():
|
|
favicon = request.files.get('favicon')
|
|
|
|
if favicon and func.allowedFile(favicon.filename):
|
|
filename = secure_filename(favicon.filename)
|
|
favicon_path = const.faviconDir / filename
|
|
favicon.save(favicon_path)
|
|
|
|
func.generateFavicon(filename)
|
|
|
|
return {'message': 'Successfully updated favicon!'}, 201
|
|
elif favicon and not func.allowedFile(favicon.filename):
|
|
return {'error': 'File type is not supported'}, 400
|
|
elif not favicon:
|
|
return {'error': "favicon is not specified"}, 400
|
|
|
|
@api_bp.route('/get_question_count/', methods=['GET'])
|
|
def getQuestionCount():
|
|
conn = func.connectToDb()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT COUNT(id) FROM questions WHERE answered=%s", (False,))
|
|
question_count = cursor.fetchone()
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
return str(question_count[0])
|
|
|
|
# unused
|
|
"""
|
|
@api_bp.route('/view_question/', methods=['GET'])
|
|
@loginRequired
|
|
def viewQuestion():
|
|
question_id = request.args.get('question_id', '')
|
|
if not question_id:
|
|
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
|
|
|
|
conn = func.connectToDb()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT id, from_who, creation_date, content, answered, answer_id FROM questions WHERE id=%s", (question_id,))
|
|
question = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify(question)
|
|
|
|
@api_bp.route('/view_answer/', methods=['GET'])
|
|
@loginRequired
|
|
def viewAnswer():
|
|
answer_id = request.args.get('answer_id', '')
|
|
if not answer_id:
|
|
abort(400, "Missing 'answer_id' attribute or 'answer_id' is empty")
|
|
|
|
conn = func.connectToDb()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT id, question_id, creation_date, content FROM answers WHERE id=%s", (answer_id,))
|
|
answer = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
return jsonify(answer)
|
|
"""
|
|
|
|
@api_bp.route('/update_config/', methods=['POST'])
|
|
@loginRequired
|
|
def updateConfig():
|
|
cfg = func.loadJSON(const.configFile)
|
|
configuration = request.form
|
|
|
|
for key, value in configuration.items():
|
|
cleaned_key = key.removeprefix('_') if key.startswith('_') else key
|
|
nested_keys = cleaned_key.split('.')
|
|
current_dict = cfg
|
|
|
|
for nested_key in nested_keys[:-1]:
|
|
current_dict = current_dict.setdefault(nested_key, {})
|
|
|
|
# Convert the checkbox value 'True'/'False' strings to actual booleans
|
|
if value.lower() == 'true':
|
|
value = True
|
|
elif value.lower() == 'false':
|
|
value = False
|
|
|
|
current_dict[nested_keys[-1]] = value
|
|
|
|
func.saveJSON(cfg, const.configFile)
|
|
app.config.update(cfg)
|
|
return {'message': 'Changes saved!'}
|
|
|
|
|
|
app.register_blueprint(api_bp, url_prefix='/api/v1')
|
|
app.register_blueprint(admin_bp, url_prefix='/admin')
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True)
|