magisch-corvee-script/magisch_corvee_script.py

228 lines
8.8 KiB
Python
Executable File

#! /usr/bin/env nix-shell
#!nix-shell -i python3 -p python3 python3Packages.pyyaml -I nixpkgs=flake:nixpkgs
# pip install pyyaml pyscipopt pyright
import sys
import yaml
import re
import argparse
from collections import OrderedDict, defaultdict
from pyscipopt import Model, quicksum
from typing import Any, Tuple, TypeVar
from dataclasses import dataclass, field
from tabulate import tabulate
conf = yaml.safe_load(open('config.yaml', 'r'))
DEFAULT_CONFIG = {
"max_load_person": 6,
"day_names": [f"dag {str(i)}" for i in range(conf['config']['days'])],
}
config = DEFAULT_CONFIG | conf['config']
config['ignore'].append('')
assert(len(config['day_names']) == config['days'])
QUADRATIC = False
@dataclass
class TaskConfig:
personen: list[str]
workload: int
req: list[int]
name: str
hardcode: list[str] | None = None
lookup: list[str] = field(default_factory=list)
tasks: dict[str, TaskConfig] = OrderedDict({k: TaskConfig(**({"name": k} | t)) for k, t in conf['tasks'].items()})
X = TypeVar("X")
def index(x: dict[X, Any]) -> dict[X, int]:
return {v: k for k, v in enumerate(x)}
daily_workloads = \
[sum(task.workload * task.req[d] for task in tasks.values()) for d in range(config['days'])]
ALL_DAYS: set[int] = set(range(config['days']))
class Person(object):
def __init__(self, name: str, conf={"dagen":ALL_DAYS}):
self.name = name
self.can: set[str] = set()
self.loves: set[str] = set()
self.hates: set[str] = set()
self.does: set[Tuple[int, str]] = set() # hardcoded
self.has_prefs = False
self.conf = conf
self.conf['dagen'] = set(conf['dagen'])
def vrolijkheid(self):
res = config['days'] - len(self.does)
for (_,t) in self.does:
if t in self.loves:
res += config['weights']['likes']
if t in self.hates:
res -= config['weights']['hates']
return res
def workload(self, tasks):
return sum(tasks[t].workload for (_,t) in self.does)
def cost(self, num_people):
return round(sum((daily_workloads[d] for d in self.conf['dagen'])) / num_people)
# probabilistic round: int(math.floor(x + random.random()))
def read_people(conf_ppl) -> dict[str, Person]:
people = OrderedDict()
for x in conf_ppl:
val = {"dagen": ALL_DAYS}
if isinstance(x, dict):
x,val = x.popitem()
people[x.lower()] = Person(x, val)
return people
# deal with loves/hates
def make_task_lut(tasks: dict[str, TaskConfig]):
task_lut = defaultdict(set)
for t, taskconf in tasks.items():
for lookup in taskconf.lookup:
task_lut[lookup] |= {t.lower()}
task_re = re.compile(config['task_re'])
def lookup_tasks(tasks):
return set.union(set(), *(task_lut[x.strip()] for x in task_re.split(tasks) if x not in config['ignore']))
return lookup_tasks
def read_prefs(pref_file, tasks, people):
lookup_tasks = make_task_lut(tasks)
# read the wiki corvee table
for [name, loves, hates] in ((q.strip().lower() for q in x.split('\t')) for x in pref_file):
p = people[name]
p.has_prefs = True
p.loves |= set(lookup_tasks(loves))
p.hates |= set(lookup_tasks(hates))
for name, p in people.items():
if not p.has_prefs:
print("warning: no preferences for", name, file=sys.stderr)
# deal with capability and hardcode
def set_capabilities(tasks: dict[str, TaskConfig], people: dict[str, Person]):
for (task,conf) in tasks.items():
if conf.personen == 'iedereen':
for p in people.values():
p.can.add(task)
elif conf.personen == 'liefhebbers':
for p in people.values():
if task in p.loves:
p.can.add(task)
else:
for p in conf.personen:
people[p.lower()].can.add(task)
if conf.hardcode is not None:
for day, pers in enumerate(conf.hardcode):
if pers:
people[pers.lower()].does.add((day, task))
def write_tasks(people: dict[str, Person], tasks: dict[str, TaskConfig], file=sys.stdout):
headers = ["wie"] + config['day_names']
if not args.simple:
headers += ["workload", "vrolijkheid"]
tabl = []
for p in people.values():
days = [[] for _ in range(config['days'])]
for (d,t) in p.does:
days[d].append((t, t in p.loves, t in p.hates))
if not args.simple:
def q(w):
return ",".join([tasks[t].name + (" :)" if love else "") + (" :(" if hate else "") for (t,love,hate) in w])
else:
def q(w):
return ",".join([tasks[t].name for (t,_,_) in w])
row = [p.name, *map(q, days)]
if not args.simple:
row += [p.workload(tasks), p.vrolijkheid()]
tabl.append(row)
print(tabulate(tabl, headers=headers, tablefmt=args.output_format), file=file)
def scipsol(people: dict[str, Person], tasks: dict[str, TaskConfig]):
max_loads: dict[Tuple[str, int], int] = {}
for i,p in people.items():
if 'max_load' in p.conf: # max_load override for Pol
for d,load in enumerate(p.conf['max_load']):
max_loads[(i,d)] = load
# filter days that the person does not exist
for d in ALL_DAYS - p.conf['dagen']:
max_loads[(i,d)] = 0
m = Model()
does = {}
happiness = []
stdevs = []
errors = []
for pname, person in people.items():
workloads = []
p_error = m.addVar(vtype="I", name=f"{pname}_error", lb=0, ub=None)
for d in ALL_DAYS:
pdt = []
for task in tasks:
var = m.addVar(vtype="B", name=f"{pname}_does_{task}@{d}")
pdt.append(var)
does[(pname, d, task)] = var
# a person only does what (s)he is capable of
if task not in person.can:
m.addCons(var == 0)
workloads.append(var * tasks[task].workload)
for task in person.loves:
happiness.append(does[(pname, d, task)] * config['weights']['likes'])
for task in person.hates:
happiness.append(does[(pname, d, task)] * (config['weights']['hates'] * -1))
# max_load_person: a person only has one task per day at most
m.addCons(quicksum(pdt) <= max_loads.get((pname, d), 1))
m.addCons(p_error >= person.cost(len(people)) - quicksum(workloads))
m.addCons(p_error >= quicksum(workloads) - person.cost(len(people)))
errors.append(p_error)
stdevs.append((person.cost(len(people)) - quicksum(workloads)) ** 2)
# min_load_person: person has at least 1 task
m.addCons(quicksum([does[(pname, d, task)] for d in ALL_DAYS for task in tasks]) >= 1)
# duplicate_jobs: a person does not perform the same job on all days
for task in tasks:
m.addCons(quicksum(does[(pname, d, task)] for d in ALL_DAYS) <= len(ALL_DAYS) - 1)
# max_load_person_total
m.addCons(quicksum([does[(pname, d, task)] * tasks[task].workload for d in ALL_DAYS for task in tasks]) <= config['max_load_person'])
# hardcode constraint
for d, task in person.does:
m.addCons(does[(pname, d, task)] == 1)
# all_allocated: each task in allocated
for j in tasks:
for d in ALL_DAYS:
m.addCons(quicksum(does[(p, d, j)] for p in people) == tasks[j].req[d])
objective = m.addVar(name="objvar", vtype="C", lb=None, ub=None)
m.setObjective(objective, "maximize")
if QUADRATIC:
m.addCons(objective <= quicksum(happiness) - quicksum(stdevs) / 2)
else:
if args.max_total_error is not None:
m.addCons(quicksum(errors) <= args.max_total_error)
m.addCons(objective <= quicksum(happiness) - quicksum(errors))
m.setObjective(objective, "maximize")
m.solveConcurrent()
for pname, person in people.items():
for d in ALL_DAYS:
for task in tasks:
if m.getVal(does[(pname, d, task)]):
person.does.add((d, task))
write_tasks(people, tasks)
print("Totale vrolijkheid:", sum(p.vrolijkheid() for p in people.values()))
print("workload deviation:", m.getVal(quicksum(stdevs)))
parser = argparse.ArgumentParser()
parser.add_argument("-q", "--quadratic", action="store_true")
parser.add_argument("--simple", action="store_true", help="hide workload and happiness")
parser.add_argument("--max_total_error", type=int, default=None)
parser.add_argument("--output-format", default="mediawiki", help="`tabulate` output format")
args = parser.parse_args()
QUADRATIC = args.quadratic
people = read_people(conf['people'])
with open('prefs_table', 'r') as pref_file:
read_prefs(pref_file, tasks, people)
set_capabilities(tasks, people)
scipsol(people, tasks)