This repository has been archived on 2024-10-20. You can view files and clone it, but cannot push or open issues or pull requests.
burgercat/main
TestingPlant 24b278788e feat!: zero-downtime restarts and bind to all IPs
The server can now restart with zero downtime. The server also now binds
to all IPv4 and IPv6 addresses, so the ADDR config value is removed.
2023-07-10 14:28:15 -05:00

655 lines
21 KiB
Python

#!/usr/bin/python3
import os
import configparser
import sqlite3
import time
import json
import secrets
import datetime
import socket
from itertools import groupby
from waitress import serve
from werkzeug.utils import secure_filename
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.middleware.proxy_fix import ProxyFix
from flask import Flask, render_template, request, url_for, flash, redirect, session, make_response, send_from_directory, stream_with_context, Response, request
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
# read config file
config = configparser.ConfigParser()
config.read("config.ini")
PORT = config["config"]["PORT"]
SECRET_KEY = config["config"]["SECRET_KEY"]
UPLOAD_FOLDER = config["config"]["UPLOAD_FOLDER"]
UPLOAD_LIMIT = config["config"]["UPLOAD_LIMIT"]
PASSWORD_REQUIREMENT = config["config"]["PASSWORD_REQUIREMENT"]
app = Flask(__name__)
app.config["SECRET_KEY"] = SECRET_KEY
app.config["MAX_CONTENT_LENGTH"] = int(UPLOAD_LIMIT) * 1000 * 1000
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1)
limiter = Limiter(
get_remote_address,
app = app,
default_limits = ["3 per second"],
storage_uri = "memory://",
strategy = "fixed-window"
)
if SECRET_KEY == "placeholder":
print("[WARNING] Secret key is not set")
if not os.path.exists(UPLOAD_FOLDER):
print("[WARNING] Upload folder doesn't exist, creating")
os.mkdir(UPLOAD_FOLDER)
if not os.path.exists("database.db"):
print("[ERROR] No database exists, please run init_db")
exit()
def makeStrSafe(url):
return str(urllib.parse.quote(url)).replace("%20", " ")
def get_db_connection():
conn = sqlite3.connect("database.db")
conn.row_factory = sqlite3.Row
return conn
def get_user(id):
conn = get_db_connection()
post = conn.execute("SELECT * FROM users WHERE id = ?",
(id,)).fetchone()
conn.close()
if post is None:
return "error"
return post
def get_comments(id):
conn = get_db_connection()
post = conn.execute("SELECT * FROM comments WHERE post_id = ?",
(id,)).fetchall()
conn.close()
if post is None:
return "error"
return post
app.jinja_env.globals.update(getComments=get_comments)
def get_post(id):
conn = get_db_connection()
post = conn.execute("SELECT * FROM posts WHERE id = ?",
(id,)).fetchone()
conn.close()
if post is None:
return "error"
return post
app.jinja_env.globals.update(getUser=get_user)
def check_username_taken(username):
conn = get_db_connection()
post = conn.execute("SELECT * FROM users WHERE lower(username) = ?",
(username.lower(),)).fetchone()
conn.close()
if post is None:
return "error"
return post["id"]
def get_session(id):
conn = get_db_connection()
post = conn.execute("SELECT * FROM sessions WHERE session = ?",
(id,)).fetchone()
conn.close()
if post is None:
return "error"
return post
ALLOWED_EXTENSIONS = {"png", "apng", "jpg", "jpeg", "gif", "svg", "webp"}
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route("/", methods=("GET", "POST"))
def main():
usersession = request.cookies.get("session_DO_NOT_SHARE")
conn = get_db_connection()
posts = conn.execute("SELECT * FROM posts ORDER BY created DESC;").fetchall()
conn.close()
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
return render_template("main.html", userdata=user, posts=posts)
else:
return render_template("main.html", posts=posts)
@app.route("/@<pageusername>", methods=("GET", "POST"))
def user(pageusername):
usersession = request.cookies.get("session_DO_NOT_SHARE")
checkusername = check_username_taken(pageusername)
if not checkusername == "error":
pageuser = get_user(checkusername)
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
return render_template("user.html", userdata=user, createddate=datetime.datetime.utcfromtimestamp(int(str(pageuser["created"]).split(".")[0])).strftime("%Y-%m-%d"), pageuser=pageuser)
else:
return render_template("user.html", createddate=datetime.datetime.utcfromtimestamp(int(str(pageuser["created"]).split(".")[0])).strftime("%Y-%m-%d"), pageuser=pageuser)
else:
return """<img src="https://http.cat/images/404.jpg">""", 404
@app.route("/api/page/<userid>", methods=("GET", "POST"))
def apipageuser(userid):
pageuser = get_user(userid)
addhtml = """
<!DOCTYPE html><script>
window.stop()
</script>
<base target="_blank"/> <head><meta http-equiv="Content-Security-Policy" default-src='none'; content="img-src cdn.discordapp.com cdn.discordapp.net media.tenor.com; style-src: 'self';" /></head>"""
if not pageuser == "error":
return addhtml + pageuser["htmldescription"]
else:
return """<img src="https://http.cat/images/404.jpg">""", 404
@app.route("/@<pageusername>/edit", methods=("GET", "POST"))
def edituser(pageusername):
usersession = request.cookies.get("session_DO_NOT_SHARE")
checkusername = check_username_taken(pageusername)
if not checkusername == "error":
pageuser = get_user(checkusername)
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
if pageuser["username"] == user["username"]:
if request.method == "POST":
code = request.form["code"].replace("Content-Security-Policy", "").replace("<iframe>", "")
conn = get_db_connection()
conn.execute("UPDATE users SET htmldescription = ? WHERE id = ?",
(code, user["id"]))
conn.commit()
conn.close()
return redirect("/@" + user["username"])
else:
return render_template("edituser.html", userdata=user, pageuser=pageuser)
else:
return """<img src="https://http.cat/images/403.jpg">""", 403
else:
return """<img src="https://http.cat/images/403.jpg">""", 403
else:
return """<img src="https://http.cat/images/404.jpg">""", 404
@app.route("/api/frontpage", methods=("GET", "POST"))
def apifrontpage():
conn = get_db_connection()
posts = conn.execute("SELECT * FROM posts ORDER BY created DESC;").fetchall()
conn.close()
result = []
for post in posts:
comments = []
for comment in get_comments(post["id"]):
commentthing = {
"title": comment["textstr"],
"id": comment["id"],
"created": comment["created"],
"creator": {
"id": comment["creator"],
"username": get_user(comment["creator"])["username"]
}
}
comments.append(commentthing)
mainthing = {
"id": post["id"],
"created": post["created"],
"title": post["textstr"],
"imgurl": post["imageurl"],
"creator": {
"id": post["creator"],
"username": get_user(post["creator"])["username"]
},
"comments": comments
}
result.append(mainthing)
return result
@app.route("/api/userinfo", methods=("GET", "POST"))
def apiuserinfo():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
datatemplate = {
"username": user["username"],
"id": user["id"],
"created": user["created"]
}
return datatemplate
else:
return {
"error": "no authentication"
}, 403
@limiter.limit("10/minute", override_defaults=False)
@app.route("/api/login", methods=("GET", "POST"))
def apilogin():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if request.method == "POST":
data = request.get_json()
username = data["username"]
password = data["password"]
userID = check_username_taken(username)
user = get_user(userID)
if user == "error":
return {
"error": "wrong username or password"
}, 401
if not check_password_hash(user["password"], (password)):
return {
"error": "wrong username or password"
}, 401
randomCharacters = secrets.token_hex(512)
conn = get_db_connection()
conn.execute("INSERT INTO sessions (session, id) VALUES (?, ?)",
(randomCharacters, userID))
conn.commit()
conn.close()
return {
"key": randomCharacters
}, 100
else:
return {
"error": "https://http.cat/images/400.jpg"
}, 400
@app.route("/api/post", methods=("GET", "POST"))
def apipost():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
if request.method == "POST":
data = request.get_json()
title = data["id"]
if title == "":
return {
"error": "no title"
}, 403
if "file" not in request.files:
return {
"error": "no file"
}, 403
file = request.files["file"]
if file.filename == "":
return {
"error": "no file"
}, 403
if not allowed_file(file.filename):
return {
"error": "invalid file format"
}, 403
filename = secure_filename(file.filename)
finalfilename = secrets.token_hex(64) + filename
file.save(os.path.join(UPLOAD_FOLDER, finalfilename))
imgurl = "/cdn/" + finalfilename
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
if not user["banned"] == "0":
return {
"error": "banned"
}, 403
conn = get_db_connection()
conn.execute("INSERT INTO posts (textstr, imageurl, creator, created) VALUES (?, ?, ?, ?)",
(title, imgurl, userCookie["id"], str(time.time())))
conn.commit()
conn.close()
return "success", 100
@app.route("/apidocs", methods=("GET", "POST"))
def apidocs():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
return render_template("apidocs.html", userdata=user)
else:
return render_template("apidocs.html")
@app.route("/post", methods=("GET", "POST"))
def post():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
if request.method == "POST":
title = request.form["title"]
if title == "":
flash("Text required :3")
return redirect(url_for("post"))
if len(title) > 300:
flash("Too long title!")
return redirect(url_for("post"))
if "file" not in request.files:
flash("No file selected :3")
return redirect(url_for("post"))
file = request.files["file"]
if file.filename == "":
flash("No file selected :3")
return redirect(url_for("post"))
if not allowed_file(file.filename):
flash("File is not an image!")
return redirect(url_for("post"))
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
if not user["banned"] == "0":
flash("Your account has been banned. You may no longer perform this action.")
return redirect(url_for("post"))
filename = secure_filename(file.filename)
finalfilename = secrets.token_hex(32) + filename
file.save(os.path.join(UPLOAD_FOLDER, finalfilename))
imgurl = "/cdn/" + finalfilename
conn = get_db_connection()
conn.execute("INSERT INTO posts (textstr, imageurl, creator, created) VALUES (?, ?, ?, ?)",
(title, imgurl, userCookie["id"], str(time.time())))
conn.commit()
conn.close()
return redirect(url_for("main"))
else:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
return render_template("post.html", userdata=user)
else:
flash("A burgercat account is required to post :3")
return redirect(url_for("login"))
@app.route("/api/comment", methods=("GET", "POST"))
@limiter.limit("1/second", override_defaults=False)
def comment():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
if request.method == "POST":
data = request.get_json()
uid = data["id"]
title = data["title"]
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
if len(title) > 300:
return {
"error": "too much text"
}, 403
if not user["banned"] == "0":
return {
"error": "banned"
}, 403
conn = get_db_connection()
conn.execute("INSERT INTO comments (textstr, post_id, creator, created) VALUES (?, ?, ?, ?)",
(title, uid, userCookie["id"], str(time.time())))
conn.commit()
conn.close()
return "success", 100
else:
return {
"error": "https://http.cat/images/400.jpg"
}, 400
else:
return {
"error": "https://http.cat/images/401.jpg"
}, 401
@app.route("/api/post/<post_id>/comments", methods=("GET", "POST"))
def apicomments(post_id):
postthing = get_comments(int(post_id))
if not postthing == "error":
comments = []
for comment in postthing:
commentthing = {
"title": comment["textstr"],
"created": comment["created"],
"creator": {
"id": comment["creator"],
"username": get_user(comment["creator"])["username"]
}
}
comments.append(commentthing)
return comments
@app.route("/cdn/<filename>", methods=("GET", "POST"))
@limiter.exempt
def cdn(filename):
if os.path.exists(os.path.join(UPLOAD_FOLDER, filename)):
return send_from_directory(UPLOAD_FOLDER, filename)
else:
return "file doesn't exist!!"
@app.route("/signup", methods=("GET", "POST"))
@limiter.limit("5/minute", override_defaults=False)
def signup():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
return redirect(url_for("main"))
if request.method == "POST":
if not check_username_taken(request.form["username"]) == "error":
flash("Username already taken :3")
return redirect(url_for("signup"))
if not request.form["username"].isalnum():
flash("Username must be alphanumeric :3")
return redirect(url_for("signup"))
if not len(request.form["password"]) > int(PASSWORD_REQUIREMENT):
flash("Password must contain at least " + PASSWORD_REQUIREMENT + " characters")
return redirect(url_for("signup"))
hashedpassword = generate_password_hash(request.form["password"])
conn = get_db_connection()
conn.execute("INSERT INTO users (username, password, created, htmldescription) VALUES (?, ?, ?, ?)",
(request.form["username"], hashedpassword, str(time.time()), ""))
conn.commit()
conn.close()
return redirect(url_for("login"))
else:
return render_template("signup.html")
@app.route("/login", methods=("GET", "POST"))
@limiter.limit("10/minute", override_defaults=False)
def login():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
redirect(url_for("main"))
if request.method == "POST":
userID = check_username_taken(request.form["username"])
user = get_user(userID)
if user == "error":
flash("Wrong username or password :3")
return redirect(url_for("login"))
if not check_password_hash(user["password"], (request.form["password"])):
flash("Wrong username or password :3")
return redirect(url_for("login"))
randomCharacters = secrets.token_hex(512)
conn = get_db_connection()
conn.execute("INSERT INTO sessions (session, id) VALUES (?, ?)",
(randomCharacters, userID))
conn.commit()
conn.close()
resp = make_response(redirect("/"))
resp.set_cookie("session_DO_NOT_SHARE", randomCharacters, expires=datetime.datetime.now() + datetime.timedelta(days=float(180)))
return resp
else:
return render_template("login.html")
@app.route("/settings", methods=("GET", "POST"))
def settings():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
return render_template("settings.html", userdata=user)
else:
return redirect("/")
@app.route("/api/delete", methods=("GET", "POST"))
def delete():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if request.method == "POST":
data = request.get_json()
postid = int(data["id"])
post = get_post(postid)
if not post == "error":
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
if (str(user["administrator"]) == "1") or (int(user["id"]) == int(post["creator"])):
post = get_post(postid)
conn = get_db_connection()
conn.execute("DELETE FROM posts WHERE id = ?", (postid,))
conn.commit()
conn.close()
return "success", 100
else:
return {
"error": "https://http.cat/images/403.jpg"
}, 403
else:
return {
"error": "https://http.cat/images/400.jpg"
}, 400
@app.route("/listusers", methods=("GET", "POST"))
def listusers():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
if str(user["administrator"]) == "1":
thing = ""
conn = get_db_connection()
users = conn.execute("SELECT * FROM users").fetchall()
conn.close()
for x in users:
thing = str(x["id"]) + " - " + x["username"] + "<br>" + thing
return thing
else:
return """<img src="https://http.cat/images/403.jpg">"""
else:
return redirect(url_for("login"))
@app.route("/settings/logout", methods=("GET", "POST"))
def logout():
resp = redirect(url_for("main"))
session = request.cookies.get("session_DO_NOT_SHARE")
resp.delete_cookie("session_DO_NOT_SHARE")
return resp
@app.errorhandler(500)
def page_not_found(e):
return """<img src="https://http.cat/images/500.jpg">""", 500
@app.errorhandler(400)
def page_not_found(e):
return """<img src="https://http.cat/images/400.jpg">""", 400
@app.errorhandler(429)
def page_not_found(e):
return """<img src="https://http.cat/images/429.jpg">""", 429
@app.errorhandler(404)
def page_not_found(e):
return """<img src="https://http.cat/images/404.jpg">""", 404
@app.errorhandler(413)
def page_not_found(e):
return "Images can't be larger than " + str(UPLOAD_LIMIT) + "MB", 413
if __name__ == "__main__":
print("[INFO] Server started")
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock:
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
sock.bind(('', int(PORT)))
serve(app, sockets=[sock])
print("[INFO] Server stopped")