This repository has been archived on 2024-10-20. You can view files and clone it, but cannot push or open issues or pull requests.
burgercat/main

702 lines
22 KiB
Python

#!/usr/bin/python3
import os
import configparser
import sqlite3
import time
import json
import secrets
import datetime
import socket
from itertools import groupby
from waitress import serve
from werkzeug.utils import secure_filename
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.middleware.proxy_fix import ProxyFix
from flask import Flask, render_template, request, url_for, flash, redirect, session, make_response, send_from_directory, stream_with_context, Response, request
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
# read config file
config = configparser.ConfigParser()
config.read("config.ini")
PORT = config["config"]["PORT"]
SECRET_KEY = config["config"]["SECRET_KEY"]
UPLOAD_FOLDER = config["config"]["UPLOAD_FOLDER"]
UPLOAD_LIMIT = config["config"]["UPLOAD_LIMIT"]
PASSWORD_REQUIREMENT = config["config"]["PASSWORD_REQUIREMENT"]
app = Flask(__name__)
app.config["SECRET_KEY"] = SECRET_KEY
app.config["MAX_CONTENT_LENGTH"] = int(UPLOAD_LIMIT) * 1000 * 1000
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1)
limiter = Limiter(
get_remote_address,
app = app,
default_limits = ["3 per second"],
storage_uri = "memory://",
strategy = "fixed-window"
)
if SECRET_KEY == "placeholder":
print("[WARNING] Secret key is not set")
if not os.path.exists(UPLOAD_FOLDER):
print("[WARNING] Upload folder doesn't exist, creating")
os.mkdir(UPLOAD_FOLDER)
if not os.path.exists("database.db"):
print("[ERROR] No database exists, please run init_db")
exit()
def makeStrSafe(url):
return str(urllib.parse.quote(url)).replace("%20", " ")
def get_db_connection():
conn = sqlite3.connect("database.db")
conn.row_factory = sqlite3.Row
return conn
def get_user(id):
conn = get_db_connection()
post = conn.execute("SELECT * FROM users WHERE id = ?",
(id,)).fetchone()
conn.close()
if post is None:
return "error"
return post
def get_comments(id):
conn = get_db_connection()
post = conn.execute("SELECT * FROM comments WHERE post_id = ?",
(id,)).fetchall()
conn.close()
if post is None:
return "error"
return post
def get_messages(chatroomid, max):
conn = get_db_connection()
post = conn.execute("SELECT * FROM chatmessages WHERE chatroom_id = ? ORDER BY created DESC;",
(chatroomid,)).fetchmany(max + 1)
conn.close()
if post is None:
return "error"
return post
app.jinja_env.globals.update(getComments=get_comments)
def get_post(id):
conn = get_db_connection()
post = conn.execute("SELECT * FROM posts WHERE id = ?",
(id,)).fetchone()
conn.close()
if post is None:
return "error"
return post
app.jinja_env.globals.update(getUser=get_user)
def check_username_taken(username):
conn = get_db_connection()
post = conn.execute("SELECT * FROM users WHERE lower(username) = ?",
(username.lower(),)).fetchone()
conn.close()
if post is None:
return "error"
return post["id"]
def get_session(id):
conn = get_db_connection()
post = conn.execute("SELECT * FROM sessions WHERE session = ?",
(id,)).fetchone()
conn.close()
if post is None:
return "error"
return post
ALLOWED_EXTENSIONS = {"png", "apng", "jpg", "jpeg", "gif", "svg", "webp"}
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route("/", methods=("GET", "POST"))
def main():
usersession = request.cookies.get("session_DO_NOT_SHARE")
conn = get_db_connection()
posts = conn.execute("SELECT * FROM posts ORDER BY created DESC;").fetchall()
conn.close()
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
return render_template("main.html", userdata=user, posts=posts)
else:
return render_template("main.html", posts=posts)
@app.route("/chat", methods=("GET", "POST"))
def chat():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
return render_template("chat.html", userdata=user)
else:
return render_template("chat.html")
@app.route("/api/chat/listrooms")
def chatlistrooms():
conn = get_db_connection()
rooms = conn.execute("SELECT * FROM chatrooms ORDER BY roomname ASC;").fetchall()
conn.close()
template = []
for room in rooms:
roomtemplate = {
"id": room["id"],
"name": room["roomname"]
}
template.append(roomtemplate)
return(template), 200
@app.route("/api/chat/getmessages/<roomid>")
@limiter.exempt
def chatget(roomid):
messages = get_messages(roomid, 40)
template = []
for message in messages:
creatorid = message["creator"]
creatortemplate = {
"id": message["creator"],
"username": get_user(creatorid)["username"]
}
messagetemplate = {
"id": message["id"],
"content": message["content"],
"creator": creatortemplate,
"created": message["created"]
}
template.append(messagetemplate)
return(template), 200
burgerMessageUpdate = True
burgerMessageCache = ""
@app.route("/api/chat/send/<roomid>", methods=("GET", "POST"))
def chatsend(roomid):
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
if request.method == "POST":
data = request.get_json()
content = data["content"]
print(content)
print(roomid)
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
if not user["banned"] == "0":
return {
"error": "banned"
}, 403
conn = get_db_connection()
conn.execute("INSERT INTO chatmessages (content, chatroom_id, creator, created) VALUES (?, ?, ?, ?)",
(content, roomid, userCookie["id"], str(time.time())))
conn.commit()
conn.close()
global burgerMessageCache
global burgerMessageUpdate
burgerMessageUpdate = True
burgerMessageCache = user["username"] + ": " + content
return "success", 200
@app.route("/@<pageusername>", methods=("GET", "POST"))
def user(pageusername):
usersession = request.cookies.get("session_DO_NOT_SHARE")
checkusername = check_username_taken(pageusername)
if not checkusername == "error":
pageuser = get_user(checkusername)
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
return render_template("user.html", userdata=user, createddate=datetime.datetime.utcfromtimestamp(int(str(pageuser["created"]).split(".")[0])).strftime("%Y-%m-%d"), pageuser=pageuser)
else:
return render_template("user.html", createddate=datetime.datetime.utcfromtimestamp(int(str(pageuser["created"]).split(".")[0])).strftime("%Y-%m-%d"), pageuser=pageuser)
else:
return """<img src="https://http.cat/images/404.jpg">""", 404
@app.route("/api/page/<userid>", methods=("GET", "POST"))
def apipageuser(userid):
pageuser = get_user(userid)
addhtml = """
<!DOCTYPE html><script>
window.stop()
</script>
<base target="_blank"/> <head><meta http-equiv="Content-Security-Policy" default-src='none'; content="img-src cdn.discordapp.com cdn.discordapp.net media.tenor.com; style-src: 'self';" /></head>"""
if not pageuser == "error":
return addhtml + pageuser["htmldescription"]
else:
return """<img src="https://http.cat/images/404.jpg">""", 404
@app.route("/@<pageusername>/edit", methods=("GET", "POST"))
def edituser(pageusername):
usersession = request.cookies.get("session_DO_NOT_SHARE")
checkusername = check_username_taken(pageusername)
if not checkusername == "error":
pageuser = get_user(checkusername)
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
if pageuser["username"] == user["username"]:
if request.method == "POST":
code = request.form["code"].replace("Content-Security-Policy", "").replace("<iframe>", "")
conn = get_db_connection()
conn.execute("UPDATE users SET htmldescription = ? WHERE id = ?",
(code, user["id"]))
conn.commit()
conn.close()
return redirect("/@" + user["username"])
else:
return render_template("edituser.html", userdata=user, pageuser=pageuser)
else:
return """<img src="https://http.cat/images/403.jpg">""", 403
else:
return """<img src="https://http.cat/images/403.jpg">""", 403
else:
return """<img src="https://http.cat/images/404.jpg">""", 404
@app.route("/api/frontpage", methods=("GET", "POST"))
def apifrontpage():
conn = get_db_connection()
posts = conn.execute("SELECT * FROM posts ORDER BY created DESC;").fetchall()
conn.close()
result = []
for post in posts:
comments = []
for comment in get_comments(post["id"]):
commentthing = {
"title": comment["textstr"],
"id": comment["id"],
"created": comment["created"],
"creator": {
"id": comment["creator"],
"username": get_user(comment["creator"])["username"]
}
}
comments.append(commentthing)
mainthing = {
"id": post["id"],
"created": post["created"],
"title": post["textstr"],
"imgurl": post["imageurl"],
"creator": {
"id": post["creator"],
"username": get_user(post["creator"])["username"]
},
"comments": comments
}
result.append(mainthing)
return result
@app.route("/api/userinfo", methods=("GET", "POST"))
def apiuserinfo():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
datatemplate = {
"username": user["username"],
"id": user["id"],
"created": user["created"]
}
return datatemplate
else:
return {
"error": "no authentication"
}, 403
@limiter.limit("10/minute", override_defaults=False)
@app.route("/api/login", methods=("GET", "POST"))
def apilogin():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if request.method == "POST":
data = request.get_json()
username = data["username"]
password = data["password"]
userID = check_username_taken(username)
user = get_user(userID)
if user == "error":
return {
"error": "wrong username or password"
}, 401
if not check_password_hash(user["password"], (password)):
return {
"error": "wrong username or password"
}, 401
randomCharacters = secrets.token_hex(512)
conn = get_db_connection()
conn.execute("INSERT INTO sessions (session, id) VALUES (?, ?)",
(randomCharacters, userID))
conn.commit()
conn.close()
return {
"key": randomCharacters
}, 200
else:
return {
"error": "https://http.cat/images/400.jpg"
}, 400
@app.route("/apidocs", methods=("GET", "POST"))
def apidocs():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
return render_template("apidocs.html", userdata=user)
else:
return render_template("apidocs.html")
@app.route("/post", methods=("GET", "POST"))
def post():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
if request.method == "POST":
title = request.form["title"]
if title == "":
flash("Text required :3")
return redirect(url_for("post"))
if len(title) > 300:
flash("Too long title!")
return redirect(url_for("post"))
if "file" not in request.files:
flash("No file selected :3")
return redirect(url_for("post"))
file = request.files["file"]
if file.filename == "":
flash("No file selected :3")
return redirect(url_for("post"))
if not allowed_file(file.filename):
flash("File is not an image!")
return redirect(url_for("post"))
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
if not user["banned"] == "0":
flash("Your account has been banned. You may no longer perform this action.")
return redirect(url_for("post"))
filename = secure_filename(file.filename)
finalfilename = secrets.token_hex(32) + filename
file.save(os.path.join(UPLOAD_FOLDER, finalfilename))
imgurl = "/cdn/" + finalfilename
conn = get_db_connection()
conn.execute("INSERT INTO posts (textstr, imageurl, creator, created) VALUES (?, ?, ?, ?)",
(title, imgurl, userCookie["id"], str(time.time())))
conn.commit()
conn.close()
return redirect(url_for("main"))
else:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
return render_template("post.html", userdata=user)
else:
flash("A burgercat account is required to post :3")
return redirect(url_for("login"))
@app.route("/api/comment", methods=("GET", "POST"))
@limiter.limit("1/second", override_defaults=False)
def comment():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
if request.method == "POST":
data = request.get_json()
uid = data["id"]
title = data["title"]
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
if len(title) > 300:
return {
"error": "too much text"
}, 403
if not user["banned"] == "0":
return {
"error": "banned"
}, 403
conn = get_db_connection()
conn.execute("INSERT INTO comments (textstr, post_id, creator, created) VALUES (?, ?, ?, ?)",
(title, uid, userCookie["id"], str(time.time())))
conn.commit()
conn.close()
return "success", 200
else:
return {
"error": "https://http.cat/images/400.jpg"
}, 400
else:
return {
"error": "https://http.cat/images/401.jpg"
}, 401
@app.route("/api/post/<post_id>/comments", methods=("GET", "POST"))
def apicomments(post_id):
postthing = get_comments(int(post_id))
if not postthing == "error":
comments = []
for comment in postthing:
commentthing = {
"title": comment["textstr"],
"created": comment["created"],
"creator": {
"id": comment["creator"],
"username": get_user(comment["creator"])["username"]
}
}
comments.append(commentthing)
return comments
@app.route("/cdn/<filename>", methods=("GET", "POST"))
@limiter.exempt
def cdn(filename):
if os.path.exists(os.path.join(UPLOAD_FOLDER, filename)):
return send_from_directory(UPLOAD_FOLDER, filename)
else:
return "file doesn't exist!!"
@app.route("/signup", methods=("GET", "POST"))
@limiter.limit("5/minute", override_defaults=False)
def signup():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
return redirect(url_for("main"))
if request.method == "POST":
if not check_username_taken(request.form["username"]) == "error":
flash("Username already taken :3")
return redirect(url_for("signup"))
if not request.form["username"].isalnum():
flash("Username must be alphanumeric :3")
return redirect(url_for("signup"))
if not len(request.form["password"]) > int(PASSWORD_REQUIREMENT):
flash("Password must contain at least " + PASSWORD_REQUIREMENT + " characters")
return redirect(url_for("signup"))
hashedpassword = generate_password_hash(request.form["password"])
conn = get_db_connection()
conn.execute("INSERT INTO users (username, password, created, htmldescription) VALUES (?, ?, ?, ?)",
(request.form["username"], hashedpassword, str(time.time()), ""))
conn.commit()
conn.close()
return redirect(url_for("login"))
else:
return render_template("signup.html")
@app.route("/login", methods=("GET", "POST"))
@limiter.limit("10/minute", override_defaults=False)
def login():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
redirect(url_for("main"))
if request.method == "POST":
userID = check_username_taken(request.form["username"])
user = get_user(userID)
if user == "error":
flash("Wrong username or password :3")
return redirect(url_for("login"))
if not check_password_hash(user["password"], (request.form["password"])):
flash("Wrong username or password :3")
return redirect(url_for("login"))
randomCharacters = secrets.token_hex(512)
conn = get_db_connection()
conn.execute("INSERT INTO sessions (session, id) VALUES (?, ?)",
(randomCharacters, userID))
conn.commit()
conn.close()
resp = make_response(redirect("/"))
resp.set_cookie("session_DO_NOT_SHARE", randomCharacters, expires=datetime.datetime.now() + datetime.timedelta(days=float(180)))
return resp
else:
return render_template("login.html")
@app.route("/settings", methods=("GET", "POST"))
def settings():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
return render_template("settings.html", userdata=user)
else:
return redirect("/")
@app.route("/api/delete", methods=("GET", "POST"))
def delete():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if request.method == "POST":
data = request.get_json()
postid = int(data["id"])
post = get_post(postid)
if not post == "error":
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
if (str(user["administrator"]) == "1") or (int(user["id"]) == int(post["creator"])):
post = get_post(postid)
conn = get_db_connection()
conn.execute("DELETE FROM posts WHERE id = ?", (postid,))
conn.commit()
conn.close()
return "success", 200
else:
return {
"error": "https://http.cat/images/403.jpg"
}, 403
else:
return {
"error": "https://http.cat/images/400.jpg"
}, 400
@app.route("/listusers", methods=("GET", "POST"))
def listusers():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
if str(user["administrator"]) == "1":
thing = ""
conn = get_db_connection()
users = conn.execute("SELECT * FROM users").fetchall()
conn.close()
for x in users:
thing = str(x["id"]) + " - " + x["username"] + "<br>" + thing
return thing
else:
return """<img src="https://http.cat/images/403.jpg">"""
else:
return redirect(url_for("login"))
@app.route("/settings/logout", methods=("GET", "POST"))
def logout():
resp = redirect(url_for("main"))
session = request.cookies.get("session_DO_NOT_SHARE")
resp.delete_cookie("session_DO_NOT_SHARE")
return resp
@app.errorhandler(500)
def page_not_found(e):
return """<img src="https://http.cat/images/500.jpg">""", 500
@app.errorhandler(400)
def page_not_found(e):
return """<img src="https://http.cat/images/400.jpg">""", 400
@app.errorhandler(429)
def page_not_found(e):
return """<img src="https://http.cat/images/429.jpg">""", 429
@app.errorhandler(404)
def page_not_found(e):
return """<img src="https://http.cat/images/404.jpg">""", 404
@app.errorhandler(413)
def page_not_found(e):
return "Images can't be larger than " + str(UPLOAD_LIMIT) + "MB", 413
if __name__ == "__main__":
print("[INFO] Server started")
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock:
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
sock.bind(('', int(PORT)))
serve(app, sockets=[sock])
print("[INFO] Server stopped")