from Queue import PriorityQueue, Empty import random import time from threading import RLock from zipfile import ZipFile import logging from collections import defaultdict from utils.path import path from utils.xml import FileToTree, prettyPrint, TreeToFile, StringToTree, subelement from runphoton import prepareControlFile, BBox, Resolution from utils.materials import loadMaterials, materialsAsXml, Brdf from utils import daemonthread log = logging.getLogger('lws') CLEANUP_RUN_DIRECTORIES = True MAX_RUNTIME = 300 # 5 min is max runtime of a job, after that it will be rescheduled WORKER_TIMEOUT = 300 # after 5 min seconds inactivity we assume a known worker has gone down HIGH_PRIO = 0 NORMAL_PRIO = 10 BASE_SCENE = ''' ''' class AccessPoint(object): def __init__(self, id='default', x=0.0, y=0.0, z=0.0, powerid='all', power=1.0): self.id = id self.x = x self.y = y self.z = z self.power = power self.powerid = powerid class Run(object): ''' a simulation run ''' known = {} # known runs key: runid, value Run() runcount = 0 countlock = RLock() def __init__(self, scenename, objfile, materials, ap, bbox, resolution, density, numphotons, code, locidinfo=None, disabledObjects=None): with self.countlock: Run.runcount += 1 self.id = '%s.%s.%s' % (scenename, ap.id, Run.runcount) self.scenename = scenename self.objfile = objfile self.materials = materials self.ap = ap self.bbox = bbox self.resolution = resolution self.density = density self.numphotons = numphotons self.disabledObjects = disabledObjects self.qts = None # timestamp queued self.rts = None # timestamp running self.fts = None # timestamp finished self.worker = None # the worker that handles this job self.resultfiles = [] # .raw and .dat file and .zip file containing both Run.known[self.id] = self self.code = code # hold location coords: key: locid, value: namedtuple Measurement self.locidinfo = locidinfo # hold result of a measurement worker run self.locid2rssi = None self.tmpdir = None def build(self, tmpdir): self.tmpdir = _tmpdir = tmpdir.joinpath(self.id) if _tmpdir.exists(): _tmpdir.rmtree() _tmpdir.mkdir() outname = '%s_%s' % (self.scenename, self.ap.id) code = path(__file__).parent.joinpath('worker_code_%s.py' % self.code).text() rootAttribs = {'name': self.scenename, 'code': code, 'runid': self.id} ctrlfile = _tmpdir.joinpath('%s.xml' % self.scenename) if self.locidinfo is None: ctrlfile.write_text(BASE_SCENE) else: # inject location info into ctrlfile tree = StringToTree(BASE_SCENE) locsEl = subelement(tree.getroot(), 'locations') for locid, loc in self.locidinfo.items(): subelement(locsEl, 'loc', attribs={'id': locid, 'x': loc.x, 'y': loc.y, 'z': loc.z}) TreeToFile(tree, ctrlfile) remapMaterials = defaultdict(lambda: 'MatConcrete') remapMaterials['MatLightWalls'] = 'MatLightWalls' for n in ['MatCupboard', 'MatDoors', 'MatFascade','MatGlassDoor', 'MatGlassWindow', 'MatHardware', 'MatIronDoor', 'MatLightWalls', 'MatRailing', 'MatTable']: remapMaterials[n] = 'MatLightWalls' remapMaterials = None preparedFile = prepareControlFile(ctrlfile, self.objfile, self.materials, self.ap, bbox=self.bbox, resolution=self.resolution, density=self.density, numphotons=self.numphotons, tmpdir=_tmpdir, outname=outname, rootAttribs=rootAttribs, disabledObjects=self.disabledObjects, remapMaterials=remapMaterials) return preparedFile @classmethod def get(cls, runid): return cls.known.get(runid, None) def cleanup(self): if self.tmpdir is not None: try: self.tmpdir.rmtree() except IOError: log.exception('cannot remove %s' % self.tmpdir.abspath()) class Worker(object): def __init__(self, name): self.name = name self.jobcount = 0 self.failurecount = 0 self.durations = [] # list of job durations in seconds self.active = True self.firstseen = time.time() class Simulator(object): def __init__(self, tmpdir): self.waiting_jobs = PriorityQueue() self.running_jobs = set() self.finished_jobs = set() self.joblock = RLock() self.tmpdir = tmpdir.joinpath('simulator') if not self.tmpdir.exists(): self.tmpdir.mkdir() daemonthread(target=self._checkJobs, name="checkJobsLoop") self.workerlock = RLock() self.workers = {} # key worker_name, value: Worker() def queue(self, run, priority=NORMAL_PRIO): run.qts = time.time() self.waiting_jobs.put((priority, run)) def _checkJobs(self): old_w, old_r, old_f = 0, 0, 0 while True: try: statsfile = self.tmpdir.parent.joinpath('simulator.stats') mode = 'a' if statsfile.exists() else 'w' with open(statsfile, mode) as fh: w, r, f = self.queryRuns() w, r, f = len(w), len(r), len(f) if (w, r, f) != (old_w, old_r, old_f): fh.write('ts: %.3f wait: %s run: %s fin: %s\n' % (time.time(), w, r, f)) old_w, old_r, old_f = w, r, f with self.joblock: worker_with_job = {} # check for jobs that a worker has left # and for jobs that seems to have been not returned for _run in sorted(set(self.running_jobs), key=lambda r: r.rts): if _run.worker in worker_with_job: oldrun = worker_with_job[_run.worker] log.info('discarding old job of worker %s' % _run.worker) self.brokenRun(_run.worker, oldrun) if time.time() - _run.rts > MAX_RUNTIME: log.info('runtime limit of %s seconds exceeded for job %s on worker %s' % (MAX_RUNTIME, _run.id, _run.worker)) self.brokenRun(_run.worker, _run) else: worker_with_job[_run.worker] = _run except Exception: log.exception('error during checking jobs') time.sleep(5) def getWork(self, wname): with self.workerlock: worker = self.workers.get(wname) if worker is None: self.workers[wname] = worker = Worker(wname) worker.lastts = time.time() worker.active = True # check for expiration for _wname, w in self.workers.items(): if time.time() - w.lastts > WORKER_TIMEOUT: self.workers[_wname].active = False try: _, run = self.waiting_jobs.get_nowait() ctrlfile = run.build(self.tmpdir) with self.joblock: run.rts = time.time() run.worker = wname self.running_jobs.add(run) worker.jobcount += 1 return ctrlfile except Empty: return None def brokenRun(self, worker, run): ''' run is either a runid or a Run instance ''' if isinstance(run, basestring): runid = run run = Run.known.get(runid) if run is None: log.error('unknown runid %s, will not requeue' % runid) return if isinstance(run, Run): log.warn('assuming run %s on worker %s is broken, requeueing' % (run.id, worker)) self.running_jobs.discard(run) self.waiting_jobs.put((HIGH_PRIO, run)) with self.workerlock: w = self.workers.get(worker) if w is not None: w.failurecount += 1 def storeTransferedResult(self, worker, runid, zippeddata): run = Run.known.get(runid) if run is None: log.error('unknown runid %s, will not store result' % runid) return resultsdir = self.tmpdir.joinpath('results', runid) if not resultsdir.exists(): resultsdir.makedirs() tempf = resultsdir.joinpath('_tmp.zip') log.debug('storing %.1f kb to %s' % (len(zippeddata) / 1024.0, tempf.abspath())) tempf.write_bytes(zippeddata) zf = ZipFile(tempf) fname = None for n in zf.namelist(): log.debug('extracting %s' % n) if n.endswith('.dat'): fname = path(n).namebase s = zf.read(n) f = resultsdir.joinpath(n) log.debug('storing %.1f kb to %s' % (len(s) / 1024.0, f.abspath())) f.write_bytes(s) run.resultfiles.append(f) if fname is not None: f = tempf.parent.joinpath('%s.zip' % fname) tempf.rename(f) run.resultfiles.append(f) self._runfinished(run) def _runfinished(self, run): with self.joblock: self.running_jobs.discard(run) if run.qts is None: # a unqueued job - from finishAll() run.qts = time.time() if run.rts is None: run.rts = time.time() run.fts = time.time() self.finished_jobs.add(run) w = self.workers.get(run.worker) if w is not None: w.durations.append(run.fts - run.rts) if CLEANUP_RUN_DIRECTORIES: run.cleanup() def storeResultAtLocIDs(self, worker, runid, data): run = Run.known.get(runid) t = time.time() if run is None: return #~ print 'got %s' % runid run.locid2rssi = data self._runfinished(run) def queryRuns(self, runids=None): if runids is None: runids = Run.known runids = set(runids) waiting, running, finished = [], [], [] with self.joblock: running = {run.id for run in self.running_jobs if run.id in runids} finished = {run.id for run in self.finished_jobs if run.id in runids} waiting = set(Run.known.keys()).intersection(runids) - running - finished return waiting, running, finished def finishAll(self): i = 0 while True: try: # clear job queue _, run = self.waiting_jobs.get_nowait() self._runfinished(run) except Empty: break i += 1 log.warn('removed %s runs from waiting queue' % i) def testQueue(sim): objfile = path('/home/dirk/loco/maps/UMIC/umic.obj') materials = loadMaterials(path('/home/dirk/loco/maps/UMIC/materials.xml')) ap = AccessPoint(x=35.3, y=14.2, z=1.0, powerid='asus', id='107') bbox = BBox(x1=-1.0, x2=60.0, y1=-1.0, y2=18.0, z1=-4, z2=4.0) resolution = Resolution(x=320, y=99, z=42) density = 0.3 numphotons = 20000 try: path('./tmp/1').rmtree() except: pass r = Run('umic', objfile, materials, ap, bbox, resolution, density, numphotons) sim.queue(r) if __name__ == '__main__': sim = Simulator(path('./tmp')) testQueue(sim) print sim.getWork()