diff --git a/.gitignore b/.gitignore index 7b12180..5c1395e 100644 --- a/.gitignore +++ b/.gitignore @@ -54,7 +54,10 @@ docs/_build/ target/ # other +*.pyx.orig +cython_debug/ bquery/ctable_ext.c +bquery/ctable_ext.cpp bcolz bquery/benchmarks/vb_suite/benchmarks.db bquery/benchmarks/vb_suite/source \ No newline at end of file diff --git a/bquery/ctable.py b/bquery/ctable.py index 804edf5..0afa1f5 100644 --- a/bquery/ctable.py +++ b/bquery/ctable.py @@ -59,11 +59,8 @@ def cache_factor(self, col_list, refresh=False): col_factor_rootdir = col_rootdir + '.factor' col_values_rootdir = col_rootdir + '.values' - carray_factor = \ - bcolz.carray([], dtype='int64', expectedlen=self.size, - rootdir=col_factor_rootdir, mode='w') - _, values = \ - ctable_ext.factorize(self[col], labels=carray_factor) + carray_factor, values = \ + ctable_ext.factorize(self[col], rootdir=col_factor_rootdir) carray_factor.flush() carray_values = \ @@ -194,7 +191,6 @@ def factorize_groupby_cols(self, groupby_cols): return factor_list, values_list - @staticmethod def make_group_index(self, factor_list, values_list, groupby_cols, array_length, bool_arr): # create unique groups for groupby loop diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index 1e4becb..cb3cc04 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -1,15 +1,25 @@ +cimport cython +from cython.parallel import parallel, prange, threadid +from openmp cimport * + import numpy as np -import cython -from numpy cimport ndarray, dtype, npy_intp, npy_int32, npy_uint64, npy_int64, npy_float64 -from libc.stdlib cimport malloc -from libc.string cimport strcpy +from numpy cimport (ndarray, dtype, npy_intp, npy_uint8, npy_int32, npy_uint64, + npy_int64, npy_float64, uint64_t) + +from libc.stdint cimport uintptr_t +from libc.stdlib cimport malloc, calloc, free +from libc.string cimport strcpy, memcpy, memset +from libcpp.vector cimport vector + from khash cimport * -from bcolz.carray_ext cimport carray, chunk +import bcolz as bz +from bcolz.carray_ext cimport carray, chunks, chunk + +include "parallel_processing.pxi" # ---------------------------------------------------------------------------- # GLOBAL DEFINITIONS # ---------------------------------------------------------------------------- - SUM = 0 DEF _SUM = 0 @@ -24,101 +34,173 @@ DEF _COUNT_DISTINCT = 3 SORTED_COUNT_DISTINCT = 4 DEF _SORTED_COUNT_DISTINCT = 4 -# ---------------------------------------------------------------------------- -# Factorize Section + +# ---------------------------------------------------------------------------- +# FACTORIZE SECTION +# ---------------------------------------------------------------------------- @cython.wraparound(False) @cython.boundscheck(False) cdef void _factorize_str_helper(Py_ssize_t iter_range, Py_ssize_t allocation_size, - ndarray in_buffer, - ndarray[npy_uint64] out_buffer, + char * in_buffer_ptr, + uint64_t * out_buffer, kh_str_t *table, Py_ssize_t * count, - dict reverse, - ): + unsigned int thread_id, + omp_lock_t kh_locks[], + unsigned int num_threads, + ) nogil: cdef: Py_ssize_t i, idx - int ret + unsigned int j + khiter_t k + char * element char * insert - khiter_t k - ret = 0 + int ret = 0 + Py_ssize_t itemsize = allocation_size - 1 + + # allocate enough memory to hold the string element, add one for the + # null byte that marks the end of the string. + # TODO: understand why zero-filling is necessary. Without zero-filling + # the buffer, duplicate keys occur in the reverse dict + element = calloc(allocation_size, sizeof(char)) + + # lock ensures that table is in consistend state during access + omp_set_lock(&kh_locks[thread_id]) + for i in xrange(iter_range): + # appends null character to element string + memcpy(element, in_buffer_ptr, itemsize) + in_buffer_ptr += itemsize - for i in range(iter_range): - # TODO: Consider indexing directly into the array for efficiency - element = in_buffer[i] k = kh_get_str(table, element) if k != table.n_buckets: idx = table.vals[k] else: - # allocate enough memory to hold the string, add one for the - # null byte that marks the end of the string. + omp_unset_lock(&kh_locks[thread_id]) insert = malloc(allocation_size) # TODO: is strcpy really the best way to copy a string? strcpy(insert, element) - k = kh_put_str(table, insert, &ret) - table.vals[k] = idx = count[0] - reverse[count[0]] = element - count[0] += 1 + + # acquire locks for all threads to avoid inconsistent state + for j in xrange(num_threads): + omp_set_lock(&kh_locks[j]) + # check whether another thread has already added the entry by now + k = kh_get_str(table, element) + if k != table.n_buckets: + idx = table.vals[k] + # if not, add element + else: + k = kh_put_str(table, insert, &ret) + table.vals[k] = idx = count[0] + count[0] += 1 + # release all locks + for j in xrange(num_threads): + omp_unset_lock(&kh_locks[j]) + # acquire our own lock again, to indicate we are reading the table + omp_set_lock(&kh_locks[thread_id]) out_buffer[i] = idx + omp_unset_lock(&kh_locks[thread_id]) + free(element) + @cython.wraparound(False) @cython.boundscheck(False) -def factorize_str(carray carray_, carray labels=None): +def factorize_str(carray carray_, unsigned int num_threads_requested = 0, **kwargs): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements - dict reverse + Py_ssize_t i, blocklen, nblocks + khint_t j + carray labels ndarray in_buffer ndarray[npy_uint64] out_buffer + unsigned int thread_id + par_info_t par_info + kh_str_t *table + char * out_buffer_org_ptr + char * in_buffer_ptr + uint64_t * out_buffer_ptr - count = 0 - ret = 0 - reverse = {} + Py_ssize_t count = 0 + dict reverse = {} - n = len(carray_) - chunklen = carray_.chunklen - if labels is None: - labels = carray([], dtype='int64', expectedlen=n) - # in-buffer isn't typed, because cython doesn't support string arrays (?) - out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype=carray_.dtype) - table = kh_init_str() + bint first_thread = True + Py_ssize_t element_allocation_size = carray_.dtype.itemsize + 1 + Py_ssize_t carray_chunklen = carray_.chunklen - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_str_helper(chunklen, - carray_.dtype.itemsize + 1, - in_buffer, - out_buffer, - table, - &count, - reverse, - ) - # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_str_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + labels = create_labels_carray(carray_, **kwargs) - kh_destroy_str(table) + out_buffer = np.empty(labels.chunklen, dtype='uint64') + out_buffer_org_ptr = out_buffer.data + table = kh_init_str() + + block_iterator = bz.iterblocks(carray_, blen=labels.chunklen) + nblocks = (len(carray_)/labels.chunklen+0.5)+1 + + with nogil, parallel(num_threads=num_threads_requested): + # Initialise some parallel processing stuff + omp_start_critical() + if first_thread == True: + (&first_thread)[0] = False + with gil: + par_initialise(&par_info, carray_) + omp_end_critical() + + # allocate thread-local in- and out-buffers + with gil: + in_buffer_ptr = par_allocate_buffer(labels.chunklen, carray_.dtype.itemsize) + out_buffer_ptr = par_allocate_buffer(labels.chunklen, labels.dtype.itemsize) + + # factorise the chunks in parallel + for i in prange(0, nblocks, schedule='dynamic', chunksize=1): + thread_id = threadid() + + omp_set_lock(&par_info.chunk_lock) + with gil: + in_buffer = np.ascontiguousarray(block_iterator.next()) + blocklen = len(in_buffer) + memcpy(in_buffer_ptr, in_buffer.data, in_buffer.nbytes) + omp_unset_lock(&par_info.chunk_lock) + + _factorize_str_helper(blocklen, + element_allocation_size, + in_buffer_ptr, + out_buffer_ptr, + table, + &count, + thread_id, + par_info.kh_locks, + par_info.num_threads, + ) + + # compress out_buffer into labels + omp_set_lock(&par_info.out_buffer_lock) + with gil: + out_buffer.data = out_buffer_ptr + par_save_block(i, labels, out_buffer, blocklen, nblocks) + omp_unset_lock(&par_info.out_buffer_lock) + + # Clean-up thread local variables + free(in_buffer_ptr) + free(out_buffer_ptr) + + # Clean-up some parallel processing stuff + par_terminate(par_info) + + # restore out_buffer data pointer to allow python to free the object's data + out_buffer.data = out_buffer_org_ptr + + # construct python dict from vectors and free element memory + for j in range(table.n_buckets): + if not kh_exist_str(table, j): + continue + reverse[table.vals[j]] = table.keys[j] + free(table.keys[j]) + kh_destroy_str(table) + return labels, reverse @cython.wraparound(False) @@ -140,6 +222,7 @@ cdef void _factorize_int64_helper(Py_ssize_t iter_range, ret = 0 for i in range(iter_range): + # TODO: Consider indexing directly into the array for efficiency element = in_buffer[i] k = kh_get_int64(table, element) if k != table.n_buckets: @@ -155,8 +238,7 @@ cdef void _factorize_int64_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_int64(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer dict reverse ndarray[npy_int64] in_buffer ndarray[npy_uint64] out_buffer @@ -166,19 +248,17 @@ def factorize_int64(carray carray_, carray labels=None): ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) + # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype='int64') table = kh_init_int64() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_int64_helper(chunklen, + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + _factorize_int64_helper(len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, out_buffer, @@ -187,21 +267,7 @@ def factorize_int64(carray carray_, carray labels=None): reverse, ) # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_int64_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_int64(table) @@ -226,6 +292,7 @@ cdef void _factorize_int32_helper(Py_ssize_t iter_range, ret = 0 for i in range(iter_range): + # TODO: Consider indexing directly into the array for efficiency element = in_buffer[i] k = kh_get_int32(table, element) if k != table.n_buckets: @@ -241,8 +308,7 @@ cdef void _factorize_int32_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_int32(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer dict reverse ndarray[npy_int32] in_buffer ndarray[npy_uint64] out_buffer @@ -252,20 +318,17 @@ def factorize_int32(carray carray_, carray labels=None): ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype='int32') table = kh_init_int32() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_int32_helper(chunklen, + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + _factorize_int32_helper(len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, out_buffer, @@ -274,21 +337,7 @@ def factorize_int32(carray carray_, carray labels=None): reverse, ) # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_int32_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_int32(table) @@ -329,8 +378,7 @@ cdef void _factorize_float64_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_float64(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer dict reverse ndarray[npy_float64] in_buffer ndarray[npy_uint64] out_buffer @@ -340,20 +388,17 @@ def factorize_float64(carray carray_, carray labels=None): ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype='float64') table = kh_init_float64() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_float64_helper(chunklen, + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + _factorize_float64_helper(len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, out_buffer, @@ -362,36 +407,22 @@ def factorize_float64(carray carray_, carray labels=None): reverse, ) # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_float64_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_float64(table) return labels, reverse -cpdef factorize(carray carray_, carray labels=None): +def factorize(carray carray_, **kwargs): if carray_.dtype == 'int32': - labels, reverse = factorize_int32(carray_, labels=labels) + labels, reverse = factorize_int32(carray_, **kwargs) elif carray_.dtype == 'int64': - labels, reverse = factorize_int64(carray_, labels=labels) + labels, reverse = factorize_int64(carray_, **kwargs) elif carray_.dtype == 'float64': - labels, reverse = factorize_float64(carray_, labels=labels) + labels, reverse = factorize_float64(carray_, **kwargs) else: #TODO: check that the input is a string_ dtype type - labels, reverse = factorize_str(carray_, labels=labels) + labels, reverse = factorize_str(carray_, **kwargs) return labels, reverse # --------------------------------------------------------------------------- @@ -400,32 +431,20 @@ cpdef factorize(carray carray_, carray labels=None): @cython.boundscheck(False) cpdef translate_int64(carray input_, carray output_, dict lookup, npy_int64 default=-1): cdef: - chunk chunk_ - Py_ssize_t i, chunklen, leftover_elements + Py_ssize_t chunklen, leftover_elements, len_in_buffer ndarray[npy_int64] in_buffer ndarray[npy_int64] out_buffer chunklen = input_.chunklen out_buffer = np.empty(chunklen, dtype='int64') - in_buffer = np.empty(chunklen, dtype='int64') - for i in range(input_.nchunks): - chunk_ = input_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - for i in range(chunklen): + for in_buffer in bz.iterblocks(input_): + len_in_buffer = len(in_buffer) + for i in range(len_in_buffer): element = in_buffer[i] out_buffer[i] = lookup.get(element, default) # compress out_buffer into labels - output_.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(input_.leftover, input_.atomsize) - if leftover_elements > 0: - in_buffer = input_.leftover_array - for i in range(leftover_elements): - element = in_buffer[i] - out_buffer[i] = lookup.get(element, default) - output_.append(out_buffer[:leftover_elements].astype(np.int64)) + output_.append(out_buffer[:len_in_buffer].astype(np.int64)) # --------------------------------------------------------------------------- # Aggregation Section (old) @@ -458,7 +477,7 @@ def agg_sum(iter_): @cython.wraparound(False) def groupsort_indexer(carray index, Py_ssize_t ngroups): cdef: - Py_ssize_t i, label, n + Py_ssize_t i, label, n, len_in_buffer ndarray[int64_t] counts, where, np_result # -- carray c_result @@ -475,22 +494,10 @@ def groupsort_indexer(carray index, Py_ssize_t ngroups): counts = np.zeros(ngroups + 1, dtype=np.int64) n = len(index) - for index_chunk_nr in range(index.nchunks): - # fill input buffer - input_chunk = index.chunks[index_chunk_nr] - input_chunk._getitem(0, index_chunk_len, in_buffer.data) - + for in_buffer in bz.iterblocks(index): + len_in_buffer = len(in_buffer) # loop through rows - for i in range(index_chunk_len): - counts[index[i] + 1] += 1 - - leftover_elements = cython.cdiv(index.leftover, index.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = index.leftover_array - - # loop through rows - for i in range(leftover_elements): + for i in range(len_in_buffer): counts[index[i] + 1] += 1 # mark the start of each contiguous group of like-indexed data @@ -589,12 +596,11 @@ cdef count_unique_int32(ndarray[int32_t] values): @cython.wraparound(False) @cython.boundscheck(False) cdef sum_float64(carray ca_input, carray ca_factor, - Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): + Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t current_index, i, j, end_counts, start_counts ndarray[npy_float64] in_buffer ndarray[npy_int64] factor_buffer @@ -605,10 +611,10 @@ cdef sum_float64(carray ca_input, carray ca_factor, bint count_distinct_started = 0 carray num_uniques - count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) if agg_method == _COUNT_DISTINCT: num_uniques = carray([], dtype='int64') @@ -625,36 +631,22 @@ cdef sum_float64(carray ca_input, carray ca_factor, return num_uniques - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype='float64') - factor_chunk_len = ca_factor.chunklen - factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='float64') - for input_chunk_nr in range(ca_input.nchunks): - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) # loop through rows - for i in range(input_chunk_len): + for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index @@ -668,53 +660,7 @@ cdef sum_float64(carray ca_input, carray ca_factor, elif agg_method == _COUNT: out_buffer[current_index] += 1 elif agg_method == _COUNT_NA: - v = in_buffer[i] - if v == v: # skip NA values - out_buffer[current_index] += 1 - elif agg_method == _SORTED_COUNT_DISTINCT: - v = in_buffer[i] - if not count_distinct_started: - count_distinct_started = 1 - last_values = np.zeros(nr_groups, dtype='float64') - last_values[0] = v - out_buffer[0] = 1 - else: - if v != last_values[current_index]: - out_buffer[current_index] += 1 - - last_values[current_index] = v - else: - raise NotImplementedError('sumtype not supported') - - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = ca_input.leftover_array - - # loop through rows - for i in range(leftover_elements): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - # update value if it's not an invalid index - if current_index != skip_key: - if agg_method == _SUM: - out_buffer[current_index] += in_buffer[i] - elif agg_method == _COUNT: - out_buffer[current_index] += 1 - elif agg_method == _COUNT_NA: v = in_buffer[i] if v == v: # skip NA values out_buffer[current_index] += 1 @@ -744,10 +690,9 @@ cdef sum_float64(carray ca_input, carray ca_factor, cdef sum_int32(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t current_index, i, j, end_counts, start_counts ndarray[npy_int32] in_buffer ndarray[npy_int64] factor_buffer @@ -761,6 +706,7 @@ cdef sum_int32(carray ca_input, carray ca_factor, count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) if agg_method == _COUNT_DISTINCT: num_uniques = carray([], dtype='int64') @@ -777,36 +723,22 @@ cdef sum_int32(carray ca_input, carray ca_factor, return num_uniques - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype='int32') - factor_chunk_len = ca_factor.chunklen - factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='int32') - for input_chunk_nr in range(ca_input.nchunks): - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) # loop through rows - for i in range(input_chunk_len): + for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index @@ -820,52 +752,7 @@ cdef sum_int32(carray ca_input, carray ca_factor, elif agg_method == _COUNT: out_buffer[current_index] += 1 elif agg_method == _COUNT_NA: - # TODO: Warning: int does not support NA values, is this what we need? - out_buffer[current_index] += 1 - elif agg_method == _SORTED_COUNT_DISTINCT: - v = in_buffer[i] - if not count_distinct_started: - count_distinct_started = 1 - last_values = np.zeros(nr_groups, dtype='int32') - last_values[0] = v - out_buffer[0] = 1 - else: - if v != last_values[current_index]: - out_buffer[current_index] += 1 - - last_values[current_index] = v - else: - raise NotImplementedError('sumtype not supported') - - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = ca_input.leftover_array - - # loop through rows - for i in range(leftover_elements): - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - if agg_method == _SUM: - out_buffer[current_index] += in_buffer[i] - elif agg_method == _COUNT: - out_buffer[current_index] += 1 - elif agg_method == _COUNT_NA: # TODO: Warning: int does not support NA values, is this what we need? out_buffer[current_index] += 1 elif agg_method == _SORTED_COUNT_DISTINCT: @@ -894,10 +781,9 @@ cdef sum_int32(carray ca_input, carray ca_factor, cdef sum_int64(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t current_index, i, j, end_counts, start_counts ndarray[npy_int64] in_buffer ndarray[npy_int64] factor_buffer @@ -911,6 +797,7 @@ cdef sum_int64(carray ca_input, carray ca_factor, count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) if agg_method == _COUNT_DISTINCT: num_uniques = carray([], dtype='int64') @@ -927,36 +814,22 @@ cdef sum_int64(carray ca_input, carray ca_factor, return num_uniques - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype='int64') - factor_chunk_len = ca_factor.chunklen - factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='int64') - for input_chunk_nr in range(ca_input.nchunks): - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) # loop through rows - for i in range(input_chunk_len): + for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index @@ -970,52 +843,7 @@ cdef sum_int64(carray ca_input, carray ca_factor, elif agg_method == _COUNT: out_buffer[current_index] += 1 elif agg_method == _COUNT_NA: - # TODO: Warning: int does not support NA values, is this what we need? - out_buffer[current_index] += 1 - elif agg_method == _SORTED_COUNT_DISTINCT: - v = in_buffer[i] - if not count_distinct_started: - count_distinct_started = 1 - last_values = np.zeros(nr_groups, dtype='int64') - last_values[0] = v - out_buffer[0] = 1 - else: - if v != last_values[current_index]: - out_buffer[current_index] += 1 - last_values[current_index] = v - else: - raise NotImplementedError('sumtype not supported') - - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = ca_input.leftover_array - - # loop through rows - for i in range(leftover_elements): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - if agg_method == _SUM: - out_buffer[current_index] += in_buffer[i] - elif agg_method == _COUNT: - out_buffer[current_index] += 1 - elif agg_method == _COUNT_NA: # TODO: Warning: int does not support NA values, is this what we need? out_buffer[current_index] += 1 elif agg_method == _SORTED_COUNT_DISTINCT: @@ -1043,10 +871,9 @@ cdef sum_int64(carray ca_input, carray ca_factor, @cython.boundscheck(False) cdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t current_index, i, factor_total_chunks ndarray in_buffer ndarray[npy_int64] factor_buffer @@ -1055,61 +882,25 @@ cdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_s count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) + - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype=ca_input.dtype) - factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype=ca_input.dtype) - for input_chunk_nr in range(ca_input.nchunks): - - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) - - for i in range(input_chunk_len): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - out_buffer[current_index] = in_buffer[i] - - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - in_buffer = ca_input.leftover_array + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) - for i in range(leftover_elements): + for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index diff --git a/bquery/khash.h b/bquery/khash.h index c0421c4..4350ff0 100644 --- a/bquery/khash.h +++ b/bquery/khash.h @@ -33,6 +33,7 @@ int main() { khiter_t k; khash_t(32) *h = kh_init(32); k = kh_put(32, h, 5, &ret); + if (!ret) kh_del(32, h, k); kh_value(h, k) = 10; k = kh_get(32, h, 10); is_missing = (k == kh_end(h)); @@ -46,23 +47,6 @@ int main() { */ /* - 2013-05-02 (0.2.8): - - * Use quadratic probing. When the capacity is power of 2, stepping function - i*(i+1)/2 guarantees to traverse each bucket. It is better than double - hashing on cache performance and is more robust than linear probing. - - In theory, double hashing should be more robust than quadratic probing. - However, my implementation is probably not for large hash tables, because - the second hash function is closely tied to the first hash function, - which reduce the effectiveness of double hashing. - - Reference: http://research.cs.vt.edu/AVresearch/hashing/quadratic.php - - 2011-12-29 (0.2.7): - - * Minor code clean up; no actual effect. - 2011-09-16 (0.2.6): * The capacity is a power of 2. This seems to dramatically improve the @@ -116,7 +100,6 @@ int main() { #ifndef __AC_KHASH_H #define __AC_KHASH_H -#endif /*! @header @@ -124,13 +107,12 @@ int main() { Generic hash table library. */ -#define AC_VERSION_KHASH_H "0.2.8" +#define AC_VERSION_KHASH_H "0.2.6" #include #include #include -/* compiler specific configuration */ #if UINT_MAX == 0xffffffffu typedef unsigned int khint32_t; @@ -146,13 +128,20 @@ typedef unsigned long long khuint64_t; typedef signed long long khint64_t; #endif +typedef double khfloat64_t; -#ifdef _MSC_VER -#define kh_inline __inline -#else -#define kh_inline inline +#ifndef PANDAS_INLINE + #if defined(__GNUC__) + #define PANDAS_INLINE __inline__ + #elif defined(_MSC_VER) + #define PANDAS_INLINE __inline + #elif defined (__STDC_VERSION__) && __STDC_VERSION__ >= 199901L + #define PANDAS_INLINE inline + #else + #define PANDAS_INLINE + #endif +#endif -typedef double khfloat64_t; typedef khint32_t khint_t; typedef khint_t khiter_t; @@ -165,54 +154,51 @@ typedef khint_t khiter_t; #define __ac_set_isboth_false(flag, i) __ac_set_isempty_false(flag, i) #define __ac_set_isdel_true(flag, i) (0) +#ifdef KHASH_LINEAR +#define __ac_inc(k, m) 1 +#else +#define __ac_inc(k, m) (((k)>>3 ^ (k)<<3) | 1) & (m) +#endif + #define __ac_fsize(m) ((m) < 32? 1 : (m)>>5) #ifndef kroundup32 #define kroundup32(x) (--(x), (x)|=(x)>>1, (x)|=(x)>>2, (x)|=(x)>>4, (x)|=(x)>>8, (x)|=(x)>>16, ++(x)) #endif -#ifndef kcalloc -#define kcalloc(N,Z) calloc(N,Z) -#endif -#ifndef kmalloc -#define kmalloc(Z) malloc(Z) -#endif -#ifndef krealloc -#define krealloc(P,Z) realloc(P,Z) -#endif -#ifndef kfree -#define kfree(P) free(P) -#endif - static const double __ac_HASH_UPPER = 0.77; -#define __KHASH_TYPE(name, khkey_t, khval_t) \ - typedef struct { \ - khint_t n_buckets, size, n_occupied, upper_bound; \ - khint32_t *flags; \ - khkey_t *keys; \ - khval_t *vals; \ - } kh_##name##_t; - -#define __KHASH_PROTOTYPES(name, khkey_t, khval_t) \ - extern kh_##name##_t *kh_init_##name(void); \ +#define KHASH_DECLARE(name, khkey_t, khval_t) \ + typedef struct { \ + khint_t n_buckets, size, n_occupied, upper_bound; \ + khint32_t *flags; \ + khkey_t *keys; \ + khval_t *vals; \ + } kh_##name##_t; \ + extern kh_##name##_t *kh_init_##name(); \ extern void kh_destroy_##name(kh_##name##_t *h); \ extern void kh_clear_##name(kh_##name##_t *h); \ extern khint_t kh_get_##name(const kh_##name##_t *h, khkey_t key); \ - extern int kh_resize_##name(kh_##name##_t *h, khint_t new_n_buckets); \ + extern void kh_resize_##name(kh_##name##_t *h, khint_t new_n_buckets); \ extern khint_t kh_put_##name(kh_##name##_t *h, khkey_t key, int *ret); \ extern void kh_del_##name(kh_##name##_t *h, khint_t x); -#define __KHASH_IMPL(name, SCOPE, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal) \ - SCOPE kh_##name##_t *kh_init_##name(void) { \ - return (kh_##name##_t*)kcalloc(1, sizeof(kh_##name##_t)); \ +#define KHASH_INIT2(name, SCOPE, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal) \ + typedef struct { \ + khint_t n_buckets, size, n_occupied, upper_bound; \ + khint32_t *flags; \ + khkey_t *keys; \ + khval_t *vals; \ + } kh_##name##_t; \ + SCOPE kh_##name##_t *kh_init_##name(void) { \ + return (kh_##name##_t*)calloc(1, sizeof(kh_##name##_t)); \ } \ SCOPE void kh_destroy_##name(kh_##name##_t *h) \ { \ if (h) { \ - kfree((void *)h->keys); kfree(h->flags); \ - kfree((void *)h->vals); \ - kfree(h); \ + free(h->keys); free(h->flags); \ + free(h->vals); \ + free(h); \ } \ } \ SCOPE void kh_clear_##name(kh_##name##_t *h) \ @@ -225,19 +211,19 @@ static const double __ac_HASH_UPPER = 0.77; SCOPE khint_t kh_get_##name(const kh_##name##_t *h, khkey_t key) \ { \ if (h->n_buckets) { \ - khint_t k, i, last, mask, step = 0; \ + khint_t inc, k, i, last, mask; \ mask = h->n_buckets - 1; \ k = __hash_func(key); i = k & mask; \ - last = i; \ + inc = __ac_inc(k, mask); last = i; /* inc==1 for linear probing */ \ while (!__ac_isempty(h->flags, i) && (__ac_isdel(h->flags, i) || !__hash_equal(h->keys[i], key))) { \ - i = (i + (++step)) & mask; \ + i = (i + inc) & mask; \ if (i == last) return h->n_buckets; \ } \ return __ac_iseither(h->flags, i)? h->n_buckets : i; \ } else return 0; \ } \ - SCOPE int kh_resize_##name(kh_##name##_t *h, khint_t new_n_buckets) \ - { /* This function uses 0.25*n_buckets bytes of working space instead of [sizeof(key_t+val_t)+.25]*n_buckets. */ \ + SCOPE void kh_resize_##name(kh_##name##_t *h, khint_t new_n_buckets) \ + { /* This function uses 0.25*n_bucktes bytes of working space instead of [sizeof(key_t+val_t)+.25]*n_buckets. */ \ khint32_t *new_flags = 0; \ khint_t j = 1; \ { \ @@ -245,18 +231,11 @@ static const double __ac_HASH_UPPER = 0.77; if (new_n_buckets < 4) new_n_buckets = 4; \ if (h->size >= (khint_t)(new_n_buckets * __ac_HASH_UPPER + 0.5)) j = 0; /* requested size is too small */ \ else { /* hash table size to be changed (shrink or expand); rehash */ \ - new_flags = (khint32_t*)kmalloc(__ac_fsize(new_n_buckets) * sizeof(khint32_t)); \ - if (!new_flags) return -1; \ + new_flags = (khint32_t*)malloc(__ac_fsize(new_n_buckets) * sizeof(khint32_t)); \ memset(new_flags, 0xff, __ac_fsize(new_n_buckets) * sizeof(khint32_t)); \ if (h->n_buckets < new_n_buckets) { /* expand */ \ - khkey_t *new_keys = (khkey_t*)krealloc((void *)h->keys, new_n_buckets * sizeof(khkey_t)); \ - if (!new_keys) { kfree(new_flags); return -1; } \ - h->keys = new_keys; \ - if (kh_is_map) { \ - khval_t *new_vals = (khval_t*)krealloc((void *)h->vals, new_n_buckets * sizeof(khval_t)); \ - if (!new_vals) { kfree(new_flags); return -1; } \ - h->vals = new_vals; \ - } \ + h->keys = (khkey_t*)realloc(h->keys, new_n_buckets * sizeof(khkey_t)); \ + if (kh_is_map) h->vals = (khval_t*)realloc(h->vals, new_n_buckets * sizeof(khval_t)); \ } /* otherwise shrink */ \ } \ } \ @@ -270,10 +249,11 @@ static const double __ac_HASH_UPPER = 0.77; if (kh_is_map) val = h->vals[j]; \ __ac_set_isempty_true(h->flags, j); \ while (1) { /* kick-out process; sort of like in Cuckoo hashing */ \ - khint_t k, i, step = 0; \ + khint_t inc, k, i; \ k = __hash_func(key); \ i = k & new_mask; \ - while (!__ac_isempty(new_flags, i)) i = (i + (++step)) & new_mask; \ + inc = __ac_inc(k, new_mask); \ + while (!__ac_isempty(new_flags, i)) i = (i + inc) & new_mask; \ __ac_set_isempty_false(new_flags, i); \ if (i < h->n_buckets && __ac_iseither(h->flags, i) == 0) { /* kick out the existing element */ \ { khkey_t tmp = h->keys[i]; h->keys[i] = key; key = tmp; } \ @@ -288,38 +268,32 @@ static const double __ac_HASH_UPPER = 0.77; } \ } \ if (h->n_buckets > new_n_buckets) { /* shrink the hash table */ \ - h->keys = (khkey_t*)krealloc((void *)h->keys, new_n_buckets * sizeof(khkey_t)); \ - if (kh_is_map) h->vals = (khval_t*)krealloc((void *)h->vals, new_n_buckets * sizeof(khval_t)); \ + h->keys = (khkey_t*)realloc(h->keys, new_n_buckets * sizeof(khkey_t)); \ + if (kh_is_map) h->vals = (khval_t*)realloc(h->vals, new_n_buckets * sizeof(khval_t)); \ } \ - kfree(h->flags); /* free the working space */ \ + free(h->flags); /* free the working space */ \ h->flags = new_flags; \ h->n_buckets = new_n_buckets; \ h->n_occupied = h->size; \ h->upper_bound = (khint_t)(h->n_buckets * __ac_HASH_UPPER + 0.5); \ } \ - return 0; \ } \ SCOPE khint_t kh_put_##name(kh_##name##_t *h, khkey_t key, int *ret) \ { \ khint_t x; \ if (h->n_occupied >= h->upper_bound) { /* update the hash table */ \ - if (h->n_buckets > (h->size<<1)) { \ - if (kh_resize_##name(h, h->n_buckets - 1) < 0) { /* clear "deleted" elements */ \ - *ret = -1; return h->n_buckets; \ - } \ - } else if (kh_resize_##name(h, h->n_buckets + 1) < 0) { /* expand the hash table */ \ - *ret = -1; return h->n_buckets; \ - } \ + if (h->n_buckets > (h->size<<1)) kh_resize_##name(h, h->n_buckets - 1); /* clear "deleted" elements */ \ + else kh_resize_##name(h, h->n_buckets + 1); /* expand the hash table */ \ } /* TODO: to implement automatically shrinking; resize() already support shrinking */ \ { \ - khint_t k, i, site, last, mask = h->n_buckets - 1, step = 0; \ + khint_t inc, k, i, site, last, mask = h->n_buckets - 1; \ x = site = h->n_buckets; k = __hash_func(key); i = k & mask; \ if (__ac_isempty(h->flags, i)) x = i; /* for speed up */ \ else { \ - last = i; \ + inc = __ac_inc(k, mask); last = i; \ while (!__ac_isempty(h->flags, i) && (__ac_isdel(h->flags, i) || !__hash_equal(h->keys[i], key))) { \ if (__ac_isdel(h->flags, i)) site = i; \ - i = (i + (++step)) & mask; \ + i = (i + inc) & mask; \ if (i == last) { x = site; break; } \ } \ if (x == h->n_buckets) { \ @@ -349,16 +323,8 @@ static const double __ac_HASH_UPPER = 0.77; } \ } -#define KHASH_DECLARE(name, khkey_t, khval_t) \ - __KHASH_TYPE(name, khkey_t, khval_t) \ - __KHASH_PROTOTYPES(name, khkey_t, khval_t) - -#define KHASH_INIT2(name, SCOPE, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal) \ - __KHASH_TYPE(name, khkey_t, khval_t) \ - __KHASH_IMPL(name, SCOPE, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal) - #define KHASH_INIT(name, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal) \ - KHASH_INIT2(name, static kh_inline, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal) + KHASH_INIT2(name, static PANDAS_INLINE, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal) /* --- BEGIN OF HASH FUNCTIONS --- */ @@ -388,10 +354,10 @@ static const double __ac_HASH_UPPER = 0.77; @param s Pointer to a null terminated string @return The hash value */ -static kh_inline khint_t __ac_X31_hash_string(const char *s) +static PANDAS_INLINE khint_t __ac_X31_hash_string(const char *s) { - khint_t h = (khint_t)*s; - if (h) for (++s ; *s; ++s) h = (h << 5) - h + (khint_t)*s; + khint_t h = *s; + if (h) for (++s ; *s; ++s) h = (h << 5) - h + *s; return h; } /*! @function @@ -405,7 +371,7 @@ static kh_inline khint_t __ac_X31_hash_string(const char *s) */ #define kh_str_hash_equal(a, b) (strcmp(a, b) == 0) -static kh_inline khint_t __ac_Wang_hash(khint_t key) +static PANDAS_INLINE khint_t __ac_Wang_hash(khint_t key) { key += ~(key << 15); key ^= (key >> 10); @@ -461,8 +427,7 @@ static kh_inline khint_t __ac_Wang_hash(khint_t key) @param name Name of the hash table [symbol] @param h Pointer to the hash table [khash_t(name)*] @param k Key [type of keys] - @param r Extra return code: -1 if the operation failed; - 0 if the key is present in the hash table; + @param r Extra return code: 0 if the key is present in the hash table; 1 if the bucket is empty (never used); 2 if the element in the bucket has been deleted [int*] @return Iterator to the inserted element [khint_t] @@ -474,7 +439,7 @@ static kh_inline khint_t __ac_Wang_hash(khint_t key) @param name Name of the hash table [symbol] @param h Pointer to the hash table [khash_t(name)*] @param k Key [type of keys] - @return Iterator to the found element, or kh_end(h) if the element is absent [khint_t] + @return Iterator to the found element, or kh_end(h) is the element is absent [khint_t] */ #define kh_get(name, h, k) kh_get_##name(h, k) @@ -544,34 +509,6 @@ static kh_inline khint_t __ac_Wang_hash(khint_t key) */ #define kh_n_buckets(h) ((h)->n_buckets) -/*! @function - @abstract Iterate over the entries in the hash table - @param h Pointer to the hash table [khash_t(name)*] - @param kvar Variable to which key will be assigned - @param vvar Variable to which value will be assigned - @param code Block of code to execute - */ -#define kh_foreach(h, kvar, vvar, code) { khint_t __i; \ - for (__i = kh_begin(h); __i != kh_end(h); ++__i) { \ - if (!kh_exist(h,__i)) continue; \ - (kvar) = kh_key(h,__i); \ - (vvar) = kh_val(h,__i); \ - code; \ - } } - -/*! @function - @abstract Iterate over the values in the hash table - @param h Pointer to the hash table [khash_t(name)*] - @param vvar Variable to which value will be assigned - @param code Block of code to execute - */ -#define kh_foreach_value(h, vvar, code) { khint_t __i; \ - for (__i = kh_begin(h); __i != kh_end(h); ++__i) { \ - if (!kh_exist(h,__i)) continue; \ - (vvar) = kh_val(h,__i); \ - code; \ - } } - /* More conenient interfaces */ /*! @function diff --git a/bquery/khash.pxd b/bquery/khash.pxd index a8fd51a..5e855c7 100644 --- a/bquery/khash.pxd +++ b/bquery/khash.pxd @@ -48,9 +48,9 @@ cdef extern from "khash_python.h": inline kh_str_t* kh_init_str() inline void kh_destroy_str(kh_str_t*) inline void kh_clear_str(kh_str_t*) - inline khint_t kh_get_str(kh_str_t*, kh_cstr_t) + inline khint_t kh_get_str(kh_str_t*, kh_cstr_t) nogil inline void kh_resize_str(kh_str_t*, khint_t) - inline khint_t kh_put_str(kh_str_t*, kh_cstr_t, int*) + inline khint_t kh_put_str(kh_str_t*, kh_cstr_t, int*) nogil inline void kh_del_str(kh_str_t*, khint_t) bint kh_exist_str(kh_str_t*, khiter_t) diff --git a/bquery/khash_python.h b/bquery/khash_python.h index b5b0de2..cdd94b5 100644 --- a/bquery/khash_python.h +++ b/bquery/khash_python.h @@ -5,7 +5,7 @@ // kludge #define kh_float64_hash_func _Py_HashDouble -#define kh_float64_hash_equal kh_int64_hash_equal +#define kh_float64_hash_equal(a, b) ((a) == (b) || ((b) != (b) && (a) != (a))) #define KHASH_MAP_INIT_FLOAT64(name, khval_t) \ KHASH_INIT(name, khfloat64_t, khval_t, 1, kh_float64_hash_func, kh_float64_hash_equal) @@ -13,7 +13,7 @@ KHASH_MAP_INIT_FLOAT64(float64, size_t) -int kh_inline pyobject_cmp(PyObject* a, PyObject* b) { +int PANDAS_INLINE pyobject_cmp(PyObject* a, PyObject* b) { int result = PyObject_RichCompareBool(a, b, Py_EQ); if (result < 0) { PyErr_Clear(); @@ -46,4 +46,4 @@ KHASH_SET_INIT_PYOBJECT(pyset) #define kh_exist_pymap(h, k) (kh_exist(h, k)) #define kh_exist_pyset(h, k) (kh_exist(h, k)) -KHASH_MAP_INIT_STR(strbox, kh_pyobject_t) \ No newline at end of file +KHASH_MAP_INIT_STR(strbox, kh_pyobject_t) diff --git a/requirements.txt b/requirements.txt index 86f0f4f..c85e29c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -bcolz>=0.8.0 +bcolz>=0.8.1 diff --git a/setup.py b/setup.py index d311871..acbb51f 100644 --- a/setup.py +++ b/setup.py @@ -91,6 +91,64 @@ def check_import(pkgname, pkgver): ########### End of checks ########## +########### Project-specific build options ########### + +copt = {'cygwin': ['-fopenmp'], + 'emx': ['-fopenmp'], + 'intel': ['-openmp'], + 'intele': ['-openmp'], + 'intelem': ['-openmp'], + 'mingw32': ['-fopenmp'], + 'msvc': ['/openmp'], + } +lopt = {'cygwin': ['-fopenmp'], + 'emx': ['-fopenmp'], + 'mingw32': ['-fopenmp'], + } + +class bquery_build_ext(build_ext): + user_options = build_ext.user_options + \ + [ + ('from-templates', None, + "rebuild project from code generation templates"), + ] + + def initialize_options(self): + self.from_templates = False + build_ext.initialize_options(self) + + def run(self): + # regenerate cython code from templates + if self.from_templates: + try: + import jinja2 + except: + exit_with_error( + "You need the python package jinja2 to rebuild the " + \ + "extension from the templates") + execfile("bquery/templates/run_templates.py") + + build_ext.run(self) + + def build_extensions(self): + # set compiler-specific build flags, e.g. for openmp + c = self.compiler.compiler_type + if copt.has_key(c): + for e in self.extensions: + e.extra_compile_args += copt[ c ] + else: + print_warning( + "Openmp flags for compiler '%s' not configured in setup.py." + + "Building without parallel processing support.") + if lopt.has_key(c): + for e in self.extensions: + e.extra_link_args += lopt[ c ] + build_ext.build_extensions(self) + + +######### End project-specific build options ######### + + # bquery version VERSION = open('VERSION').read().strip() # Create the version.py file @@ -128,7 +186,7 @@ def check_import(pkgname, pkgver): url='https://github.com/visualfabriq/bquery', license='http://www.opensource.org/licenses/bsd-license.php', platforms=['any'], - cmdclass={'build_ext': build_ext}, + cmdclass={'build_ext': bquery_build_ext}, ext_modules=[ Extension("bquery.ctable_ext", include_dirs=inc_dirs, @@ -137,7 +195,8 @@ def check_import(pkgname, pkgver): library_dirs=lib_dirs, libraries=libs, extra_link_args=LFLAGS, - extra_compile_args=CFLAGS), + extra_compile_args=CFLAGS, + language='c++'), ], packages=['bquery', 'bquery.tests'], )