simulate.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. from Queue import PriorityQueue, Empty
  2. import random
  3. import time
  4. from threading import RLock
  5. from zipfile import ZipFile
  6. import logging
  7. from collections import defaultdict
  8. from utils.path import path
  9. from utils.xml import FileToTree, prettyPrint, TreeToFile, StringToTree, subelement
  10. from runphoton import prepareControlFile, BBox, Resolution
  11. from utils.materials import loadMaterials, materialsAsXml, Brdf
  12. from utils import daemonthread
  13. log = logging.getLogger('lws')
  14. CLEANUP_RUN_DIRECTORIES = True
  15. MAX_RUNTIME = 300 # 5 min is max runtime of a job, after that it will be rescheduled
  16. WORKER_TIMEOUT = 300 # after 5 min seconds inactivity we assume a known worker has gone down
  17. HIGH_PRIO = 0
  18. NORMAL_PRIO = 10
  19. BASE_SCENE = '''
  20. <scene>
  21. <light type="monolight" name="AP 1" power="0.0" mode="undirected">
  22. <from x="0.0" y="0.0" z="0.0" />
  23. <wavelength value="0.12" />
  24. </light>
  25. <rwp>
  26. <simulation>
  27. <initial_ior r="1" i="0" />
  28. <photons value="0" />
  29. <recursion depth="15" diffractions="0" />
  30. <diffraction_radius value="3" />
  31. <bbox>
  32. <min x="0.0" y="0.0" z="0.0" />
  33. <max x="0.0" y="0.0" z="0.0" />
  34. </bbox>
  35. </simulation>
  36. <volumerender>
  37. <output filename="XXX" save_mode="float" />
  38. <size x="0" y="0" z="0" />
  39. <density_estimator>
  40. <radius value="0.0" />
  41. </density_estimator>
  42. </volumerender>
  43. </rwp>
  44. </scene>
  45. '''
  46. class AccessPoint(object):
  47. def __init__(self, id='default', x=0.0, y=0.0, z=0.0, powerid='all', power=1.0):
  48. self.id = id
  49. self.x = x
  50. self.y = y
  51. self.z = z
  52. self.power = power
  53. self.powerid = powerid
  54. class Run(object):
  55. ''' a simulation run '''
  56. known = {} # known runs key: runid, value Run()
  57. runcount = 0
  58. countlock = RLock()
  59. def __init__(self, scenename, objfile, materials, ap, bbox, resolution,
  60. density, numphotons, code, locidinfo=None, disabledObjects=None):
  61. with self.countlock:
  62. Run.runcount += 1
  63. self.id = '%s.%s.%s' % (scenename, ap.id, Run.runcount)
  64. self.scenename = scenename
  65. self.objfile = objfile
  66. self.materials = materials
  67. self.ap = ap
  68. self.bbox = bbox
  69. self.resolution = resolution
  70. self.density = density
  71. self.numphotons = numphotons
  72. self.disabledObjects = disabledObjects
  73. self.qts = None # timestamp queued
  74. self.rts = None # timestamp running
  75. self.fts = None # timestamp finished
  76. self.worker = None # the worker that handles this job
  77. self.resultfiles = [] # .raw and .dat file and .zip file containing both
  78. Run.known[self.id] = self
  79. self.code = code
  80. # hold location coords: key: locid, value: namedtuple Measurement
  81. self.locidinfo = locidinfo
  82. # hold result of a measurement worker run
  83. self.locid2rssi = None
  84. self.tmpdir = None
  85. def build(self, tmpdir):
  86. self.tmpdir = _tmpdir = tmpdir.joinpath(self.id)
  87. if _tmpdir.exists():
  88. _tmpdir.rmtree()
  89. _tmpdir.mkdir()
  90. outname = '%s_%s' % (self.scenename, self.ap.id)
  91. code = path(__file__).parent.joinpath('worker_code_%s.py' % self.code).text()
  92. rootAttribs = {'name': self.scenename, 'code': code, 'runid': self.id}
  93. ctrlfile = _tmpdir.joinpath('%s.xml' % self.scenename)
  94. if self.locidinfo is None:
  95. ctrlfile.write_text(BASE_SCENE)
  96. else:
  97. # inject location info into ctrlfile
  98. tree = StringToTree(BASE_SCENE)
  99. locsEl = subelement(tree.getroot(), 'locations')
  100. for locid, loc in self.locidinfo.items():
  101. subelement(locsEl, 'loc', attribs={'id': locid, 'x': loc.x, 'y': loc.y, 'z': loc.z})
  102. TreeToFile(tree, ctrlfile)
  103. remapMaterials = defaultdict(lambda: 'MatConcrete')
  104. remapMaterials['MatLightWalls'] = 'MatLightWalls'
  105. for n in ['MatCupboard', 'MatDoors', 'MatFascade','MatGlassDoor', 'MatGlassWindow',
  106. 'MatHardware', 'MatIronDoor', 'MatLightWalls', 'MatRailing', 'MatTable']:
  107. remapMaterials[n] = 'MatLightWalls'
  108. remapMaterials = None
  109. preparedFile = prepareControlFile(ctrlfile, self.objfile, self.materials, self.ap,
  110. bbox=self.bbox, resolution=self.resolution,
  111. density=self.density, numphotons=self.numphotons,
  112. tmpdir=_tmpdir, outname=outname, rootAttribs=rootAttribs,
  113. disabledObjects=self.disabledObjects, remapMaterials=remapMaterials)
  114. return preparedFile
  115. @classmethod
  116. def get(cls, runid):
  117. return cls.known.get(runid, None)
  118. def cleanup(self):
  119. if self.tmpdir is not None:
  120. try:
  121. self.tmpdir.rmtree()
  122. except IOError:
  123. log.exception('cannot remove %s' % self.tmpdir.abspath())
  124. class Worker(object):
  125. def __init__(self, name):
  126. self.name = name
  127. self.jobcount = 0
  128. self.failurecount = 0
  129. self.durations = [] # list of job durations in seconds
  130. self.active = True
  131. self.firstseen = time.time()
  132. class Simulator(object):
  133. def __init__(self, tmpdir):
  134. self.waiting_jobs = PriorityQueue()
  135. self.running_jobs = set()
  136. self.finished_jobs = set()
  137. self.joblock = RLock()
  138. self.tmpdir = tmpdir.joinpath('simulator')
  139. if not self.tmpdir.exists():
  140. self.tmpdir.mkdir()
  141. daemonthread(target=self._checkJobs, name="checkJobsLoop")
  142. self.workerlock = RLock()
  143. self.workers = {} # key worker_name, value: Worker()
  144. def queue(self, run, priority=NORMAL_PRIO):
  145. run.qts = time.time()
  146. self.waiting_jobs.put((priority, run))
  147. def _checkJobs(self):
  148. old_w, old_r, old_f = 0, 0, 0
  149. while True:
  150. try:
  151. statsfile = self.tmpdir.parent.joinpath('simulator.stats')
  152. mode = 'a' if statsfile.exists() else 'w'
  153. with open(statsfile, mode) as fh:
  154. w, r, f = self.queryRuns()
  155. w, r, f = len(w), len(r), len(f)
  156. if (w, r, f) != (old_w, old_r, old_f):
  157. fh.write('ts: %.3f wait: %s run: %s fin: %s\n' % (time.time(), w, r, f))
  158. old_w, old_r, old_f = w, r, f
  159. with self.joblock:
  160. worker_with_job = {}
  161. # check for jobs that a worker has left
  162. # and for jobs that seems to have been not returned
  163. for _run in sorted(set(self.running_jobs), key=lambda r: r.rts):
  164. if _run.worker in worker_with_job:
  165. oldrun = worker_with_job[_run.worker]
  166. log.info('discarding old job of worker %s' % _run.worker)
  167. self.brokenRun(_run.worker, oldrun)
  168. if time.time() - _run.rts > MAX_RUNTIME:
  169. log.info('runtime limit of %s seconds exceeded for job %s on worker %s' % (MAX_RUNTIME, _run.id, _run.worker))
  170. self.brokenRun(_run.worker, _run)
  171. else:
  172. worker_with_job[_run.worker] = _run
  173. except Exception:
  174. log.exception('error during checking jobs')
  175. time.sleep(5)
  176. def getWork(self, wname):
  177. with self.workerlock:
  178. worker = self.workers.get(wname)
  179. if worker is None:
  180. self.workers[wname] = worker = Worker(wname)
  181. worker.lastts = time.time()
  182. worker.active = True
  183. # check for expiration
  184. for _wname, w in self.workers.items():
  185. if time.time() - w.lastts > WORKER_TIMEOUT:
  186. self.workers[_wname].active = False
  187. try:
  188. _, run = self.waiting_jobs.get_nowait()
  189. ctrlfile = run.build(self.tmpdir)
  190. with self.joblock:
  191. run.rts = time.time()
  192. run.worker = wname
  193. self.running_jobs.add(run)
  194. worker.jobcount += 1
  195. return ctrlfile
  196. except Empty:
  197. return None
  198. def brokenRun(self, worker, run):
  199. ''' run is either a runid or a Run instance '''
  200. if isinstance(run, basestring):
  201. runid = run
  202. run = Run.known.get(runid)
  203. if run is None:
  204. log.error('unknown runid %s, will not requeue' % runid)
  205. return
  206. if isinstance(run, Run):
  207. log.warn('assuming run %s on worker %s is broken, requeueing' % (run.id, worker))
  208. self.running_jobs.discard(run)
  209. self.waiting_jobs.put((HIGH_PRIO, run))
  210. with self.workerlock:
  211. w = self.workers.get(worker)
  212. if w is not None:
  213. w.failurecount += 1
  214. def storeTransferedResult(self, worker, runid, zippeddata):
  215. run = Run.known.get(runid)
  216. if run is None:
  217. log.error('unknown runid %s, will not store result' % runid)
  218. return
  219. resultsdir = self.tmpdir.joinpath('results', runid)
  220. if not resultsdir.exists():
  221. resultsdir.makedirs()
  222. tempf = resultsdir.joinpath('_tmp.zip')
  223. log.debug('storing %.1f kb to %s' % (len(zippeddata) / 1024.0, tempf.abspath()))
  224. tempf.write_bytes(zippeddata)
  225. zf = ZipFile(tempf)
  226. fname = None
  227. for n in zf.namelist():
  228. log.debug('extracting %s' % n)
  229. if n.endswith('.dat'):
  230. fname = path(n).namebase
  231. s = zf.read(n)
  232. f = resultsdir.joinpath(n)
  233. log.debug('storing %.1f kb to %s' % (len(s) / 1024.0, f.abspath()))
  234. f.write_bytes(s)
  235. run.resultfiles.append(f)
  236. if fname is not None:
  237. f = tempf.parent.joinpath('%s.zip' % fname)
  238. tempf.rename(f)
  239. run.resultfiles.append(f)
  240. self._runfinished(run)
  241. def _runfinished(self, run):
  242. with self.joblock:
  243. self.running_jobs.discard(run)
  244. if run.qts is None: # a unqueued job - from finishAll()
  245. run.qts = time.time()
  246. if run.rts is None:
  247. run.rts = time.time()
  248. run.fts = time.time()
  249. self.finished_jobs.add(run)
  250. w = self.workers.get(run.worker)
  251. if w is not None:
  252. w.durations.append(run.fts - run.rts)
  253. if CLEANUP_RUN_DIRECTORIES:
  254. run.cleanup()
  255. def storeResultAtLocIDs(self, worker, runid, data):
  256. run = Run.known.get(runid)
  257. t = time.time()
  258. if run is None:
  259. return
  260. #~ print 'got %s' % runid
  261. run.locid2rssi = data
  262. self._runfinished(run)
  263. def queryRuns(self, runids=None):
  264. if runids is None:
  265. runids = Run.known
  266. runids = set(runids)
  267. waiting, running, finished = [], [], []
  268. with self.joblock:
  269. running = {run.id for run in self.running_jobs if run.id in runids}
  270. finished = {run.id for run in self.finished_jobs if run.id in runids}
  271. waiting = set(Run.known.keys()).intersection(runids) - running - finished
  272. return waiting, running, finished
  273. def finishAll(self):
  274. i = 0
  275. while True:
  276. try:
  277. # clear job queue
  278. _, run = self.waiting_jobs.get_nowait()
  279. self._runfinished(run)
  280. except Empty:
  281. break
  282. i += 1
  283. log.warn('removed %s runs from waiting queue' % i)
  284. def testQueue(sim):
  285. objfile = path('/home/dirk/loco/maps/UMIC/umic.obj')
  286. materials = loadMaterials(path('/home/dirk/loco/maps/UMIC/materials.xml'))
  287. ap = AccessPoint(x=35.3, y=14.2, z=1.0, powerid='asus', id='107')
  288. bbox = BBox(x1=-1.0, x2=60.0, y1=-1.0, y2=18.0, z1=-4, z2=4.0)
  289. resolution = Resolution(x=320, y=99, z=42)
  290. density = 0.3
  291. numphotons = 20000
  292. try: path('./tmp/1').rmtree()
  293. except: pass
  294. r = Run('umic', objfile, materials, ap, bbox, resolution, density, numphotons)
  295. sim.queue(r)
  296. if __name__ == '__main__':
  297. sim = Simulator(path('./tmp'))
  298. testQueue(sim)
  299. print sim.getWork()