234 lines
7.1 KiB
Python
234 lines
7.1 KiB
Python
|
import json
|
||
|
import math
|
||
|
from dotenv import load_dotenv
|
||
|
from typing import List
|
||
|
import logging
|
||
|
import os
|
||
|
import datetime
|
||
|
import dateutil
|
||
|
from pathlib import Path
|
||
|
|
||
|
load_dotenv() # load environment before initializing flaskoidc
|
||
|
logger = logging.getLogger(__name__)
|
||
|
logging.basicConfig(level=logging.DEBUG)
|
||
|
|
||
|
from flask import render_template, session, request, abort, redirect, url_for, flash
|
||
|
from flask_seasurf import SeaSurf
|
||
|
|
||
|
from sqlalchemy import extract
|
||
|
import requests
|
||
|
|
||
|
if "ENV" in os.environ and os.environ["ENV"] == "development":
|
||
|
# if we're running on localhost we cannot use the production OIDC provider
|
||
|
from oidcmock import FlaskOIDCMock
|
||
|
|
||
|
app = FlaskOIDCMock(__name__)
|
||
|
else:
|
||
|
from flaskoidc import FlaskOIDC
|
||
|
|
||
|
app = FlaskOIDC(__name__)
|
||
|
|
||
|
csrf = SeaSurf(app) # CSRF protection
|
||
|
db = app.db # FlaskOIDC already creates a db instance
|
||
|
|
||
|
|
||
|
with app.app_context():
|
||
|
from models import User
|
||
|
|
||
|
app.db.create_all()
|
||
|
import cli
|
||
|
import template_filters
|
||
|
from radicale import Radicale
|
||
|
|
||
|
app.radicale = Radicale(data_path=os.environ["RADICALE_DATA_PATH"])
|
||
|
|
||
|
|
||
|
@app.before_request
|
||
|
def _populate_rich_user():
|
||
|
"""Ensure the current user object is always up-to-date and available."""
|
||
|
# skip flaskoidc routes; user session variable is not guaranteed to be ready
|
||
|
if request.path in ["/logout", "/login", app.config.get("REDIRECT_URI", "/auth")]:
|
||
|
return
|
||
|
session.rich_user = User.create_or_update(session.get("user"))
|
||
|
db.session.commit()
|
||
|
|
||
|
|
||
|
def get_map_label(user: User):
|
||
|
return user.display_name
|
||
|
|
||
|
|
||
|
@app.route("/")
|
||
|
def overview_map():
|
||
|
all_users = User.all_viewable_users()
|
||
|
|
||
|
markers = dict()
|
||
|
for u in all_users:
|
||
|
if u.lat and u.lon:
|
||
|
if (coords := (u.lat, u.lon)) in markers:
|
||
|
markers[coords].append(get_map_label(u))
|
||
|
else:
|
||
|
markers[coords] = [get_map_label(u)]
|
||
|
markers = [
|
||
|
{
|
||
|
"label": " & ".join(sorted(v)),
|
||
|
"lat": lat,
|
||
|
"lon": lon,
|
||
|
}
|
||
|
for (lat, lon), v in markers.items()
|
||
|
]
|
||
|
return render_template("map.html.j2", markers=markers, user=session.rich_user)
|
||
|
|
||
|
|
||
|
@app.route("/profile", methods=["GET", "POST"])
|
||
|
def profile():
|
||
|
user = session.rich_user
|
||
|
if request.method == "POST":
|
||
|
user.first_name = request.form.get("first_name", "")
|
||
|
user.last_name = request.form.get("last_name", "")
|
||
|
user.email = request.form.get("email", "")
|
||
|
user.phone = request.form.get("phone", "")
|
||
|
user.street = request.form.get("street", "")
|
||
|
user.number = request.form.get("number", "")
|
||
|
user.postal = request.form.get("postal", "")
|
||
|
user.city = request.form.get("city", "")
|
||
|
user.country = request.form.get("country", "")
|
||
|
user.lat = request.form.get("lat", "")
|
||
|
user.lon = request.form.get("lon", "")
|
||
|
birthdate = request.form.get("birthdate")
|
||
|
if birthdate == "":
|
||
|
user.birthdate = None
|
||
|
else:
|
||
|
user.birthdate = datetime.date.fromisoformat(birthdate)
|
||
|
user.last_updated = datetime.datetime.now()
|
||
|
# user.include_in_views = request.form.get("include_in_views", "") == "on"
|
||
|
# do not give users a choice; include after first modification
|
||
|
user.include_in_views = True
|
||
|
db.session.commit()
|
||
|
app.radicale.create_or_update_vcf(user)
|
||
|
flash("Je wijzigingen zijn succesvol opgeslagen!", "success")
|
||
|
return redirect(url_for("profile"))
|
||
|
return render_template("profile.html.j2", user=user)
|
||
|
|
||
|
|
||
|
@app.route("/calendar")
|
||
|
def calendar():
|
||
|
def days_in_month(m):
|
||
|
return [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31][m - 1]
|
||
|
|
||
|
now = datetime.datetime.now().astimezone(dateutil.tz.gettz("Europe/Amsterdam"))
|
||
|
try:
|
||
|
month = ((int(request.args.get("month", now.month)) - 1) % 12) + 1
|
||
|
except ValueError:
|
||
|
month = now.month
|
||
|
|
||
|
all_users: List[User] = db.session.execute(
|
||
|
db.select(User)
|
||
|
.filter_by(include_in_views=True)
|
||
|
.filter(extract("month", User.birthdate) == month)
|
||
|
).scalars()
|
||
|
|
||
|
buckets = [{"day": i + 1, "bucket": []} for i in range(days_in_month(month))]
|
||
|
for u in all_users:
|
||
|
buckets[u.birthdate.day - 1]["bucket"].append(u.display_name)
|
||
|
ncols = 3
|
||
|
column_size = math.ceil(len(buckets) / ncols)
|
||
|
columns = [
|
||
|
buckets[i : i + column_size] for i in range(0, len(buckets), column_size)
|
||
|
]
|
||
|
try:
|
||
|
img_src = next(Path("static/img/calendar/").glob(f"{month}-*.webp"))
|
||
|
except StopIteration:
|
||
|
img_src = next(Path("static/img/calendar/").glob("default*.webp"))
|
||
|
img_credits = None
|
||
|
if "-" in img_src.stem:
|
||
|
img_credits = img_src.stem.split("-")[1]
|
||
|
|
||
|
return render_template(
|
||
|
"calendar.html.j2",
|
||
|
user=session.rich_user,
|
||
|
month=month,
|
||
|
columns=columns,
|
||
|
img_src=img_src,
|
||
|
img_credits=img_credits,
|
||
|
today=now.day if month == now.month else None,
|
||
|
)
|
||
|
|
||
|
|
||
|
def get_profile(username):
|
||
|
if username is None:
|
||
|
return None
|
||
|
profile = db.session.execute(
|
||
|
db.select(User)
|
||
|
.filter_by(include_in_views=True)
|
||
|
.filter(User.username == username)
|
||
|
).first()
|
||
|
if profile is not None:
|
||
|
profile: User = profile[0]
|
||
|
profile.marker = {
|
||
|
"lat": profile.lat,
|
||
|
"lon": profile.lon,
|
||
|
"label": get_map_label(profile),
|
||
|
}
|
||
|
return profile
|
||
|
|
||
|
|
||
|
@app.route("/addressbook_card/<username>/")
|
||
|
@app.route("/addressbook_card/", defaults={"username": None})
|
||
|
def addressbook_card(username):
|
||
|
profile = get_profile(username)
|
||
|
return render_template(
|
||
|
"addressbook_card.html.j2",
|
||
|
user=session.rich_user,
|
||
|
profile=profile,
|
||
|
)
|
||
|
|
||
|
|
||
|
@app.route("/addressbook/<username>/")
|
||
|
@app.route("/addressbook/", defaults={"username": None})
|
||
|
def addressbook(username):
|
||
|
contacts = sorted(User.all_viewable_users(), key=lambda x: x.display_name)
|
||
|
profile = get_profile(username)
|
||
|
if username and profile is None:
|
||
|
return redirect(url_for("addressbook"))
|
||
|
return render_template(
|
||
|
"addressbook.html.j2",
|
||
|
user=session.rich_user,
|
||
|
contacts=contacts,
|
||
|
profile=profile,
|
||
|
)
|
||
|
|
||
|
|
||
|
@app.route("/generate_carddav_password", methods=["POST"])
|
||
|
def generate_carddav_password():
|
||
|
session.rich_user.reset_carddav_password()
|
||
|
db.session.commit()
|
||
|
flash("Er is een nieuw CardDAV-wachtwoord gegenereerd!", "carddav")
|
||
|
return redirect(url_for("carddav"))
|
||
|
|
||
|
|
||
|
@app.route("/carddav")
|
||
|
def carddav():
|
||
|
return render_template(
|
||
|
"carddav.html.j2", user=session.rich_user, carddav_url=os.environ["CARDDAV_URL"]
|
||
|
)
|
||
|
|
||
|
|
||
|
@app.route("/geocoding")
|
||
|
def geocoding():
|
||
|
params = dict(request.args)
|
||
|
params.update({"format": "jsonv2"})
|
||
|
headers = {
|
||
|
"User-Agent": "adRUsboek v1.0",
|
||
|
}
|
||
|
resp = requests.get(
|
||
|
"https://nominatim.openstreetmap.org/search", params=params, headers=headers
|
||
|
)
|
||
|
data = json.loads(resp.content.decode("utf8"))
|
||
|
if not data or "lat" not in data[0] or "lon" not in data[0]:
|
||
|
abort(404)
|
||
|
return json.dumps({"lat": data[0]["lat"], "lon": data[0]["lon"]})
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
app.run(host="0.0.0.0", debug=True, port=5000)
|