catask/app.py
2025-03-28 19:51:57 +03:00

1015 lines
37 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, Blueprint, jsonify, request, abort, render_template, flash, session, redirect, url_for
from flask_babel import Babel, _, refresh
from flask_compress import Compress
from dotenv import load_dotenv
from functools import wraps
from werkzeug.utils import secure_filename
from pathlib import Path
import unicodedata
import threading
import requests
import secrets
import shutil
import zipfile
import tarfile
import psycopg
import urllib
import functions as func
import os
import json
import constants as const
# used for admin routes
logged_in = False
load_dotenv()
app = Flask(const.appName)
app.secret_key = os.environ.get("APP_SECRET")
cfg = func.loadJSON(const.configFile)
app.config.from_mapping(cfg)
app.config.update(cfg)
# compress to improve page load speed
Compress(app)
app.config['BABEL_DEFAULT_LOCALE'] = 'en'
app.config['BABEL_TRANSLATION_DIRECTORIES'] = 'locales'
# refreshing locale
refresh()
# update this once more languages are supported
app.config['available_languages'] = {
"en_US": _("English (US)"),
"ru_RU": _("Russian")
}
def getLocale():
if not session.get('language'):
app.config.update(cfg)
session['language'] = cfg['languages']['default']
return session.get('language')
babel = Babel(app, locale_selector=getLocale)
# -- blueprints --
api_bp = Blueprint('api', const.appName)
admin_bp = Blueprint('admin', const.appName)
# -- cli commands --
@app.cli.command("init-db")
def initDatabase():
dbName = os.environ.get("DB_NAME")
print("Attempting to connect to database {dbName}...")
try:
conn = func.connectToDb()
cursor = conn.cursor()
print("Connected successfully")
conn.database = dbName
except psycopg.Error as error:
app.logger.error("Database error:", error)
# if error.errno == errorcode.ER_ACCESS_DENIED_ERROR:
# print("Bad credentials")
# elif error.errno == errorcode.ER_BAD_DB_ERROR:
# dbPort = os.environ.get("DB_PORT")
# if not dbPort:
# dbPort = 3306
#
# conn = func.connectToDb()
# cursor = conn.cursor()
# func.createDatabase(cursor, dbName)
# conn.database = dbName
# else:
# print("Error:", error)
# return
with open('schema.sql', 'r') as schema_file:
schema = schema_file.read()
try:
cursor.execute(schema)
conn.commit()
print(f"Database {dbName} was successfully initialized!")
except psycopg.Error as error:
print(f"Failed to initialize database {dbName}: {error}")
finally:
cursor.close()
conn.close()
# -- decorators --
def loginRequired(f):
@wraps(f)
def login_required(*args, **kwargs):
if not logged_in:
return abort(404)
return f(*args, **kwargs)
return login_required
# -- before request --
@app.before_request
def before_request():
# app.logger.debug("[CatAsk] app.before_request triggered")
global logged_in
logged_in = session.get('logged_in')
# -- context processors --
@app.context_processor
def inject_stuff():
app.logger.debug("[CatAsk] app.context_processor inject_stuff() triggered")
cfg = func.loadJSON(const.configFile)
# for 1.6.0 or later
unreadQuestionCount = int(getQuestionCount(False, True))
totalQuestionCount = int(getQuestionCount())
return dict(metadata=func.generateMetadata(), len=len, str=str, const=const, cfg=cfg, logged_in=logged_in, version_id=const.version_id, version=const.version, appName=const.appName, unreadQuestionCount=unreadQuestionCount, totalQuestionCount=totalQuestionCount)
# -- template filters --
# {text} | render_markdown
@app.template_filter('render_markdown')
def render_markdown(text):
if text.startswith("![") and not (text.startswith(':') and text.endswith(':')):
return text
else:
# app.logger.debug("[CatAsk] app.template_filter render_markdown(text) triggered")
return func.renderMarkdown(text)
# render_markdown({text}, fromWho=False|True)
@app.template_global('render_markdown')
def render_markdown(text, fromWho=False):
if (text.startswith("![") or "![" in text) and not (text.startswith(':') and text.endswith(':')):
# ensuring that only inline images get escaped, not emojis
escaped = text.replace("![", "!\[")
return func.renderMarkdown(escaped)
else:
# don't want people inserting buttons and other stuff into name field
allowed_tags = ["p", "img"] if fromWho else None
return func.renderMarkdown(text, allowed_tags)
# -- error handlers --
@app.errorhandler(404)
def notFound(e):
return render_template('errors/404.html'), 404
@app.errorhandler(500)
def internalServerError(e):
return render_template('errors/500.html'), 500
@app.errorhandler(502)
def badGateway(e):
return render_template('errors/502.html'), 502
# -- client (frontend) routes --
@app.route('/', methods=['GET'])
def index():
per_page = 25
offset = 0
page = 1
func_val = func.getAllQuestions(limit=per_page, offset=offset)
combined = func_val[0]
metadata = func_val[1]
emojis = func.listEmojis()
packs = func.listEmojiPacks()
return render_template(
'index.html',
combined=combined,
urllib=urllib,
trimContent=func.trimContent,
metadata=metadata,
getRandomWord=func.getRandomWord,
formatRelativeTime=func.formatRelativeTime,
emojis=emojis,
packs=packs,
page=page,
per_page=per_page
)
@app.route('/inbox/', methods=['GET'])
@loginRequired
def inbox():
conn = func.connectToDb()
cursor = conn.cursor()
app.logger.debug("[CatAsk/Inbox] SELECT'ing unanswered questions")
cursor.execute("SELECT * FROM questions WHERE answered=%s ORDER BY creation_date DESC", (False,))
questions = cursor.fetchall()
# postgres shenanigans
for question in questions:
question['creation_date'] = question['creation_date'].replace(microsecond=0).replace(tzinfo=None)
cursor.execute("UPDATE questions SET unread=%s WHERE id=%s", (False, question['id']))
conn.commit()
cursor.close()
conn.close()
return render_template('inbox.html', questions=questions, formatRelativeTime=func.formatRelativeTime)
@app.route('/q/<int:question_id>/', methods=['GET'])
def viewQuestion(question_id):
question = func.getQuestion(question_id)
answer = func.getAnswer(question_id)
if not question or not answer:
return abort(404)
else:
metadata = func.generateMetadata(question, answer)
return render_template('view_question.html', question=question, urllib=urllib, answer=answer, metadata=metadata, formatRelativeTime=func.formatRelativeTime, trimContent=func.trimContent)
# TODO: implement this and private questions should be here too
"""
@app.route('/questions/', methods=['GET'])
def seeAskedQuestions():
conn = func.connectToDb()
cursor = conn.cursor()
cursor.execute("SELECT * FROM questions WHERE asker_id=%s ORDER BY creation_date DESC", (asker_id,))
questions = cursor.fetchall()
cursor.close()
conn.close()
return render_template('asked_questions.html', questions=questions, formatRelativeTime=func.formatRelativeTime)
"""
# -- admin client routes --
@admin_bp.route('/login/', methods=['GET', 'POST'])
def login():
global logged_in
if request.method == 'POST':
admin_password = request.form.get('admin_password')
if admin_password == os.environ.get('ADMIN_PASSWORD'):
session['logged_in'] = True
session.permanent = request.form.get('remember_me', False)
return redirect(url_for('index'))
else:
flash(_("Wrong password"), 'danger')
return redirect(url_for('admin.login'))
else:
if logged_in:
return redirect('index')
else:
return render_template('admin/login.html')
@admin_bp.route('/logout/', methods=['POST'])
@loginRequired
def logout():
session['logged_in'] = False
return redirect(url_for('index'))
@admin_bp.route('/', methods=['GET', 'POST'])
@loginRequired
def index():
return redirect(url_for('admin.information'))
@admin_bp.route('/information/', methods=['GET', 'POST'])
@loginRequired
def information():
return render_template('admin/categories/instance.html')
@admin_bp.route('/accessibility/', methods=['GET', 'POST'])
@loginRequired
def accessibility():
return render_template('admin/categories/accessibility.html')
@admin_bp.route('/crosspost/', methods=['GET', 'POST'])
@loginRequired
def crosspost():
return render_template('admin/categories/crosspost.html')
@admin_bp.route('/languages/', methods=['GET', 'POST'])
@loginRequired
def languages():
return render_template('admin/categories/languages.html')
@admin_bp.route('/notifications/', methods=['GET', 'POST'])
@loginRequired
def notifications():
return render_template('admin/categories/notifications.html')
@admin_bp.route('/general/', methods=['GET', 'POST'])
@loginRequired
def general():
return render_template('admin/categories/general.html')
@admin_bp.route('/customize/', methods=['GET', 'POST'])
@loginRequired
def customize():
return render_template('admin/categories/customize.html')
@admin_bp.route('/antispam/', methods=['GET', 'POST'])
@loginRequired
def antispam():
return render_template('admin/categories/antispam.html')
@admin_bp.route('/emojis/', methods=['GET', 'POST'])
@loginRequired
def emojis():
if not os.path.exists(const.emojiPath):
os.makedirs(const.emojiPath)
packs = func.listEmojiPacks()
if packs:
for pack in packs:
json_pack = bool(pack.get('website') and pack.get('exportedAt'))
if json_pack:
break
else:
json_pack = False
emojis = func.listEmojis()
return render_template('admin/categories/emojis.html', json_pack=json_pack, packs=packs, emojis=emojis, formatRelativeTime=func.formatRelativeTime2)
@admin_bp.route('/blacklist/', methods=['GET', 'POST'])
@loginRequired
def blacklist():
if request.method == 'POST':
action = request.form.get('action')
blacklist = request.form.get('blacklist')
with open(const.blacklistFile, 'w') as file:
file.write(blacklist)
return {'message': _("Blacklist updated!")}, 200
else:
blacklist = func.readPlainFile(const.blacklistFile)
if blacklist == []:
blacklist = ''
return render_template('admin/categories/blacklist.html', blacklist=blacklist)
# TODO: implement deleting exports
@admin_bp.route('/import-export/', methods=['GET'])
@loginRequired
def importExport():
if os.path.exists(const.exportsFile):
exports = func.loadJSON(const.exportsFile)
else:
exports = None
return render_template('admin/categories/import.html', exports=exports)
# TODO: implement first-launch setup route
"""
@admin_bp.route('/post-install/', methods=['GET', 'POST'])
@loginRequired
def postInstall():
if config.postInstallCompleted:
return abort(404)
else:
pass
"""
# -- server routes --
@api_bp.errorhandler(404)
def notFound(e):
return jsonify({'error': 'Not found'}), 404
@api_bp.errorhandler(400)
def badRequest(e):
return jsonify({'error': str(e)}), 400
@api_bp.errorhandler(500)
def internalServerError(e):
return jsonify({'error': str(e)}), 500
# why should i have a manifest.json file when i can just make it a route
@api_bp.route("/manifest.json", methods=['GET'])
def pwaManifest():
# not sure about theme_color but whateva
return jsonify({
"short_name": const.appName,
"name": cfg['instance']['title'],
"icons": [
{
"src": url_for("static", filename="icons/favicon/android-chrome-192x192.png"),
"sizes": "192x192",
"type": "image/png"
},
{
"src": url_for("static", filename="icons/favicon/android-chrome-512x512.png"),
"sizes": "512x512",
"type": "image/png"
}
],
"start_url": "/",
"display": "standalone",
"theme_color": cfg['style']['accentLight'],
"background_color": ""
})
@api_bp.route('/ts/download/<author>/<theme>/', methods=['GET'])
@loginRequired
def downloadTheme(author, theme):
return send_file(
requests.get(f"{ cfg['themeStoreUrl'] }/api/v1/dl/{author}/{theme}/"),
download_name=f"{ theme }.css")
@api_bp.route('/ts/card_row/', methods=['GET'])
@loginRequired
def themeStore_themes():
cfg = func.loadJSON(const.configFile)
url = f"{ cfg['themeStoreUrl'] }/api/v1/themes/"
themes = requests.get(url).json()
cards = ""
i = 0
for theme in themes:
i += 1
cards += f"""
<div class="col-md-6 col-lg-4 col-xl-3 mb-4">
<div class="card">
<div class="card-header p-0 preview-img">
<button type="button" class="btn border-0 p-0" data-bs-toggle="modal" data-bs-target="#theme-modal-{i}" data-dontshowtoast>
<img src="{ cfg['themeStoreUrl'] }{ theme['preview_url'] }" class="card-img-top" alt="Preview of { theme['name'] }" style="min-height: 160px; object-fit: cover;">
</button>
</div>
<div class="card-body">
<p class="card-text text-body-secondary mb-1 small">{ _("Updated") } { func.formatRelativeTime(theme['updated_at']) }</p>
<h5 class="card-title m-0"><button type="button" class="btn p-0 fs-5" data-bs-toggle="modal" data-bs-target="#theme-modal-{i}" data-dontshowtoast>{ theme['name'] }</button></h5>
<p class="card-text text-body-secondary">
{ _("by") }
<a href="{ theme['author_url'] or 'javascript:;' }" class="link-offset-2 text-body text-decoration-none underline-hover">{ theme['author'] }</a>
</p>
</div>
</div>
</div>
"""
html = f"""
<div class="row">
{cards}
</div>
"""
return html
@api_bp.route('/ts/modals/', methods=['GET'])
@loginRequired
def themeStore_themeModals():
cfg = func.loadJSON(const.configFile)
url = f"{cfg['themeStoreUrl']}/api/v1/themes/"
themes = requests.get(url).json()
i = 0
modals = ""
for theme in themes:
i += 1
theme_name = theme['name']
singleton = f"data-bs-toggle='modal' data-bs-target='#theme-modal-confirm-{i}'" if cfg['style']['customCss'] != "" else f"""data-bs-dismiss='modal' onclick='useTheme({i}, "{ theme_name }")'"""
modals += f"""
<div class="modal fade" id="theme-modal-{i}" tabindex="-1" aria-labelledby="ts-modal-label" aria-hidden="true">
<div class="modal-dialog modal-fullscreen-md-down modal-xl modal-dialog-centered">
<div class="modal-content">
<div class="modal-header border-0 d-flex align-items-center ps-2 ms-1">
<h1 class="modal-title fs-5 fw-normal d-flex align-items-center" id="q-modal-label">
<button type="button" class="btn btn-basic fs-5 me-1 px-2 py-1" data-bs-target="#theme-store-modal" data-bs-toggle="modal">
<i class="bi bi-chevron-left"></i>
</button>
<span>{ theme['name'] } <span class="text-body-secondary">{ _("by") } { theme['author'] }</span></span>
</h1>
<button type="button" class="btn-close d-flex align-items-center fs-5" data-bs-dismiss="modal" aria-label="Close"><i class="bi bi-x-lg"></i></button>
</div>
<div class="modal-body py-0">
<section id="preview">
<img src="{ cfg['themeStoreUrl'] }{ theme['preview_url'] }" class="img-fluid rounded mx-auto d-block" alt="Screenshot of { theme['name'] } by { theme['author'] }">
</section>
<section id="header">
<div class="d-md-flex justify-content-between align-items-center mb-4 mt-3">
<div class="d-flex">
<span class="border py-1 px-2 rounded-start user-select-all overflow-hidden text-nowrap ts-share">{ cfg['themeStoreUrl'] }/themes/{ theme['escaped_name'] }/</span>
<button class="btn btn-secondary rounded-start-0" type="button" onclick="copyFull(`{ cfg['themeStoreUrl'] }/themes/{ theme['escaped_name'] }/`)" style="min-width: 6em;"><i class="bi bi-copy me-1"></i> { _("Copy") }</button>
</div>
<br class="d-md-none">
<div class="d-flex flex-column flex-md-row align-items-md-center gap-2">
<button type="button" class="btn btn-primary" {singleton}><i class="bi bi-palette me-1"></i> { _("Use") }</button>
{ f"""<a href="{ theme['homepage'] }/" class="btn btn-secondary">
<i class="bi bi-house-door"></i>
<span class="d-inline d-sm-none ms-1">{ _("Homepage") }</span>
</a>""" if theme['homepage'] else "" }
<a href="{ cfg['themeStoreUrl'] }/dl/{ theme['author'] }/{ theme['name'] }/" class="btn btn-secondary">
<i class="bi bi-download"></i>
<span class="d-inline d-sm-none ms-1">{ _("Download") }</span>
</a>
<a href="{ cfg['themeStoreUrl'] }/themes/{ theme['escaped_name'] }/" class="btn btn-secondary" target="_blank">
<i class="bi bi-box-arrow-up-right"></i>
<span class="d-inline d-sm-none ms-1">{ _("Go to Store page") }</span>
</a>
</div>
</div>
</section>
<section id="details" class="mb-4">
<h2>{ _("Details") }</h2>
<p class="mb-2">
<span class="tab">{ _("Author") }</span>
<a href="{ theme['author_url'] }" target="_blank" class="text-decoration-none underline-hover">{ theme['author'] }</a>
</p>
<p class="mb-2">
<span class="tab">{ _("Size") }</span>
{ theme['size'] }
</p>
<p class="mb-2">
<span class="tab">{ _("Created") }</span>
{ func.formatRelativeTime(theme['created_at']) }
</p>
<p class="mb-2">
<span class="tab">{ _("Updated") }</span>
{ func.formatRelativeTime(theme['updated_at']) }
</p>
</section>
<section id="source-code">
<h2>{ _("Source code") }</h2>
<pre class="border p-2 rounded"><code style="tab-size: 4;" id="theme-source-code-{i}">{ theme['file_contents'] }</code></pre>
</section>
</div>
</div>
</div>
</div>
<div class="modal fade" id="theme-modal-confirm-{i}" tabindex="-1" aria-labelledby="ts-modal-label" aria-hidden="true">
<div class="modal-dialog modal-dialog-centered">
<div class="modal-content">
<div class="modal-header border-0">
<h1 class="modal-title fs-5 fw-normal" id="q-modal-label">{ _('Confirmation') }</h1>
<button type="button" class="btn-close d-flex align-items-center fs-5" data-bs-toggle='modal' data-bs-target='#theme-modal-{i}' aria-label="Close"><i class="bi bi-x-lg"></i></button>
</div>
<div class="modal-body py-0">
<p>{ _('This action will overwrite your existing custom CSS, are you sure you want to apply this theme?') }</p>
</div>
<div class="modal-footer pt-1 flex-row align-items-stretch w-100 border-0">
<button type="button" class="btn btn-outline-secondary flex-fill flex-lg-grow-0" data-bs-toggle='modal' data-bs-target='#theme-modal-{i}'>{ _('Cancel') }</button>
<button type="button" class="btn btn-danger flex-fill flex-lg-grow-0" data-bs-dismiss="modal" onclick="useTheme({ i }, '{ theme_name }')">{ _('Confirm') }</button>
</div>
</div>
</div>
</div>
"""
return modals
@api_bp.route("/change_language/", methods=['POST'])
def changeLanguage():
if cfg['languages']['allowChanging']:
lang = request.form.get('lang')
if lang not in app.config['available_languages'].keys():
# fallback to en_US on malformed request
session['language'] == 'en_US'
flash("400 Bad Request: The browser (or proxy) sent a request that this server could not understand.", "danger")
return redirect(request.referrer)
session['language'] = lang
return redirect(request.referrer)
else:
return abort(404)
# wip, scheduled for 1.8.0 release
# @api_bp.route('/hyper/widget/', methods=['GET'])
# def widget():
# func_val = func.getAllQuestions()
# combined = func_val[0]
# metadata = func_val[1]
# return render_template('widget.html', combined=combined, urllib=urllib, trimContent=func.trimContent, metadata=metadata, getRandomWord=func.getRandomWord, formatRelativeTime=func.formatRelativeTime)
@api_bp.route('/hyper/load_more_questions/', methods=['GET'])
def load_more_questions():
page = request.args.get('page', default=1, type=int)
per_page = 25
offset = (page - 1) * per_page
func_val = func.getAllQuestions(limit=per_page, offset=offset)
combined = func_val[0]
if not combined:
return ""
return render_template('snippets/layout/load_more_questions.html', combined=combined, page=page, per_page=per_page, formatRelativeTime=func.formatRelativeTime, trimContent=func.trimContent, urllib=urllib)
# -- question routes --
@api_bp.route('/add_question/', methods=['POST'])
def addQuestion():
try:
app.logger.debug("[CatAsk/API/add_question] started question flow")
from_who = request.form.get('from_who', cfg['anonName'])
question = request.form.get('question', '')
cw = request.form.get('cw', '')
question_normalized = unicodedata.normalize('NFKC', question).replace("", "").strip()
if not question or not question_normalized:
abort(400, _("Question field must not be empty"))
if len(question) > int(cfg['charLimit']) or len(from_who) > int(cfg['charLimit']):
abort(400, _("Question exceeds the character limit"))
return_val = func.addQuestion(from_who, question, cw)
app.logger.debug("[CatAsk/API/add_question] finished question flow")
return return_val[0]
finally:
if cfg['ntfy']['enabled']:
# cw, return_val, from_who, question
ntfy_thread = threading.Thread(target=func.ntfySend, name=f"ntfy thread for {return_val[2]}", args=(cw, return_val, from_who, question,))
ntfy_thread.start()
@api_bp.route('/delete_question/', methods=['DELETE'])
@loginRequired
def deleteQuestion():
question_id = request.args.get('question_id', '')
if not question_id:
abort(400, _("Missing 'question_id' attribute or 'question_id' is empty"))
conn = func.connectToDb()
cursor = conn.cursor()
app.logger.debug("[CatAsk/API/delete_question] DELETE'ing a question from database")
cursor.execute("DELETE FROM questions WHERE id=%s", (question_id,))
conn.commit()
cursor.close()
conn.close()
return {'message': _("Successfully deleted question.")}, 200
@api_bp.route('/return_to_inbox/', methods=['POST'])
@loginRequired
def returnToInbox():
question_id = request.args.get('question_id', '')
if not question_id:
abort(400, _("Missing 'question_id' attribute or 'question_id' is empty"))
conn = func.connectToDb()
cursor = conn.cursor()
app.logger.debug("[CatAsk/API/return_to_inbox] SELECT'ing a question from database")
cursor.execute("SELECT from_who, content, creation_date, cw FROM questions WHERE id=%s", (question_id,))
question = cursor.fetchone()
# question = {
# 'from_who': row[0],
# 'content': row[1],
# 'creation_date': row[2],
# 'cw': row[3]
# }
app.logger.debug("[CatAsk/API/return_to_inbox] DELETE'ing a question from database")
cursor.execute("DELETE FROM questions WHERE id=%s", (question_id,))
app.logger.debug("[CatAsk/API/return_to_inbox] INSERT'ing a question into database")
cursor.execute("INSERT INTO questions (from_who, content, creation_date, answered, cw) VALUES (%s, %s, %s, %s, %s)", (question["from_who"], question["content"], question["creation_date"], False, question['cw']))
conn.commit()
cursor.close()
conn.close()
return {'message': _("Successfully returned question to inbox.")}, 200
@api_bp.route('/pin_question/', methods=['POST'])
@loginRequired
def pinQuestion():
question_id = request.args.get('question_id', '')
if not question_id:
abort(400, _("Missing 'question_id' attribute or 'question_id' is empty"))
conn = func.connectToDb()
cursor = conn.cursor()
app.logger.debug("[CatAsk/API/pin_question] UPDATE'ing a question to pin it")
cursor.execute("UPDATE questions SET pinned=%s WHERE id=%s", (True, question_id))
conn.commit()
cursor.close()
conn.close()
return {'message': _("Successfully pinned question.")}, 200
@api_bp.route('/unpin_question/', methods=['POST'])
@loginRequired
def unpinQuestion():
question_id = request.args.get('question_id', '')
if not question_id:
abort(400, _("Missing 'question_id' attribute or 'question_id' is empty"))
conn = func.connectToDb()
cursor = conn.cursor()
app.logger.debug("[CatAsk/API/unpin_question] UPDATE'ing a question to unpin it")
cursor.execute("UPDATE questions SET pinned=%s WHERE id=%s", (False, question_id))
conn.commit()
cursor.close()
conn.close()
return {'message': _("Successfully unpinned question.")}, 200
@api_bp.route('/add_answer/', methods=['POST'])
@loginRequired
def addAnswer():
question_id = request.args.get('question_id', '')
answer = request.form.get('answer')
cw = request.form.get('cw', '')
if not question_id:
abort(400, _("Missing 'question_id' attribute or 'question_id' is empty"))
if not answer:
abort(400, _("Missing 'answer' attribute or 'answer' is empty"))
return_val = func.addAnswer(question_id, answer, cw)
if cfg['crosspost']['fediverse']['enabled']:
fedi_crosspost_thread = threading.Thread(target=func.postOnFediverse, name=f"fediverse crosspost thread for {question_id}", args=(question_id, answer, cw,))
fedi_crosspost_thread.start()
if cfg['crosspost']['bluesky']['enabled']:
bsky_crosspost_thread = threading.Thread(target=func.postOnBluesky, name=f"bluesky crosspost thread for {question_id}", args=(question_id, answer, cw,))
bsky_crosspost_thread.start()
return return_val
# -- uploaders --
@api_bp.route('/upload_favicon/', methods=['POST'])
@loginRequired
def uploadFavicon():
favicon = request.files.get('favicon')
if favicon and func.allowedFile(favicon.filename):
filename = secure_filename(favicon.filename)
favicon_path = const.faviconDir / filename
favicon.save(favicon_path)
func.generateFavicon(filename)
return {'message': _("Successfully updated favicon!")}, 201
elif favicon and not func.allowedFile(favicon.filename):
return {'error': _("File type is not supported")}, 400
elif not favicon:
return {'error': _("favicon is not specified")}, 400
@api_bp.route('/upload_emoji/', methods=['POST'])
@loginRequired
def uploadEmoji():
if 'emoji' not in request.files:
return jsonify({'error': _("No file part in the request")}), 400
emoji = request.files['emoji']
if emoji.filename == '':
return jsonify({'error': _("No file selected for uploading")}), 400
if not func.allowedFile(emoji.filename):
return jsonify({'error': _("Invalid file type. Only png, jpg, jpeg, webp, bmp, jxl, gif supported")}), 400
# Secure the filename and determine the archive name
filename = secure_filename(emoji.filename)
emoji_path = const.emojiPath / filename
emoji.save(emoji_path)
return jsonify({'message': _("Emoji {} successfully uploaded").format(filename)}), 201
@api_bp.route('/upload_emoji_pack/', methods=['POST'])
@loginRequired
def uploadEmojiPack():
if 'emoji_archive' not in request.files:
return jsonify({'error': _("No file part in the request")}), 400
emoji_archive = request.files['emoji_archive']
if emoji_archive.filename == '':
return jsonify({'error': _("No file selected for uploading")}), 400
if not func.allowedArchive(emoji_archive.filename):
return jsonify({'error': _("Invalid file type. Only .zip, .tar, .tar.gz, .tar.bz2 allowed")}), 400
filename = secure_filename(emoji_archive.filename)
archive_name = func.stripArchExtension(filename)
extract_path = const.emojiPath / archive_name
extract_path.mkdir(parents=True, exist_ok=True)
archive_path = extract_path / filename
emoji_archive.save(archive_path)
try:
if zipfile.is_zipfile(archive_path):
with zipfile.ZipFile(archive_path, 'r') as zip_ref:
zip_ref.extractall(extract_path)
elif tarfile.is_tarfile(archive_path):
with tarfile.open(archive_path, 'r:*') as tar_ref:
tar_ref.extractall(extract_path)
else:
return jsonify({'error': _("Unsupported archive format")}), 400
# parse meta.json if it exists
meta_json_path = extract_path / 'meta.json'
if meta_json_path.exists():
processed_emojis = func.processEmojis(meta_json_path)
return jsonify({'message': _('Successfully uploaded and processed {} emojis from archive "{}".').format(len(processed_emojis), filename)}), 201
else:
return jsonify({'message': _("Archive {} successfully uploaded and extracted.").format(filename)}), 201
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
archive_path.unlink()
# -- getters --
# this isn't used anywhere yet, but maybe it will in the future
@api_bp.route('/get_emojis/', methods=['GET'])
@loginRequired
def getEmojis():
return func.listEmojis()
@api_bp.route('/hyper/get_emoji_packs/', methods=['GET'])
@loginRequired
def getEmojiPacks():
index = int(request.args.get('index', ''))
packs = func.listEmojiPacks()
pack = packs[index]
rel_path = pack['relative_path']
emojis = pack['emojis']
html = ''
for emoji in emojis:
html += f"""
<div class="mb-2">
<img src="/{ rel_path }/{ emoji['file_name'] }" width="28" height="28" class="emoji" title=":{ emoji['name'] }:">
<span>{ emoji['name'] }</span>
</div>
"""
return html
# -- delete routes --
@api_bp.route('/delete_emoji/', methods=['DELETE'])
@loginRequired
def deleteEmoji():
emoji_name = request.args.get('emoji_name')
emoji_base_path = const.emojiPath
emoji_file_path = emoji_base_path / emoji_name
emoji_ext = os.path.splitext(f"{emoji_base_path}/{emoji_file_path}")
if not emoji_file_path.exists() or not emoji_file_path.is_file():
return jsonify({'error': _("Emoji not found")}), 404
try:
emoji_file_path.unlink()
return jsonify({'message': _('Emoji "{}" deleted successfully').format(emoji_name)}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@api_bp.route('/delete_emoji_pack/', methods=['DELETE'])
@loginRequired
def deleteEmojiPack():
pack_name = request.args.get('pack_name')
emojis_path = const.emojiPath
emoji_base_path = Path.cwd() / 'static' / 'emojis' / pack_name.lower()
if not emoji_base_path.exists() or not emoji_base_path.is_dir():
return jsonify({'error': _('Emoji pack "{}" not found').format(pack_name)}), 404
try:
for json_file in emojis_path.glob(f'{pack_name.lower()}.json'):
json_file.unlink()
shutil.rmtree(emoji_base_path)
return jsonify({'message': _('Emoji pack "{}" deleted successfully.').format(pack_name)}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
# -- misc --
@api_bp.route('/update_blacklist/', methods=['PUT'])
@loginRequired
def updateBlacklist():
action = request.form.get('action')
blacklist = request.form.get('blacklist')
with open(const.blacklistFile, 'w') as file:
file.write(blacklist)
return {'message': _("Blacklist updated!")}, 200
@api_bp.route('/get_question_count/', methods=['GET'])
def getQuestionCount(answered: bool = None, unread: bool = None):
conn = func.connectToDb()
cursor = conn.cursor()
app.logger.debug("[CatAsk/API/get_question_count] SELECT'ing question count from database")
query = "SELECT COUNT(id) FROM questions"
if (answered != None) and not unread:
query += " WHERE answered=%s"
cursor.execute(query, (answered,))
elif (answered != None) and (unread != None):
query += " WHERE answered=%s AND unread=%s"
cursor.execute(query, (answered, unread))
elif (unread != None) and not answered:
query += " WHERE unread=%s"
cursor.execute(query, (unread,))
else:
cursor.execute(query)
question_count = cursor.fetchone()
cursor.close()
conn.close()
return str(question_count['count'])
# -- import/export --
@api_bp.route('/import_data/', methods=['PUT'])
def importData():
os.makedirs(const.tempDir, exist_ok=True)
archive = request.files['import_archive']
filename = secure_filename(archive.filename)
file_path = const.tempDir / filename
archive.save(file_path)
return func.importData(file_path)
@api_bp.route('/import_rs_data/', methods=['PUT'])
def importRsData():
os.makedirs(const.tempDir, exist_ok=True)
archive = request.files['import_archive']
filename = secure_filename(archive.filename)
file_path = const.tempDir / filename
archive.save(file_path)
return func.retrospringImport(file_path)
@api_bp.route('/create_export/', methods=['POST'])
@loginRequired
def createExport():
return func.createExport()
@api_bp.route('/delete_export/', methods=['DELETE'])
@loginRequired
def deleteExport():
timestamp = request.args.get('timestamp', '')
return func.deleteExport(timestamp)
# unused
@api_bp.route('/view_question/', methods=['GET'])
@loginRequired
def viewQuestion():
question_id = request.args.get('question_id', '')
if not question_id:
abort(400, _("Missing 'question_id' attribute or 'question_id' is empty"))
conn = func.connectToDb()
cursor = conn.cursor()
cursor.execute("SELECT id, from_who, creation_date, content, answered, answer_id FROM questions WHERE id=%s", (question_id,))
question = cursor.fetchone()
cursor.close()
conn.close()
return jsonify(question)
@api_bp.route('/view_answer/', methods=['GET'])
@loginRequired
def viewAnswer():
answer_id = request.args.get('answer_id', '')
if not answer_id:
abort(400, _("Missing 'answer_id' attribute or 'answer_id' is empty"))
conn = func.connectToDb()
cursor = conn.cursor()
cursor.execute("SELECT id, question_id, creation_date, content FROM answers WHERE id=%s", (answer_id,))
answer = cursor.fetchone()
cursor.close()
conn.close()
return jsonify(answer)
@api_bp.route('/update_config/', methods=['POST'])
@loginRequired
def updateConfig():
cfg = func.loadJSON(const.configFile)
configuration = request.form
for key, value in configuration.items():
cleaned_key = key.removeprefix('_') if key.startswith('_') else key
nested_keys = cleaned_key.split('.')
current_dict = cfg
for nested_key in nested_keys[:-1]:
current_dict = current_dict.setdefault(nested_key, {})
# Convert the checkbox value 'True'/'False' strings to actual booleans
if value.lower() == 'true':
value = True
elif value.lower() == 'false':
value = False
current_dict[nested_keys[-1]] = value
func.saveJSON(cfg, const.configFile)
app.config.update(cfg)
# loading the config file again to read new values
func.loadJSON(const.configFile)
return {'message': _("Settings saved!")}
app.register_blueprint(api_bp, url_prefix='/api/v1')
app.register_blueprint(admin_bp, url_prefix='/admin')
if __name__ == '__main__':
app.run(debug=True)