import math import time import logging import subprocess import sys from threading import Thread, RLock from collections import namedtuple import colorsys import numpy as np from path import path # ================================================== # probably make this a real extension module from distutils.extension import Extension utils_path = path(__file__).parent.parent.joinpath('utils') msvc_compile_args = ['/openmp', '/wd4244', '/O2'] gcc_compile_args = ['-fopenmp'] gcc_link_args=['-fopenmp'] if sys.platform != 'win32': compile_args = gcc_compile_args link_args = gcc_link_args else: compile_args = msvc_compile_args link_args = [] ext_modules = [Extension("utils.accelerated", [str(utils_path.joinpath("accelerated.pyx").abspath())], extra_compile_args=compile_args, extra_link_args=link_args) ] import pyximport; pyximport.install(setup_args={'include_dirs': [np.get_include(), str(utils_path)], 'ext_modules': ext_modules}) from utils import accelerated # ================================================== log = logging.getLogger() def _debug(): """rpdb2 debug starter""" import rpdb2 print 'stopping for embedded rpdb2 debugging...' rpdb2.start_embedded_debugger('debug', timeout=60 * 20) # add to builtins import __builtin__; __builtin__._debug = _debug def extract(Z, shape, position, fill=np.NaN): """ Extract a sub-array from Z using given shape and centered on position. If some part of the sub-array is out of Z bounds, result is padded with fill value. **Parameters** `Z` : array_like Input array. `shape` : tuple Shape of the output array `position` : tuple Position within Z `fill` : scalar Fill value **Returns** `out` : array_like Z slice with given shape and center **Examples** >>> Z = np.arange(0,16).reshape((4,4)) >>> extract(Z, shape=(3,3), position=(0,0)) [[ NaN NaN NaN] [ NaN 0. 1.] [ NaN 4. 5.]] Schema: +-----------+ | 0 0 0 | = extract (Z, shape=(3,3), position=(0,0)) | +---------------+ | 0 | 0 1 | 2 3 | = Z | | | | | 0 | 4 5 | 6 7 | +---|-------+ | | 8 9 10 11 | | | | 12 13 14 15 | +---------------+ >>> Z = np.arange(0,16).reshape((4,4)) >>> extract(Z, shape=(3,3), position=(3,3)) [[ 10. 11. NaN] [ 14. 15. NaN] [ NaN NaN NaN]] Schema: +---------------+ | 0 1 2 3 | = Z | | | 4 5 6 7 | | +-----------+ | 8 9 |10 11 | 0 | = extract (Z, shape=(3,3), position=(3,3)) | | | | | 12 13 |14 15 | 0 | +---------------+ | | 0 0 0 | +-----------+ """ # assert(len(position) == len(Z.shape)) # if len(shape) < len(Z.shape): # shape = shape + Z.shape[len(Z.shape)-len(shape):] R = np.ones(shape, dtype=Z.dtype)*fill P = np.array(list(position)).astype(int) Rs = np.array(list(R.shape)).astype(int) Zs = np.array(list(Z.shape)).astype(int) R_start = np.zeros((len(shape),)).astype(int) R_stop = np.array(list(shape)).astype(int) Z_start = (P-Rs//2) Z_stop = (P+Rs//2) + Rs % 2 R_start = (R_start - np.minimum(Z_start,0)).tolist() Z_start = (np.maximum(Z_start,0)).tolist() R_stop = (R_stop - np.maximum(Z_stop-Zs,0)).tolist() Z_stop = (np.minimum(Z_stop,Zs)).tolist() r = [slice(start,stop) for start,stop in zip(R_start,R_stop)] z = [slice(start,stop) for start,stop in zip(Z_start,Z_stop)] R[r] = Z[z] return R _MATPLOTLIB = None plotLock = RLock() def getPyPlot(): '''for lazy loading matplotlib to decrease startup time''' global _MATPLOTLIB if _MATPLOTLIB is None: t = time.time() import matplotlib matplotlib.use('Agg') from matplotlib import pyplot import pylab from matplotlib import dates from matplotlib import font_manager from matplotlib import ticker _MATPLOTLIB = [pyplot, pylab, dates, font_manager, ticker] log.debug('loaded matplotlib in %.3f sec' % (time.time() - t)) return _MATPLOTLIB def cython_annotate(fname): ''' create cython annotation html (next to fname) ''' #~ f = path(sys.executable).parent.joinpath('Scripts/cython.py') f = path(sys.executable).parent.joinpath('Scripts/cython-script.py') try: subprocess.check_call([sys.executable, f, '-a', fname]) except subprocess.CalledProcessError: log.exception('Error during cython annotation') def daemonthread(target, name=None, autostart=True, args=(), kwargs=None): """Start a thread as daemon thread """ thread = Thread(target=target, name=name, args=args, kwargs=kwargs) thread.setDaemon(True) if autostart: thread.start() return thread MAX_RSSI = 140.0 RSSI_EQ = 90.0 def normalizeLogarithmic(x, max_rssi=MAX_RSSI, rssi_eq=RSSI_EQ): if int(rssi_eq) == 0: return x # we work with positive values rssi_eq = abs(rssi_eq) a = math.exp(math.log(max_rssi)/(100-rssi_eq) - math.log(rssi_eq)/(100-rssi_eq)) b = (rssi_eq * math.log(a) - math.log(rssi_eq)) / math.log(a) if -x > rssi_eq: return -(b + math.log(-x, a)) else: return x def normalizeDeviceAdaption(x, da_values, adaptMin=-150): da_values.insert(0, (adaptMin, -100 - adaptMin)) for left, right in zip(da_values, da_values[1:]): if left[0] <= x < right[0]: diff1 = right[0] - left[0] diff2 = right[0] + right[1] - (left[0] + left[1]) m = diff2 / float(diff1) return left[0] + left[1] + (x - left[0]) * m return x Location = namedtuple('Location', 'id x y z') def loadLocations(f): f = path(f) log.debug('loading locations from %s' % f.abspath()) lines = [l for l in f.lines() if l.strip() and not l.strip().startswith('#')] res = {} for l in lines: i, x, y, z = [float(e.strip()) for e in l.split()] i = int(i) res[i] = Location(i, x, y, z) return res def pos2rgb(i, max, offset=0.0, spread=1.0, saturation=1.0): rgb = list(colorsys.hsv_to_rgb(spread * i / float(max) + offset, saturation, 1.0)) for j in range(3): rgb[j] = int(rgb[j] * 255) return ("#%02x%02x%02x" % tuple(rgb)).upper()