#!/usr/bin/python3 import os import configparser import sqlite3 import time import json import secrets import datetime import socket from itertools import groupby from waitress import serve from werkzeug.utils import secure_filename from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.middleware.proxy_fix import ProxyFix from flask import Flask, render_template, request, url_for, flash, redirect, session, make_response, send_from_directory, stream_with_context, Response, request from flask_limiter import Limiter from flask_limiter.util import get_remote_address # read config file config = configparser.ConfigParser() config.read("config.ini") PORT = config["config"]["PORT"] SECRET_KEY = config["config"]["SECRET_KEY"] UPLOAD_FOLDER = config["config"]["UPLOAD_FOLDER"] UPLOAD_LIMIT = config["config"]["UPLOAD_LIMIT"] PASSWORD_REQUIREMENT = config["config"]["PASSWORD_REQUIREMENT"] app = Flask(__name__) app.config["SECRET_KEY"] = SECRET_KEY app.config["MAX_CONTENT_LENGTH"] = int(UPLOAD_LIMIT) * 1000 * 1000 app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1) limiter = Limiter( get_remote_address, app = app, default_limits = ["3 per second"], storage_uri = "memory://", strategy = "fixed-window" ) if SECRET_KEY == "placeholder": print("[WARNING] Secret key is not set") if not os.path.exists(UPLOAD_FOLDER): print("[WARNING] Upload folder doesn't exist, creating") os.mkdir(UPLOAD_FOLDER) if not os.path.exists("database.db"): print("[ERROR] No database exists, please run init_db") exit() def makeStrSafe(url): return str(urllib.parse.quote(url)).replace("%20", " ") def get_db_connection(): conn = sqlite3.connect("database.db") conn.row_factory = sqlite3.Row return conn def get_user(id): conn = get_db_connection() post = conn.execute("SELECT * FROM users WHERE id = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post def get_comments(id): conn = get_db_connection() post = conn.execute("SELECT * FROM comments WHERE post_id = ?", (id,)).fetchall() conn.close() if post is None: return "error" return post def get_messages(chatroomid): conn = get_db_connection() post = conn.execute("SELECT * FROM chatmessages WHERE chatroom_id = ? ORDER BY created DESC;", (chatroomid,)).fetchall() conn.close() if post is None: return "error" return post app.jinja_env.globals.update(getComments=get_comments) def get_post(id): conn = get_db_connection() post = conn.execute("SELECT * FROM posts WHERE id = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post app.jinja_env.globals.update(getUser=get_user) def check_username_taken(username): conn = get_db_connection() post = conn.execute("SELECT * FROM users WHERE lower(username) = ?", (username.lower(),)).fetchone() conn.close() if post is None: return "error" return post["id"] def get_session(id): conn = get_db_connection() post = conn.execute("SELECT * FROM sessions WHERE session = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post ALLOWED_EXTENSIONS = {"png", "apng", "jpg", "jpeg", "gif", "svg", "webp"} def allowed_file(filename): return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS @app.route("/", methods=("GET", "POST")) def main(): usersession = request.cookies.get("session_DO_NOT_SHARE") conn = get_db_connection() posts = conn.execute("SELECT * FROM posts ORDER BY created DESC;").fetchall() conn.close() if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return render_template("main.html", userdata=user, posts=posts) else: return render_template("main.html", posts=posts) @app.route("/chat", methods=("GET", "POST")) def chat(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return render_template("chat.html", userdata=user) else: return render_template("chat.html") @app.route("/api/chat/listrooms") def chatlistrooms(): conn = get_db_connection() rooms = conn.execute("SELECT * FROM chatrooms ORDER BY roomname ASC;").fetchall() conn.close() template = [] for room in rooms: roomtemplate = { "id": room["id"], "name": room["roomname"] } template.append(roomtemplate) return(template), 200 @app.route("/api/chat/getmessages/") def chatget(roomid): messages = get_messages(roomid) template = [] for message in messages: creatorid = message["creator"] creatortemplate = { "id": message["creator"], "username": get_user(creatorid)["username"] } messagetemplate = { "id": message["id"], "content": message["content"], "creator": creatortemplate, "created": message["created"] } template.append(messagetemplate) return(template), 200 burgerMessageUpdate = True burgerMessageCache = "" @app.route("/api/chat/send/", methods=("GET", "POST")) def chatsend(roomid): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: if request.method == "POST": data = request.get_json() content = data["content"] print(content) print(roomid) userCookie = get_session(usersession) user = get_user(userCookie["id"]) if not user["banned"] == "0": return { "error": "banned" }, 403 conn = get_db_connection() if not content.isspace(): conn.execute("INSERT INTO chatmessages (content, chatroom_id, creator, created) VALUES (?, ?, ?, ?)", (content, roomid, userCookie["id"], str(time.time()))) conn.commit() conn.close() else: # do something global burgerMessageCache global burgerMessageUpdate burgerMessageUpdate = True burgerMessageCache = user["username"] + ": " + content return "success", 200 @app.route("/@", methods=("GET", "POST")) def user(pageusername): usersession = request.cookies.get("session_DO_NOT_SHARE") checkusername = check_username_taken(pageusername) if not checkusername == "error": pageuser = get_user(checkusername) if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return render_template("user.html", userdata=user, createddate=datetime.datetime.utcfromtimestamp(int(str(pageuser["created"]).split(".")[0])).strftime("%Y-%m-%d"), pageuser=pageuser) else: return render_template("user.html", createddate=datetime.datetime.utcfromtimestamp(int(str(pageuser["created"]).split(".")[0])).strftime("%Y-%m-%d"), pageuser=pageuser) else: return """""", 404 @app.route("/api/page/", methods=("GET", "POST")) def apipageuser(userid): pageuser = get_user(userid) addhtml = """ """ if not pageuser == "error": return addhtml + pageuser["htmldescription"] else: return """""", 404 @app.route("/@/edit", methods=("GET", "POST")) def edituser(pageusername): usersession = request.cookies.get("session_DO_NOT_SHARE") checkusername = check_username_taken(pageusername) if not checkusername == "error": pageuser = get_user(checkusername) if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) if pageuser["username"] == user["username"]: if request.method == "POST": code = request.form["code"].replace("Content-Security-Policy", "").replace("