| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560 |
- import logging
- import time
- from collections import namedtuple, defaultdict
- import datetime
- import functools
- import socket
- from scipy.misc import pilutil
- import numpy as np
- from concurrent import futures
- from utils.path import path
- from utils.xml import StringToTree, subelement, TreeToFile, FileToTree
- import utils.materials
- from utils.volumeimage import VolumeImage
- from utils import pos2rgb
- import numpy as np
- from scipy.misc import pilutil
- from PIL import Image, ImageDraw, ImageFont, ImageOps
- import localize as localizer
- import optimize as optimizer
- log = logging.getLogger('lws')
- def storeErrors(cachefile, errors):
- all_err3d = defaultdict(list)
- all_err2d = defaultdict(list)
- for pathid in errors:
- for runid in errors[pathid]:
- for s in ['end', 'seq']:
- err3d, err2d = errors[pathid][runid][s]
- all_err3d[s].append(err3d)
- all_err2d[s].append(err2d)
-
- try:
- avg_err2d_end = sum(all_err2d['end']) / len(all_err2d['end'])
- avg_err2d_seq = sum(all_err2d['seq']) / len(all_err2d['seq'])
- avg_err3d_end = sum(all_err3d['end']) / len(all_err3d['end'])
- avg_err3d_seq = sum(all_err3d['seq']) / len(all_err3d['seq'])
- except ZeroDivisionError:
- avg_err2d_end = avg_err2d_seq = avg_err3d_end = avg_err3d_seq = 0
-
- tree = StringToTree('<errors/>')
-
- a = tree.getroot().attrib
- a['avg_err2d_end'] = '%.2f' % avg_err2d_end
- a['avg_err2d_seq'] = '%.2f' % avg_err2d_seq
- a['avg_err3d_end'] = '%.2f' % avg_err3d_end
- a['avg_err3d_seq'] = '%.2f' % avg_err3d_seq
-
- for pathid, runs in errors.items():
- pathEl = subelement(tree.getroot(), 'path', attribs={'id': pathid})
- for runid, settings in runs.items():
- runEl = subelement(pathEl, 'run', attribs={'id': runid})
- for s, (err3d, err2d) in settings.items():
- subelement(runEl, 'setting', attribs={'id': s, 'err3d': '%.2f' % err3d, 'err2d': '%.2f' % err2d})
- TreeToFile(tree, cachefile)
- def loadErrors(cachefile):
- tree = FileToTree(cachefile)
- result = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: (0, 0))))
- for pathEl in tree.getroot().xpath('path'):
- d1 = result[pathEl.attrib['id']]
- for runEl in pathEl.xpath('run'):
- d2 = d1[runEl.attrib['id']]
- for settingEl in runEl.xpath('setting'):
- d2[settingEl.attrib['id']] = (float(settingEl.attrib['err3d']), float(settingEl.attrib['err2d']))
- return result
- def buildDeviceAdaptationArrays(lws, optrundir):
- activeAPs = [apid for apid in lws.config['aps'] if lws.config['aps'][apid]['optimize']]
- max_locid = max(lws.known_locations.keys())
- cached_vi = {}
- for station in lws.config['knownStations']:
- log.info('building da arrays for %s' % station)
- # initialize with infinity
- apid_locid2measurements = np.zeros(shape=(len(activeAPs), max_locid+1)) + np.inf
-
- remainingActive = []
- for i, apid in enumerate(activeAPs):
- locid2measurement = lws.getMeasurements(station, apid)
-
- if locid2measurement is not None:
- #~ locid2measurement_overlay = app.getMeasurements(station, apid, fromMeasurementStore=False)
-
- for locid, (x, y, z, measured, all_rssis) in locid2measurement.items():
- apid_locid2measurements[i, locid] = float(measured)
- np.save(optrundir.joinpath('%s_apid_locid2measurements.npy' % station), apid_locid2measurements)
-
-
- apid_locid2estimates = np.zeros(shape=apid_locid2measurements.shape) + np.inf
- for i, ap in enumerate(activeAPs):
-
- for j in range(apid_locid2measurements.shape[1]):
- if not np.isfinite(apid_locid2measurements[i, j]):
- continue
-
- loc = lws.known_locations[j]
- datfile = optrundir.joinpath('%s_%s.dat' % (lws.activeScene.name, ap))
- if datfile in cached_vi:
- vi = cached_vi[datfile]
- else:
- vi = cached_vi[datfile] = VolumeImage(datfile)
-
- x, y, z = vi.translate(loc.x, loc.y, loc.z)
- # fetch unnormalized data - therefore raw_estimates
- apid_locid2estimates[i, j] = vi.data[x, y, z]
- np.save(optrundir.joinpath('%s_apid_locid2estimates.npy' % station), apid_locid2estimates)
-
-
- def evaluate(lws, optrun, station, cubewidth, algo, pathids2runids, refresh):
- cachefile = lws.config['tmp'].joinpath('errors_%s_%s_%s_%s.xml' % (optrun, station, cubewidth, algo))
- if not cachefile.exists() or refresh:
- optrundir = lws.config['tmp_apdata'].joinpath(optrun)
-
- locfile = lws.activeScene['locationfile']
- objfile = lws.activeScene['objfile']
- vip = optrundir.joinpath(lws.activeScene.name + '_%s.dat')
-
-
- aps = []
- for apid in lws.config['aps'].sections:
- apcfg = lws.config['aps'][apid]
- aps.append((apid, apcfg['x'], apcfg['y'], apcfg['z']))
-
- davalues = defaultdict(list)
- #~ davalues = lws.optimizer.getDeviceAdaptionFromOptrun(optrun, station)
-
- #~ buildDeviceAdaptationArrays(lws, optrundir)
- #~ apid_locid2measurement = np.load(optrundir.joinpath('%s_apid_locid2measurements.npy' % station))#[:1, :]
- #~ apid_locid2estimated = np.load(optrundir.joinpath('%s_apid_locid2estimates.npy' % station))#[:1, :]
-
- #~ davalues, avg_delta = optimizer.optimizeDeviceAdaption({station: apid_locid2measurement}, apid_locid2estimated, 80)
-
-
- log.info('using davalues: %s' % davalues[station])
-
-
- reload(localizer)
-
- env = localizer.Environment(objfile=objfile, aps=aps, locationfile=locfile, tmpdir=lws.config['tmp'],
- vi_path=vip, bbox2d=lws.activeScene['bbox'][:4],
- davalues=davalues[station])
-
-
- datadir = lws.config['tmp_tracked']
- evaluator = Evaluator(optrun, station, env, cubewidth, datadir, freespace_scan=10, algo=algo)
- #~ pathids2runids = {'og1_classic': ('05',)}
- errors, failures = evaluator.evalAll(pathids2runids)
-
- storeErrors(cachefile, errors)
- else:
- errors = loadErrors(cachefile)
-
- return errors
- #~ pickle.dump(errors, open(cachefile, 'wb'))
- BBox = namedtuple('BBox', 'x1 x2 y1 y2 z1 z2')
- Resolution = namedtuple('Resolution', 'x y z')
- def buildOptrunFromConfig(lws):
- optrundir_vidatadir = lws.config['tmp_apdata'].joinpath(optrun)
- if optrundir_vidatadir.exists():
- for f in optrundir_vidatadir.files():
- f.remove()
- vip = optrundir_vidatadir.joinpath(lws.activeScene.name + '_%s.dat')
- powerids = set()
- aps = []
- for apid in lws.config['aps'].sections:
- apcfg = lws.config['aps'][apid]
- aps.append((apid, apcfg['x'], apcfg['y'], apcfg['z']))
- powerids.add(apcfg['powerid'])
- objfile = lws.activeScene['objfile']
- reload(localizer)
-
- # for l in optrun_init:
- # if l.startswith('resolution: '):
- # resolution = [int(e) for e in l[len('resolution: '):].split(',')]
- # elif l.startswith('bbox: '):
- # bbox = [float(e) for e in l[len('bbox: '):].split(',')]
- # elif l.startswith('scene: '):
- # scene_name = l[len('scene: '):].strip()
- # elif l.startswith('numphotons: '):
- # numphotons = int(l[len('numphotons: '):])
- # elif l.startswith('density: '):
- # density = float(l[len('density: '):])
- # read from lws.config
- print lws.config['scenes']['active']
- print lws.activeScene['bbox']
- print lws.activeScene['resolution']
- print lws.activeScene['numphotons']
- print lws.activeScene['density']
- pass
- def buildOptrun(lws, optrun=None):
-
- if not optrun:
- buildOptrunFromConfig(lws)
- exit
-
-
- optrun_resultdir = lws.optimizer.tmpdir.joinpath(optrun)
- optrun_init = [s for s in optrun_resultdir.joinpath('init.txt').lines() if s.strip()]
- optrun_min = [s for s in optrun_resultdir.joinpath('mins.txt').lines() if s.strip()][-1].split()
-
-
- optrundir_vidatadir = lws.config['tmp_apdata'].joinpath(optrun)
- if optrundir_vidatadir.exists():
- for f in optrundir_vidatadir.files():
- f.remove()
-
- vip = optrundir_vidatadir.joinpath(lws.activeScene.name + '_%s.dat')
-
- powerids = set()
- aps = []
- for apid in lws.config['aps'].sections:
- apcfg = lws.config['aps'][apid]
- aps.append((apid, apcfg['x'], apcfg['y'], apcfg['z']))
- powerids.add(apcfg['powerid'])
-
- objfile = lws.activeScene['objfile']
-
-
- reload(localizer)
-
- for l in optrun_init:
- if l.startswith('resolution: '):
- resolution = [int(e) for e in l[len('resolution: '):].split(',')]
- elif l.startswith('bbox: '):
- bbox = [float(e) for e in l[len('bbox: '):].split(',')]
- elif l.startswith('scene: '):
- scene_name = l[len('scene: '):].strip()
- elif l.startswith('numphotons: '):
- numphotons = int(l[len('numphotons: '):])
- elif l.startswith('density: '):
- density = float(l[len('density: '):])
-
- #~ aps = lws.mac2cfg.values()
-
- powerid2power = {}
- materials = {}
- for min_param in optrun_min:
- for matname in lws.activeScene['materials'].scalars:
- if min_param.startswith('%s:' % matname):
- reflect, alpha = min_param[len('%s:' % matname):].split(',')
- materials[matname] = utils.materials.Brdf(reflect, alpha)
- for pid in powerids:
- if min_param.startswith('%s:' % pid):
- powerid2power[pid] = float(min_param[len('%s:' % pid):])
-
- ap2power = {ap[0]: powerid2power[lws.config['aps'][ap[0]]['powerid']] for ap in aps}
-
- tx, ty, tz = bbox[0], bbox[2], bbox[4]
- dx = round((bbox[1] - bbox[0]) / resolution[0], 6)
- dy = round((bbox[3] - bbox[2]) / resolution[1], 6)
- dz = round((bbox[5] - bbox[4]) / resolution[2], 6)
-
- di = localizer.Dimensions(tx, ty, tz, dx, dy, dz)
- env = localizer.Environment(objfile=objfile, aps=aps, tmpdir=lws.config['tmp'],
- di=di, vi_path=vip, bbox2d=lws.activeScene['bbox'][:4],
- davalues=[])
-
- BASE_URL = 'http://127.0.0.1:%s' % lws.config['httpPort']
- disabledObjects = []
-
- def onResult(*args):
- pass
-
- env.buildSimulations(scene_name, materials, ap2power, BBox(*bbox), Resolution(*resolution),
- density, numphotons, disabledObjects, BASE_URL, onResult=onResult)
-
-
- #~ print 123
- def getCollectedPathids2runids(station, paths, sourcedir, filter=None):
- tree = getCollectedPaths(station, paths, sourcedir)
- pathids2runids = defaultdict(list)
- for pathEl in tree.getroot().xpath('path'):
- if filter is not None and filter.endswith('_r'):
- filter = filter[:-2]
-
- if filter is not None and not filter in pathEl.attrib['id']:
- continue
- for runEl in pathEl.xpath('run'):
- pathids2runids[pathEl.attrib['id']].append(runEl.attrib['id'])
- return pathids2runids
-
- def getCollectedPaths(station, paths, sourcedir):
- tree = StringToTree('<paths/>')
- for pathid, loc in paths.items():
- locs = ','.join(str(e) for e in loc)
- pel = subelement(tree.getroot(), 'path', attribs={'id': pathid, 'locations': locs})
- d = sourcedir.joinpath(station, pathid)
- if not d.exists():
- continue
-
- fnames = sorted(d.files('measurements_*.txt'))
- for fname in fnames:
- id = fname.name[len('measurements_'):-4]
- rel = subelement(pel, 'run', attribs = {'id': id})
- subelement(rel, 'measurements').text = fname.text()
- locfile = d.joinpath('locations_%s.txt' % id)
- subelement(rel, 'locations').text = locfile.text() if locfile.exists() else ''
- return tree
- STAIR_HACK = False
- class Evaluator(object):
- def __init__(self, optrun, device, env, cubewidth, sourcedir,
- analyze=True, interpolate=True, max_aps=None,
- verbose=False, algo='hmm', output_html=True,
- errortype='mean', **kwds):
- self.env = env
- self.cubewidth = cubewidth
-
-
- if algo == 'lmse':
- self.localizer = localizer.LMSELocalizer(env, cubewidth=cubewidth, verbose=verbose, max_aps=max_aps)
- if STAIR_HACK:
- self.localizer3 = localizer.LMSELocalizer(env, cubewidth=3, verbose=verbose, max_aps=max_aps)
- elif algo == 'hmm':
- hmmconfig = kwds.get('hmmconfig', {})
- cubewidth2pruning = {1: 0.06, 2: 0.09, 3: 0.06, 4: 0.05}
- if not 'prune_to' in hmmconfig:
- hmmconfig['prune_to'] = cubewidth2pruning.get(cubewidth, 0.05)
-
- #TODO: use special hmmconfig channel for params
- # prune_to=prune_to, num_threads=4, freespace_scan=freespace_scan,
- self.localizer = localizer.HMMLocalizer(env, cubewidth=cubewidth, verbose=verbose, max_aps=max_aps, **hmmconfig)
- if STAIR_HACK:
- self.localizer3 = localizer.HMMLocalizer(env, cubewidth=3, verbose=verbose, max_aps=max_aps, **hmmconfig)
- elif algo == 'pf':
- pfconfig = kwds.get('pfconfig', {})
- self.localizer = localizer.PFLocalizer(env, cubewidth=cubewidth, verbose=verbose, max_aps=max_aps, **pfconfig)
- if STAIR_HACK:
- self.localizer3 = localizer.PFLocalizer(env, cubewidth=3, verbose=verbose, max_aps=max_aps, **pfconfig)
- else:
- UNSUPPORTED_ALGO
-
- self.algo = algo
-
- self.sourcedir = path(sourcedir) # location of tracked paths
- self.tmpdir = self.env.tmpdir.joinpath('evaluator', optrun)
- if not self.tmpdir.exists():
- self.tmpdir.makedirs()
-
-
- self.analyze = analyze
- self.interpolate = interpolate
- self.device = device
- self.optrun = optrun
-
- self.output_html = output_html
- assert errortype in {'mean', 'median'}
- self.errortype = errortype
-
- def evaluate(self, pathid, runid):
- c2m = self.env.cube2meter
- try:
- #~ print 'eval: %s/%s' % (pathid, runid)
- cubewidth = self.cubewidth
-
- if 'stairs' in pathid:
- cubewidth = 3
-
- measurements = localizer.Measurements.fromTrackedMeasurements(self.env.locid2location, self.sourcedir, self.device, pathid, runid)
-
- #~ measurements = localizer.Measurements()
- #~ measurements.load(r'R:\xx.txt')
-
- t = time.time()
- if STAIR_HACK and 'stairs' in pathid:
- paths, measurements = self.localizer3.evaluateMeasurements(measurements, interpolate=self.interpolate)
- else:
- paths, measurements = self.localizer.evaluateMeasurements(measurements, interpolate=self.interpolate)
-
- error_3d, error_2d, errors_2d, errors_3d = self.getError(measurements, paths, cubewidth)
-
- if self.analyze:
- for pname, psec in paths.items():
- fname = '%s_%s_%s_%s_%s_%s' % (self.device, cubewidth, self.algo, pname, pathid, runid)
-
- in_meter = [c2m(x, y, z, cubewidth) for x, y, z in psec]
- if self.output_html:
- caption = {'end': 'Offline', 'seq': 'Online', 'seq_avg': 'Online/Avg'}[pname]
- text = '%s Error 3D: %.2f, %s Error 2D: %.2f' % (caption, error_3d[pname], caption, error_2d[pname])
- self.drawLocalizationResult2D(in_meter, measurements, name=fname, text=text)
-
- if self.output_html:
- if 'end' in errors_2d:
- # HACK: non LMSE (PF + HMM)
- fname = '%s_%s_%s_end_%s_%s' % (self.device, cubewidth, self.algo, pathid, runid)
- self.localizer.analyzePathSequence(self.tmpdir, paths['end'], path['seq'], measurements, errors_2d['end'], errors_2d['seq'], fname, errors_3d['end'], errors_3d['seq'])
- else:
- # HACK: LMSE
- fname = '%s_%s_%s_end_%s_%s' % (self.device, cubewidth, self.algo, pathid, runid)
- self.localizer.analyzePathSequence(self.tmpdir, paths['seq'], path['seq'], measurements, errors_2d['seq'], errors_2d['seq'], fname, errors_3d['seq'], errors_3d['seq'])
-
- #~ print 'decoded path of length %s in %.3f sec' % (len(estimated_path), time.time() - t)
-
-
- # Bah, has this become ugly
- if not 'seq_avg' in error_3d:
- if not 'end' in error_3d:
- vals = (self.algo, pathid, runid, error_3d['seq'], error_2d['seq'], time.time() - t)
- s = '%s-run %s/%s: seq-error-3d: %.1fm, seq-error-2d: %.1fm [%.2f sec]' % vals
- else:
- vals = (self.algo, pathid, runid, error_3d['end'], error_2d['end'], error_3d['seq'], error_2d['seq'], time.time() - t)
- s = '%s-run %s/%s: error-3d: %.1fm, error-2d: %.1fm, seq-error-3d: %.1fm, seq-error-2d: %.1fm [%.2f sec]' % vals
- else:
- vals = (self.algo, pathid, runid, error_3d['end'], error_2d['end'], error_3d['seq'], error_2d['seq'], error_3d['seq_avg'], error_2d['seq_avg'], time.time() - t)
- s = '%s-run %s/%s: end-3d: %.1fm, end-2d: %.1fm, seq-3d: %.1fm, seq-2d: %.1fm seq_avg-3d: %.1fm, seq_avg-2d: %.1fm [%.2f sec]' % vals
-
- log.info(s)
- except Exception:
- log.exception('error during evaluation of %s/%s' % (pathid, runid))
- error_3d = defaultdict(int)
- error_2d = defaultdict(int)
-
- return error_3d, error_2d
-
-
- def evalAll(self, pathid2runids, limit=None):
- errors = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: (0, 0))))
- failures = []
-
- def got_result(pathid, runid, f):
- error_3d, error_2d = f.result()
- if len(error_2d) == 0 or len(error_3d) == 0:
- failures.append((pathid, runid))
- else:
- for (pname, e3d) in error_3d.items():
- errors[pathid][runid][pname] = (e3d, error_2d[pname])
-
- if self.algo == 'hmm':
- if self.cubewidth < 2:
- max_workers = 2
- else:
- max_workers = 6
- elif self.algo == 'pf':
- max_workers = 8
- elif self.algo == 'lmse':
- max_workers = 8
-
- # initialize datetime strptime as firstly using in in a thread leads to:
- # http://bugs.python.org/issue7980
- datetime.datetime.strptime('2000', '%Y')
-
- t = time.time()
- with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
- for pathid, runids in pathid2runids.items():
- #~ if 'stairs' in pathid or ('og1_eg' in pathid and not 'right' in pathid):
- #~ continue
- if '95' in pathid:
- continue
-
- for runid in runids[:limit]:
- f = executor.submit(functools.partial(self.evaluate, pathid, runid))
- f.add_done_callback(functools.partial(got_result, pathid, runid))
-
- log.info('evaluate all paths in %.3f secs' % (time.time() - t))
-
- avg2d = defaultdict(list)
- for pathid, runs in errors.items():
- for runid, settings in runs.items():
- for pname, (err3d, err2d) in settings.items():
- avg2d[pname].append(err2d)
-
- for pname, _errors in avg2d.items():
- if len(avg2d[pname]) > 0:
- median_e = list(sorted(_errors))[len(_errors) / 2]
- mean_e = sum(_errors) / float(len(_errors))
- log.debug('total mean error-%s 2d: %.2f, median error: %.2f' % (pname, mean_e, median_e))
- #~ log.debug('total median error-%s 2d: %.2fm' % (pname, ))
-
- return errors, failures
-
- def getError(self, measurements, paths, cubewidth):
-
- c2m = self.env.cube2meter
- #~ print seq_estimated_path
-
- assert all(len(measurements) == len(p) for p in paths.values())
- errors_3d = defaultdict(list)
- errors_2d = defaultdict(list)
- error_3d = {}
- error_2d = {}
-
- for pname, pseq in paths.items():
-
- for (x, y, z), m in zip(pseq, measurements):
- if m.pos != (0, 0, 0):
- x, y, z = c2m(x, y, z, cubewidth)
- errors_3d[pname].append(((x - m.pos.x)**2 + (y - m.pos.y)**2 + (z - m.pos.z)**2)**0.5)
- errors_2d[pname].append(((x - m.pos.x)**2 + (y - m.pos.y)**2)**0.5)
-
- if self.errortype == 'mean':
- error_3d[pname] = np.mean(errors_3d[pname]) #sum(errors_3d[pname]) / len(errors_3d[pname])
- error_2d[pname] = np.mean(errors_2d[pname]) #sum(errors_2d[pname]) / len(errors_2d[pname])
- elif self.errortype == 'median':
- error_3d[pname] = np.median(errors_3d[pname]) #list(sorted(errors_3d[pname]))[len(errors_3d[pname]) / 2]
- error_2d[pname] = np.median(errors_2d[pname]) #list(sorted(errors_2d[pname]))[len(errors_2d[pname]) / 2]
-
- return error_3d, error_2d, errors_2d, errors_3d
-
- def drawLocalizationResult2D(self, estimated_path, measurements, name='locresult2d', text=''):
- zs = [p[2] for p in estimated_path]
-
- avg_z = sum(zs) / len(zs) + 0.5
-
- if avg_z < 0:
- avg_z = -2.5
- elif avg_z < 2.0:
- avg_z = 1.0
- else:
- avg_z = 3.5
-
- cutfile = self.env.getCutFile(avg_z)
- img = Image.open(cutfile)
- img = ImageOps.invert(img)
- img = img.convert('RGBA')
-
-
- draw = ImageDraw.Draw(img)
-
- for i, (x1, y1, z1) in enumerate(estimated_path):
- m2dx, m2dy = self.env.translateMeterToMesh2dPixel(x1, y1)
-
- hexval = pos2rgb(i, len(estimated_path), saturation=1.0, spread=1.0)
-
- draw.ellipse((m2dx-self.cubewidth, m2dy-self.cubewidth*1.1, m2dx+self.cubewidth*1.1, m2dy+self.cubewidth*1.1), fill=hexval)
-
-
- if measurements[i].pos != (0, 0, 0):
- m2dx2, m2dy2 = self.env.translateMeterToMesh2dPixel(measurements[i].pos.x, measurements[i].pos.y)
- draw.line((m2dx, m2dy, m2dx2, m2dy2), fill='#FF5555')
-
- if socket.gethostname() == 'nostromo':
- font = ImageFont.truetype("/usr/share/fonts/truetype/freefont/FreeSans.ttf", 16)
- else:
- font = ImageFont.load_default()
-
- draw.text((10, 10), text, fill='#CC1111', font=font)
- img.save(self.tmpdir.joinpath('%s.png' % name))
-
-
-
|