catask/functions.py
2025-03-28 21:11:31 +03:00

916 lines
32 KiB
Python

from flask import url_for, request, jsonify, Flask, abort, session
from flask_babel import Babel, _, refresh
from markupsafe import Markup
from bleach.sanitizer import Cleaner
from datetime import datetime, timezone
from pathlib import Path
from mistune import HTMLRenderer, escape
from PIL import Image
from psycopg.rows import dict_row
import traceback
import base64
import time
import zipfile
import shutil
import subprocess
import mistune
import humanize
import psycopg
import re
import os
import random
import json
import requests
import constants as const
app = Flask(const.appName)
app.config['BABEL_DEFAULT_LOCALE'] = 'en'
app.config['BABEL_TRANSLATION_DIRECTORIES'] = 'locales'
# refreshing locale
refresh()
# update this once more languages are supported
app.config['available_languages'] = {
"en_US": _("English (US)"),
"ru_RU": _("Russian")
}
def getLocale():
if not session.get('language'):
app.config.update(cfg)
session['language'] = cfg['languages']['default']
return session.get('language')
babel = Babel(app, locale_selector=getLocale)
# load json file
def loadJSON(file_path):
# open the file
path = Path.cwd() / file_path
with open(path, 'r', encoding="utf-8") as file:
# return loaded file
return json.load(file)
# save json file
def saveJSON(dict, file_path):
# open the file
path = Path.cwd() / file_path
with open(path, 'w', encoding="utf-8") as file:
# dump the contents
json.dump(dict, file, indent=4)
# append to a json file
def appendToJSON(new_data, file_path) -> bool:
try:
# open the file
path = Path(file_path)
if not path.is_file():
with open(path, 'w', encoding="utf-8") as file:
json.dump([], file)
with open(path, 'r+', encoding="utf-8") as file:
file_data = json.load(file)
file_data.append(new_data)
file.seek(0)
json.dump(file_data, file, indent=4)
return True
except Exception as e:
app.logger.error(f"[appendToJSON] {str(e)}")
return False
cfg = loadJSON(const.configFile)
def formatRelativeTime(date_str: str) -> str:
date_format = "%Y-%m-%d %H:%M:%S"
past_date = datetime.strptime(date_str, date_format).replace(tzinfo=None)
now = datetime.now()
time_difference = now - past_date
return humanize.naturaltime(time_difference)
def formatRelativeTime2(date_str: str) -> str:
date_format = "%Y-%m-%dT%H:%M:%SZ"
past_date = None
try:
if date_str:
past_date = datetime.strptime(date_str, date_format)
else:
pass
except ValueError:
pass
if past_date is None:
return ''
# raise ValueError("Date string does not match any supported format.")
if past_date.tzinfo is None:
past_date = past_date.replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
time_difference = now - past_date
return humanize.naturaltime(time_difference)
dbHost = os.environ.get("DB_HOST")
dbUser = os.environ.get("DB_USER")
dbPass = os.environ.get("DB_PASS")
dbName = os.environ.get("DB_NAME")
dbPort = os.environ.get("DB_PORT")
if not dbPort:
dbPort = 3306
def createDatabase(cursor, dbName) -> None:
try:
cursor.execute("CREATE DATABASE {} OWNER {}".format(dbName, dbUser))
print(f"Database {dbName} created successfully")
except psycopg.Error as error:
print("Failed to create database:", error)
exit(1)
def connectToDb():
# using dict_row factory here because its easier than modifying now-legacy mysql code
return psycopg.connect(f"postgresql://{dbUser}:{dbPass}@{dbHost}/{dbName}", row_factory=dict_row)
def getQuestion(question_id: int) -> dict:
try:
conn = connectToDb()
cursor = conn.cursor()
cursor.execute("SELECT * FROM questions WHERE id=%s", (question_id,))
question = cursor.fetchone()
if not question:
return abort(404)
question['creation_date'] = question['creation_date'].replace(microsecond=0).replace(tzinfo=None)
return question
finally:
cursor.close()
conn.close()
def getAllQuestions(limit: int = None, offset: int = None) -> dict:
conn = connectToDb()
cursor = conn.cursor()
app.logger.debug("[CatAsk/functions/getAllQuestions] SELECT'ing all questions with latest answers")
query = """
SELECT q.*, a.creation_date AS latest_answer_date
FROM questions q
LEFT JOIN (
SELECT question_id, MAX(creation_date) AS creation_date
FROM answers
GROUP BY question_id
) a ON q.id = a.question_id
WHERE q.answered = %s
ORDER BY q.pinned DESC, (a.creation_date IS NULL), a.creation_date DESC, q.creation_date DESC
"""
params = [True]
if limit is not None:
query += " LIMIT %s"
params.append(limit)
if offset is not None:
query += " OFFSET %s"
params.append(offset)
cursor.execute(query, tuple(params))
questions = cursor.fetchall()
app.logger.debug("[CatAsk/functions/getAllQuestions] SELECT'ing answers")
cursor.execute("SELECT * FROM answers ORDER BY creation_date DESC")
answers = cursor.fetchall()
metadata = generateMetadata()
combined = []
for question in questions:
question['creation_date'] = question['creation_date'].replace(microsecond=0).replace(tzinfo=None)
for answer in answers:
answer['creation_date'] = answer['creation_date'].replace(microsecond=0).replace(tzinfo=None)
question_answers = [answer for answer in answers if answer['question_id'] == question['id']]
combined.append({
'question': question,
'answers': question_answers
})
cursor.close()
conn.close()
return combined, metadata
def addQuestion(from_who: str, question: str, cw: str, noAntispam: bool = False) -> dict:
if cfg['antispam']['type'] == 'basic':
antispam = request.form.get('antispam', '')
elif cfg['antispam']['type'] == 'recaptcha':
antispam = request.form.get('g-recaptcha-response', '')
elif cfg['antispam']['type'] == 'turnstile':
antispam = request.form.get('cf-turnstile-response', '')
elif cfg['antispam']['type'] == 'frc':
antispam = request.form.get('frc-captcha-response', '')
if cfg['antispam']['enabled'] and not noAntispam:
if cfg['antispam']['type'] == 'basic':
if not antispam:
abort(400, "Anti-spam word must not be empty")
antispam_wordlist = readPlainFile(const.antiSpamFile, split=True)
antispam_valid = antispam in antispam_wordlist
if not antispam_valid:
# return a generic error message so bad actors wouldn't figure out the antispam list
return {'error': _('An error has occurred')}, 500
# it's probably bad to hardcode the siteverify urls, but meh, that will do for now
elif cfg['antispam']['type'] == 'recaptcha':
r = requests.post(
'https://www.google.com/recaptcha/api/siteverify',
data={'response': antispam, 'secret': cfg['antispam']['recaptcha']['secretkey']}
)
json_r = r.json()
success = json_r['success']
if not success:
return {'error': _('An error has occurred')}, 500
elif cfg['antispam']['type'] == 'turnstile':
r = requests.post(
'https://challenges.cloudflare.com/turnstile/v0/siteverify',
data={'response': antispam, 'secret': cfg['antispam']['turnstile']['secretkey']}
)
json_r = r.json()
success = json_r['success']
if not success:
return {'error': _('An error has occurred')}, 500
elif cfg['antispam']['type'] == 'frc':
url = 'https://global.frcapi.com/api/v2/captcha/siteverify'
headers = {'X-API-Key': cfg['antispam']['frc']['apikey']}
data = {'response': antispam, 'sitekey': cfg['antispam']['frc']['sitekey']}
r = requests.post(url, data=data, headers=headers)
json_r = r.json()
success = json_r['success']
if not success:
return {'error': _('An error has occurred')}, 500
blacklist = readPlainFile(const.blacklistFile, split=True)
for bad_word in blacklist:
if bad_word in question or bad_word in from_who:
# return a generic error message so bad actors wouldn't figure out the blacklist
return {'error': _('An error has occurred')}, 500
conn = connectToDb()
cursor = conn.cursor()
app.logger.debug("[CatAsk/API/add_question] INSERT'ing new question into database")
cursor.execute("INSERT INTO questions (from_who, content, answered, cw) VALUES (%s, %s, %s, %s) RETURNING id", (from_who, question, False, cw))
question_id = cursor.fetchone()['id']
conn.commit()
cursor.close()
conn.close()
return {'message': _('Question asked successfully!')}, 201, question_id
def getAnswer(question_id: int) -> dict:
conn = connectToDb()
cursor = conn.cursor()
cursor.execute("SELECT * FROM answers WHERE question_id=%s", (question_id,))
answer = cursor.fetchone()
answer['creation_date'] = answer['creation_date'].replace(microsecond=0).replace(tzinfo=None)
cursor.close()
conn.close()
return answer
def addAnswer(question_id: int, answer: str, cw: str) -> dict:
conn = connectToDb()
try:
cursor = conn.cursor()
app.logger.debug("[CatAsk/API/add_answer] INSERT'ing an answer into database")
cursor.execute("INSERT INTO answers (question_id, content, cw) VALUES (%s, %s, %s) RETURNING id", (question_id, answer, cw))
answer_id = cursor.fetchone()['id']
app.logger.debug("[CatAsk/API/add_answer] UPDATE'ing question to set answered and answer_id")
cursor.execute("UPDATE questions SET answered=%s, answer_id=%s WHERE id=%s", (True, answer_id, question_id))
conn.commit()
# except Exception as e:
# conn.rollback()
# app.logger.error(e)
# return jsonify({'error': str(e)}), 500
finally:
cursor.close()
conn.close()
return jsonify({'message': _('Answer added successfully!')}), 201
if cfg['crosspost']['fediverse']['enabled']:
from mastodon import Mastodon
app.logger.debug("Initializing Mastodon.py client...")
fedi_client = Mastodon(
client_id = cfg['crosspost']['fediverse']['client']['id'],
client_secret = cfg['crosspost']['fediverse']['client']['secret'],
api_base_url = 'https://' + cfg['crosspost']['fediverse']['instance'],
access_token = cfg['crosspost']['fediverse']['token']
)
if cfg['crosspost']['bluesky']['enabled']:
from atproto import Client, client_utils
app.logger.debug("[postOnBluesky] Initializing ATProto client...")
bsky_client = Client()
bsky_client.login(cfg['crosspost']['bluesky']['handle'], cfg['crosspost']['bluesky']['appPassword'])
def postOnFediverse(question_id: int, answer: str, cw: str) -> None:
global fedi_client
# reloading config file
cfg = loadJSON(const.configFile)
conn = connectToDb()
cursor = conn.cursor()
app.logger.debug(f"[postOnFediverse] Grabbing question {question_id} from database...")
cursor.execute("SELECT id, content FROM questions WHERE id=%s", (question_id,))
question = cursor.fetchone()
warning = f"{cfg['crosspost']['fediverse']['cw']}"
if question.get('cw') and not cw:
warning += f", {question['cw']}"
elif question.get('cw') and cw and (question['cw'] != cw):
warning += f", {question['cw']}, {cw}"
elif (question.get('cw') and cw) and question['cw'] == cw:
warning += f", {cw}"
elif cw and not question.get('cw'):
warning += f", {cw}"
post_content = f"{question['content']}{answer} {cfg['instance']['fullBaseUrl']}/q/{question['id']}/"
fedi_client.status_post(post_content, visibility=cfg['crosspost']['fediverse']['visibility'], spoiler_text=warning)
app.logger.debug(f"[postOnFediverse] Made Fediverse post: {post_content}")
def postOnBluesky(question_id: int, answer: str, cw: str) -> None:
global bsky_client
# reloading config file
cfg = loadJSON(const.configFile)
conn = connectToDb()
cursor = conn.cursor()
app.logger.debug(f"[postOnBluesky] Grabbing question {question_id} from database...")
cursor.execute("SELECT id, content, cw FROM questions WHERE id=%s", (question_id,))
question = cursor.fetchone()
if question.get('cw') and not cw:
warning = f"[CW: {question['cw']}]\n\n"
elif question.get('cw') and cw and (question['cw'] != cw):
warning = f"[CW: {question['cw']}, {cw}]\n\n"
elif question['cw'] == cw:
warning = f"[CW: {cw}]\n\n"
elif cw and not question.get('cw'):
warning = f"[CW: {cw}]\n\n"
else:
warning = ""
# warning = f"{ '[CW:' if cw or question.get('cw') else '' }{ ' ' + question['cw'] if question.get('cw') else '' }{ ', ' + cw if cw else '' }{']\n\n' if cw or question.get('cw') else '' }"
text_builder = client_utils.TextBuilder()
text_builder.text(f"{warning}{question['content']}{answer} ")
text_builder.link(f"{cfg['instance']['fullBaseUrl']}/q/{question['id']}/", f"{cfg['instance']['fullBaseUrl']}/q/{question['id']}/")
bsky_client.send_post(text_builder)
app.logger.debug(f"[postOnBluesky] Made Bluesky post: {text_builder.build_text()}")
def ntfySend(cw, return_val, from_who, question) -> None:
app.logger.debug("[CatAsk/functions/ntfySend] started ntfy flow")
ntfy_cw = f" [CW: {cw}]" if cw else ""
ntfy_host = cfg['ntfy']['host']
ntfy_topic = cfg['ntfy']['topic']
question_id = return_val[2]
# doesn't work otherwise
from_who = from_who if from_who else cfg['anonName']
if cfg['ntfy']['user'] and cfg['ntfy']['pass']:
ntfy_user = cfg['ntfy']['user']
ntfy_pass = cfg['ntfy']['pass']
ascii_auth = f"{ntfy_user}:{ntfy_pass}".encode('ascii')
b64_auth = base64.b64encode(ascii_auth)
# there's probably a better way to do this without duplicated code
headers={
"Authorization": f"Basic {b64_auth.decode('ascii')}",
"Title": f"New question from {from_who}{ntfy_cw}",
"Actions": f"view, View question, {cfg['instance']['fullBaseUrl']}/inbox/#question-{question_id}",
"Tags": "question"
}
else:
headers={
"Title": f"New question from {from_who}{ntfy_cw}",
"Actions": f"view, View question, {cfg['instance']['fullBaseUrl']}/inbox/#question-{question_id}",
"Tags": "question"
}
r = requests.put(
f"{ntfy_host}/{ntfy_topic}".encode('utf-8'),
data=trimContent(question, int(cfg['trimContentAfter'])),
headers=headers
)
app.logger.debug("[CatAsk/functions/ntfySend] finished ntfy flow")
def readPlainFile(file, split=False):
if os.path.exists(file):
with open(file, 'r', encoding="utf-8") as file:
if split:
return file.read().splitlines()
else:
return file.read()
else:
return []
def savePlainFile(file, contents) -> None:
with open(file, 'w') as file:
file.write(contents)
def getRandomWord() -> str:
items = readPlainFile(const.antiSpamFile, split=True)
return random.choice(items)
def trimContent(var, trim) -> str:
trim = int(trim)
if trim > 0:
trimmed = var[:trim] + '' if len(var) >= trim else var
trimmed = trimmed.rstrip()
return trimmed
else:
return var
# mistune plugin
inlineBtnPattern = r'\[btn\](?P<button_text>.+?)\[/btn\]'
def parse_inline_button(inline, m, state):
text = m.group("button_text")
state.append_token({"type": "inline_button", "raw": text})
return m.end()
def render_inline_button(renderer, text):
return f"<button class='btn btn-secondary' type='button'>{text}</button>"
def button(md):
md.inline.register('inline_button', inlineBtnPattern, parse_inline_button, before='link')
if md.renderer and md.renderer.NAME == 'html':
md.renderer.register('inline_button', render_inline_button)
# Base directory where emoji packs are stored
EMOJI_BASE_PATH = Path.cwd() / 'static' / 'emojis'
emoji_cache = {}
def to_snake_case(name):
name = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name)
return re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', name).lower()
def find_emoji_path(emoji_name):
if '_' in emoji_name:
head, sep, tail = emoji_name.partition('_')
else:
head = to_snake_case(emoji_name).split('_')[0]
if any(Path(EMOJI_BASE_PATH).glob(f'{head}.json')):
for json_file in Path(EMOJI_BASE_PATH).glob('*.json'):
app.logger.debug("[CatAsk/functions/find_emoji_path] Using JSON meta file")
pack_data = loadJSON(json_file)
emojis = pack_data.get('emojis', [])
for emoji in emojis:
if emoji['name'] == emoji_name:
rel_dir = json_file.stem
emoji_path = os.path.join('static/emojis', rel_dir, emoji['file_name'])
emoji_cache[emoji_name] = emoji_path
return emoji_path
else:
for address, dirs, files in os.walk(EMOJI_BASE_PATH):
app.logger.debug("[CatAsk/functions/find_emoji_path] Falling back to scanning directories")
for file in files:
if os.path.splitext(file)[0] == emoji_name: # Check if the filename matches the emoji_name
rel_dir = os.path.relpath(address, EMOJI_BASE_PATH)
emoji_path = os.path.join("static/emojis", rel_dir, file) # Use the actual file name
emoji_cache[emoji_name] = emoji_path
return emoji_path
return None
emojiPattern = r':(?P<emoji_name>[a-zA-Z0-9_]+):'
def parse_emoji(inline, m, state):
emoji_name = m.group("emoji_name")
state.append_token({"type": "emoji", "raw": emoji_name})
return m.end()
def render_emoji(renderer, emoji_name):
emoji_path = find_emoji_path(emoji_name)
if emoji_path:
absolute_emoji_path = url_for('static', filename=emoji_path.replace('static/', ''))
return f"<img src='{absolute_emoji_path}' alt=':{emoji_name}:' title=':{emoji_name}:' class='emoji' loading='lazy' width='28' height='28' />"
return f":{emoji_name}:"
def emoji(md):
md.inline.register('emoji', emojiPattern, parse_emoji, before='link')
if md.renderer and md.renderer.NAME == 'html':
md.renderer.register('emoji', render_emoji)
def listEmojis() -> list:
emojis = []
emoji_base_path = Path.cwd() / 'static' / 'emojis'
os.makedirs(emoji_base_path, exist_ok=True)
# Iterate over files that are directly in the emoji base path (not in subdirectories)
for file in emoji_base_path.iterdir():
# Only include files, not directories
if file.is_file() and file.suffix in {'.png', '.jpg', '.jpeg', '.webp', '.gif'}:
# Get the relative path and name for the emoji
relative_path = os.path.relpath(file, emoji_base_path)
emojis.append({
'name': file.stem, # Get the file name without the extension
'image': os.path.join('static/emojis', relative_path), # Full relative path for image
'relative_path': relative_path
})
return emojis
def listEmojiPacks() -> list:
emoji_packs = []
emoji_base_path = const.emojiPath
# Iterate through all directories in the emoji base path
for pack_dir in emoji_base_path.iterdir():
if pack_dir.is_dir():
relative_path = os.path.relpath(pack_dir, emoji_base_path)
# Check if a meta.json file exists in the directory
meta_json_path = const.emojiPath / f"{pack_dir}.json"
if meta_json_path.exists():
app.logger.debug(f"[CatAsk/functions/listEmojiPacks] Using meta.json file ({meta_json_path})")
# Load data from the meta.json file
pack_data = loadJSON(meta_json_path)
emoji_packs.append({
'name': pack_data.get('name', pack_dir.name),
'exportedAt': pack_data.get('exportedAt', 'Unknown'),
'preview_image': pack_data.get('preview_image', ''),
'website': pack_data.get('website', ''),
'relative_path': f'static/emojis/{relative_path}',
'emojis': pack_data.get('emojis', [])
})
else:
app.logger.debug(f"[CatAsk/functions/listEmojiPacks] Falling back to directory scan ({pack_dir})")
# If no meta.json is found, fall back to directory scan
preview_image = None
# Find the first image in the directory for preview
for file in pack_dir.iterdir():
if file.suffix in {'.png', '.jpg', '.jpeg', '.webp', '.gif'}:
preview_image = os.path.join('static/emojis', relative_path, file.name)
break
# Append pack info without meta.json
emoji_packs.append({
'name': pack_dir.name,
'preview_image': preview_image,
'relative_path': f'static/emojis/{relative_path}'
})
return emoji_packs
def processEmojis(meta_json_path) -> list:
emoji_metadata = loadJSON(meta_json_path)
emojis = emoji_metadata.get('emojis', [])
pack_name = emoji_metadata['emojis'][0]['emoji']['category']
exported_at = emoji_metadata.get('exportedAt', 'Unknown')
website = emoji_metadata.get('host', '')
preview_image = os.path.join('static/emojis', pack_name.lower(), emoji_metadata['emojis'][0]['fileName'])
relative_path = os.path.join('static/emojis', pack_name.lower())
processed_emojis = []
for emoji in emojis:
emoji_info = {
'name': emoji['emoji']['name'],
'file_name': emoji['fileName'],
}
processed_emojis.append(emoji_info)
app.logger.debug(f"[CatAsk/API/upload_emoji_pack] Processed emoji: {emoji_info['name']}\t(File: {emoji_info['file_name']})")
# Create the pack info structure
pack_info = {
'name': pack_name,
'exportedAt': exported_at,
'preview_image': preview_image,
'relative_path': relative_path,
'website': website,
'emojis': processed_emojis
}
# Save the combined pack info to <pack_name>.json
pack_json_name = const.emojiPath / f"{pack_name.lower()}.json"
saveJSON(pack_info, pack_json_name)
return processed_emojis
def renderMarkdown(text: str, allowed_tags: bool = None) -> str:
plugins = [
'strikethrough',
button,
emoji
]
if not allowed_tags:
allowed_tags = [
'p',
'em',
'b',
'strong',
'i',
'br',
's',
'del',
'a',
'button',
'ol',
'li',
'hr',
'img',
'code',
'pre'
]
allowed_attrs = {
'a': 'href',
'button': 'class',
'img': {
'class': lambda value: value == "emoji",
'src': True, # Allow specific attributes on emoji images
'alt': True,
'title': True,
'width': True,
'height': True,
'loading': True
}
}
# hard_wrap=True means that newlines will be
# converted into <br> tags
#
# yes, markdown usually lets you make line breaks only
# with 2 spaces or <br> tag, but hard_wrap is enabled to keep
# sanity of whoever will use this software
# (after all, not everyone knows markdown syntax)
md = mistune.create_markdown(
escape=True,
plugins=plugins,
hard_wrap=True
)
html = md(text)
cleaner = Cleaner(tags=allowed_tags, attributes=allowed_attrs)
clean_html = cleaner.clean(html)
return Markup(clean_html)
def generateMetadata(question: str = None, answer: str = None) -> dict:
metadata = {
'title': cfg['instance']['title'],
'description': cfg['instance']['description'],
'url': cfg['instance']['fullBaseUrl'],
'image': cfg['instance']['image']
}
# if question is specified, generate metadata for that question
if question and answer:
metadata.update({
'title': trimContent(question['content'], 150) + " | " + cfg['instance']['title'],
'description': trimContent(answer['content'], 150),
'url': cfg['instance']['fullBaseUrl'] + url_for('viewQuestion', question_id=question['id']),
'image': cfg['instance']['image']
})
# return 'metadata' dictionary
return metadata
allowedFileExtensions = {'png', 'jpg', 'jpeg', 'webp', 'bmp', 'jxl', 'gif'}
allowedArchiveExtensions = {'zip', 'tar', 'gz', 'bz2', 'xz'}
def allowedFile(filename: str) -> bool:
return '.' in filename and filename.rsplit('.', 1)[1].lower() in allowedFileExtensions
def allowedArchive(filename: str) -> bool:
return '.' in filename and filename.rsplit('.', 1)[1].lower() in allowedArchiveExtensions
def stripArchExtension(filename: str) -> str:
if filename.endswith(('.tar.gz', '.tar.bz2', '.tar.xz')):
filename = filename.rsplit('.', 2)[0]
else:
filename = filename.rsplit('.', 1)[0]
return filename
def generateFavicon(file_name: str) -> None:
sizes = {
'apple-touch-icon.png': (180, 180),
'android-chrome-192x192.png': (192, 192),
'android-chrome-512x512.png': (512, 512),
'favicon-32x32.png': (32, 32),
'favicon-16x16.png': (16, 16),
'favicon.ico': (16, 16)
}
img = Image.open(const.faviconDir / file_name)
if not os.path.exists(const.faviconDir):
os.makedirs(const.faviconDir)
for filename, size in sizes.items():
resized_img = img.resize(size)
resized_img_absolute_path = const.faviconDir / filename
resized_img.save(resized_img_absolute_path)
def createExport() -> dict:
try:
# just to test if connection works
conn = connectToDb()
conn.close()
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
timestamp_morereadable = datetime.now().strftime('%b %d, %Y %H:%M')
export_dir = const.exportsDir
temp_dir = const.tempDir
os.makedirs(export_dir, exist_ok=True)
os.makedirs(temp_dir, exist_ok=True)
config_dest_path = temp_dir / const.configFile
shutil.copy(const.configFile, config_dest_path)
# Export database to SQL file
dump_file = temp_dir / 'database.sql'
result = subprocess.Popen(
f'pg_dump -h {dbHost} -U {dbUser} -d {dbName} -F c -E UTF8 -f {dump_file}',
stdin=subprocess.PIPE,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
env=dict(os.environ, PGPASSWORD=dbPass)
)
# absolutely dumb workaround for an error
time.sleep(1)
# Create export zip archive
zip_file_path = export_dir / f'export-{timestamp}.zip'
with zipfile.ZipFile(zip_file_path, 'w') as export_zip:
export_zip.write(config_dest_path, arcname=const.configFile)
export_zip.write(dump_file, arcname='database.sql')
# Add favicon and emojis folders to the zip archive
favicon_dir = Path('static/icons/favicon')
emojis_dir = Path('static/emojis')
if favicon_dir.exists():
for root, unused, files in os.walk(favicon_dir):
for file_ in files:
file_path = Path(root) / file_
export_zip.write(file_path, arcname=file_path.relative_to(favicon_dir.parent.parent))
if emojis_dir.exists():
for root, unused, files in os.walk(emojis_dir):
for file_ in files:
file_path = Path(root) / file_
export_zip.write(file_path, arcname=file_path.relative_to(emojis_dir.parent))
# Record export metadata
export_data = {
'timestamp_esc': timestamp,
'timestamp': timestamp_morereadable,
'downloadPath': str(zip_file_path)
}
appendToJSON(export_data, const.exportsFile)
shutil.rmtree(temp_dir)
return {'message': _('Export created successfully!')}
except psycopg.Error as e:
app.logger.error(f"[psycopg.Error] {traceback.format_exc()}")
return {'error': str(e)}, 500
except Exception as e:
app.logger.error(f"[Exception] {traceback.format_exc()}")
return {'error': str(e)}, 500
def importData(export_file) -> dict:
try:
shutil.unpack_archive(export_file, const.tempDir)
# Replace config file
os.remove(const.configFile)
shutil.move(const.tempDir / const.configFile, Path.cwd() / const.configFile)
# Replace favicon and emojis folders
favicon_dest = Path('static/icons/favicon')
emojis_dest = Path('static/emojis')
shutil.rmtree(favicon_dest)
shutil.copytree(const.tempDir / 'icons' / 'favicon', favicon_dest)
shutil.rmtree(emojis_dest)
shutil.copytree(const.tempDir / 'emojis', emojis_dest)
# Restore database from SQL file
conn = connectToDb()
cursor = conn.cursor()
dump_file = const.tempDir / 'database.sql'
process = subprocess.Popen(
f'pg_restore -h {dbHost} --clean -U {dbUser} -d {dbName} {dump_file}',
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
env=dict(os.environ, PGPASSWORD=dbPass)
)
shutil.rmtree(const.tempDir)
return {'message': _('Data imported successfully!')}
except Exception as e:
return {'error': str(e)}, 500
# will probably get to it in 1.8.0 because my brain can't do it rn
# 2.1.0 maybe -1/12/25
"""
def retrospringImport(export_file):
shutil.unpack_archive(export_file, const.tempDir)
# probably a hack but whateva
export_dirname = Path(export_file).stem
export_dir = const.tempDir / export_dirname
conn = connectToDb()
cursor = conn.cursor()
questions_file = loadJSON(export_dir / 'questions.json')
answers_file = loadJSON(export_dir / 'answers.json')
# Extract answers list
questions_list = questions_file.get('questions', [])
answers_list = answers_file.get('answers', [])
# ['related']['question']['anonymous']
for question in questions_list:
# addQuestion(answer['related']['question']['anonymous'], question['content'], None, noAntispam=True)
for answer in answers_list:
print("anonymous:", answer['related']['question']['anonymous'])
print(question['id'], answer['content'], None)
# addAnswer(question['id'], answer['content'], None)
# shutil.rmtree(const.tempDir)
cursor.close()
conn.close()
"""
def deleteExport(timestamp: str) -> dict:
try:
export_file = Path('static') / 'exports' / f'export-{timestamp}.zip'
data = loadJSON(const.exportsFile)
data = [export for export in data if export["timestamp_esc"] != timestamp]
export_file.unlink()
saveJSON(data, const.exportsFile)
return {'message': _('Export {} deleted successfully.').format(timestamp)}
except Exception as e:
return {'error': str(e)}, 500
# reserved for 1.7.0 or later
"""
def getUserIp():
if request.environ.get('HTTP_X_FORWARDED_FOR') is None:
return request.environ['REMOTE_ADDR']
else:
return request.environ['HTTP_X_FORWARDED_FOR']
def isIpBlacklisted(user_ip):
blacklist = readPlainFile(const.ipBlacklistFile, split=True)
return user_ip in blacklist
"""