burgernotes-server/main

577 lines
16 KiB
Python

#!/usr/bin/python3
import os
import sqlite3
import time
import secrets
import configparser
import asyncio
from hypercorn.config import Config
from hypercorn.asyncio import serve
from werkzeug.security import generate_password_hash, check_password_hash
from quart import Quart, request
from json import loads
# Parse configuration file, and check if anything is wrong with it
if not os.path.exists("config.ini"):
print("[FATAL] config.ini does not exist")
exit(1)
config = configparser.ConfigParser()
config.read("config.ini")
print("[INFO] Config loaded at", str(int(time.time())))
HOST = config["config"]["HOST"]
PORT = config["config"]["PORT"]
SECRET_KEY = config["config"]["SECRET_KEY"]
MAX_STORAGE = config["config"]["MAX_STORAGE"]
if SECRET_KEY == "supersecretkey" or SECRET_KEY == "placeholder":
print("[WARNING] Secret key not set. Please set the secret key to a non-default value.")
# Define Quart
app = Quart(__name__)
app.config["SECRET_KEY"] = SECRET_KEY
# Database functions
def get_db_connection():
conn = sqlite3.connect("database.db")
conn.row_factory = sqlite3.Row
return conn
def get_user(identifier):
conn = get_db_connection()
post = conn.execute("SELECT * FROM users WHERE id = ?",
(identifier,)).fetchone()
conn.close()
if post is None:
return None
return post
def get_note(identifier):
conn = get_db_connection()
post = conn.execute("SELECT * FROM notes WHERE id = ?",
(identifier,)).fetchone()
conn.close()
if post is None:
return None
return post
def get_space(identifier):
conn = get_db_connection()
notes = conn.execute("SELECT content, title FROM notes WHERE creator = ? ORDER BY id DESC;",
(identifier,)).fetchall()
conn.close()
spacetaken = 0
for x in notes:
spacetaken = spacetaken + len(x["content"].encode("utf-8"))
spacetaken = spacetaken + len(x["title"].encode("utf-8"))
return spacetaken
def get_note_count(identifier):
conn = get_db_connection()
notes = conn.execute("SELECT content, title FROM notes WHERE creator = ? ORDER BY id DESC;",
(identifier,)).fetchall()
conn.close()
return len(notes)
def get_session(identifier):
conn = get_db_connection()
post = conn.execute("SELECT * FROM sessions WHERE session = ?",
(identifier,)).fetchone()
conn.close()
if post is None:
return None
return post
def check_username_taken(username):
conn = get_db_connection()
post = conn.execute("SELECT * FROM users WHERE lower(username) = ?",
(username.lower(),)).fetchone()
conn.close()
if post is None:
return None
return post["id"]
# Disable CORS
@app.after_request
async def add_cors_headers(response):
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "*")
response.headers.add("Access-Control-Allow-Methods", "*")
return response
# Live editing store
messages = {}
@app.route("/api/version", methods=("GET", "POST"))
async def apiversion():
return "Burgernotes Version 1.2"
@app.route("/api/signup", methods=("GET", "POST"))
async def apisignup():
if request.method == "POST":
data = await request.get_json()
username = data["username"]
password = data["password"]
if username == "":
return {}, 422
if len(username) > 20:
return {}, 422
if not username.isalnum():
return {}, 422
if password == "":
return {}, 422
if len(password) < 14:
return {}, 422
if not check_username_taken(username) is None:
return {}, 409
hashedpassword = generate_password_hash(password)
conn = get_db_connection()
conn.execute("INSERT INTO users (username, password, created) VALUES (?, ?, ?)",
(username, hashedpassword, str(time.time())))
userID = check_username_taken(username)
randomCharacters = secrets.token_hex(512)
conn.execute("INSERT INTO sessions (session, id, device) VALUES (?, ?, ?)",
(randomCharacters, userID, request.headers.get("user-agent")))
conn.commit()
conn.close()
return {
"key": randomCharacters
}, 200
@app.route("/api/login", methods=("GET", "POST"))
async def apilogin():
if request.method == "POST":
data = await request.get_json()
username = data["username"]
password = data["password"]
passwordchange = data["passwordchange"]
newpass = data["newpass"]
check_username_thing = check_username_taken(username)
if check_username_thing is None:
return {}, 401
userID = check_username_taken(username)
user = get_user(userID)
if not check_password_hash(user["password"], password):
return {}, 401
randomCharacters = secrets.token_hex(512)
conn = get_db_connection()
conn.execute("INSERT INTO sessions (session, id, device) VALUES (?, ?, ?)",
(randomCharacters, userID, request.headers.get("user-agent")))
if passwordchange == "yes":
hashedpassword = generate_password_hash(newpass)
conn.execute("UPDATE users SET password = ? WHERE username = ?", (hashedpassword, username))
conn.commit()
conn.close()
return {
"key": randomCharacters,
}, 200
@app.route("/api/userinfo", methods=("GET", "POST"))
async def apiuserinfo():
if request.method == "POST":
data = await request.get_json()
secretKey = data["secretKey"]
userCookie = get_session(secretKey)
user = get_user(userCookie["id"])
datatemplate = {
"username": user["username"],
"id": user["id"],
"created": user["created"],
"storageused": get_space(user["id"]),
"storagemax": MAX_STORAGE,
"notecount": get_note_count(user["id"])
}
return datatemplate
@app.route("/api/loggedin", methods=("GET", "POST"))
async def apiloggedin():
if request.method == "POST":
data = await request.get_json()
secretKey = data["secretKey"]
if get_session(secretKey) != None:
return {}, 200
else:
return {}, 400
@app.route("/api/listnotes", methods=("GET", "POST"))
async def apilistnotes():
if request.method == "POST":
data = await request.get_json()
secretKey = data["secretKey"]
userCookie = get_session(secretKey)
user = get_user(userCookie["id"])
conn = get_db_connection()
notes = conn.execute("SELECT * FROM notes WHERE creator = ? ORDER BY edited DESC;", (user["id"],)).fetchall()
conn.close()
datatemplate = []
for note in notes:
notetemplate = {
"id": note["id"],
"title": note["title"]
}
datatemplate.append(notetemplate)
return datatemplate, 200
@app.route("/api/exportnotes", methods=("GET", "POST"))
async def apiexportnotes():
if request.method == "POST":
data = await request.get_json()
secretKey = data["secretKey"]
userCookie = get_session(secretKey)
user = get_user(userCookie["id"])
conn = get_db_connection()
notes = conn.execute("SELECT * FROM notes WHERE creator = ? ORDER BY id DESC;", (user["id"],)).fetchall()
conn.close()
datatemplate = []
for note in notes:
notetemplate = {
"id": note["id"],
"created": note["created"],
"edited": note["edited"],
"title": note["title"],
"content": note["content"]
}
datatemplate.append(notetemplate)
return datatemplate, 200
@app.route("/api/importnotes", methods=("GET", "POST"))
async def importnotes():
if request.method == "POST":
data = await request.get_json()
secretKey = data["secretKey"]
notes = data["notes"]
userCookie = get_session(secretKey)
conn = get_db_connection()
notejson = loads(notes)
for note in notejson:
conn.execute("INSERT INTO notes (title, content, creator, created, edited) VALUES (?, ?, ?, ?, ?)",
(note["title"], note["content"], userCookie["id"], note["created"], note["edited"]))
conn.commit()
conn.close()
return {}, 200
@app.route("/api/newnote", methods=("GET", "POST"))
async def apinewnote():
if request.method == "POST":
data = await request.get_json()
secretKey = data["secretKey"]
noteName = data["noteName"]
userCookie = get_session(secretKey)
user = get_user(userCookie["id"])
conn = get_db_connection()
conn.execute("INSERT INTO notes (title, content, creator, created, edited) VALUES (?, ?, ?, ?, ?)",
(noteName, "", user["id"], str(time.time()), str(time.time())))
conn.commit()
conn.close()
return {}, 200
@app.route("/api/readnote", methods=("GET", "POST"))
async def apireadnote():
if request.method == "POST":
data = await request.get_json()
secretKey = data["secretKey"]
noteId = data["noteId"]
userCookie = get_session(secretKey)
user = get_user(userCookie["id"])
note = get_note(noteId)
if note is not None:
if user["id"] == note["creator"]:
contenttemplate = {
"content": note["content"]
}
return contenttemplate, 200
else:
return {}, 422
else:
return {}, 422
@app.route("/api/waitforedit", methods=("GET", "POST"))
async def waitforedit():
if request.method == "POST":
data = await request.get_json()
secretKey = data["secretKey"]
userCookie = get_session(secretKey)
user = get_user(userCookie["id"])
complete = True
start_time = time.time()
while user["id"] not in messages or not messages[user["id"]]:
await asyncio.sleep(0)
elapsed_time = time.time() - start_time
if elapsed_time >= 20:
complete = False
break
message = messages[user["id"]].pop(0)
del messages[user["id"]]
if complete:
return {
"note": message
}, 200
else:
return 400
@app.route("/api/editnote", methods=("GET", "POST"))
async def apieditnote():
if request.method == "POST":
data = await request.get_json()
secretKey = data["secretKey"]
noteId = data["noteId"]
content = data["content"]
title = data["title"]
userCookie = get_session(secretKey)
user = get_user(userCookie["id"])
note = get_note(noteId)
if get_space(user["id"]) + len(content.encode("utf-8")) > int(MAX_STORAGE):
return {}, 418
if note is not None:
if user["id"] == note["creator"]:
conn = get_db_connection()
conn.execute("UPDATE notes SET content = ?, title = ?, edited = ? WHERE id = ?",
(content, title, str(time.time()), noteId))
conn.commit()
conn.close()
if user["id"] not in messages:
messages[user["id"]] = []
messages[user["id"]].append(noteId)
return {}, 200
else:
return {}, 403
else:
return {}, 422
@app.route("/api/editnotetitle", methods=("GET", "POST"))
async def apieditnotetitle():
if request.method == "POST":
data = await request.get_json()
secretKey = data["secretKey"]
noteId = data["noteId"]
content = data["content"]
userCookie = get_session(secretKey)
user = get_user(userCookie["id"])
note = get_note(noteId)
if get_space(user["id"]) + len(content.encode("utf-8")) > int(MAX_STORAGE):
return {}, 418
if note is not None:
if user["id"] == note["creator"]:
conn = get_db_connection()
conn.execute("UPDATE notes SET title = ?, edited = ? WHERE id = ?", (content, str(time.time()), noteId))
conn.commit()
conn.close()
return {}, 200
else:
return {}, 403
else:
return {}, 422
@app.route("/api/removenote", methods=("GET", "POST"))
async def apiremovenote():
if request.method == "POST":
data = await request.get_json()
secretKey = data["secretKey"]
noteId = data["noteId"]
userCookie = get_session(secretKey)
user = get_user(userCookie["id"])
note = get_note(noteId)
if note is not None:
if user["id"] == note["creator"]:
conn = get_db_connection()
conn.execute("DELETE FROM notes WHERE id = ?", (noteId,))
conn.commit()
conn.close()
return {}, 200
else:
return {}, 403
else:
return {}, 422
@app.route("/api/deleteaccount", methods=("GET", "POST"))
async def apideleteaccount():
if request.method == "POST":
data = await request.get_json()
secretKey = data["secretKey"]
userCookie = get_session(secretKey)
conn = get_db_connection()
conn.execute("DELETE FROM notes WHERE creator = ?", (userCookie["id"],))
conn.execute("DELETE FROM users WHERE id = ?", (userCookie["id"],))
conn.commit()
conn.close()
return {}, 200
@app.route("/api/sessions/list", methods=("GET", "POST"))
async def apisessionslist():
if request.method == "POST":
data = await request.get_json()
secretKey = data["secretKey"]
userCookie = get_session(secretKey)
user = get_user(userCookie["id"])
conn = get_db_connection()
sessions = conn.execute("SELECT * FROM sessions WHERE id = ? ORDER BY id DESC;", (user["id"],)).fetchall()
conn.close()
datatemplate = []
for x in sessions:
device = x["device"]
thisSession = False
if x["session"] == secretKey:
thisSession = True
sessiontemplate = {
"id": x["sessionid"],
"thisSession": thisSession,
"device": device
}
datatemplate.append(sessiontemplate)
return datatemplate, 200
@app.route("/api/sessions/remove", methods=("GET", "POST"))
async def apisessionsremove():
if request.method == "POST":
data = await request.get_json()
secretKey = data["secretKey"]
sessionId = data["sessionId"]
userCookie = get_session(secretKey)
user = get_user(userCookie["id"])
if userCookie is not None:
if user["id"] == userCookie["id"]:
conn = get_db_connection()
conn.execute("DELETE FROM sessions WHERE sessionid = ?", (sessionId,))
conn.commit()
conn.close()
return {}, 200
else:
return {}, 403
else:
return {}, 422
@app.route("/api/listusers", methods=("GET", "POST"))
async def listusers():
if request.method == "POST":
data = await request.get_json()
masterkey = data["masterkey"]
if masterkey == SECRET_KEY:
conn = get_db_connection()
users = conn.execute("SELECT * FROM users").fetchall()
conn.close()
thing = []
for x in users:
user_info = {
"id": str(x["id"]),
"username": str(x["username"]),
"space": str(get_space(x["id"]))
}
thing.append(user_info)
return thing, 200
else:
return {}, 401
@app.errorhandler(500)
async def burger():
return {}, 500
@app.errorhandler(404)
async def burger():
return {}, 404
# Start server
hypercornconfig = Config()
hypercornconfig.bind = (HOST + ":" + PORT)
if __name__ == "__main__":
print("[INFO] Server started at", str(int(time.time())))
print("[INFO] Welcome to Burgernotes! Today we are running on IP " + HOST + " on port " + PORT + ".")
asyncio.run(serve(app, hypercornconfig))