| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381 |
- from Queue import PriorityQueue, Empty
- import random
- import time
- from threading import RLock
- from zipfile import ZipFile
- import logging
- from collections import defaultdict
- from utils.path import path
- from utils.xml import FileToTree, prettyPrint, TreeToFile, StringToTree, subelement
- from runphoton import prepareControlFile, BBox, Resolution
- from utils.materials import loadMaterials, materialsAsXml, Brdf
- from utils import daemonthread
- log = logging.getLogger('lws')
- CLEANUP_RUN_DIRECTORIES = True
- MAX_RUNTIME = 300 # 5 min is max runtime of a job, after that it will be rescheduled
- WORKER_TIMEOUT = 300 # after 5 min seconds inactivity we assume a known worker has gone down
- HIGH_PRIO = 0
- NORMAL_PRIO = 10
- BASE_SCENE = '''
- <scene>
- <light type="monolight" name="AP 1" power="0.0" mode="undirected">
- <from x="0.0" y="0.0" z="0.0" />
- <wavelength value="0.12" />
- </light>
- <rwp>
- <simulation>
- <initial_ior r="1" i="0" />
- <photons value="0" />
- <recursion depth="15" diffractions="0" />
- <diffraction_radius value="3" />
- <bbox>
- <min x="0.0" y="0.0" z="0.0" />
- <max x="0.0" y="0.0" z="0.0" />
- </bbox>
- </simulation>
- <volumerender>
- <output filename="XXX" save_mode="float" />
- <size x="0" y="0" z="0" />
- <density_estimator>
- <radius value="0.0" />
- </density_estimator>
- </volumerender>
- </rwp>
- </scene>
- '''
- class AccessPoint(object):
- def __init__(self, id='default', x=0.0, y=0.0, z=0.0, powerid='all', power=1.0):
- self.id = id
- self.x = x
- self.y = y
- self.z = z
- self.power = power
- self.powerid = powerid
-
- class Run(object):
- ''' a simulation run '''
-
- known = {} # known runs key: runid, value Run()
- runcount = 0
- countlock = RLock()
-
- def __init__(self, scenename, objfile, materials, ap, bbox, resolution,
- density, numphotons, code, locidinfo=None, disabledObjects=None):
-
- with self.countlock:
- Run.runcount += 1
- self.id = '%s.%s.%s' % (scenename, ap.id, Run.runcount)
-
- self.scenename = scenename
- self.objfile = objfile
- self.materials = materials
- self.ap = ap
- self.bbox = bbox
- self.resolution = resolution
- self.density = density
- self.numphotons = numphotons
- self.disabledObjects = disabledObjects
-
- self.qts = None # timestamp queued
- self.rts = None # timestamp running
- self.fts = None # timestamp finished
-
- self.worker = None # the worker that handles this job
-
- self.resultfiles = [] # .raw and .dat file and .zip file containing both
- Run.known[self.id] = self
-
- self.code = code
-
- # hold location coords: key: locid, value: namedtuple Measurement
- self.locidinfo = locidinfo
- # hold result of a measurement worker run
- self.locid2rssi = None
-
- self.tmpdir = None
-
- def build(self, tmpdir):
- self.tmpdir = _tmpdir = tmpdir.joinpath(self.id)
- if _tmpdir.exists():
- _tmpdir.rmtree()
- _tmpdir.mkdir()
-
- outname = '%s_%s' % (self.scenename, self.ap.id)
-
- code = path(__file__).parent.joinpath('worker_code_%s.py' % self.code).text()
-
- rootAttribs = {'name': self.scenename, 'code': code, 'runid': self.id}
- ctrlfile = _tmpdir.joinpath('%s.xml' % self.scenename)
-
- if self.locidinfo is None:
- ctrlfile.write_text(BASE_SCENE)
- else:
- # inject location info into ctrlfile
- tree = StringToTree(BASE_SCENE)
- locsEl = subelement(tree.getroot(), 'locations')
- for locid, loc in self.locidinfo.items():
- subelement(locsEl, 'loc', attribs={'id': locid, 'x': loc.x, 'y': loc.y, 'z': loc.z})
-
- TreeToFile(tree, ctrlfile)
-
- remapMaterials = defaultdict(lambda: 'MatConcrete')
- remapMaterials['MatLightWalls'] = 'MatLightWalls'
- for n in ['MatCupboard', 'MatDoors', 'MatFascade','MatGlassDoor', 'MatGlassWindow',
- 'MatHardware', 'MatIronDoor', 'MatLightWalls', 'MatRailing', 'MatTable']:
- remapMaterials[n] = 'MatLightWalls'
- remapMaterials = None
-
- preparedFile = prepareControlFile(ctrlfile, self.objfile, self.materials, self.ap,
- bbox=self.bbox, resolution=self.resolution,
- density=self.density, numphotons=self.numphotons,
- tmpdir=_tmpdir, outname=outname, rootAttribs=rootAttribs,
- disabledObjects=self.disabledObjects, remapMaterials=remapMaterials)
- return preparedFile
-
- @classmethod
- def get(cls, runid):
- return cls.known.get(runid, None)
-
- def cleanup(self):
- if self.tmpdir is not None:
- try:
- self.tmpdir.rmtree()
- except IOError:
- log.exception('cannot remove %s' % self.tmpdir.abspath())
-
- class Worker(object):
- def __init__(self, name):
- self.name = name
- self.jobcount = 0
- self.failurecount = 0
- self.durations = [] # list of job durations in seconds
- self.active = True
- self.firstseen = time.time()
-
- class Simulator(object):
- def __init__(self, tmpdir):
- self.waiting_jobs = PriorityQueue()
- self.running_jobs = set()
- self.finished_jobs = set()
-
- self.joblock = RLock()
- self.tmpdir = tmpdir.joinpath('simulator')
- if not self.tmpdir.exists():
- self.tmpdir.mkdir()
-
- daemonthread(target=self._checkJobs, name="checkJobsLoop")
-
- self.workerlock = RLock()
- self.workers = {} # key worker_name, value: Worker()
-
- def queue(self, run, priority=NORMAL_PRIO):
- run.qts = time.time()
- self.waiting_jobs.put((priority, run))
-
- def _checkJobs(self):
- old_w, old_r, old_f = 0, 0, 0
- while True:
-
- try:
- statsfile = self.tmpdir.parent.joinpath('simulator.stats')
- mode = 'a' if statsfile.exists() else 'w'
- with open(statsfile, mode) as fh:
- w, r, f = self.queryRuns()
- w, r, f = len(w), len(r), len(f)
- if (w, r, f) != (old_w, old_r, old_f):
- fh.write('ts: %.3f wait: %s run: %s fin: %s\n' % (time.time(), w, r, f))
- old_w, old_r, old_f = w, r, f
-
- with self.joblock:
- worker_with_job = {}
- # check for jobs that a worker has left
- # and for jobs that seems to have been not returned
- for _run in sorted(set(self.running_jobs), key=lambda r: r.rts):
- if _run.worker in worker_with_job:
- oldrun = worker_with_job[_run.worker]
- log.info('discarding old job of worker %s' % _run.worker)
- self.brokenRun(_run.worker, oldrun)
-
- if time.time() - _run.rts > MAX_RUNTIME:
- log.info('runtime limit of %s seconds exceeded for job %s on worker %s' % (MAX_RUNTIME, _run.id, _run.worker))
- self.brokenRun(_run.worker, _run)
- else:
- worker_with_job[_run.worker] = _run
-
-
-
- except Exception:
- log.exception('error during checking jobs')
-
- time.sleep(5)
-
- def getWork(self, wname):
-
- with self.workerlock:
- worker = self.workers.get(wname)
- if worker is None:
- self.workers[wname] = worker = Worker(wname)
-
- worker.lastts = time.time()
- worker.active = True
- # check for expiration
- for _wname, w in self.workers.items():
- if time.time() - w.lastts > WORKER_TIMEOUT:
- self.workers[_wname].active = False
-
-
- try:
- _, run = self.waiting_jobs.get_nowait()
- ctrlfile = run.build(self.tmpdir)
-
- with self.joblock:
- run.rts = time.time()
- run.worker = wname
- self.running_jobs.add(run)
-
- worker.jobcount += 1
- return ctrlfile
- except Empty:
- return None
-
- def brokenRun(self, worker, run):
- ''' run is either a runid or a Run instance '''
- if isinstance(run, basestring):
- runid = run
- run = Run.known.get(runid)
- if run is None:
- log.error('unknown runid %s, will not requeue' % runid)
- return
-
- if isinstance(run, Run):
- log.warn('assuming run %s on worker %s is broken, requeueing' % (run.id, worker))
- self.running_jobs.discard(run)
- self.waiting_jobs.put((HIGH_PRIO, run))
-
- with self.workerlock:
- w = self.workers.get(worker)
- if w is not None:
- w.failurecount += 1
-
- def storeTransferedResult(self, worker, runid, zippeddata):
- run = Run.known.get(runid)
- if run is None:
- log.error('unknown runid %s, will not store result' % runid)
- return
-
- resultsdir = self.tmpdir.joinpath('results', runid)
- if not resultsdir.exists():
- resultsdir.makedirs()
-
- tempf = resultsdir.joinpath('_tmp.zip')
- log.debug('storing %.1f kb to %s' % (len(zippeddata) / 1024.0, tempf.abspath()))
- tempf.write_bytes(zippeddata)
-
- zf = ZipFile(tempf)
-
- fname = None
- for n in zf.namelist():
- log.debug('extracting %s' % n)
- if n.endswith('.dat'):
- fname = path(n).namebase
- s = zf.read(n)
- f = resultsdir.joinpath(n)
- log.debug('storing %.1f kb to %s' % (len(s) / 1024.0, f.abspath()))
- f.write_bytes(s)
- run.resultfiles.append(f)
-
- if fname is not None:
- f = tempf.parent.joinpath('%s.zip' % fname)
- tempf.rename(f)
- run.resultfiles.append(f)
-
- self._runfinished(run)
-
- def _runfinished(self, run):
- with self.joblock:
- self.running_jobs.discard(run)
-
- if run.qts is None: # a unqueued job - from finishAll()
- run.qts = time.time()
- if run.rts is None:
- run.rts = time.time()
-
- run.fts = time.time()
- self.finished_jobs.add(run)
-
-
- w = self.workers.get(run.worker)
- if w is not None:
- w.durations.append(run.fts - run.rts)
-
- if CLEANUP_RUN_DIRECTORIES:
- run.cleanup()
-
- def storeResultAtLocIDs(self, worker, runid, data):
-
- run = Run.known.get(runid)
- t = time.time()
-
- if run is None:
- return
-
- #~ print 'got %s' % runid
- run.locid2rssi = data
-
-
- self._runfinished(run)
-
-
- def queryRuns(self, runids=None):
- if runids is None:
- runids = Run.known
-
- runids = set(runids)
- waiting, running, finished = [], [], []
- with self.joblock:
- running = {run.id for run in self.running_jobs if run.id in runids}
- finished = {run.id for run in self.finished_jobs if run.id in runids}
- waiting = set(Run.known.keys()).intersection(runids) - running - finished
- return waiting, running, finished
-
- def finishAll(self):
- i = 0
- while True:
- try:
- # clear job queue
- _, run = self.waiting_jobs.get_nowait()
- self._runfinished(run)
- except Empty:
- break
- i += 1
- log.warn('removed %s runs from waiting queue' % i)
-
-
- def testQueue(sim):
- objfile = path('/home/dirk/loco/maps/UMIC/umic.obj')
- materials = loadMaterials(path('/home/dirk/loco/maps/UMIC/materials.xml'))
- ap = AccessPoint(x=35.3, y=14.2, z=1.0, powerid='asus', id='107')
- bbox = BBox(x1=-1.0, x2=60.0, y1=-1.0, y2=18.0, z1=-4, z2=4.0)
- resolution = Resolution(x=320, y=99, z=42)
- density = 0.3
- numphotons = 20000
-
- try: path('./tmp/1').rmtree()
- except: pass
- r = Run('umic', objfile, materials, ap, bbox, resolution, density, numphotons)
- sim.queue(r)
-
-
- if __name__ == '__main__':
-
- sim = Simulator(path('./tmp'))
- testQueue(sim)
- print sim.getWork()
|