magisch-corvee-script/magisch_corvee_script.py
2024-01-08 12:06:10 +01:00

184 lines
7.4 KiB
Python
Executable File

#! /usr/bin/env nix-shell
#!nix-shell -i python3 -p python3 python3Packages.pyyaml glpk -I nixpkgs=flake:nixpkgs
import sys
import yaml
import re
import subprocess
from collections import OrderedDict
import glpm
from typing import Any, Tuple, TypeVar
from dataclasses import dataclass, field
conf = yaml.safe_load(open('config.yaml', 'r'))
config = conf['config']
config['ignore'].append('')
@dataclass
class TaskConfig:
personen: list[str]
workload: int
req: list[int]
hardcode: list[str] | None = None
lookup: list[str] = field(default_factory=list)
tasks: dict[str, TaskConfig] = OrderedDict({k: TaskConfig(**t) for k, t in conf['tasks'].items()})
X = TypeVar("X")
def index(x: dict[X, Any]) -> dict[X, int]:
return {v: k for k, v in enumerate(x)}
daily_workloads = \
[sum(task.workload * task.req[d] for task in tasks.values()) for d in range(config['days'])]
ALL_DAYS: set[int] = set(range(config['days']))
class Person(object):
def __init__(self, conf={"dagen":ALL_DAYS}):
self.can = set()
self.loves = set()
self.hates = set()
self.does = set() # hardcoded
self.has_prefs = False
self.conf = conf
self.conf['dagen'] = set(conf['dagen'])
def vrolijkheid(self):
res = config['days'] - len(self.does)
for (_,t) in self.does:
if t in self.loves:
res += config['weights']['likes']
if t in self.hates:
res -= config['weights']['hates']
return res
def workload(self, tasks):
return sum(tasks[t]['workload'] for (_,t) in self.does)
def cost(self, num_people):
return round(sum((daily_workloads[d] for d in self.conf['dagen'])) / num_people)
# probabilistic round: int(math.floor(x + random.random()))
def read_people(conf_ppl) -> dict[str, Person]:
people = OrderedDict()
for x in conf_ppl:
val = {"dagen": ALL_DAYS}
if type(x) == dict:
x,val = x.popitem()
people[x.lower()] = Person(val)
return people
# deal with loves/hates
def make_task_lut(tasks: dict[str, TaskConfig]):
task_lut = {}
for t, taskconf in tasks.items():
for lookup in taskconf.lookup:
task_lut[lookup] = t
task_re = re.compile(config['task_re'])
def lookup_tasks(tasks):
return (task_lut[x] for x in task_re.split(tasks) if not x in config['ignore'])
return lookup_tasks
def read_prefs(pref_file, tasks, people):
lookup_tasks = make_task_lut(tasks)
# read the wiki corvee table
for [name, loves, hates] in ((q.strip().lower() for q in x.split('\t')) for x in pref_file):
p = people[name]
p.has_prefs = True
p.loves |= set(lookup_tasks(loves))
p.hates |= set(lookup_tasks(hates))
for name, p in people.items():
if not p.has_prefs:
print("warning: no preferences for", name, file=sys.stderr)
# deal with capability and hardcode
def set_capabilities(tasks: dict[str, TaskConfig], people: dict[str, Person]):
for (task,conf) in tasks.items():
if conf.personen == 'iedereen':
for p in people.values():
p.can.add(task)
elif conf.personen == 'liefhebbers':
for p in people.values():
if task in p.loves:
p.can.add(task)
else:
for p in conf.personen:
people[p.lower()].can.add(task)
if conf.hardcode is not None:
for day, pers in enumerate(conf.hardcode):
people[pers.lower()].does.add((day, task))
# format as matrices
def matrices(people: dict[str, Person], tasks: dict[str, TaskConfig]) -> Tuple[list[list[int]], list[list[int]], list[list[int]], dict[str, int], dict[Tuple[int, int], int], list[list[int]], dict[int, int], dict[int, int]]:
mat = lambda a,b: [[0 for j in b] for i in a]
loves = mat(people, tasks)
hates = mat(people, tasks)
capab = mat(people, tasks)
tsk_idx = index(tasks)
hardcode = {}
max_loads = {}
costs = {}
for i,p in enumerate(people.values()):
for t in p.loves: loves[i][tsk_idx[t]] = 1
for t in p.hates: hates[i][tsk_idx[t]] = 1
for t in p.can: capab[i][tsk_idx[t]] = 1
for (d,t) in p.does: hardcode[(i,tsk_idx[t],d)] = 1
if 'max_load' in p.conf: # max_load override for Pol
for d,l in enumerate(p.conf['max_load']):
max_loads[(i,d)] = l
# filter days that the person does not exist
for d in ALL_DAYS - p.conf['dagen']:
max_loads[(i,d)] = 0
costs[i] = p.cost(len(people))
req = mat(range(config['days']), tasks)
for di in range(config['days']):
for ti,t in enumerate(tasks.values()):
req[di][ti] = t.req[di]
workload = {tsk_idx[t]: tasks[t].workload for t in tasks}
return (loves, hates, capab, hardcode, max_loads, req, workload, costs)
def read_assignment(file, people, tasks):
def between_the_lines(f, a=">>>>\n", b="<<<<\n"):
for l in f:
if l == a: break
for l in f:
if l == b: break
yield map(int, l.strip().split())
for p in people.values():
p.does = set()
person_vl = list(people.values())
task_nm = list(tasks.keys())
for [p,d,j,W,l] in between_the_lines(file):
person_vl[p-1].does.add((d-1, task_nm[j-1]))
def write_data(people, tasks, file=sys.stdout):
[loves, hates, capab, hardcode, max_loads, req, workload, costs] = matrices(people, tasks)
print(glpm.matrix("L", loves), file=file)
print(glpm.matrix("H", hates), file=file)
print(glpm.matrix("C", capab, 1), file=file)
print(glpm.matrix("R", req, 0), file=file)
print(glpm.dict("Q", hardcode), file=file)
print(glpm.dict("Wl", workload), file=file)
print(glpm.dict("max_load", max_loads), file=file)
print(glpm.dict("Costs", costs), file=file)
print(glpm.param("D_count", config['days']), file=file)
print(glpm.param("P_count", len(people)), file=file)
print(glpm.param("J_count", len(tasks)), file=file)
print(glpm.param("ML", 6), file=file) # CHANGE THIS
print(glpm.param("WL", config['weights']['likes']), file=file)
print(glpm.param("WH", config['weights']['hates']), file=file)
def write_tasks(people, tasks, file=sys.stdout):
for name, p in people.items():
days = [[] for i in range(config['days'])]
for (d,t) in p.does:
days[d].append((t, t in p.loves, t in p.hates))
q = lambda w: ",".join([t + (" <3" if l else "") + (" :(" if h else "") for (t,l,h) in w])
days_fmt = " {} ||" * len(days)
days_filled = days_fmt.format(*map(q, days))
print("| {} ||{} {} || {}".format(name, days_filled, p.vrolijkheid(), p.workload(tasks)), file=file)
print("|-")
people = read_people(conf['people'])
with open('prefs_table', 'r') as pref_file:
read_prefs(pref_file, tasks, people)
set_capabilities(tasks, people)
if len(sys.argv)>1 and sys.argv[1] == 'in':
write_data(people, tasks)
elif len(sys.argv)>1 and sys.argv[1] == 'out':
with open('output', 'r') as out_file:
read_assignment(out_file, people, tasks)
write_tasks(people, tasks)
else:
with open('data', 'w') as out:
write_data(people, tasks, file=out)
subprocess.call(["glpsol", "--model", "model.glpm", "-d", "data", "--tmlim", "15", "--log", "output"], stdout=sys.stderr, stderr=sys.stdout)
with open('output', 'r') as file:
read_assignment(file, people, tasks)
write_tasks(people, tasks)