import time import logging import urllib import sys from optparse import OptionParser import os import subprocess from utils.path import path from utils.volumeimage import VolumeImage from utils.xml import TreeToFile, prettyPrint, subelement, StringToTree from utils.url import getOpener import socket MAX_RUNS = 100000 ERR_COUNT = 0 avg_jobtime = 120 START_TIME = time.time() import random parser = OptionParser() parser.add_option("-w", "--worker", dest="wid", help="worker id, empty will lead to hostname, 'auto' to a randomly choosen Int concatenated to hostname", default="") parser.add_option("-p", "--photon", dest="photoncmd", help="photon command line", default="photon") parser.add_option("-u", "--url", dest="url", help="server url", default="http://loco.visual-library.de") parser.add_option("-t", "--time", dest="timelimit", help="timelimit in seconds", default="3600") options, args = parser.parse_args() wid = options.wid if wid == 'auto': wid = random.randint(10000, 100000) WORKER_ID = '%s_%s' % (socket.gethostname().split('.')[0], wid) elif wid == '': WORKER_ID = socket.gethostname().split('.')[0] else: WORKER_ID = '%s_%s' % (socket.gethostname().split('.')[0], wid) print 'got WORKER_ID %s' % WORKER_ID PHOTON_PATH = options.photoncmd #if path('/home/dr872860/photon/photon').exists(): MAX_TIME = int(options.timelimit) TMPDIR = path('./worker_tmp') if not TMPDIR.exists(): TMPDIR.mkdir() TMPDIR = TMPDIR.joinpath(WORKER_ID) if not TMPDIR.exists(): TMPDIR.mkdir() log = logging.getLogger() logging.basicConfig(level=logging.DEBUG) VERBOSE = True def runPhoton(ctrlfile): cmd = [PHOTON_PATH, '-s', str(ctrlfile.abspath()), '-p', '1'] if not VERBOSE: subprocess.check_call(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) else: subprocess.check_call(cmd) def itsbroken(runid, opener): try: opener.open(options.url + '/brokenJob?worker=%s&runid=%s' % (WORKER_ID, runid)) except Exception: log.exception('error during communicating brokennes') def main(): global ERR_COUNT log.info('Worker-ID: %s' % WORKER_ID) headers = {'User-Agent': 'photon-worker/%s' % WORKER_ID} # multipart AND gzip does not work well together so we use two configurations opener = getOpener(headers=headers, enableGzip=True) mpart_opener = getOpener(headers=headers, enableMultipart=True) geturl = options.url + '/getJob?worker=%s' % WORKER_ID objurl = options.url + '/getObjData?worker=%s&objfile=%%s&runid=%%s' % WORKER_ID for i in range(MAX_RUNS): log.info('run %s of %s' % (i, MAX_RUNS)) if MAX_TIME is not None and time.time() - START_TIME > MAX_TIME - avg_jobtime * 2: log.warn('timelimit reached, breaking') break try: log.info('fetching %s ...' % geturl) ctrlxml = opener.open(geturl).read() except Exception: log.exception('error during fetching %s' % geturl) time.sleep(5) continue try: tree = StringToTree(ctrlxml) except Exception: log.exception('cannot parse:\n%s' % ctrlxml) time.sleep(5) continue if tree.getroot().tag == 'scene': name = tree.getroot().attrib['name'] code = tree.getroot().attrib['code'] runid = tree.getroot().attrib['runid'] ctrlfile = TMPDIR.joinpath('%s.xml' % name) for includeEl in tree.xpath('//mesh/include'): h = includeEl.attrib['hash'] cachedpath = TMPDIR.joinpath('%s.obj' % h) if not cachedpath.exists(): url = objurl % (urllib.quote(includeEl.attrib['file']), runid) log.info('fetching objfile from %s' % url) objdata = opener.open(url).read() cachedpath.write_bytes(objdata) includeEl.attrib['file'] = cachedpath.name #~ outputEl = tree.xpath('//volumerender/output')[0] #~ outputEl.attrib['filename'] = outputEl.attrib['filename'] log.info('storing photon ctrltree to %s' % ctrlfile.abspath()) TreeToFile(tree, ctrlfile) try: t = time.time() log.info('running photon...') runPhoton(ctrlfile) log.info('duration: %.1f' % (time.time() - t)) ERR_COUNT = 0 except Exception: log.exception('error during running photon') itsbroken(runid, opener) if ERR_COUNT > 5: break ERR_COUNT += 1 time.sleep(5) continue outname = tree.xpath('string(//volumerender/output/@filename)') datfile = path('.').parent.joinpath('%s.dat' % outname) log.debug('processing photon result at %s' % datfile.abspath()) try: exec(code) # will redefine after_simulation() after_simulation(tree, datfile, opener, mpart_opener, options) except Exception: log.exception('error during running after_simulation') itsbroken(runid, opener) time.sleep(5) #datfile.copy(path('/tmp/bad_phot').joinpath(datfile.name)) #rawfile = datfile.parent.joinpath(datfile.namebase + '.raw') #rawfile.copy(path('/tmp/bad_phot').joinpath(rawfile)) elif tree.getroot().tag == 'empty': time.sleep(5) if __name__ == '__main__': try: main() except Exception: log.exception('error during mainloop')