import numpy as np cimport numpy as np cimport cython from cython.parallel import prange from libc.stdlib cimport malloc, free, calloc cdef import from "math.h": double log(double) nogil cimport openmp cdef import from "omp.h": void omp_set_num_threads(int num_threads) ctypedef struct omp_lock_t: pass import time, os cdef double INFINITY = np.inf cdef double NEG_INFINITY = -np.inf cdef class SortedXYZList: ''' skiplist like structure that ensures inserted elements are always sorted and that removes the smallest entries if capacity is exceeded http://igoro.com/archive/skip-lists-are-fascinating/ http://code.activestate.com/recipes/576930-efficient-running-median-using-an-indexable-skipli/ ''' cdef public: double minimum char isfilled cdef: int maxlevels, capacity, insertion_count int *nodelinks double *nodevalues int *xyz int fill def __init__(self, int capacity): self.fill = 2 # start at position 2 as the head and the tail are allready in lists self.maxlevels = int(1 + log(capacity) / log(2)) self.capacity = capacity + 2 self.clear(True) self.isfilled = 0 @cython.cdivision(True) cdef void insert(SortedXYZList self, double value, int x, int y, int z) nogil: cdef int level, maxlevels, curr, next, new, smallest cdef int offset_curr, offset_smallest, offset_xyz maxlevels = self.maxlevels if self.isfilled: # insert a new element new = self.fill self.fill += 1 if self.fill == self.capacity: smallest = self.nodelinks[0] # the head node points to the smallest element offset_smallest = smallest * maxlevels # update the minimum to the value of the smallest element self.minimum = self.nodevalues[self.nodelinks[offset_smallest]] # check if curent value is even smaller if value < self.minimum: self.minimum = value self.isfilled = 1 # update minimum #~ if value < self.minimum: #~ self.minimum = value else: if value <= self.minimum: # nothing to do - we will not insert a element # that is too small return # replace the smallest element smallest = self.nodelinks[0] # the head node points to the smallest element offset_smallest = smallest * maxlevels # update the minimum to the value of the second-smallest element self.minimum = self.nodevalues[self.nodelinks[offset_smallest]] if value < self.minimum: # the new incoming value is still smaller than the *currently* second smallest self.minimum = value # now update all next-links on head node to the outgoing links of the "node to be removed = the smallest" for level in range(maxlevels): next_on_level = self.nodelinks[offset_smallest + level] if next_on_level > 0: # will be 0 if the "historical" level_for_insert is reached self.nodelinks[level] = next_on_level # clear old entries! self.nodelinks[offset_smallest + level] = 0 else: break # the new node will get the position of the smallest node (in the array) new = smallest # With a probability of 1/2, make the node a part of the lowest-level list only. # With 1/4 probability, the node will be a part of the lowest two lists. # With 1/8 probability, the node will be a part of three lists. And so forth. cdef int level_for_insert, i #~ level_for_insert = min(self.maxlevels, 1 - int(log(random()) / log(2))) for i in range(self.maxlevels - 1, -1, -1): if self.insertion_count % (1 << i) == 0: level_for_insert = i + 1 break self.insertion_count += 1 curr = 0 # the head node self.nodevalues[new] = value # store xyz values offset_xyz = new*3 self.xyz[offset_xyz] = x self.xyz[offset_xyz + 1] = y self.xyz[offset_xyz + 2] = z for level in range(self.maxlevels - 1, -1, -1): offset_curr = curr * maxlevels + level next = self.nodelinks[offset_curr] while value > self.nodevalues[next]: curr = next offset_curr = curr * maxlevels + level next = self.nodelinks[offset_curr] if level < level_for_insert: self.nodelinks[new * maxlevels + level] = self.nodelinks[offset_curr] self.nodelinks[offset_curr] = new def getall(self): ''' debug function''' cdef int curr = 0, next, offset_curr = curr * self.maxlevels next = self.nodelinks[offset_curr] while self.nodevalues[next] < INFINITY: v = self.nodevalues[self.nodelinks[curr * self.maxlevels]] curr = next print v, curr, self.nodelinks[curr * self.maxlevels] next = self.nodelinks[curr * self.maxlevels] @cython.boundscheck(False) def xyzvAsArray(self): '''transform result to sorted np.array''' cdef np.ndarray[np.double_t, ndim=2] result = np.zeros((self.fill - 2, 4), dtype=np.double) cdef int i, offset cdef int curr = 0, next, offset_curr = curr * self.maxlevels next = self.nodelinks[offset_curr] i = 0 while self.nodevalues[next] < INFINITY: offset = self.nodelinks[curr * self.maxlevels] * 3 result[i, 0] = self.xyz[offset] result[i, 1] = self.xyz[offset+1] result[i, 2] = self.xyz[offset+2] result[i, 3] = self.nodevalues[self.nodelinks[curr * self.maxlevels]] curr = next next = self.nodelinks[curr * self.maxlevels] i += 1 return result cdef clear(SortedXYZList self, realloc=False): cdef int i if realloc: self.nodelinks = calloc(self.capacity * self.maxlevels, sizeof(int)) self.nodevalues = calloc(self.capacity, sizeof(double)) self.xyz = calloc(self.capacity * 3, sizeof(int)) else: # only set links to 0 for i in range(self.capacity * self.maxlevels): self.nodelinks[i] = 0 # start at position 2 as the head and the tail are allready in lists self.fill = 2 self.isfilled = 0 # initialize head node for i in range(self.maxlevels): self.nodelinks[0 + i] = 1 # initialize tail node self.nodevalues[1] = INFINITY self.insertion_count = 1 # used for generating probabilities # will be set automatically to valid values if initial loading happens self.minimum = NEG_INFINITY def __dealloc__(SortedXYZList self): free(self.nodelinks) free(self.nodevalues) def getData(n, load_stored, seed): t = time.time() if not load_stored or not os.path.exists('data_%s_%s.npz' % (n, seed)): print 'building data with SEED %s' % seed np.random.seed(seed) values = np.random.random_sample(size=n).astype(np.double) xyz = np.random.randint(256, size=n*3).astype(np.uint8).reshape((n, 3)) np.savez('data_%s_%s.npz' % (n, seed), values=values, xyz=xyz) print 'random + store to ./data_%s_%s.npz: %.3f sec' % (n, seed, time.time() - t) else: npzfile = np.load('data_%s_%s.npz' % (n, seed)) xyz = npzfile['xyz'] values = npzfile['values'] print 'load: %.3f sec' % (time.time() - t) return values, xyz @cython.boundscheck(False) def test(int size, int n, parallel=True, load_stored=False, seed=190180): cdef np.ndarray[np.double_t, ndim=1] values cdef np.ndarray[np.uint8_t, ndim=2] xyz cdef omp_lock_t mylock values, xyz = getData(n, load_stored, seed) cdef int i ll = SortedXYZList(size) t = time.time() if not parallel: for i in range(n): #~ print '%s: inserting %s' % (i, xx[i]) ll.insert(values[i], xyz[i, 0], xyz[i, 1], xyz[i, 2]) pass else: omp_set_num_threads(8) openmp.omp_init_lock(&mylock) # initialize for i in prange(n, nogil=True): if values[i] > ll.minimum: openmp.omp_set_lock(&mylock) # acquire ll.insert(values[i], xyz[i, 0], xyz[i, 1], xyz[i, 2]) openmp.omp_unset_lock(&mylock) # release openmp.omp_destroy_lock(&mylock) # deallocate the lock #~ ll.getall() print 'insert: %.3f secs, %.2fmio per sec' % (time.time() - t, n / 1000.0**2 / (time.time() - t + 0.000000000001)) #~ if seed == 190180 and size == 1e4 and n == 1e8: #~ # do not compare double values #~ res = ll.xyzvAsArray() #~ np.savetxt('result_190180_1e4_1e8.txt', res, fmt='%d %d %d %s') #~ stored_res = np.loadtxt('result_190180_1e4_1e8.txt')[:, :3] #~ assert np.all(np.equal(res[:, :3], stored_res)), 'skiplist result differs'