207 lines
7.9 KiB
Python
Executable File
207 lines
7.9 KiB
Python
Executable File
#! /usr/bin/env nix-shell
|
|
#!nix-shell -i python3 -p python3 python3Packages.pyyaml -I nixpkgs=flake:nixpkgs
|
|
# pip install pyyaml pyscipopt pyright
|
|
import sys
|
|
import yaml
|
|
import re
|
|
import argparse
|
|
from collections import OrderedDict
|
|
from pyscipopt import Model, quicksum
|
|
from typing import Any, Tuple, TypeVar
|
|
from dataclasses import dataclass, field
|
|
conf = yaml.safe_load(open('config.yaml', 'r'))
|
|
config = conf['config']
|
|
config['ignore'].append('')
|
|
|
|
QUADRATIC = False
|
|
|
|
@dataclass
|
|
class TaskConfig:
|
|
personen: list[str]
|
|
workload: int
|
|
req: list[int]
|
|
hardcode: list[str] | None = None
|
|
lookup: list[str] = field(default_factory=list)
|
|
|
|
tasks: dict[str, TaskConfig] = OrderedDict({k: TaskConfig(**t) for k, t in conf['tasks'].items()})
|
|
X = TypeVar("X")
|
|
def index(x: dict[X, Any]) -> dict[X, int]:
|
|
return {v: k for k, v in enumerate(x)}
|
|
daily_workloads = \
|
|
[sum(task.workload * task.req[d] for task in tasks.values()) for d in range(config['days'])]
|
|
ALL_DAYS: set[int] = set(range(config['days']))
|
|
class Person(object):
|
|
def __init__(self, conf={"dagen":ALL_DAYS}):
|
|
self.can: set[str] = set()
|
|
self.loves: set[str] = set()
|
|
self.hates: set[str] = set()
|
|
self.does: set[Tuple[int, str]] = set() # hardcoded
|
|
self.has_prefs = False
|
|
self.conf = conf
|
|
self.conf['dagen'] = set(conf['dagen'])
|
|
def vrolijkheid(self):
|
|
res = config['days'] - len(self.does)
|
|
for (_,t) in self.does:
|
|
if t in self.loves:
|
|
res += config['weights']['likes']
|
|
if t in self.hates:
|
|
res -= config['weights']['hates']
|
|
return res
|
|
def workload(self, tasks):
|
|
return sum(tasks[t].workload for (_,t) in self.does)
|
|
def cost(self, num_people):
|
|
return round(sum((daily_workloads[d] for d in self.conf['dagen'])) / num_people)
|
|
# probabilistic round: int(math.floor(x + random.random()))
|
|
def read_people(conf_ppl) -> dict[str, Person]:
|
|
people = OrderedDict()
|
|
for x in conf_ppl:
|
|
val = {"dagen": ALL_DAYS}
|
|
if isinstance(x, dict):
|
|
x,val = x.popitem()
|
|
people[x.lower()] = Person(val)
|
|
return people
|
|
# deal with loves/hates
|
|
def make_task_lut(tasks: dict[str, TaskConfig]):
|
|
task_lut = {}
|
|
for t, taskconf in tasks.items():
|
|
for lookup in taskconf.lookup:
|
|
task_lut[lookup] = t
|
|
task_re = re.compile(config['task_re'])
|
|
def lookup_tasks(tasks):
|
|
return (task_lut[x] for x in task_re.split(tasks) if x not in config['ignore'])
|
|
return lookup_tasks
|
|
def read_prefs(pref_file, tasks, people):
|
|
lookup_tasks = make_task_lut(tasks)
|
|
# read the wiki corvee table
|
|
for [name, loves, hates] in ((q.strip().lower() for q in x.split('\t')) for x in pref_file):
|
|
p = people[name]
|
|
p.has_prefs = True
|
|
p.loves |= set(lookup_tasks(loves))
|
|
p.hates |= set(lookup_tasks(hates))
|
|
for name, p in people.items():
|
|
if not p.has_prefs:
|
|
print("warning: no preferences for", name, file=sys.stderr)
|
|
# deal with capability and hardcode
|
|
def set_capabilities(tasks: dict[str, TaskConfig], people: dict[str, Person]):
|
|
for (task,conf) in tasks.items():
|
|
if conf.personen == 'iedereen':
|
|
for p in people.values():
|
|
p.can.add(task)
|
|
elif conf.personen == 'liefhebbers':
|
|
for p in people.values():
|
|
if task in p.loves:
|
|
p.can.add(task)
|
|
else:
|
|
for p in conf.personen:
|
|
people[p.lower()].can.add(task)
|
|
if conf.hardcode is not None:
|
|
for day, pers in enumerate(conf.hardcode):
|
|
people[pers.lower()].does.add((day, task))
|
|
|
|
def write_tasks(people, tasks, file=sys.stdout):
|
|
for name, p in people.items():
|
|
days = [[] for i in range(config['days'])]
|
|
for (d,t) in p.does:
|
|
days[d].append((t, t in p.loves, t in p.hates))
|
|
def q(w):
|
|
return ",".join([t + (" <3" if love else "") + (" :(" if hate else "") for (t,love,hate) in w])
|
|
days_fmt = " {} ||" * len(days)
|
|
days_filled = days_fmt.format(*map(q, days))
|
|
print("| {} ||{} {} || {}".format(name, days_filled, p.vrolijkheid(), p.workload(tasks)), file=file)
|
|
print("|-")
|
|
|
|
def scipsol(people: dict[str, Person], tasks: dict[str, TaskConfig]):
|
|
max_loads: dict[Tuple[str, int], int] = {}
|
|
for i,p in people.items():
|
|
if 'max_load' in p.conf: # max_load override for Pol
|
|
for d,load in enumerate(p.conf['max_load']):
|
|
max_loads[(i,d)] = load
|
|
# filter days that the person does not exist
|
|
for d in ALL_DAYS - p.conf['dagen']:
|
|
max_loads[(i,d)] = 0
|
|
|
|
m = Model()
|
|
does = {}
|
|
happiness = []
|
|
stdevs = []
|
|
errors = []
|
|
for pname, person in people.items():
|
|
workloads = []
|
|
p_error = m.addVar(vtype="I", name=f"{pname}_error", lb=0, ub=None)
|
|
for d in ALL_DAYS:
|
|
pdt = []
|
|
for task in tasks:
|
|
var = m.addVar(vtype="B", name=f"{pname}_does_{task}@{d}")
|
|
pdt.append(var)
|
|
does[(pname, d, task)] = var
|
|
# a person only does what (s)he is capable of
|
|
if task not in person.can:
|
|
m.addCons(var == 0)
|
|
workloads.append(var * tasks[task].workload)
|
|
for task in person.loves:
|
|
happiness.append(does[(pname, d, task)] * config['weights']['likes'])
|
|
for task in person.hates:
|
|
happiness.append(does[(pname, d, task)] * (config['weights']['hates'] * -1))
|
|
|
|
# max_load_person: a person only has one task per day at most
|
|
m.addCons(quicksum(pdt) <= max_loads.get((pname, d), 1))
|
|
|
|
m.addCons(p_error >= person.cost(len(people)) - quicksum(workloads))
|
|
m.addCons(p_error >= quicksum(workloads) - person.cost(len(people)))
|
|
errors.append(p_error)
|
|
|
|
stdevs.append((person.cost(len(people)) - quicksum(workloads)) ** 2)
|
|
|
|
# min_load_person: person has at least 1 task
|
|
m.addCons(quicksum([does[(pname, d, task)] for d in ALL_DAYS for task in tasks]) >= 1)
|
|
|
|
# duplicate_jobs: a person does not perform the same job on all days
|
|
for task in tasks:
|
|
m.addCons(quicksum(does[(pname, d, task)] for d in ALL_DAYS) <= len(ALL_DAYS) - 1)
|
|
|
|
# max_load_person_total
|
|
m.addCons(quicksum([does[(pname, d, task)] * tasks[task].workload for d in ALL_DAYS for task in tasks]) <= 6)
|
|
|
|
# hardcode constraint
|
|
for d, task in person.does:
|
|
m.addCons(does[(pname, d, task)] == 1)
|
|
|
|
|
|
# all_allocated: each task in allocated
|
|
for j in tasks:
|
|
for d in ALL_DAYS:
|
|
m.addCons(quicksum(does[(p, d, j)] for p in people) == tasks[j].req[d])
|
|
|
|
objective = m.addVar(name="objvar", vtype="C", lb=None, ub=None)
|
|
m.setObjective(objective, "maximize")
|
|
if QUADRATIC:
|
|
m.addCons(objective <= quicksum(happiness) - quicksum(stdevs) / 2)
|
|
else:
|
|
if args.max_total_error is not None:
|
|
m.addCons(quicksum(errors) <= args.max_total_error)
|
|
m.addCons(objective <= quicksum(happiness) - quicksum(errors))
|
|
m.setObjective(objective, "maximize")
|
|
m.solveConcurrent()
|
|
for pname, person in people.items():
|
|
for d in ALL_DAYS:
|
|
for task in tasks:
|
|
if m.getVal(does[(pname, d, task)]):
|
|
person.does.add((d, task))
|
|
write_tasks(people, tasks)
|
|
|
|
print("Totale vrolijkheid:", sum(p.vrolijkheid() for p in people.values()))
|
|
print("workload deviation:", m.getVal(quicksum(stdevs)))
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("-q", "--quadratic", action="store_true")
|
|
parser.add_argument("--max_total_error", type=int, default=None)
|
|
|
|
args = parser.parse_args()
|
|
QUADRATIC = args.quadratic
|
|
|
|
people = read_people(conf['people'])
|
|
with open('prefs_table', 'r') as pref_file:
|
|
read_prefs(pref_file, tasks, people)
|
|
set_capabilities(tasks, people)
|
|
scipsol(people, tasks)
|