photon_worker.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. import time
  2. import logging
  3. import urllib
  4. import sys
  5. from optparse import OptionParser
  6. import os
  7. import subprocess
  8. from utils.path import path
  9. from utils.volumeimage import VolumeImage
  10. from utils.xml import TreeToFile, prettyPrint, subelement, StringToTree
  11. from utils.url import getOpener
  12. import socket
  13. MAX_RUNS = 100000
  14. ERR_COUNT = 0
  15. avg_jobtime = 120
  16. START_TIME = time.time()
  17. import random
  18. parser = OptionParser()
  19. parser.add_option("-w", "--worker", dest="wid",
  20. help="worker id, empty will lead to hostname, 'auto' to a randomly choosen Int concatenated to hostname", default="")
  21. parser.add_option("-p", "--photon", dest="photoncmd",
  22. help="photon command line", default="photon")
  23. parser.add_option("-u", "--url", dest="url",
  24. help="server url", default="http://loco.visual-library.de")
  25. parser.add_option("-t", "--time", dest="timelimit",
  26. help="timelimit in seconds", default="3600")
  27. options, args = parser.parse_args()
  28. wid = options.wid
  29. if wid == 'auto':
  30. wid = random.randint(10000, 100000)
  31. WORKER_ID = '%s_%s' % (socket.gethostname().split('.')[0], wid)
  32. elif wid == '':
  33. WORKER_ID = socket.gethostname().split('.')[0]
  34. else:
  35. WORKER_ID = '%s_%s' % (socket.gethostname().split('.')[0], wid)
  36. print 'got WORKER_ID %s' % WORKER_ID
  37. PHOTON_PATH = options.photoncmd
  38. #if path('/home/dr872860/photon/photon').exists():
  39. MAX_TIME = int(options.timelimit)
  40. TMPDIR = path('./worker_tmp')
  41. if not TMPDIR.exists():
  42. TMPDIR.mkdir()
  43. TMPDIR = TMPDIR.joinpath(WORKER_ID)
  44. if not TMPDIR.exists():
  45. TMPDIR.mkdir()
  46. log = logging.getLogger()
  47. logging.basicConfig(level=logging.DEBUG)
  48. VERBOSE = True
  49. def runPhoton(ctrlfile):
  50. cmd = [PHOTON_PATH, '-s', str(ctrlfile.abspath()), '-p', '1']
  51. if not VERBOSE:
  52. subprocess.check_call(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  53. else:
  54. subprocess.check_call(cmd)
  55. def itsbroken(runid, opener):
  56. try:
  57. opener.open(options.url + '/brokenJob?worker=%s&runid=%s' % (WORKER_ID, runid))
  58. except Exception:
  59. log.exception('error during communicating brokennes')
  60. def main():
  61. global ERR_COUNT
  62. log.info('Worker-ID: %s' % WORKER_ID)
  63. headers = {'User-Agent': 'photon-worker/%s' % WORKER_ID}
  64. # multipart AND gzip does not work well together so we use two configurations
  65. opener = getOpener(headers=headers, enableGzip=True)
  66. mpart_opener = getOpener(headers=headers, enableMultipart=True)
  67. geturl = options.url + '/getJob?worker=%s' % WORKER_ID
  68. objurl = options.url + '/getObjData?worker=%s&objfile=%%s&runid=%%s' % WORKER_ID
  69. for i in range(MAX_RUNS):
  70. log.info('run %s of %s' % (i, MAX_RUNS))
  71. if MAX_TIME is not None and time.time() - START_TIME > MAX_TIME - avg_jobtime * 2:
  72. log.warn('timelimit reached, breaking')
  73. break
  74. try:
  75. log.info('fetching %s ...' % geturl)
  76. ctrlxml = opener.open(geturl).read()
  77. except Exception:
  78. log.exception('error during fetching %s' % geturl)
  79. time.sleep(5)
  80. continue
  81. try:
  82. tree = StringToTree(ctrlxml)
  83. except Exception:
  84. log.exception('cannot parse:\n%s' % ctrlxml)
  85. time.sleep(5)
  86. continue
  87. if tree.getroot().tag == 'scene':
  88. name = tree.getroot().attrib['name']
  89. code = tree.getroot().attrib['code']
  90. runid = tree.getroot().attrib['runid']
  91. ctrlfile = TMPDIR.joinpath('%s.xml' % name)
  92. for includeEl in tree.xpath('//mesh/include'):
  93. h = includeEl.attrib['hash']
  94. cachedpath = TMPDIR.joinpath('%s.obj' % h)
  95. if not cachedpath.exists():
  96. url = objurl % (urllib.quote(includeEl.attrib['file']), runid)
  97. log.info('fetching objfile from %s' % url)
  98. objdata = opener.open(url).read()
  99. cachedpath.write_bytes(objdata)
  100. includeEl.attrib['file'] = cachedpath.name
  101. #~ outputEl = tree.xpath('//volumerender/output')[0]
  102. #~ outputEl.attrib['filename'] = outputEl.attrib['filename']
  103. log.info('storing photon ctrltree to %s' % ctrlfile.abspath())
  104. TreeToFile(tree, ctrlfile)
  105. try:
  106. t = time.time()
  107. log.info('running photon...')
  108. runPhoton(ctrlfile)
  109. log.info('duration: %.1f' % (time.time() - t))
  110. ERR_COUNT = 0
  111. except Exception:
  112. log.exception('error during running photon')
  113. itsbroken(runid, opener)
  114. if ERR_COUNT > 5: break
  115. ERR_COUNT += 1
  116. time.sleep(5)
  117. continue
  118. outname = tree.xpath('string(//volumerender/output/@filename)')
  119. datfile = path('.').parent.joinpath('%s.dat' % outname)
  120. log.debug('processing photon result at %s' % datfile.abspath())
  121. try:
  122. exec(code) # will redefine after_simulation()
  123. after_simulation(tree, datfile, opener, mpart_opener, options)
  124. except Exception:
  125. log.exception('error during running after_simulation')
  126. itsbroken(runid, opener)
  127. time.sleep(5)
  128. #datfile.copy(path('/tmp/bad_phot').joinpath(datfile.name))
  129. #rawfile = datfile.parent.joinpath(datfile.namebase + '.raw')
  130. #rawfile.copy(path('/tmp/bad_phot').joinpath(rawfile))
  131. elif tree.getroot().tag == 'empty':
  132. time.sleep(5)
  133. if __name__ == '__main__':
  134. try:
  135. main()
  136. except Exception:
  137. log.exception('error during mainloop')