adRUsboek/app.py

234 lines
7.1 KiB
Python
Raw Permalink Normal View History

2024-08-04 10:01:08 +02:00
import json
import math
from dotenv import load_dotenv
from typing import List
import logging
import os
import datetime
import dateutil
from pathlib import Path
load_dotenv() # load environment before initializing flaskoidc
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
from flask import render_template, session, request, abort, redirect, url_for, flash
from flask_seasurf import SeaSurf
from sqlalchemy import extract
import requests
if "ENV" in os.environ and os.environ["ENV"] == "development":
# if we're running on localhost we cannot use the production OIDC provider
from oidcmock import FlaskOIDCMock
app = FlaskOIDCMock(__name__)
else:
from flaskoidc import FlaskOIDC
app = FlaskOIDC(__name__)
csrf = SeaSurf(app) # CSRF protection
db = app.db # FlaskOIDC already creates a db instance
with app.app_context():
from models import User
app.db.create_all()
import cli
import template_filters
from radicale import Radicale
app.radicale = Radicale(data_path=os.environ["RADICALE_DATA_PATH"])
@app.before_request
def _populate_rich_user():
"""Ensure the current user object is always up-to-date and available."""
# skip flaskoidc routes; user session variable is not guaranteed to be ready
if request.path in ["/logout", "/login", app.config.get("REDIRECT_URI", "/auth")]:
return
session.rich_user = User.create_or_update(session.get("user"))
db.session.commit()
def get_map_label(user: User):
return user.display_name
@app.route("/")
def overview_map():
all_users = User.all_viewable_users()
markers = dict()
for u in all_users:
if u.lat and u.lon:
if (coords := (u.lat, u.lon)) in markers:
markers[coords].append(get_map_label(u))
else:
markers[coords] = [get_map_label(u)]
markers = [
{
"label": " & ".join(sorted(v)),
"lat": lat,
"lon": lon,
}
for (lat, lon), v in markers.items()
]
return render_template("map.html.j2", markers=markers, user=session.rich_user)
@app.route("/profile", methods=["GET", "POST"])
def profile():
user = session.rich_user
if request.method == "POST":
user.first_name = request.form.get("first_name", "")
user.last_name = request.form.get("last_name", "")
user.email = request.form.get("email", "")
user.phone = request.form.get("phone", "")
user.street = request.form.get("street", "")
user.number = request.form.get("number", "")
user.postal = request.form.get("postal", "")
user.city = request.form.get("city", "")
user.country = request.form.get("country", "")
user.lat = request.form.get("lat", "")
user.lon = request.form.get("lon", "")
birthdate = request.form.get("birthdate")
if birthdate == "":
user.birthdate = None
else:
user.birthdate = datetime.date.fromisoformat(birthdate)
user.last_updated = datetime.datetime.now()
# user.include_in_views = request.form.get("include_in_views", "") == "on"
# do not give users a choice; include after first modification
user.include_in_views = True
db.session.commit()
app.radicale.create_or_update_vcf(user)
flash("Je wijzigingen zijn succesvol opgeslagen!", "success")
return redirect(url_for("profile"))
return render_template("profile.html.j2", user=user)
@app.route("/calendar")
def calendar():
def days_in_month(m):
return [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31][m - 1]
now = datetime.datetime.now().astimezone(dateutil.tz.gettz("Europe/Amsterdam"))
try:
month = ((int(request.args.get("month", now.month)) - 1) % 12) + 1
except ValueError:
month = now.month
all_users: List[User] = db.session.execute(
db.select(User)
.filter_by(include_in_views=True)
.filter(extract("month", User.birthdate) == month)
).scalars()
buckets = [{"day": i + 1, "bucket": []} for i in range(days_in_month(month))]
for u in all_users:
buckets[u.birthdate.day - 1]["bucket"].append(u.display_name)
ncols = 3
column_size = math.ceil(len(buckets) / ncols)
columns = [
buckets[i : i + column_size] for i in range(0, len(buckets), column_size)
]
try:
img_src = next(Path("static/img/calendar/").glob(f"{month}-*.webp"))
except StopIteration:
img_src = next(Path("static/img/calendar/").glob("default*.webp"))
img_credits = None
if "-" in img_src.stem:
img_credits = img_src.stem.split("-")[1]
return render_template(
"calendar.html.j2",
user=session.rich_user,
month=month,
columns=columns,
img_src=img_src,
img_credits=img_credits,
today=now.day if month == now.month else None,
)
def get_profile(username):
if username is None:
return None
profile = db.session.execute(
db.select(User)
.filter_by(include_in_views=True)
.filter(User.username == username)
).first()
if profile is not None:
profile: User = profile[0]
profile.marker = {
"lat": profile.lat,
"lon": profile.lon,
"label": get_map_label(profile),
}
return profile
@app.route("/addressbook_card/<username>/")
@app.route("/addressbook_card/", defaults={"username": None})
def addressbook_card(username):
profile = get_profile(username)
return render_template(
"addressbook_card.html.j2",
user=session.rich_user,
profile=profile,
)
@app.route("/addressbook/<username>/")
@app.route("/addressbook/", defaults={"username": None})
def addressbook(username):
contacts = sorted(User.all_viewable_users(), key=lambda x: x.display_name)
profile = get_profile(username)
if username and profile is None:
return redirect(url_for("addressbook"))
return render_template(
"addressbook.html.j2",
user=session.rich_user,
contacts=contacts,
profile=profile,
)
@app.route("/generate_carddav_password", methods=["POST"])
def generate_carddav_password():
session.rich_user.reset_carddav_password()
db.session.commit()
flash("Er is een nieuw CardDAV-wachtwoord gegenereerd!", "carddav")
return redirect(url_for("carddav"))
@app.route("/carddav")
def carddav():
return render_template(
"carddav.html.j2", user=session.rich_user, carddav_url=os.environ["CARDDAV_URL"]
)
@app.route("/geocoding")
def geocoding():
params = dict(request.args)
params.update({"format": "jsonv2"})
headers = {
"User-Agent": "adRUsboek v1.0",
}
resp = requests.get(
"https://nominatim.openstreetmap.org/search", params=params, headers=headers
)
data = json.loads(resp.content.decode("utf8"))
if not data or "lat" not in data[0] or "lon" not in data[0]:
abort(404)
return json.dumps({"lat": data[0]["lat"], "lon": data[0]["lon"]})
if __name__ == "__main__":
app.run(host="0.0.0.0", debug=True, port=5000)