This repository has been archived on 2024-10-20. You can view files and clone it, but cannot push or open issues or pull requests.
burgercat/main

716 lines
23 KiB
Python

#!/usr/bin/python3
import os
import configparser
import sqlite3
import time
import json
import secrets
import datetime
import socket
import subprocess
from itertools import groupby
from waitress import serve
from werkzeug.utils import secure_filename
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.middleware.proxy_fix import ProxyFix
from flask import Flask, render_template, request, url_for, flash, redirect, session, make_response, send_from_directory, stream_with_context, Response, request
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
# read config file
config = configparser.ConfigParser()
config.read("config.ini")
PORT = config["config"]["PORT"]
SECRET_KEY = config["config"]["SECRET_KEY"]
UPLOAD_FOLDER = config["config"]["UPLOAD_FOLDER"]
UPLOAD_LIMIT = config["config"]["UPLOAD_LIMIT"]
PASSWORD_REQUIREMENT = config["config"]["PASSWORD_REQUIREMENT"]
app = Flask(__name__)
app.config["SECRET_KEY"] = SECRET_KEY
app.config["MAX_CONTENT_LENGTH"] = int(UPLOAD_LIMIT) * 1000 * 1000
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1)
limiter = Limiter(
get_remote_address,
app = app,
default_limits = ["3 per second"],
storage_uri = "memory://",
strategy = "fixed-window"
)
if SECRET_KEY == "placeholder":
print("[WARNING] Secret key is not set")
if not os.path.exists(UPLOAD_FOLDER):
print("[WARNING] Upload folder doesn't exist, creating")
os.mkdir(UPLOAD_FOLDER)
if not os.path.exists("database.db"):
print("[ERROR] No database exists, please run init_db")
exit()
def makeStrSafe(url):
return str(urllib.parse.quote(url)).replace("%20", " ")
def get_db_connection():
conn = sqlite3.connect("database.db")
conn.row_factory = sqlite3.Row
return conn
def get_user(id):
conn = get_db_connection()
post = conn.execute("SELECT * FROM users WHERE id = ?",
(id,)).fetchone()
conn.close()
if post is None:
return "error"
return post
def get_comments(id):
conn = get_db_connection()
post = conn.execute("SELECT * FROM comments WHERE post_id = ?",
(id,)).fetchall()
conn.close()
if post is None:
return "error"
return post
def get_messages(chatroomid, max):
conn = get_db_connection()
post = conn.execute("SELECT * FROM chatmessages WHERE chatroom_id = ? ORDER BY created DESC;",
(chatroomid,)).fetchmany(max + 1)
conn.close()
if post is None:
return "error"
return post
app.jinja_env.globals.update(getComments=get_comments)
def get_post(id):
conn = get_db_connection()
post = conn.execute("SELECT * FROM posts WHERE id = ?",
(id,)).fetchone()
conn.close()
if post is None:
return "error"
return post
app.jinja_env.globals.update(getUser=get_user)
def check_username_taken(username):
conn = get_db_connection()
post = conn.execute("SELECT * FROM users WHERE lower(username) = ?",
(username.lower(),)).fetchone()
conn.close()
if post is None:
return "error"
return post["id"]
def get_session(id):
conn = get_db_connection()
post = conn.execute("SELECT * FROM sessions WHERE session = ?",
(id,)).fetchone()
conn.close()
if post is None:
return "error"
return post
def get_current_commit(output_format="full"):
if output_format == "short":
length = 8
else:
length = 40
try:
output = subprocess.check_output(["git", "rev-parse", f"--short={length}", "HEAD"]).decode().strip()
return output
except subprocess.CalledProcessError:
return "Error fetching git commit"
ALLOWED_EXTENSIONS = {"png", "apng", "jpg", "jpeg", "gif", "svg", "webp"}
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route("/", methods=("GET", "POST"))
def main():
usersession = request.cookies.get("session_DO_NOT_SHARE")
conn = get_db_connection()
posts = conn.execute("SELECT * FROM posts ORDER BY created DESC;").fetchall()
commit_hash_long = get_current_commit()
commit_hash_short = get_current_commit(output_format="short")
conn.close()
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
return render_template("main.html", userdata=user, posts=posts, commit_hash_long=commit_hash_long, commit_hash_short=commit_hash_short)
else:
return render_template("main.html", posts=posts, commit_hash_long=commit_hash_long, commit_hash_short=commit_hash_short)
@app.route("/chat", methods=("GET", "POST"))
def chat():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
return render_template("chat.html", userdata=user)
else:
return render_template("chat.html")
@app.route("/api/chat/listrooms")
def chatlistrooms():
conn = get_db_connection()
rooms = conn.execute("SELECT * FROM chatrooms ORDER BY roomname ASC;").fetchall()
conn.close()
template = []
for room in rooms:
roomtemplate = {
"id": room["id"],
"name": room["roomname"]
}
template.append(roomtemplate)
return(template), 200
@app.route("/api/chat/getmessages/<roomid>")
@limiter.exempt
def chatget(roomid):
messages = get_messages(roomid, 40)
template = []
for message in messages:
creatorid = message["creator"]
creatortemplate = {
"id": message["creator"],
"username": get_user(creatorid)["username"]
}
messagetemplate = {
"id": message["id"],
"content": message["content"],
"creator": creatortemplate,
"created": message["created"]
}
template.append(messagetemplate)
return(template), 200
burgerMessageUpdate = True
burgerMessageCache = ""
@app.route("/api/chat/send/<roomid>", methods=("GET", "POST"))
def chatsend(roomid):
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
if request.method == "POST":
data = request.get_json()
content = data["content"]
print(content)
print(roomid)
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
if not user["banned"] == "0":
return {
"error": "banned"
}, 403
conn = get_db_connection()
conn.execute("INSERT INTO chatmessages (content, chatroom_id, creator, created) VALUES (?, ?, ?, ?)",
(content, roomid, userCookie["id"], str(time.time())))
conn.commit()
conn.close()
global burgerMessageCache
global burgerMessageUpdate
burgerMessageUpdate = True
burgerMessageCache = user["username"] + ": " + content
return "success", 200
@app.route("/@<pageusername>", methods=("GET", "POST"))
def user(pageusername):
usersession = request.cookies.get("session_DO_NOT_SHARE")
checkusername = check_username_taken(pageusername)
if not checkusername == "error":
pageuser = get_user(checkusername)
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
return render_template("user.html", userdata=user, createddate=datetime.datetime.utcfromtimestamp(int(str(pageuser["created"]).split(".")[0])).strftime("%Y-%m-%d"), pageuser=pageuser)
else:
return render_template("user.html", createddate=datetime.datetime.utcfromtimestamp(int(str(pageuser["created"]).split(".")[0])).strftime("%Y-%m-%d"), pageuser=pageuser)
else:
return """<img src="https://http.cat/images/404.jpg">""", 404
@app.route("/api/page/<userid>", methods=("GET", "POST"))
def apipageuser(userid):
pageuser = get_user(userid)
addhtml = """
<!DOCTYPE html><script>
window.stop()
</script>
<base target="_blank"/> <head><meta http-equiv="Content-Security-Policy" default-src='none'; content="img-src cdn.discordapp.com cdn.discordapp.net media.tenor.com; style-src: 'self';" /></head>"""
if not pageuser == "error":
return addhtml + pageuser["htmldescription"]
else:
return """<img src="https://http.cat/images/404.jpg">""", 404
@app.route("/@<pageusername>/edit", methods=("GET", "POST"))
def edituser(pageusername):
usersession = request.cookies.get("session_DO_NOT_SHARE")
checkusername = check_username_taken(pageusername)
if not checkusername == "error":
pageuser = get_user(checkusername)
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
if pageuser["username"] == user["username"]:
if request.method == "POST":
code = request.form["code"].replace("Content-Security-Policy", "").replace("<iframe>", "")
conn = get_db_connection()
conn.execute("UPDATE users SET htmldescription = ? WHERE id = ?",
(code, user["id"]))
conn.commit()
conn.close()
return redirect("/@" + user["username"])
else:
return render_template("edituser.html", userdata=user, pageuser=pageuser)
else:
return """<img src="https://http.cat/images/403.jpg">""", 403
else:
return """<img src="https://http.cat/images/403.jpg">""", 403
else:
return """<img src="https://http.cat/images/404.jpg">""", 404
@app.route("/api/frontpage", methods=("GET", "POST"))
def apifrontpage():
conn = get_db_connection()
posts = conn.execute("SELECT * FROM posts ORDER BY created DESC;").fetchall()
conn.close()
result = []
for post in posts:
comments = []
for comment in get_comments(post["id"]):
commentthing = {
"title": comment["textstr"],
"id": comment["id"],
"created": comment["created"],
"creator": {
"id": comment["creator"],
"username": get_user(comment["creator"])["username"]
}
}
comments.append(commentthing)
mainthing = {
"id": post["id"],
"created": post["created"],
"title": post["textstr"],
"imgurl": post["imageurl"],
"creator": {
"id": post["creator"],
"username": get_user(post["creator"])["username"]
},
"comments": comments
}
result.append(mainthing)
return result
@app.route("/api/userinfo", methods=("GET", "POST"))
def apiuserinfo():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
datatemplate = {
"username": user["username"],
"id": user["id"],
"created": user["created"]
}
return datatemplate
else:
return {
"error": "no authentication"
}, 403
@limiter.limit("10/minute", override_defaults=False)
@app.route("/api/login", methods=("GET", "POST"))
def apilogin():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if request.method == "POST":
data = request.get_json()
username = data["username"]
password = data["password"]
userID = check_username_taken(username)
user = get_user(userID)
if user == "error":
return {
"error": "wrong username or password"
}, 401
if not check_password_hash(user["password"], (password)):
return {
"error": "wrong username or password"
}, 401
randomCharacters = secrets.token_hex(512)
conn = get_db_connection()
conn.execute("INSERT INTO sessions (session, id) VALUES (?, ?)",
(randomCharacters, userID))
conn.commit()
conn.close()
return {
"key": randomCharacters
}, 200
else:
return {
"error": "https://http.cat/images/400.jpg"
}, 400
@app.route("/apidocs", methods=("GET", "POST"))
def apidocs():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
return render_template("apidocs.html", userdata=user)
else:
return render_template("apidocs.html")
@app.route("/post", methods=("GET", "POST"))
def post():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
if request.method == "POST":
title = request.form["title"]
if title == "":
flash("Text required :3")
return redirect(url_for("post"))
if len(title) > 300:
flash("Too long title!")
return redirect(url_for("post"))
if "file" not in request.files:
flash("No file selected :3")
return redirect(url_for("post"))
file = request.files["file"]
if file.filename == "":
flash("No file selected :3")
return redirect(url_for("post"))
if not allowed_file(file.filename):
flash("File is not an image!")
return redirect(url_for("post"))
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
if not user["banned"] == "0":
flash("Your account has been banned. You may no longer perform this action.")
return redirect(url_for("post"))
filename = secure_filename(file.filename)
finalfilename = secrets.token_hex(32) + filename
file.save(os.path.join(UPLOAD_FOLDER, finalfilename))
imgurl = "/cdn/" + finalfilename
conn = get_db_connection()
conn.execute("INSERT INTO posts (textstr, imageurl, creator, created) VALUES (?, ?, ?, ?)",
(title, imgurl, userCookie["id"], str(time.time())))
conn.commit()
conn.close()
return redirect(url_for("main"))
else:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
return render_template("post.html", userdata=user)
else:
flash("A burgercat account is required to post :3")
return redirect(url_for("login"))
@app.route("/api/comment", methods=("GET", "POST"))
@limiter.limit("1/second", override_defaults=False)
def comment():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
if request.method == "POST":
data = request.get_json()
uid = data["id"]
title = data["title"]
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
if len(title) > 300:
return {
"error": "too much text"
}, 403
if not user["banned"] == "0":
return {
"error": "banned"
}, 403
conn = get_db_connection()
conn.execute("INSERT INTO comments (textstr, post_id, creator, created) VALUES (?, ?, ?, ?)",
(title, uid, userCookie["id"], str(time.time())))
conn.commit()
conn.close()
return "success", 200
else:
return {
"error": "https://http.cat/images/400.jpg"
}, 400
else:
return {
"error": "https://http.cat/images/401.jpg"
}, 401
@app.route("/api/post/<post_id>/comments", methods=("GET", "POST"))
def apicomments(post_id):
postthing = get_comments(int(post_id))
if not postthing == "error":
comments = []
for comment in postthing:
commentthing = {
"title": comment["textstr"],
"created": comment["created"],
"creator": {
"id": comment["creator"],
"username": get_user(comment["creator"])["username"]
}
}
comments.append(commentthing)
return comments
@app.route("/cdn/<filename>", methods=("GET", "POST"))
@limiter.exempt
def cdn(filename):
if os.path.exists(os.path.join(UPLOAD_FOLDER, filename)):
return send_from_directory(UPLOAD_FOLDER, filename)
else:
return "file doesn't exist!!"
@app.route("/signup", methods=("GET", "POST"))
@limiter.limit("5/minute", override_defaults=False)
def signup():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
return redirect(url_for("main"))
if request.method == "POST":
if not check_username_taken(request.form["username"]) == "error":
flash("Username already taken :3")
return redirect(url_for("signup"))
if not request.form["username"].isalnum():
flash("Username must be alphanumeric :3")
return redirect(url_for("signup"))
if not len(request.form["password"]) > int(PASSWORD_REQUIREMENT):
flash("Password must contain at least " + PASSWORD_REQUIREMENT + " characters")
return redirect(url_for("signup"))
hashedpassword = generate_password_hash(request.form["password"])
conn = get_db_connection()
conn.execute("INSERT INTO users (username, password, created, htmldescription) VALUES (?, ?, ?, ?)",
(request.form["username"], hashedpassword, str(time.time()), ""))
conn.commit()
conn.close()
return redirect(url_for("login"))
else:
return render_template("signup.html")
@app.route("/login", methods=("GET", "POST"))
@limiter.limit("10/minute", override_defaults=False)
def login():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
redirect(url_for("main"))
if request.method == "POST":
userID = check_username_taken(request.form["username"])
user = get_user(userID)
if user == "error":
flash("Wrong username or password :3")
return redirect(url_for("login"))
if not check_password_hash(user["password"], (request.form["password"])):
flash("Wrong username or password :3")
return redirect(url_for("login"))
randomCharacters = secrets.token_hex(512)
conn = get_db_connection()
conn.execute("INSERT INTO sessions (session, id) VALUES (?, ?)",
(randomCharacters, userID))
conn.commit()
conn.close()
resp = make_response(redirect("/"))
resp.set_cookie("session_DO_NOT_SHARE", randomCharacters, expires=datetime.datetime.now() + datetime.timedelta(days=float(180)))
return resp
else:
return render_template("login.html")
@app.route("/settings", methods=("GET", "POST"))
def settings():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
return render_template("settings.html", userdata=user, createddate=datetime.datetime.utcfromtimestamp(int(str(user["created"]).split(".")[0])).strftime("%Y-%m-%d %H:%m:%S"))
else:
return redirect("/")
@app.route("/api/delete", methods=("GET", "POST"))
def delete():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if request.method == "POST":
data = request.get_json()
postid = int(data["id"])
post = get_post(postid)
if not post == "error":
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
if (str(user["administrator"]) == "1") or (int(user["id"]) == int(post["creator"])):
post = get_post(postid)
conn = get_db_connection()
conn.execute("DELETE FROM posts WHERE id = ?", (postid,))
conn.commit()
conn.close()
return "success", 200
else:
return {
"error": "https://http.cat/images/403.jpg"
}, 403
else:
return {
"error": "https://http.cat/images/400.jpg"
}, 400
@app.route("/listusers", methods=("GET", "POST"))
def listusers():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
if str(user["administrator"]) == "1":
thing = ""
conn = get_db_connection()
users = conn.execute("SELECT * FROM users").fetchall()
conn.close()
for x in users:
thing = str(x["id"]) + " - " + x["username"] + "<br>" + thing
return thing
else:
return """<img src="https://http.cat/images/403.jpg">"""
else:
return redirect(url_for("login"))
@app.route("/settings/logout", methods=("GET", "POST"))
def logout():
resp = redirect(url_for("main"))
session = request.cookies.get("session_DO_NOT_SHARE")
resp.delete_cookie("session_DO_NOT_SHARE")
return resp
@app.errorhandler(500)
def page_not_found(e):
return """<img src="https://http.cat/images/500.jpg">""", 500
@app.errorhandler(400)
def page_not_found(e):
return """<img src="https://http.cat/images/400.jpg">""", 400
@app.errorhandler(429)
def page_not_found(e):
return """<img src="https://http.cat/images/429.jpg">""", 429
@app.errorhandler(404)
def page_not_found(e):
return """<img src="https://http.cat/images/404.jpg">""", 404
@app.errorhandler(413)
def page_not_found(e):
return "Images can't be larger than " + str(UPLOAD_LIMIT) + "MB", 413
if __name__ == "__main__":
print("[INFO] Server started")
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock:
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
sock.bind(('', int(PORT)))
serve(app, sockets=[sock])
print("[INFO] Server stopped")