#!/usr/bin/python3 import os import configparser import sqlite3 import time import json import secrets import datetime import socket import subprocess from itertools import groupby from waitress import serve from werkzeug.utils import secure_filename from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.middleware.proxy_fix import ProxyFix from flask import Flask, render_template, request, url_for, flash, redirect, session, make_response, send_from_directory, stream_with_context, Response, request from flask_limiter import Limiter from flask_limiter.util import get_remote_address # read config file config = configparser.ConfigParser() config.read("config.ini") PORT = config["config"]["PORT"] SECRET_KEY = config["config"]["SECRET_KEY"] UPLOAD_FOLDER = config["config"]["UPLOAD_FOLDER"] UPLOAD_LIMIT = config["config"]["UPLOAD_LIMIT"] PASSWORD_REQUIREMENT = config["config"]["PASSWORD_REQUIREMENT"] app = Flask(__name__) app.config["SECRET_KEY"] = SECRET_KEY app.config["MAX_CONTENT_LENGTH"] = int(UPLOAD_LIMIT) * 1000 * 1000 app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1) limiter = Limiter( get_remote_address, app = app, default_limits = ["3 per second"], storage_uri = "memory://", strategy = "fixed-window" ) if SECRET_KEY == "placeholder": print("[WARNING] Secret key is not set") if not os.path.exists(UPLOAD_FOLDER): print("[WARNING] Upload folder doesn't exist, creating") os.mkdir(UPLOAD_FOLDER) if not os.path.exists("database.db"): print("[ERROR] No database exists, please run init_db") exit() def makeStrSafe(url): return str(urllib.parse.quote(url)).replace("%20", " ") def get_db_connection(): conn = sqlite3.connect("database.db") conn.row_factory = sqlite3.Row return conn def get_user(id): conn = get_db_connection() post = conn.execute("SELECT * FROM users WHERE id = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post def get_comments(id): conn = get_db_connection() post = conn.execute("SELECT * FROM comments WHERE post_id = ?", (id,)).fetchall() conn.close() if post is None: return "error" return post def get_messages(chatroomid, max): conn = get_db_connection() post = conn.execute("SELECT * FROM chatmessages WHERE chatroom_id = ? ORDER BY created DESC;", (chatroomid,)).fetchmany(max + 1) conn.close() if post is None: return "error" return post app.jinja_env.globals.update(getComments=get_comments) def get_post(id): conn = get_db_connection() post = conn.execute("SELECT * FROM posts WHERE id = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post app.jinja_env.globals.update(getUser=get_user) def check_username_taken(username): conn = get_db_connection() post = conn.execute("SELECT * FROM users WHERE lower(username) = ?", (username.lower(),)).fetchone() conn.close() if post is None: return "error" return post["id"] def get_session(id): conn = get_db_connection() post = conn.execute("SELECT * FROM sessions WHERE session = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post def get_current_commit(output_format="full"): if output_format == "short": length = 8 else: length = 40 try: output = subprocess.check_output(["git", "rev-parse", f"--short={length}", "HEAD"]).decode().strip() return output except subprocess.CalledProcessError: return "Error fetching git commit" ALLOWED_EXTENSIONS = {"png", "apng", "jpg", "jpeg", "gif", "svg", "webp"} def allowed_file(filename): return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS @app.route("/", methods=("GET", "POST")) def main(): usersession = request.cookies.get("session_DO_NOT_SHARE") conn = get_db_connection() posts = conn.execute("SELECT * FROM posts ORDER BY created DESC;").fetchall() commit_hash_long = get_current_commit() commit_hash_short = get_current_commit(output_format="short") conn.close() if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return render_template("main.html", userdata=user, posts=posts, commit_hash_long=commit_hash_long, commit_hash_short=commit_hash_short) else: return render_template("main.html", posts=posts, commit_hash_long=commit_hash_long, commit_hash_short=commit_hash_short) @app.route("/chat", methods=("GET", "POST")) def chat(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return render_template("chat.html", userdata=user) else: return render_template("chat.html") @app.route("/api/chat/listrooms") def chatlistrooms(): conn = get_db_connection() rooms = conn.execute("SELECT * FROM chatrooms ORDER BY roomname ASC;").fetchall() conn.close() template = [] for room in rooms: roomtemplate = { "id": room["id"], "name": room["roomname"] } template.append(roomtemplate) return(template), 200 @app.route("/api/chat/getmessages/") @limiter.exempt def chatget(roomid): messages = get_messages(roomid, 40) template = [] for message in messages: creatorid = message["creator"] creatortemplate = { "id": message["creator"], "username": get_user(creatorid)["username"] } messagetemplate = { "id": message["id"], "content": message["content"], "creator": creatortemplate, "created": message["created"] } template.append(messagetemplate) return(template), 200 burgerMessageUpdate = True burgerMessageCache = "" @app.route("/api/chat/send/", methods=("GET", "POST")) def chatsend(roomid): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: if request.method == "POST": data = request.get_json() content = data["content"] print(content) print(roomid) userCookie = get_session(usersession) user = get_user(userCookie["id"]) if not user["banned"] == "0": return { "error": "banned" }, 403 conn = get_db_connection() conn.execute("INSERT INTO chatmessages (content, chatroom_id, creator, created) VALUES (?, ?, ?, ?)", (content, roomid, userCookie["id"], str(time.time()))) conn.commit() conn.close() global burgerMessageCache global burgerMessageUpdate burgerMessageUpdate = True burgerMessageCache = user["username"] + ": " + content return "success", 200 @app.route("/@", methods=("GET", "POST")) def user(pageusername): usersession = request.cookies.get("session_DO_NOT_SHARE") checkusername = check_username_taken(pageusername) if not checkusername == "error": pageuser = get_user(checkusername) if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return render_template("user.html", userdata=user, createddate=datetime.datetime.utcfromtimestamp(int(str(pageuser["created"]).split(".")[0])).strftime("%Y-%m-%d"), pageuser=pageuser) else: return render_template("user.html", createddate=datetime.datetime.utcfromtimestamp(int(str(pageuser["created"]).split(".")[0])).strftime("%Y-%m-%d"), pageuser=pageuser) else: return """""", 404 @app.route("/api/page/", methods=("GET", "POST")) def apipageuser(userid): pageuser = get_user(userid) addhtml = """ """ if not pageuser == "error": return addhtml + pageuser["htmldescription"] else: return """""", 404 @app.route("/@/edit", methods=("GET", "POST")) def edituser(pageusername): usersession = request.cookies.get("session_DO_NOT_SHARE") checkusername = check_username_taken(pageusername) if not checkusername == "error": pageuser = get_user(checkusername) if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) if pageuser["username"] == user["username"]: if request.method == "POST": code = request.form["code"].replace("Content-Security-Policy", "").replace("