mirror of
https://codeberg.org/catask-org/catask.git
synced 2025-04-16 12:13:42 -05:00
916 lines
32 KiB
Python
916 lines
32 KiB
Python
from flask import url_for, request, jsonify, Flask, abort, session
|
|
from flask_babel import Babel, _, refresh
|
|
from markupsafe import Markup
|
|
from bleach.sanitizer import Cleaner
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from mistune import HTMLRenderer, escape
|
|
from PIL import Image
|
|
from psycopg.rows import dict_row
|
|
import traceback
|
|
import base64
|
|
import time
|
|
import zipfile
|
|
import shutil
|
|
import subprocess
|
|
import mistune
|
|
import humanize
|
|
import psycopg
|
|
import re
|
|
import os
|
|
import random
|
|
import json
|
|
import requests
|
|
import constants as const
|
|
|
|
app = Flask(const.appName)
|
|
|
|
app.config['BABEL_DEFAULT_LOCALE'] = 'en'
|
|
app.config['BABEL_TRANSLATION_DIRECTORIES'] = 'locales'
|
|
# refreshing locale
|
|
refresh()
|
|
|
|
# update this once more languages are supported
|
|
app.config['available_languages'] = {
|
|
"en_US": _("English (US)"),
|
|
"ru_RU": _("Russian")
|
|
}
|
|
|
|
def getLocale():
|
|
if not session.get('language'):
|
|
app.config.update(cfg)
|
|
session['language'] = cfg['languages']['default']
|
|
return session.get('language')
|
|
|
|
babel = Babel(app, locale_selector=getLocale)
|
|
|
|
# load json file
|
|
def loadJSON(file_path):
|
|
# open the file
|
|
path = Path.cwd() / file_path
|
|
with open(path, 'r', encoding="utf-8") as file:
|
|
# return loaded file
|
|
return json.load(file)
|
|
|
|
# save json file
|
|
def saveJSON(dict, file_path):
|
|
# open the file
|
|
path = Path.cwd() / file_path
|
|
with open(path, 'w', encoding="utf-8") as file:
|
|
# dump the contents
|
|
json.dump(dict, file, indent=4)
|
|
|
|
# append to a json file
|
|
def appendToJSON(new_data, file_path) -> bool:
|
|
try:
|
|
# open the file
|
|
path = Path(file_path)
|
|
if not path.is_file():
|
|
with open(path, 'w', encoding="utf-8") as file:
|
|
json.dump([], file)
|
|
|
|
with open(path, 'r+', encoding="utf-8") as file:
|
|
file_data = json.load(file)
|
|
file_data.append(new_data)
|
|
file.seek(0)
|
|
json.dump(file_data, file, indent=4)
|
|
return True
|
|
except Exception as e:
|
|
app.logger.error(f"[appendToJSON] {str(e)}")
|
|
return False
|
|
|
|
cfg = loadJSON(const.configFile)
|
|
|
|
def formatRelativeTime(date_str: str) -> str:
|
|
date_format = "%Y-%m-%d %H:%M:%S"
|
|
past_date = datetime.strptime(date_str, date_format).replace(tzinfo=None)
|
|
|
|
now = datetime.now()
|
|
time_difference = now - past_date
|
|
|
|
return humanize.naturaltime(time_difference)
|
|
|
|
def formatRelativeTime2(date_str: str) -> str:
|
|
date_format = "%Y-%m-%dT%H:%M:%SZ"
|
|
|
|
past_date = None
|
|
try:
|
|
if date_str:
|
|
past_date = datetime.strptime(date_str, date_format)
|
|
else:
|
|
pass
|
|
except ValueError:
|
|
pass
|
|
|
|
if past_date is None:
|
|
return ''
|
|
# raise ValueError("Date string does not match any supported format.")
|
|
|
|
if past_date.tzinfo is None:
|
|
past_date = past_date.replace(tzinfo=timezone.utc)
|
|
|
|
now = datetime.now(timezone.utc)
|
|
|
|
time_difference = now - past_date
|
|
|
|
return humanize.naturaltime(time_difference)
|
|
|
|
dbHost = os.environ.get("DB_HOST")
|
|
dbUser = os.environ.get("DB_USER")
|
|
dbPass = os.environ.get("DB_PASS")
|
|
dbName = os.environ.get("DB_NAME")
|
|
dbPort = os.environ.get("DB_PORT")
|
|
if not dbPort:
|
|
dbPort = 3306
|
|
|
|
def createDatabase(cursor, dbName) -> None:
|
|
try:
|
|
cursor.execute("CREATE DATABASE {} OWNER {}".format(dbName, dbUser))
|
|
print(f"Database {dbName} created successfully")
|
|
except psycopg.Error as error:
|
|
print("Failed to create database:", error)
|
|
exit(1)
|
|
|
|
def connectToDb():
|
|
# using dict_row factory here because its easier than modifying now-legacy mysql code
|
|
return psycopg.connect(f"postgresql://{dbUser}:{dbPass}@{dbHost}/{dbName}", row_factory=dict_row)
|
|
|
|
def getQuestion(question_id: int) -> dict:
|
|
try:
|
|
conn = connectToDb()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM questions WHERE id=%s", (question_id,))
|
|
question = cursor.fetchone()
|
|
|
|
if not question:
|
|
return abort(404)
|
|
|
|
question['creation_date'] = question['creation_date'].replace(microsecond=0).replace(tzinfo=None)
|
|
|
|
return question
|
|
|
|
finally:
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
def getAllQuestions(limit: int = None, offset: int = None) -> dict:
|
|
conn = connectToDb()
|
|
cursor = conn.cursor()
|
|
|
|
app.logger.debug("[CatAsk/functions/getAllQuestions] SELECT'ing all questions with latest answers")
|
|
|
|
query = """
|
|
SELECT q.*, a.creation_date AS latest_answer_date
|
|
FROM questions q
|
|
LEFT JOIN (
|
|
SELECT question_id, MAX(creation_date) AS creation_date
|
|
FROM answers
|
|
GROUP BY question_id
|
|
) a ON q.id = a.question_id
|
|
WHERE q.answered = %s
|
|
ORDER BY q.pinned DESC, (a.creation_date IS NULL), a.creation_date DESC, q.creation_date DESC
|
|
"""
|
|
|
|
params = [True]
|
|
if limit is not None:
|
|
query += " LIMIT %s"
|
|
params.append(limit)
|
|
if offset is not None:
|
|
query += " OFFSET %s"
|
|
params.append(offset)
|
|
|
|
cursor.execute(query, tuple(params))
|
|
questions = cursor.fetchall()
|
|
|
|
app.logger.debug("[CatAsk/functions/getAllQuestions] SELECT'ing answers")
|
|
cursor.execute("SELECT * FROM answers ORDER BY creation_date DESC")
|
|
answers = cursor.fetchall()
|
|
|
|
metadata = generateMetadata()
|
|
|
|
combined = []
|
|
for question in questions:
|
|
question['creation_date'] = question['creation_date'].replace(microsecond=0).replace(tzinfo=None)
|
|
for answer in answers:
|
|
answer['creation_date'] = answer['creation_date'].replace(microsecond=0).replace(tzinfo=None)
|
|
question_answers = [answer for answer in answers if answer['question_id'] == question['id']]
|
|
combined.append({
|
|
'question': question,
|
|
'answers': question_answers
|
|
})
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return combined, metadata
|
|
|
|
|
|
def addQuestion(from_who: str, question: str, cw: str, noAntispam: bool = False) -> dict:
|
|
|
|
if cfg['antispam']['type'] == 'basic':
|
|
antispam = request.form.get('antispam', '')
|
|
elif cfg['antispam']['type'] == 'recaptcha':
|
|
antispam = request.form.get('g-recaptcha-response', '')
|
|
elif cfg['antispam']['type'] == 'turnstile':
|
|
antispam = request.form.get('cf-turnstile-response', '')
|
|
elif cfg['antispam']['type'] == 'frc':
|
|
antispam = request.form.get('frc-captcha-response', '')
|
|
|
|
if cfg['antispam']['enabled'] and not noAntispam:
|
|
|
|
if cfg['antispam']['type'] == 'basic':
|
|
if not antispam:
|
|
abort(400, "Anti-spam word must not be empty")
|
|
|
|
antispam_wordlist = readPlainFile(const.antiSpamFile, split=True)
|
|
antispam_valid = antispam in antispam_wordlist
|
|
if not antispam_valid:
|
|
# return a generic error message so bad actors wouldn't figure out the antispam list
|
|
return {'error': _('An error has occurred')}, 500
|
|
# it's probably bad to hardcode the siteverify urls, but meh, that will do for now
|
|
elif cfg['antispam']['type'] == 'recaptcha':
|
|
r = requests.post(
|
|
'https://www.google.com/recaptcha/api/siteverify',
|
|
data={'response': antispam, 'secret': cfg['antispam']['recaptcha']['secretkey']}
|
|
)
|
|
json_r = r.json()
|
|
success = json_r['success']
|
|
if not success:
|
|
return {'error': _('An error has occurred')}, 500
|
|
elif cfg['antispam']['type'] == 'turnstile':
|
|
r = requests.post(
|
|
'https://challenges.cloudflare.com/turnstile/v0/siteverify',
|
|
data={'response': antispam, 'secret': cfg['antispam']['turnstile']['secretkey']}
|
|
)
|
|
json_r = r.json()
|
|
success = json_r['success']
|
|
if not success:
|
|
return {'error': _('An error has occurred')}, 500
|
|
elif cfg['antispam']['type'] == 'frc':
|
|
url = 'https://global.frcapi.com/api/v2/captcha/siteverify'
|
|
headers = {'X-API-Key': cfg['antispam']['frc']['apikey']}
|
|
data = {'response': antispam, 'sitekey': cfg['antispam']['frc']['sitekey']}
|
|
r = requests.post(url, data=data, headers=headers)
|
|
json_r = r.json()
|
|
success = json_r['success']
|
|
if not success:
|
|
return {'error': _('An error has occurred')}, 500
|
|
|
|
blacklist = readPlainFile(const.blacklistFile, split=True)
|
|
|
|
for bad_word in blacklist:
|
|
if bad_word in question or bad_word in from_who:
|
|
# return a generic error message so bad actors wouldn't figure out the blacklist
|
|
return {'error': _('An error has occurred')}, 500
|
|
|
|
conn = connectToDb()
|
|
cursor = conn.cursor()
|
|
|
|
app.logger.debug("[CatAsk/API/add_question] INSERT'ing new question into database")
|
|
|
|
cursor.execute("INSERT INTO questions (from_who, content, answered, cw) VALUES (%s, %s, %s, %s) RETURNING id", (from_who, question, False, cw))
|
|
question_id = cursor.fetchone()['id']
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return {'message': _('Question asked successfully!')}, 201, question_id
|
|
|
|
def getAnswer(question_id: int) -> dict:
|
|
conn = connectToDb()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM answers WHERE question_id=%s", (question_id,))
|
|
answer = cursor.fetchone()
|
|
answer['creation_date'] = answer['creation_date'].replace(microsecond=0).replace(tzinfo=None)
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return answer
|
|
|
|
def addAnswer(question_id: int, answer: str, cw: str) -> dict:
|
|
conn = connectToDb()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
app.logger.debug("[CatAsk/API/add_answer] INSERT'ing an answer into database")
|
|
|
|
cursor.execute("INSERT INTO answers (question_id, content, cw) VALUES (%s, %s, %s) RETURNING id", (question_id, answer, cw))
|
|
answer_id = cursor.fetchone()['id']
|
|
|
|
app.logger.debug("[CatAsk/API/add_answer] UPDATE'ing question to set answered and answer_id")
|
|
|
|
cursor.execute("UPDATE questions SET answered=%s, answer_id=%s WHERE id=%s", (True, answer_id, question_id))
|
|
conn.commit()
|
|
|
|
# except Exception as e:
|
|
# conn.rollback()
|
|
# app.logger.error(e)
|
|
# return jsonify({'error': str(e)}), 500
|
|
finally:
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return jsonify({'message': _('Answer added successfully!')}), 201
|
|
|
|
if cfg['crosspost']['fediverse']['enabled']:
|
|
from mastodon import Mastodon
|
|
|
|
app.logger.debug("Initializing Mastodon.py client...")
|
|
|
|
fedi_client = Mastodon(
|
|
client_id = cfg['crosspost']['fediverse']['client']['id'],
|
|
client_secret = cfg['crosspost']['fediverse']['client']['secret'],
|
|
api_base_url = 'https://' + cfg['crosspost']['fediverse']['instance'],
|
|
access_token = cfg['crosspost']['fediverse']['token']
|
|
)
|
|
|
|
if cfg['crosspost']['bluesky']['enabled']:
|
|
from atproto import Client, client_utils
|
|
|
|
app.logger.debug("[postOnBluesky] Initializing ATProto client...")
|
|
|
|
bsky_client = Client()
|
|
bsky_client.login(cfg['crosspost']['bluesky']['handle'], cfg['crosspost']['bluesky']['appPassword'])
|
|
|
|
def postOnFediverse(question_id: int, answer: str, cw: str) -> None:
|
|
global fedi_client
|
|
|
|
# reloading config file
|
|
cfg = loadJSON(const.configFile)
|
|
|
|
conn = connectToDb()
|
|
cursor = conn.cursor()
|
|
|
|
app.logger.debug(f"[postOnFediverse] Grabbing question {question_id} from database...")
|
|
|
|
cursor.execute("SELECT id, content FROM questions WHERE id=%s", (question_id,))
|
|
question = cursor.fetchone()
|
|
|
|
warning = f"{cfg['crosspost']['fediverse']['cw']}"
|
|
|
|
if question.get('cw') and not cw:
|
|
warning += f", {question['cw']}"
|
|
|
|
elif question.get('cw') and cw and (question['cw'] != cw):
|
|
warning += f", {question['cw']}, {cw}"
|
|
|
|
elif (question.get('cw') and cw) and question['cw'] == cw:
|
|
warning += f", {cw}"
|
|
|
|
elif cw and not question.get('cw'):
|
|
warning += f", {cw}"
|
|
|
|
post_content = f"{question['content']} — {answer} {cfg['instance']['fullBaseUrl']}/q/{question['id']}/"
|
|
fedi_client.status_post(post_content, visibility=cfg['crosspost']['fediverse']['visibility'], spoiler_text=warning)
|
|
|
|
app.logger.debug(f"[postOnFediverse] Made Fediverse post: {post_content}")
|
|
|
|
def postOnBluesky(question_id: int, answer: str, cw: str) -> None:
|
|
global bsky_client
|
|
|
|
# reloading config file
|
|
cfg = loadJSON(const.configFile)
|
|
|
|
conn = connectToDb()
|
|
cursor = conn.cursor()
|
|
|
|
app.logger.debug(f"[postOnBluesky] Grabbing question {question_id} from database...")
|
|
|
|
cursor.execute("SELECT id, content, cw FROM questions WHERE id=%s", (question_id,))
|
|
question = cursor.fetchone()
|
|
|
|
if question.get('cw') and not cw:
|
|
warning = f"[CW: {question['cw']}]\n\n"
|
|
|
|
elif question.get('cw') and cw and (question['cw'] != cw):
|
|
warning = f"[CW: {question['cw']}, {cw}]\n\n"
|
|
|
|
elif question['cw'] == cw:
|
|
warning = f"[CW: {cw}]\n\n"
|
|
|
|
elif cw and not question.get('cw'):
|
|
warning = f"[CW: {cw}]\n\n"
|
|
|
|
else:
|
|
warning = ""
|
|
|
|
# warning = f"{ '[CW:' if cw or question.get('cw') else '' }{ ' ' + question['cw'] if question.get('cw') else '' }{ ', ' + cw if cw else '' }{']\n\n' if cw or question.get('cw') else '' }"
|
|
|
|
text_builder = client_utils.TextBuilder()
|
|
text_builder.text(f"{warning}{question['content']} — {answer} ")
|
|
text_builder.link(f"{cfg['instance']['fullBaseUrl']}/q/{question['id']}/", f"{cfg['instance']['fullBaseUrl']}/q/{question['id']}/")
|
|
bsky_client.send_post(text_builder)
|
|
|
|
app.logger.debug(f"[postOnBluesky] Made Bluesky post: {text_builder.build_text()}")
|
|
|
|
def ntfySend(cw, return_val, from_who, question) -> None:
|
|
app.logger.debug("[CatAsk/functions/ntfySend] started ntfy flow")
|
|
ntfy_cw = f" [CW: {cw}]" if cw else ""
|
|
ntfy_host = cfg['ntfy']['host']
|
|
ntfy_topic = cfg['ntfy']['topic']
|
|
question_id = return_val[2]
|
|
# doesn't work otherwise
|
|
from_who = from_who if from_who else cfg['anonName']
|
|
|
|
if cfg['ntfy']['user'] and cfg['ntfy']['pass']:
|
|
ntfy_user = cfg['ntfy']['user']
|
|
ntfy_pass = cfg['ntfy']['pass']
|
|
ascii_auth = f"{ntfy_user}:{ntfy_pass}".encode('ascii')
|
|
b64_auth = base64.b64encode(ascii_auth)
|
|
# there's probably a better way to do this without duplicated code
|
|
headers={
|
|
"Authorization": f"Basic {b64_auth.decode('ascii')}",
|
|
"Title": f"New question from {from_who}{ntfy_cw}",
|
|
"Actions": f"view, View question, {cfg['instance']['fullBaseUrl']}/inbox/#question-{question_id}",
|
|
"Tags": "question"
|
|
}
|
|
else:
|
|
headers={
|
|
"Title": f"New question from {from_who}{ntfy_cw}",
|
|
"Actions": f"view, View question, {cfg['instance']['fullBaseUrl']}/inbox/#question-{question_id}",
|
|
"Tags": "question"
|
|
}
|
|
|
|
r = requests.put(
|
|
f"{ntfy_host}/{ntfy_topic}".encode('utf-8'),
|
|
data=trimContent(question, int(cfg['trimContentAfter'])),
|
|
headers=headers
|
|
)
|
|
app.logger.debug("[CatAsk/functions/ntfySend] finished ntfy flow")
|
|
|
|
def readPlainFile(file, split=False):
|
|
if os.path.exists(file):
|
|
with open(file, 'r', encoding="utf-8") as file:
|
|
if split:
|
|
return file.read().splitlines()
|
|
else:
|
|
return file.read()
|
|
else:
|
|
return []
|
|
|
|
def savePlainFile(file, contents) -> None:
|
|
with open(file, 'w') as file:
|
|
file.write(contents)
|
|
|
|
def getRandomWord() -> str:
|
|
items = readPlainFile(const.antiSpamFile, split=True)
|
|
return random.choice(items)
|
|
|
|
def trimContent(var, trim) -> str:
|
|
trim = int(trim)
|
|
if trim > 0:
|
|
trimmed = var[:trim] + '…' if len(var) >= trim else var
|
|
trimmed = trimmed.rstrip()
|
|
return trimmed
|
|
else:
|
|
return var
|
|
|
|
# mistune plugin
|
|
inlineBtnPattern = r'\[btn\](?P<button_text>.+?)\[/btn\]'
|
|
|
|
def parse_inline_button(inline, m, state):
|
|
text = m.group("button_text")
|
|
state.append_token({"type": "inline_button", "raw": text})
|
|
return m.end()
|
|
|
|
def render_inline_button(renderer, text):
|
|
return f"<button class='btn btn-secondary' type='button'>{text}</button>"
|
|
|
|
def button(md):
|
|
md.inline.register('inline_button', inlineBtnPattern, parse_inline_button, before='link')
|
|
if md.renderer and md.renderer.NAME == 'html':
|
|
md.renderer.register('inline_button', render_inline_button)
|
|
|
|
# Base directory where emoji packs are stored
|
|
EMOJI_BASE_PATH = Path.cwd() / 'static' / 'emojis'
|
|
|
|
emoji_cache = {}
|
|
|
|
def to_snake_case(name):
|
|
name = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name)
|
|
return re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', name).lower()
|
|
|
|
def find_emoji_path(emoji_name):
|
|
if '_' in emoji_name:
|
|
head, sep, tail = emoji_name.partition('_')
|
|
else:
|
|
head = to_snake_case(emoji_name).split('_')[0]
|
|
if any(Path(EMOJI_BASE_PATH).glob(f'{head}.json')):
|
|
for json_file in Path(EMOJI_BASE_PATH).glob('*.json'):
|
|
app.logger.debug("[CatAsk/functions/find_emoji_path] Using JSON meta file")
|
|
pack_data = loadJSON(json_file)
|
|
emojis = pack_data.get('emojis', [])
|
|
|
|
for emoji in emojis:
|
|
if emoji['name'] == emoji_name:
|
|
rel_dir = json_file.stem
|
|
emoji_path = os.path.join('static/emojis', rel_dir, emoji['file_name'])
|
|
emoji_cache[emoji_name] = emoji_path
|
|
return emoji_path
|
|
|
|
else:
|
|
for address, dirs, files in os.walk(EMOJI_BASE_PATH):
|
|
app.logger.debug("[CatAsk/functions/find_emoji_path] Falling back to scanning directories")
|
|
for file in files:
|
|
if os.path.splitext(file)[0] == emoji_name: # Check if the filename matches the emoji_name
|
|
rel_dir = os.path.relpath(address, EMOJI_BASE_PATH)
|
|
emoji_path = os.path.join("static/emojis", rel_dir, file) # Use the actual file name
|
|
emoji_cache[emoji_name] = emoji_path
|
|
return emoji_path
|
|
|
|
return None
|
|
|
|
emojiPattern = r':(?P<emoji_name>[a-zA-Z0-9_]+):'
|
|
|
|
def parse_emoji(inline, m, state):
|
|
emoji_name = m.group("emoji_name")
|
|
state.append_token({"type": "emoji", "raw": emoji_name})
|
|
return m.end()
|
|
|
|
def render_emoji(renderer, emoji_name):
|
|
emoji_path = find_emoji_path(emoji_name)
|
|
|
|
if emoji_path:
|
|
absolute_emoji_path = url_for('static', filename=emoji_path.replace('static/', ''))
|
|
return f"<img src='{absolute_emoji_path}' alt=':{emoji_name}:' title=':{emoji_name}:' class='emoji' loading='lazy' width='28' height='28' />"
|
|
|
|
return f":{emoji_name}:"
|
|
|
|
def emoji(md):
|
|
md.inline.register('emoji', emojiPattern, parse_emoji, before='link')
|
|
|
|
if md.renderer and md.renderer.NAME == 'html':
|
|
md.renderer.register('emoji', render_emoji)
|
|
|
|
def listEmojis() -> list:
|
|
emojis = []
|
|
emoji_base_path = Path.cwd() / 'static' / 'emojis'
|
|
os.makedirs(emoji_base_path, exist_ok=True)
|
|
|
|
# Iterate over files that are directly in the emoji base path (not in subdirectories)
|
|
for file in emoji_base_path.iterdir():
|
|
# Only include files, not directories
|
|
if file.is_file() and file.suffix in {'.png', '.jpg', '.jpeg', '.webp', '.gif'}:
|
|
# Get the relative path and name for the emoji
|
|
relative_path = os.path.relpath(file, emoji_base_path)
|
|
emojis.append({
|
|
'name': file.stem, # Get the file name without the extension
|
|
'image': os.path.join('static/emojis', relative_path), # Full relative path for image
|
|
'relative_path': relative_path
|
|
})
|
|
|
|
return emojis
|
|
|
|
def listEmojiPacks() -> list:
|
|
emoji_packs = []
|
|
emoji_base_path = const.emojiPath
|
|
|
|
# Iterate through all directories in the emoji base path
|
|
for pack_dir in emoji_base_path.iterdir():
|
|
if pack_dir.is_dir():
|
|
relative_path = os.path.relpath(pack_dir, emoji_base_path)
|
|
|
|
# Check if a meta.json file exists in the directory
|
|
meta_json_path = const.emojiPath / f"{pack_dir}.json"
|
|
if meta_json_path.exists():
|
|
app.logger.debug(f"[CatAsk/functions/listEmojiPacks] Using meta.json file ({meta_json_path})")
|
|
# Load data from the meta.json file
|
|
pack_data = loadJSON(meta_json_path)
|
|
|
|
emoji_packs.append({
|
|
'name': pack_data.get('name', pack_dir.name),
|
|
'exportedAt': pack_data.get('exportedAt', 'Unknown'),
|
|
'preview_image': pack_data.get('preview_image', ''),
|
|
'website': pack_data.get('website', ''),
|
|
'relative_path': f'static/emojis/{relative_path}',
|
|
'emojis': pack_data.get('emojis', [])
|
|
})
|
|
else:
|
|
app.logger.debug(f"[CatAsk/functions/listEmojiPacks] Falling back to directory scan ({pack_dir})")
|
|
# If no meta.json is found, fall back to directory scan
|
|
preview_image = None
|
|
# Find the first image in the directory for preview
|
|
for file in pack_dir.iterdir():
|
|
if file.suffix in {'.png', '.jpg', '.jpeg', '.webp', '.gif'}:
|
|
preview_image = os.path.join('static/emojis', relative_path, file.name)
|
|
break
|
|
|
|
# Append pack info without meta.json
|
|
emoji_packs.append({
|
|
'name': pack_dir.name,
|
|
'preview_image': preview_image,
|
|
'relative_path': f'static/emojis/{relative_path}'
|
|
})
|
|
|
|
return emoji_packs
|
|
|
|
|
|
def processEmojis(meta_json_path) -> list:
|
|
emoji_metadata = loadJSON(meta_json_path)
|
|
emojis = emoji_metadata.get('emojis', [])
|
|
pack_name = emoji_metadata['emojis'][0]['emoji']['category']
|
|
exported_at = emoji_metadata.get('exportedAt', 'Unknown')
|
|
website = emoji_metadata.get('host', '')
|
|
preview_image = os.path.join('static/emojis', pack_name.lower(), emoji_metadata['emojis'][0]['fileName'])
|
|
relative_path = os.path.join('static/emojis', pack_name.lower())
|
|
|
|
processed_emojis = []
|
|
for emoji in emojis:
|
|
emoji_info = {
|
|
'name': emoji['emoji']['name'],
|
|
'file_name': emoji['fileName'],
|
|
}
|
|
processed_emojis.append(emoji_info)
|
|
app.logger.debug(f"[CatAsk/API/upload_emoji_pack] Processed emoji: {emoji_info['name']}\t(File: {emoji_info['file_name']})")
|
|
|
|
# Create the pack info structure
|
|
pack_info = {
|
|
'name': pack_name,
|
|
'exportedAt': exported_at,
|
|
'preview_image': preview_image,
|
|
'relative_path': relative_path,
|
|
'website': website,
|
|
'emojis': processed_emojis
|
|
}
|
|
|
|
# Save the combined pack info to <pack_name>.json
|
|
pack_json_name = const.emojiPath / f"{pack_name.lower()}.json"
|
|
saveJSON(pack_info, pack_json_name)
|
|
|
|
return processed_emojis
|
|
|
|
def renderMarkdown(text: str, allowed_tags: bool = None) -> str:
|
|
plugins = [
|
|
'strikethrough',
|
|
button,
|
|
emoji
|
|
]
|
|
if not allowed_tags:
|
|
allowed_tags = [
|
|
'p',
|
|
'em',
|
|
'b',
|
|
'strong',
|
|
'i',
|
|
'br',
|
|
's',
|
|
'del',
|
|
'a',
|
|
'button',
|
|
'ol',
|
|
'li',
|
|
'hr',
|
|
'img',
|
|
'code',
|
|
'pre'
|
|
]
|
|
allowed_attrs = {
|
|
'a': 'href',
|
|
'button': 'class',
|
|
'img': {
|
|
'class': lambda value: value == "emoji",
|
|
'src': True, # Allow specific attributes on emoji images
|
|
'alt': True,
|
|
'title': True,
|
|
'width': True,
|
|
'height': True,
|
|
'loading': True
|
|
}
|
|
}
|
|
# hard_wrap=True means that newlines will be
|
|
# converted into <br> tags
|
|
#
|
|
# yes, markdown usually lets you make line breaks only
|
|
# with 2 spaces or <br> tag, but hard_wrap is enabled to keep
|
|
# sanity of whoever will use this software
|
|
# (after all, not everyone knows markdown syntax)
|
|
md = mistune.create_markdown(
|
|
escape=True,
|
|
plugins=plugins,
|
|
hard_wrap=True
|
|
)
|
|
html = md(text)
|
|
cleaner = Cleaner(tags=allowed_tags, attributes=allowed_attrs)
|
|
clean_html = cleaner.clean(html)
|
|
return Markup(clean_html)
|
|
|
|
def generateMetadata(question: str = None, answer: str = None) -> dict:
|
|
metadata = {
|
|
'title': cfg['instance']['title'],
|
|
'description': cfg['instance']['description'],
|
|
'url': cfg['instance']['fullBaseUrl'],
|
|
'image': cfg['instance']['image']
|
|
}
|
|
|
|
# if question is specified, generate metadata for that question
|
|
if question and answer:
|
|
metadata.update({
|
|
'title': trimContent(question['content'], 150) + " | " + cfg['instance']['title'],
|
|
'description': trimContent(answer['content'], 150),
|
|
'url': cfg['instance']['fullBaseUrl'] + url_for('viewQuestion', question_id=question['id']),
|
|
'image': cfg['instance']['image']
|
|
})
|
|
|
|
# return 'metadata' dictionary
|
|
return metadata
|
|
|
|
allowedFileExtensions = {'png', 'jpg', 'jpeg', 'webp', 'bmp', 'jxl', 'gif'}
|
|
allowedArchiveExtensions = {'zip', 'tar', 'gz', 'bz2', 'xz'}
|
|
|
|
def allowedFile(filename: str) -> bool:
|
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in allowedFileExtensions
|
|
|
|
def allowedArchive(filename: str) -> bool:
|
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in allowedArchiveExtensions
|
|
|
|
def stripArchExtension(filename: str) -> str:
|
|
if filename.endswith(('.tar.gz', '.tar.bz2', '.tar.xz')):
|
|
filename = filename.rsplit('.', 2)[0]
|
|
else:
|
|
filename = filename.rsplit('.', 1)[0]
|
|
return filename
|
|
|
|
def generateFavicon(file_name: str) -> None:
|
|
sizes = {
|
|
'apple-touch-icon.png': (180, 180),
|
|
'android-chrome-192x192.png': (192, 192),
|
|
'android-chrome-512x512.png': (512, 512),
|
|
'favicon-32x32.png': (32, 32),
|
|
'favicon-16x16.png': (16, 16),
|
|
'favicon.ico': (16, 16)
|
|
}
|
|
|
|
img = Image.open(const.faviconDir / file_name)
|
|
|
|
if not os.path.exists(const.faviconDir):
|
|
os.makedirs(const.faviconDir)
|
|
|
|
for filename, size in sizes.items():
|
|
resized_img = img.resize(size)
|
|
resized_img_absolute_path = const.faviconDir / filename
|
|
resized_img.save(resized_img_absolute_path)
|
|
|
|
def createExport() -> dict:
|
|
try:
|
|
# just to test if connection works
|
|
conn = connectToDb()
|
|
conn.close()
|
|
|
|
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
|
timestamp_morereadable = datetime.now().strftime('%b %d, %Y %H:%M')
|
|
export_dir = const.exportsDir
|
|
temp_dir = const.tempDir
|
|
os.makedirs(export_dir, exist_ok=True)
|
|
os.makedirs(temp_dir, exist_ok=True)
|
|
|
|
config_dest_path = temp_dir / const.configFile
|
|
shutil.copy(const.configFile, config_dest_path)
|
|
|
|
# Export database to SQL file
|
|
dump_file = temp_dir / 'database.sql'
|
|
result = subprocess.Popen(
|
|
f'pg_dump -h {dbHost} -U {dbUser} -d {dbName} -F c -E UTF8 -f {dump_file}',
|
|
stdin=subprocess.PIPE,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
encoding="utf-8",
|
|
env=dict(os.environ, PGPASSWORD=dbPass)
|
|
)
|
|
|
|
# absolutely dumb workaround for an error
|
|
time.sleep(1)
|
|
|
|
# Create export zip archive
|
|
zip_file_path = export_dir / f'export-{timestamp}.zip'
|
|
with zipfile.ZipFile(zip_file_path, 'w') as export_zip:
|
|
export_zip.write(config_dest_path, arcname=const.configFile)
|
|
export_zip.write(dump_file, arcname='database.sql')
|
|
|
|
# Add favicon and emojis folders to the zip archive
|
|
favicon_dir = Path('static/icons/favicon')
|
|
emojis_dir = Path('static/emojis')
|
|
|
|
if favicon_dir.exists():
|
|
for root, unused, files in os.walk(favicon_dir):
|
|
for file_ in files:
|
|
file_path = Path(root) / file_
|
|
export_zip.write(file_path, arcname=file_path.relative_to(favicon_dir.parent.parent))
|
|
|
|
if emojis_dir.exists():
|
|
for root, unused, files in os.walk(emojis_dir):
|
|
for file_ in files:
|
|
file_path = Path(root) / file_
|
|
export_zip.write(file_path, arcname=file_path.relative_to(emojis_dir.parent))
|
|
|
|
# Record export metadata
|
|
export_data = {
|
|
'timestamp_esc': timestamp,
|
|
'timestamp': timestamp_morereadable,
|
|
'downloadPath': str(zip_file_path)
|
|
}
|
|
appendToJSON(export_data, const.exportsFile)
|
|
shutil.rmtree(temp_dir)
|
|
|
|
return {'message': _('Export created successfully!')}
|
|
except psycopg.Error as e:
|
|
app.logger.error(f"[psycopg.Error] {traceback.format_exc()}")
|
|
return {'error': str(e)}, 500
|
|
except Exception as e:
|
|
app.logger.error(f"[Exception] {traceback.format_exc()}")
|
|
return {'error': str(e)}, 500
|
|
|
|
|
|
def importData(export_file) -> dict:
|
|
try:
|
|
shutil.unpack_archive(export_file, const.tempDir)
|
|
|
|
# Replace config file
|
|
os.remove(const.configFile)
|
|
shutil.move(const.tempDir / const.configFile, Path.cwd() / const.configFile)
|
|
|
|
# Replace favicon and emojis folders
|
|
favicon_dest = Path('static/icons/favicon')
|
|
emojis_dest = Path('static/emojis')
|
|
|
|
shutil.rmtree(favicon_dest)
|
|
shutil.copytree(const.tempDir / 'icons' / 'favicon', favicon_dest)
|
|
|
|
shutil.rmtree(emojis_dest)
|
|
shutil.copytree(const.tempDir / 'emojis', emojis_dest)
|
|
|
|
# Restore database from SQL file
|
|
conn = connectToDb()
|
|
cursor = conn.cursor()
|
|
|
|
dump_file = const.tempDir / 'database.sql'
|
|
process = subprocess.Popen(
|
|
f'pg_restore -h {dbHost} --clean -U {dbUser} -d {dbName} {dump_file}',
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
encoding="utf-8",
|
|
env=dict(os.environ, PGPASSWORD=dbPass)
|
|
)
|
|
shutil.rmtree(const.tempDir)
|
|
|
|
return {'message': _('Data imported successfully!')}
|
|
except Exception as e:
|
|
return {'error': str(e)}, 500
|
|
|
|
# will probably get to it in 1.8.0 because my brain can't do it rn
|
|
# 2.1.0 maybe -1/12/25
|
|
"""
|
|
def retrospringImport(export_file):
|
|
shutil.unpack_archive(export_file, const.tempDir)
|
|
# probably a hack but whateva
|
|
export_dirname = Path(export_file).stem
|
|
export_dir = const.tempDir / export_dirname
|
|
|
|
conn = connectToDb()
|
|
cursor = conn.cursor()
|
|
|
|
questions_file = loadJSON(export_dir / 'questions.json')
|
|
answers_file = loadJSON(export_dir / 'answers.json')
|
|
|
|
# Extract answers list
|
|
questions_list = questions_file.get('questions', [])
|
|
answers_list = answers_file.get('answers', [])
|
|
# ['related']['question']['anonymous']
|
|
|
|
for question in questions_list:
|
|
# addQuestion(answer['related']['question']['anonymous'], question['content'], None, noAntispam=True)
|
|
for answer in answers_list:
|
|
print("anonymous:", answer['related']['question']['anonymous'])
|
|
print(question['id'], answer['content'], None)
|
|
# addAnswer(question['id'], answer['content'], None)
|
|
|
|
# shutil.rmtree(const.tempDir)
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
"""
|
|
|
|
def deleteExport(timestamp: str) -> dict:
|
|
try:
|
|
export_file = Path('static') / 'exports' / f'export-{timestamp}.zip'
|
|
data = loadJSON(const.exportsFile)
|
|
data = [export for export in data if export["timestamp_esc"] != timestamp]
|
|
export_file.unlink()
|
|
saveJSON(data, const.exportsFile)
|
|
return {'message': _('Export {} deleted successfully.').format(timestamp)}
|
|
except Exception as e:
|
|
return {'error': str(e)}, 500
|
|
|
|
# reserved for 1.7.0 or later
|
|
"""
|
|
def getUserIp():
|
|
if request.environ.get('HTTP_X_FORWARDED_FOR') is None:
|
|
return request.environ['REMOTE_ADDR']
|
|
else:
|
|
return request.environ['HTTP_X_FORWARDED_FOR']
|
|
|
|
def isIpBlacklisted(user_ip):
|
|
blacklist = readPlainFile(const.ipBlacklistFile, split=True)
|
|
return user_ip in blacklist
|
|
"""
|