#!/usr/bin/python3 import os import requests import configparser import sqlite3 import time import json import secrets import datetime import socket import threading import subprocess import asyncio from hypercorn.config import Config from hypercorn.asyncio import serve from itertools import groupby from werkzeug.utils import secure_filename from werkzeug.security import generate_password_hash, check_password_hash from quart import Quart, render_template, request, url_for, flash, redirect, session, make_response, send_from_directory, stream_with_context, Response, request, jsonify, websocket from apscheduler.schedulers.background import BackgroundScheduler # read config file config = configparser.ConfigParser() config.read("config.ini") PORT = config["config"]["PORT"] SECRET_KEY = config["config"]["SECRET_KEY"] UPLOAD_FOLDER = config["config"]["UPLOAD_FOLDER"] UPLOAD_LIMIT = config["config"]["UPLOAD_LIMIT"] PASSWORD_REQUIREMENT = config["config"]["PASSWORD_REQUIREMENT"] app = Quart(__name__) app.config["SECRET_KEY"] = SECRET_KEY app.config["MAX_CONTENT_LENGTH"] = int(UPLOAD_LIMIT) * 1000 * 1000 if SECRET_KEY == "placeholder": print("[WARNING] Secret key is not set") if not os.path.exists(UPLOAD_FOLDER): print("[WARNING] Upload folder doesn't exist, creating") os.mkdir(UPLOAD_FOLDER) if not os.path.exists("database.db"): print("[ERROR] No database exists, please run init_db") exit() def makeStrSafe(url): return str(urllib.parse.quote(url)).replace("%20", " ") def get_db_connection(): conn = sqlite3.connect("database.db") conn.row_factory = sqlite3.Row return conn def get_user(id): conn = get_db_connection() post = conn.execute("SELECT * FROM users WHERE id = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post def get_comments(id): conn = get_db_connection() post = conn.execute("SELECT * FROM comments WHERE post_id = ?", (id,)).fetchall() conn.close() if post is None: return "error" return post def get_messages(chatroomid, max): conn = get_db_connection() post = conn.execute("SELECT * FROM chatmessages WHERE chatroom_id = ? ORDER BY created DESC;", (chatroomid,)).fetchmany(max + 1) conn.close() if post is None: return "error" return post app.jinja_env.globals.update(getComments=get_comments) def get_post(id): conn = get_db_connection() post = conn.execute("SELECT * FROM posts WHERE id = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post app.jinja_env.globals.update(getUser=get_user) def check_username_taken(username): conn = get_db_connection() post = conn.execute("SELECT * FROM users WHERE lower(username) = ?", (username.lower(),)).fetchone() conn.close() if post is None: return "error" return post["id"] def get_session(id): conn = get_db_connection() post = conn.execute("SELECT * FROM sessions WHERE session = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post full_hash = subprocess.check_output(["git", "rev-parse", "HEAD"]).decode().strip() short_hash = subprocess.check_output(["git", "rev-parse", "--short=8", "HEAD"]).decode().strip() ALLOWED_EXTENSIONS = {"png", "apng", "jpg", "jpeg", "gif", "svg", "webp", "jxl"} def allowed_file(filename): return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS @app.route("/", methods=("GET", "POST")) async def main(): usersession = request.cookies.get("session_DO_NOT_SHARE") conn = get_db_connection() posts = conn.execute("SELECT * FROM posts ORDER BY created DESC;").fetchall() conn.close() if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return await render_template("main.html", userdata=user, posts=posts, full_hash=full_hash, short_hash=short_hash) else: return await render_template("main.html", posts=posts, full_hash=full_hash, short_hash=short_hash) @app.route("/chat", methods=("GET", "POST")) async def chat(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return await render_template("chat.html", userdata=user) else: return await render_template("chat.html") @app.route("/api/chat/listrooms") async def chatlistrooms(): conn = get_db_connection() rooms = conn.execute("SELECT * FROM chatrooms ORDER BY id ASC;").fetchall() conn.close() template = [] for room in rooms: roomtemplate = { "id": room["id"], "name": room["roomname"] } template.append(roomtemplate) return(template), 200 @app.route("/api/chat/getmessages/") async def chatget(roomid): messages = get_messages(roomid, 150) template = [] for message in messages: creatorid = message["creator"] creatortemplate = { "id": message["creator"], "username": get_user(creatorid)["username"] } messagetemplate = { "id": message["id"], "content": message["content"], "creator": creatortemplate, "created": message["created"] } template.append(messagetemplate) return(template), 200 @app.route("/api/chat/send/", methods=("GET", "POST")) async def chatsend(roomid): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: if request.method == "POST": data = await request.get_json() content = data["content"] userCookie = get_session(usersession) user = get_user(userCookie["id"]) if not user["banned"] == "0": return { "error": "banned" }, 403 chatMessageContent = { "content": content, "creator": user["username"], "roomid": roomid, "created": str(time.time()) } #message_queue.append({"message": chatMessageContent}) conn = get_db_connection() conn.execute("INSERT INTO chatmessages (content, chatroom_id, creator, created) VALUES (?, ?, ?, ?)", (content, roomid, userCookie["id"], str(time.time()))) conn.commit() conn.close() return "success", 200 @app.route("/@", methods=("GET", "POST")) async def user(pageusername): usersession = request.cookies.get("session_DO_NOT_SHARE") checkusername = check_username_taken(pageusername) if not checkusername == "error": pageuser = get_user(checkusername) if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return await render_template("user.html", userdata=user, createddate=datetime.datetime.utcfromtimestamp(int(str(pageuser["created"]).split(".")[0])).strftime("%Y-%m-%d"), pageuser=pageuser) else: return await render_template("user.html", createddate=datetime.datetime.utcfromtimestamp(int(str(pageuser["created"]).split(".")[0])).strftime("%Y-%m-%d"), pageuser=pageuser) else: return """""", 404 @app.route("/api/page/", methods=("GET", "POST")) async def apipageuser(userid): pageuser = get_user(userid) addhtml = """ """ if not pageuser == "error": return addhtml + pageuser["htmldescription"] else: return """""", 404 @app.route("/@/edit", methods=("GET", "POST")) async def edituser(pageusername): usersession = request.cookies.get("session_DO_NOT_SHARE") checkusername = check_username_taken(pageusername) if not checkusername == "error": pageuser = get_user(checkusername) if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) if pageuser["username"] == user["username"]: if request.method == "POST": requestData = await request.form code = requestData["code"].replace("Content-Security-Policy", "").replace("