789 lines
26 KiB
Python
789 lines
26 KiB
Python
#!/usr/bin/python3
|
|
import os
|
|
import requests
|
|
import configparser
|
|
import sqlite3
|
|
import time
|
|
import json
|
|
import secrets
|
|
import datetime
|
|
import socket
|
|
import threading
|
|
import subprocess
|
|
import asyncio
|
|
from hypercorn.config import Config
|
|
from hypercorn.asyncio import serve
|
|
from itertools import groupby
|
|
from werkzeug.utils import secure_filename
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from quart import Quart, render_template, request, url_for, flash, redirect, session, make_response, send_from_directory, stream_with_context, Response, request, jsonify, websocket
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
|
|
# read config file
|
|
config = configparser.ConfigParser()
|
|
config.read("config.ini")
|
|
|
|
PORT = config["config"]["PORT"]
|
|
SECRET_KEY = config["config"]["SECRET_KEY"]
|
|
UPLOAD_FOLDER = config["config"]["UPLOAD_FOLDER"]
|
|
UPLOAD_LIMIT = config["config"]["UPLOAD_LIMIT"]
|
|
PASSWORD_REQUIREMENT = config["config"]["PASSWORD_REQUIREMENT"]
|
|
|
|
app = Quart(__name__)
|
|
app.config["SECRET_KEY"] = SECRET_KEY
|
|
app.config["MAX_CONTENT_LENGTH"] = int(UPLOAD_LIMIT) * 1000 * 1000
|
|
|
|
if SECRET_KEY == "placeholder":
|
|
print("[WARNING] Secret key is not set")
|
|
|
|
if not os.path.exists(UPLOAD_FOLDER):
|
|
print("[WARNING] Upload folder doesn't exist, creating")
|
|
os.mkdir(UPLOAD_FOLDER)
|
|
|
|
if not os.path.exists("database.db"):
|
|
print("[ERROR] No database exists, please run init_db")
|
|
exit()
|
|
|
|
|
|
def makeStrSafe(url):
|
|
return str(urllib.parse.quote(url)).replace("%20", " ")
|
|
|
|
|
|
def get_db_connection():
|
|
conn = sqlite3.connect("database.db")
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def get_user(id):
|
|
conn = get_db_connection()
|
|
post = conn.execute("SELECT * FROM users WHERE id = ?",
|
|
(id,)).fetchone()
|
|
conn.close()
|
|
if post is None:
|
|
return "error"
|
|
return post
|
|
|
|
|
|
def get_comments(id):
|
|
conn = get_db_connection()
|
|
post = conn.execute("SELECT * FROM comments WHERE post_id = ?",
|
|
(id,)).fetchall()
|
|
conn.close()
|
|
if post is None:
|
|
return "error"
|
|
return post
|
|
|
|
|
|
def get_messages(chatroomid, max):
|
|
conn = get_db_connection()
|
|
post = conn.execute("SELECT * FROM chatmessages WHERE chatroom_id = ? ORDER BY created DESC;",
|
|
(chatroomid,)).fetchmany(max + 1)
|
|
conn.close()
|
|
if post is None:
|
|
return "error"
|
|
return post
|
|
|
|
|
|
app.jinja_env.globals.update(getComments=get_comments)
|
|
|
|
def get_post(id):
|
|
conn = get_db_connection()
|
|
post = conn.execute("SELECT * FROM posts WHERE id = ?",
|
|
(id,)).fetchone()
|
|
conn.close()
|
|
if post is None:
|
|
return "error"
|
|
return post
|
|
|
|
|
|
app.jinja_env.globals.update(getUser=get_user)
|
|
|
|
|
|
def check_username_taken(username):
|
|
conn = get_db_connection()
|
|
post = conn.execute("SELECT * FROM users WHERE lower(username) = ?",
|
|
(username.lower(),)).fetchone()
|
|
conn.close()
|
|
if post is None:
|
|
return "error"
|
|
return post["id"]
|
|
|
|
def check_sub_taken(sub):
|
|
conn = get_db_connection()
|
|
post = conn.execute("SELECT * FROM users WHERE password = ?",
|
|
(str("OAUTH-" + sub),)).fetchone()
|
|
conn.close()
|
|
if post is None:
|
|
return "error"
|
|
return post["id"]
|
|
|
|
def get_session(id):
|
|
conn = get_db_connection()
|
|
post = conn.execute("SELECT * FROM sessions WHERE session = ?",
|
|
(id,)).fetchone()
|
|
conn.close()
|
|
if post is None:
|
|
return "error"
|
|
return post
|
|
|
|
full_hash = subprocess.check_output(["git", "rev-parse", "HEAD"]).decode().strip()
|
|
short_hash = subprocess.check_output(["git", "rev-parse", "--short=8", "HEAD"]).decode().strip()
|
|
|
|
ALLOWED_EXTENSIONS = {"png", "apng", "jpg", "jpeg", "gif", "svg", "webp", "jxl"}
|
|
|
|
def allowed_file(filename):
|
|
return '.' in filename and \
|
|
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
@app.route("/", methods=("GET", "POST"))
|
|
async def main():
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
conn = get_db_connection()
|
|
posts = conn.execute("SELECT * FROM posts ORDER BY created DESC;").fetchall()
|
|
conn.close()
|
|
|
|
if usersession:
|
|
userCookie = get_session(usersession)
|
|
user = get_user(userCookie["id"])
|
|
return await render_template("main.html", userdata=user, posts=posts, full_hash=full_hash, short_hash=short_hash)
|
|
else:
|
|
return await render_template("main.html", posts=posts, full_hash=full_hash, short_hash=short_hash)
|
|
|
|
@app.route("/chat", methods=("GET", "POST"))
|
|
async def chat():
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
if usersession:
|
|
userCookie = get_session(usersession)
|
|
user = get_user(userCookie["id"])
|
|
return await render_template("chat.html", userdata=user)
|
|
else:
|
|
return await render_template("chat.html")
|
|
|
|
@app.route("/api/chat/listrooms")
|
|
async def chatlistrooms():
|
|
conn = get_db_connection()
|
|
rooms = conn.execute("SELECT * FROM chatrooms ORDER BY id ASC;").fetchall()
|
|
conn.close()
|
|
|
|
template = []
|
|
|
|
for room in rooms:
|
|
roomtemplate = {
|
|
"id": room["id"],
|
|
"name": room["roomname"]
|
|
}
|
|
template.append(roomtemplate)
|
|
|
|
return(template), 200
|
|
|
|
@app.route("/api/chat/getmessages/<roomid>")
|
|
async def chatget(roomid):
|
|
messages = get_messages(roomid, 150)
|
|
|
|
template = []
|
|
|
|
for message in messages:
|
|
creatorid = message["creator"]
|
|
|
|
creatortemplate = {
|
|
"id": message["creator"],
|
|
"username": get_user(creatorid)["username"]
|
|
}
|
|
|
|
messagetemplate = {
|
|
"id": message["id"],
|
|
"content": message["content"],
|
|
"creator": creatortemplate,
|
|
"created": message["created"]
|
|
}
|
|
template.append(messagetemplate)
|
|
|
|
return(template), 200
|
|
|
|
|
|
@app.route("/api/chat/send/<roomid>", methods=("GET", "POST"))
|
|
async def chatsend(roomid):
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
if usersession:
|
|
if request.method == "POST":
|
|
|
|
data = await request.get_json()
|
|
content = data["content"]
|
|
|
|
userCookie = get_session(usersession)
|
|
user = get_user(userCookie["id"])
|
|
|
|
if not user["banned"] == "0":
|
|
return {
|
|
"error": "banned"
|
|
}, 403
|
|
|
|
chatMessageContent = {
|
|
"content": content,
|
|
"creator": user["username"],
|
|
"roomid": roomid,
|
|
"created": str(time.time())
|
|
}
|
|
|
|
#message_queue.append({"message": chatMessageContent})
|
|
|
|
conn = get_db_connection()
|
|
conn.execute("INSERT INTO chatmessages (content, chatroom_id, creator, created) VALUES (?, ?, ?, ?)",
|
|
(content, roomid, userCookie["id"], str(time.time())))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return "success", 200
|
|
|
|
|
|
@app.route("/@<pageusername>", methods=("GET", "POST"))
|
|
async def user(pageusername):
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
|
|
checkusername = check_username_taken(pageusername)
|
|
|
|
if not checkusername == "error":
|
|
pageuser = get_user(checkusername)
|
|
if usersession:
|
|
userCookie = get_session(usersession)
|
|
user = get_user(userCookie["id"])
|
|
return await render_template("user.html", userdata=user, createddate=datetime.datetime.fromtimestamp(int(str(pageuser["created"]).split(".")[0])).strftime("%Y-%m-%d"), pageuser=pageuser)
|
|
else:
|
|
return await render_template("user.html", createddate=datetime.datetime.fromtimestamp(int(str(pageuser["created"]).split(".")[0])).strftime("%Y-%m-%d"), pageuser=pageuser)
|
|
else:
|
|
return """<img src="https://http.cat/images/404.jpg">""", 404
|
|
|
|
@app.route("/api/page/<userid>", methods=("GET", "POST"))
|
|
async def apipageuser(userid):
|
|
pageuser = get_user(userid)
|
|
addhtml = """
|
|
<!DOCTYPE html><script>
|
|
window.stop()
|
|
</script>
|
|
<base target="_blank"/> <head><meta http-equiv="Content-Security-Policy" default-src='none'; child-src='none'; content="img-src cdn.discordapp.com cdn.discordapp.net media.tenor.com; media-src: fonts.gstatic.com fonts.googleapis.com;" /></head>"""
|
|
|
|
if not pageuser == "error":
|
|
return addhtml + pageuser["htmldescription"]
|
|
else:
|
|
return """<img src="https://http.cat/images/404.jpg">""", 404
|
|
|
|
@app.route("/@<pageusername>/edit", methods=("GET", "POST"))
|
|
async def edituser(pageusername):
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
|
|
checkusername = check_username_taken(pageusername)
|
|
|
|
if not checkusername == "error":
|
|
pageuser = get_user(checkusername)
|
|
if usersession:
|
|
userCookie = get_session(usersession)
|
|
user = get_user(userCookie["id"])
|
|
if pageuser["username"] == user["username"]:
|
|
if request.method == "POST":
|
|
requestData = await request.form
|
|
|
|
code = requestData["code"].replace("Content-Security-Policy", "").replace("<iframe>", "")
|
|
conn = get_db_connection()
|
|
conn.execute("UPDATE users SET htmldescription = ? WHERE id = ?",
|
|
(code, user["id"]))
|
|
conn.commit()
|
|
conn.close()
|
|
return redirect("/@" + user["username"])
|
|
else:
|
|
return await render_template("edituser.html", userdata=user, pageuser=pageuser)
|
|
else:
|
|
return """<img src="https://http.cat/images/403.jpg">""", 403
|
|
else:
|
|
return """<img src="https://http.cat/images/403.jpg">""", 403
|
|
else:
|
|
return """<img src="https://http.cat/images/404.jpg">""", 404
|
|
|
|
|
|
@app.route("/api/frontpage", methods=("GET", "POST"))
|
|
async def apifrontpage():
|
|
conn = get_db_connection()
|
|
posts = conn.execute("SELECT * FROM posts ORDER BY created DESC;").fetchall()
|
|
conn.close()
|
|
|
|
result = []
|
|
|
|
for post in posts:
|
|
comments = []
|
|
|
|
for comment in get_comments(post["id"]):
|
|
commentthing = {
|
|
"title": comment["textstr"],
|
|
"id": comment["id"],
|
|
"created": comment["created"],
|
|
"creator": {
|
|
"id": comment["creator"],
|
|
"username": get_user(comment["creator"])["username"]
|
|
}
|
|
}
|
|
comments.append(commentthing)
|
|
|
|
|
|
mainthing = {
|
|
"id": post["id"],
|
|
"created": post["created"],
|
|
"title": post["textstr"],
|
|
"imgurl": post["imageurl"],
|
|
"creator": {
|
|
"id": post["creator"],
|
|
"username": get_user(post["creator"])["username"]
|
|
},
|
|
"comments": comments
|
|
}
|
|
|
|
result.append(mainthing)
|
|
|
|
return result
|
|
|
|
@app.route("/api/userinfo", methods=("GET", "POST"))
|
|
async def apiuserinfo():
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
if usersession:
|
|
userCookie = get_session(usersession)
|
|
user = get_user(userCookie["id"])
|
|
datatemplate = {
|
|
"username": user["username"],
|
|
"id": user["id"],
|
|
"created": user["created"]
|
|
}
|
|
return datatemplate
|
|
else:
|
|
return {
|
|
"error": "no authentication"
|
|
}, 403
|
|
|
|
|
|
@app.route("/api/login", methods=("GET", "POST"))
|
|
async def apilogin():
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
if request.method == "POST":
|
|
data = await request.get_json()
|
|
username = data["username"]
|
|
password = data["password"]
|
|
|
|
userID = check_username_taken(username)
|
|
user = get_user(userID)
|
|
|
|
if user == "error":
|
|
return {
|
|
"error": "wrong username or password"
|
|
}, 401
|
|
|
|
if not check_password_hash(user["password"], (password)):
|
|
return {
|
|
"error": "wrong username or password"
|
|
}, 401
|
|
|
|
randomCharacters = secrets.token_hex(512)
|
|
|
|
conn = get_db_connection()
|
|
conn.execute("INSERT INTO sessions (session, id) VALUES (?, ?)",
|
|
(randomCharacters, userID))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {
|
|
"key": randomCharacters
|
|
}, 200
|
|
else:
|
|
return {
|
|
"error": "https://http.cat/images/400.jpg"
|
|
}, 400
|
|
|
|
@app.route("/api/migrate", methods=("GET", "POST"))
|
|
async def migrate():
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
if request.method == "POST":
|
|
data = await request.get_json()
|
|
sub = data["sub"]
|
|
password = data["access_token"]
|
|
userCookie = get_session(usersession)
|
|
user = get_user(userCookie["id"])
|
|
if user == "error":
|
|
return { "error": "User doesn't exist" }, 403
|
|
|
|
conn = get_db_connection()
|
|
subdata = '{"access_token":"' + password + '"}'
|
|
response = requests.post("https://auth.hectabit.org/api/uniqueid", subdata)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
conn.execute("UPDATE users SET password = ? WHERE id = ?",
|
|
(str("OAUTH-" + data['uniqueid']), user["id"]))
|
|
else:
|
|
return {"error": response.json()["error"]}, response.status_code
|
|
conn.commit()
|
|
conn.close()
|
|
return {"success": "true"}, 200
|
|
|
|
@app.route("/api/oauth", methods=("GET", "POST"))
|
|
async def apioauth():
|
|
if request.method == "POST":
|
|
data = await request.get_json()
|
|
username = data["username"]
|
|
sub = data["sub"]
|
|
password = data["access_token"]
|
|
|
|
conn = get_db_connection()
|
|
subdata = '{"access_token":"' + password + '"}'
|
|
response = requests.post("https://auth.hectabit.org/api/loggedin", subdata)
|
|
if response.status_code == 200:
|
|
userID = check_sub_taken(sub)
|
|
user = get_user(userID)
|
|
if user == "error":
|
|
userID = check_username_taken(username)
|
|
user = get_user(userID)
|
|
if user == "error":
|
|
conn.execute("INSERT INTO users (username, password, created, htmldescription, banned) VALUES (?, ?, ?, ?, ?)",
|
|
(username, str("OAUTH-" + sub), str(time.time()), "", "0"))
|
|
userID = conn.execute("SELECT * FROM users WHERE lower(username) = ?",
|
|
(username.lower(),)).fetchone()["id"]
|
|
else:
|
|
if user["password"] != "OAUTH-" + sub:
|
|
return {"error": "Migration required or username taken"}, 422
|
|
else:
|
|
return {"error": response.json()["error"]}, response.status_code
|
|
|
|
randomCharacters = secrets.token_hex(512)
|
|
|
|
conn.execute("INSERT INTO sessions (session, id) VALUES (?, ?)",
|
|
(randomCharacters, userID))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {
|
|
"key": randomCharacters
|
|
}, 200
|
|
else:
|
|
return {
|
|
"error": "https://http.cat/images/405.jpg"
|
|
}, 405
|
|
|
|
@app.route("/apidocs", methods=("GET", "POST"))
|
|
async def apidocs():
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
|
|
if usersession:
|
|
userCookie = get_session(usersession)
|
|
user = get_user(userCookie["id"])
|
|
return await render_template("apidocs.html", userdata=user)
|
|
else:
|
|
return await render_template("apidocs.html")
|
|
|
|
@app.route("/post", methods=("GET", "POST"))
|
|
async def post():
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
if usersession:
|
|
if request.method == "POST":
|
|
formData = await request.form
|
|
formFiles = await request.files
|
|
|
|
title = formData["title"]
|
|
if title == "":
|
|
await flash("Text required :3")
|
|
return redirect(url_for("post"))
|
|
|
|
if len(title) > 300:
|
|
await flash("Too long title!")
|
|
return redirect(url_for("post"))
|
|
|
|
file = formFiles["file"]
|
|
|
|
if (file):
|
|
if not allowed_file(file.filename):
|
|
await flash("File is not an image!")
|
|
return redirect(url_for("post"))
|
|
|
|
userCookie = get_session(usersession)
|
|
user = get_user(userCookie["id"])
|
|
|
|
if not user["banned"] == "0":
|
|
await flash("Your account has been suspended. You may no longer post")
|
|
return redirect(url_for("post"))
|
|
|
|
filename = secure_filename(file.filename)
|
|
finalfilename = secrets.token_hex(32) + filename
|
|
|
|
await file.save(os.path.join(UPLOAD_FOLDER, finalfilename))
|
|
imgurl = "/cdn/" + finalfilename
|
|
|
|
if (not file):
|
|
imgurl = ""
|
|
|
|
conn = get_db_connection()
|
|
conn.execute("INSERT INTO posts (textstr, imageurl, creator, created) VALUES (?, ?, ?, ?)",
|
|
(title, imgurl, userCookie["id"], str(time.time())))
|
|
conn.commit()
|
|
conn.close()
|
|
return redirect(url_for("main"))
|
|
|
|
else:
|
|
userCookie = get_session(usersession)
|
|
user = get_user(userCookie["id"])
|
|
return await render_template("post.html", userdata=user)
|
|
else:
|
|
await flash("A burgercat account is required to post :3")
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
@app.route("/api/comment", methods=("GET", "POST"))
|
|
async def comment():
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
if usersession:
|
|
if request.method == "POST":
|
|
|
|
data = await request.get_json()
|
|
uid = data["id"]
|
|
title = data["title"]
|
|
|
|
userCookie = get_session(usersession)
|
|
user = get_user(userCookie["id"])
|
|
|
|
if len(title) > 300:
|
|
return {
|
|
"error": "too much text"
|
|
}, 403
|
|
|
|
if not user["banned"] == "0":
|
|
return {
|
|
"error": "banned"
|
|
}, 403
|
|
|
|
conn = get_db_connection()
|
|
conn.execute("INSERT INTO comments (textstr, post_id, creator, created) VALUES (?, ?, ?, ?)",
|
|
(title, uid, userCookie["id"], str(time.time())))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return "success", 200
|
|
|
|
else:
|
|
return {
|
|
"error": "https://http.cat/images/400.jpg"
|
|
}, 400
|
|
else:
|
|
return {
|
|
"error": "https://http.cat/images/401.jpg"
|
|
}, 401
|
|
|
|
@app.route("/api/post/<post_id>/comments", methods=("GET", "POST"))
|
|
async def apicomments(post_id):
|
|
postthing = get_comments(int(post_id))
|
|
|
|
if not postthing == "error":
|
|
comments = []
|
|
for comment in postthing:
|
|
commentthing = {
|
|
"title": comment["textstr"],
|
|
"created": comment["created"],
|
|
"creator": {
|
|
"id": comment["creator"],
|
|
"username": get_user(comment["creator"])["username"]
|
|
}
|
|
}
|
|
comments.append(commentthing)
|
|
return comments
|
|
|
|
@app.route("/cdn/<filename>", methods=("GET", "POST"))
|
|
async def cdn(filename):
|
|
if os.path.exists(os.path.join(UPLOAD_FOLDER, filename)):
|
|
return await send_from_directory(UPLOAD_FOLDER, filename)
|
|
else:
|
|
return "file doesn't exist!!"
|
|
|
|
@app.route("/legacysignup", methods=("GET", "POST"))
|
|
async def legacysignup():
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
if usersession:
|
|
return redirect(url_for("main"))
|
|
if request.method == "POST":
|
|
requestData = await request.form
|
|
|
|
if not check_username_taken(requestData["username"]) == "error":
|
|
await flash("Username already taken :3")
|
|
return redirect(url_for("signup"))
|
|
|
|
if not requestData["username"].isalnum():
|
|
await flash("Username must be alphanumeric :3")
|
|
return redirect(url_for("signup"))
|
|
|
|
if not len(requestData["password"]) > int(PASSWORD_REQUIREMENT):
|
|
await flash("Password must contain at least " + PASSWORD_REQUIREMENT + " characters")
|
|
return redirect(url_for("signup"))
|
|
|
|
hashedpassword = generate_password_hash(requestData["password"])
|
|
|
|
conn = get_db_connection()
|
|
conn.execute("INSERT INTO users (username, password, created, htmldescription) VALUES (?, ?, ?, ?)",
|
|
(requestData["username"], hashedpassword, str(time.time()), ""))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return redirect(url_for("login"))
|
|
else:
|
|
return await render_template("signup.html")
|
|
|
|
@app.route("/signup", methods=("GET", "POST"))
|
|
async def signup():
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
if usersession:
|
|
return redirect(url_for("main"))
|
|
else:
|
|
return redirect(url_for("oauth"))
|
|
|
|
@app.route("/login", methods=("GET", "POST"))
|
|
async def login():
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
if usersession:
|
|
redirect(url_for("main"))
|
|
if request.method == "POST":
|
|
requestData = await request.form
|
|
|
|
userID = check_username_taken(requestData["username"])
|
|
user = get_user(userID)
|
|
|
|
if user == "error":
|
|
await flash("Wrong username or password :3")
|
|
return redirect(url_for("login"))
|
|
|
|
if not check_password_hash(user["password"], (requestData["password"])):
|
|
await flash("Wrong username or password :3")
|
|
return redirect(url_for("login"))
|
|
|
|
randomCharacters = secrets.token_hex(512)
|
|
|
|
conn = get_db_connection()
|
|
conn.execute("INSERT INTO sessions (session, id) VALUES (?, ?)",
|
|
(randomCharacters, userID))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
response = Response("""<script>window.location.href = "/oauth";</script>""")
|
|
response.set_cookie("session_DO_NOT_SHARE", randomCharacters)
|
|
response.set_cookie("legacy_migrate", "1")
|
|
return response
|
|
|
|
#resp = await make_response(redirect("/"))
|
|
#resp.set_cookie("session_DO_NOT_SHARE", randomCharacters, expires=datetime.datetime.now() + datetime.timedelta(days=float(180)))
|
|
|
|
#return redirect("/")
|
|
|
|
else:
|
|
return await render_template("login.html")
|
|
|
|
@app.route("/oauth", methods=("GET", "POST"))
|
|
async def oauth():
|
|
legacymigrate = request.cookies.get("legacy_migrate")
|
|
if legacymigrate != "1":
|
|
return await render_template("oauth.html")
|
|
else:
|
|
return await render_template("migrate.html")
|
|
|
|
@app.route("/settings", methods=("GET", "POST"))
|
|
async def settings():
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
if usersession:
|
|
userCookie = get_session(usersession)
|
|
user = get_user(userCookie["id"])
|
|
|
|
return await render_template("settings.html", userdata=user, createddate=datetime.datetime.fromtimestamp(int(str(user["created"]).split(".")[0])).strftime("%Y-%m-%d %H:%m:%S"))
|
|
else:
|
|
return redirect("/")
|
|
|
|
|
|
@app.route("/api/delete", methods=("GET", "POST"))
|
|
async def delete():
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
|
|
if request.method == "POST":
|
|
data = await request.get_json()
|
|
postid = int(data["id"])
|
|
|
|
post = get_post(postid)
|
|
if not post == "error":
|
|
if usersession:
|
|
userCookie = get_session(usersession)
|
|
user = get_user(userCookie["id"])
|
|
|
|
if (str(user["administrator"]) == "1") or (int(user["id"]) == int(post["creator"])):
|
|
post = get_post(postid)
|
|
conn = get_db_connection()
|
|
conn.execute("DELETE FROM posts WHERE id = ?", (postid,))
|
|
conn.commit()
|
|
conn.close()
|
|
return "success", 200
|
|
else:
|
|
return {
|
|
"error": "https://http.cat/images/403.jpg"
|
|
}, 403
|
|
else:
|
|
return {
|
|
"error": "https://http.cat/images/400.jpg"
|
|
}, 400
|
|
|
|
@app.route("/listusers", methods=("GET", "POST"))
|
|
async def listusers():
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
if usersession:
|
|
userCookie = get_session(usersession)
|
|
user = get_user(userCookie["id"])
|
|
if str(user["administrator"]) == "1":
|
|
thing = ""
|
|
|
|
conn = get_db_connection()
|
|
users = conn.execute("SELECT * FROM users").fetchall()
|
|
conn.close()
|
|
for x in users:
|
|
thing = str(x["id"]) + " - " + x["username"] + "<br>" + thing
|
|
|
|
return thing
|
|
else:
|
|
return """<img src="https://http.cat/images/403.jpg">"""
|
|
else:
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
@app.route("/settings/logout", methods=("GET", "POST"))
|
|
async def logout():
|
|
resp = redirect(url_for("main"))
|
|
session = request.cookies.get("session_DO_NOT_SHARE")
|
|
resp.delete_cookie("session_DO_NOT_SHARE")
|
|
resp.delete_cookie("prefuser")
|
|
resp.delete_cookie("legacy_migrate")
|
|
return resp
|
|
|
|
@app.errorhandler(500)
|
|
async def page_not_found(e):
|
|
return """<img src="https://http.cat/images/500.jpg">""", 500
|
|
|
|
@app.errorhandler(400)
|
|
async def page_not_found(e):
|
|
return """<img src="https://http.cat/images/400.jpg">""", 400
|
|
|
|
@app.errorhandler(429)
|
|
async def page_not_found(e):
|
|
return """<img src="https://http.cat/images/429.jpg">""", 429
|
|
|
|
@app.errorhandler(404)
|
|
async def page_not_found(e):
|
|
return """<img src="https://http.cat/images/404.jpg">""", 404
|
|
|
|
@app.errorhandler(413)
|
|
async def page_not_found(e):
|
|
return "Images can't be larger than " + str(UPLOAD_LIMIT) + "MB", 413
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("[INFO] Server started")
|
|
|
|
hypercornconfig = Config()
|
|
hypercornconfig.bind = ("0.0.0.0" + ":" + PORT)
|
|
|
|
asyncio.run(serve(app, hypercornconfig))
|
|
|
|
print("[INFO] Server stopped")
|