diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index 1b6a362..95a6c0a 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -102,8 +102,7 @@ cdef void _factorize_str_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_str(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer dict reverse np.ndarray in_buffer np.ndarray[np.uint64_t] out_buffer @@ -113,20 +112,23 @@ def factorize_str(carray carray_, carray labels=None): ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') in_buffer = np.empty(chunklen, dtype=carray_.dtype) table = kh_init_str() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_str_helper(chunklen, + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + + # should only happen on leftovers + if(len_in_buffer != chunklen): + out_buffer = np.empty(len_in_buffer, dtype='uint64') + + _factorize_str_helper(len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, out_buffer, @@ -134,22 +136,9 @@ def factorize_str(carray carray_, carray labels=None): &count, reverse, ) - # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_str_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + + # TODO: need to use an explicit loop here for speed? + labels.append(out_buffer) kh_destroy_str(table) @@ -238,8 +227,7 @@ cdef factorize_number(carray carray_, numpy_native_number_input typehint, carra p_dtype = np.float64 cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, i, count, chunklen, len_in_buffer dict reverse np.ndarray[numpy_native_number_input] in_buffer np.ndarray[np.uint64_t] out_buffer @@ -248,13 +236,12 @@ cdef factorize_number(carray carray_, numpy_native_number_input typehint, carra count = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype=p_dtype) if numpy_native_number_input is np.int64_t: table = kh_init_int64() @@ -265,12 +252,14 @@ cdef factorize_number(carray carray_, numpy_native_number_input typehint, carra else: table = kh_init_int64() + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + + # should only happen on leftovers + if(len_in_buffer != chunklen): + out_buffer = np.empty(len_in_buffer, dtype='uint64') - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_number_helper[numpy_native_number_input](chunklen, + _factorize_number_helper[numpy_native_number_input](len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, out_buffer, @@ -278,22 +267,11 @@ cdef factorize_number(carray carray_, numpy_native_number_input typehint, carra &count, reverse, ) + + + # TODO: need to use an explicit loop here for speed? # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_number_helper[numpy_native_number_input](leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + labels.append(out_buffer) if numpy_native_number_input is np.int64_t: kh_destroy_int64(table) @@ -327,11 +305,10 @@ cpdef factorize(carray carray_, carray labels=None): @cython.wraparound(False) def groupsort_indexer(carray index, Py_ssize_t ngroups): cdef: - Py_ssize_t i, label, n + Py_ssize_t label, n, i, len_in_buffer np.ndarray[int64_t] counts, where, np_result # -- carray c_result - chunk input_chunk, index_chunk Py_ssize_t index_chunk_nr, index_chunk_len, leftover_elements np.ndarray[int64_t] in_buffer @@ -344,22 +321,10 @@ def groupsort_indexer(carray index, Py_ssize_t ngroups): counts = np.zeros(ngroups + 1, dtype=np.int64) n = len(index) - for index_chunk_nr in range(index.nchunks): - # fill input buffer - input_chunk = index.chunks[index_chunk_nr] - input_chunk._getitem(0, index_chunk_len, in_buffer.data) - - # loop through rows - for i in range(index_chunk_len): - counts[index[i] + 1] += 1 - - leftover_elements = cython.cdiv(index.leftover, index.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = index.leftover_array + for in_buffer in bz.iterblocks(index): + len_in_buffer = len(in_buffer) - # loop through rows - for i in range(leftover_elements): + for i in range(len_in_buffer): counts[index[i] + 1] += 1 # mark the start of each contiguous group of like-indexed data @@ -464,10 +429,9 @@ cpdef aggregate(carray ca_input, carray ca_factor, p_dtype = np.float64 cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row, factor_total_chunks + Py_ssize_t current_index, i, j, end_counts, start_counts np.ndarray[np.int64_t] factor_buffer @@ -480,6 +444,7 @@ cpdef aggregate(carray ca_input, carray ca_factor, count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) if agg_method == _COUNT_DISTINCT: positions, counts = groupsort_indexer(ca_factor, nr_groups) @@ -494,16 +459,10 @@ cpdef aggregate(carray ca_input, carray ca_factor, return - input_chunk_len = ca_input.chunklen - factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = next(iter_ca_factor) + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 # create special buffers for complex operations @@ -512,22 +471,20 @@ cpdef aggregate(carray ca_input, carray ca_factor, if agg_method == _STDEV: mean_buffer = np.zeros(nr_groups, dtype='float64') - for input_chunk_nr in range(ca_input.nchunks): - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) + for in_buffer in bz.iterblocks(ca_input): + len_in_buffer = len(in_buffer) # loop through rows - for i in range(input_chunk_len): + for i in range(len_in_buffer): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + # TODO: make chunk sizes the same for ca_factor + # and ca_input, to eliminate separate iteration + if (factor_chunk_row == factor_buffer_len + and factor_chunk_nr < factor_total_chunks): factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = next(iter_ca_factor) + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 # retrieve index @@ -571,63 +528,7 @@ cpdef aggregate(carray ca_input, carray ca_factor, else: raise NotImplementedError('sumtype not supported') - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = ca_input.leftover_array - - # loop through rows - for i in range(leftover_elements): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - if agg_method == _SUM: - out_buffer[current_index] += in_buffer[i] - elif agg_method == _MEAN: - # method from Knuth - count_buffer[current_index] += 1 - delta = in_buffer[i] - out_buffer[current_index] - out_buffer[current_index] += delta / count_buffer[current_index] - elif agg_method == _STDEV: - count_buffer[current_index] += 1 - delta = in_buffer[i] - mean_buffer[current_index] - mean_buffer[current_index] += delta / count_buffer[current_index] - # M2 = M2 + delta*(x - mean) - out_buffer[current_index] += delta * (in_buffer[i] - mean_buffer[current_index]) - elif agg_method == _COUNT: - out_buffer[current_index] += 1 - elif agg_method == _COUNT_NA: - v = in_buffer[i] - if v == v: # skip NA values - out_buffer[current_index] += 1 - elif agg_method == _SORTED_COUNT_DISTINCT: - v = in_buffer[i] - if not count_distinct_started: - count_distinct_started = 1 - last_values = np.zeros(nr_groups, dtype=p_dtype) - last_values[0] = v - out_buffer[0] = 1 - else: - if v != last_values[current_index]: - out_buffer[current_index] += 1 - - last_values[current_index] = v - else: - raise NotImplementedError('sumtype not supported') + del iter_ca_factor if agg_method == _STDEV: for i in range(len(out_buffer)): @@ -641,10 +542,9 @@ cpdef aggregate(carray ca_input, carray ca_factor, @cython.boundscheck(False) cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t current_index, i, factor_total_chunks np.ndarray in_buffer np.ndarray[np.int64_t] factor_buffer @@ -653,61 +553,29 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) + - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype=ca_input.dtype) - factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = next(iter_ca_factor) + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 - out_buffer = np.zeros(nr_groups, dtype=ca_input.dtype) - for input_chunk_nr in range(ca_input.nchunks): + out_buffer = np.zeros(nr_groups, dtype=ca_input.dtype) - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) + for in_buffer in bz.iterblocks(ca_input): + len_in_buffer = len(in_buffer) - for i in range(input_chunk_len): + for i in range(len_in_buffer): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - out_buffer[current_index] = in_buffer[i] + if (factor_chunk_row == factor_buffer_len + and factor_chunk_nr < factor_total_chunks): - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - in_buffer = ca_input.leftover_array - - for i in range(leftover_elements): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = next(iter_ca_factor) + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 # retrieve index @@ -718,6 +586,8 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ if current_index != skip_key: out_buffer[current_index] = in_buffer[i] + del iter_ca_factor + # check whether a row has to be fixed if skip_key < nr_groups: np.delete(out_buffer, skip_key) @@ -845,29 +715,19 @@ cpdef translate_int64(carray input_, carray output_, dict lookup, np.npy_int64 d :return: """ cdef: - chunk chunk_ - Py_ssize_t i, chunklen, leftover_elements + Py_ssize_t i, chunklen, len_in_buffer np.ndarray[np.npy_int64] in_buffer np.ndarray[np.npy_int64] out_buffer chunklen = input_.chunklen out_buffer = np.empty(chunklen, dtype='int64') - in_buffer = np.empty(chunklen, dtype='int64') - for i in range(input_.nchunks): - chunk_ = input_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - for i in range(chunklen): - element = in_buffer[i] - out_buffer[i] = lookup.get(element, default) - # compress out_buffer into labels - output_.append(out_buffer.astype(np.int64)) + for in_buffer in bz.iterblocks(input_): + len_in_buffer = len(in_buffer) - leftover_elements = cython.cdiv(input_.leftover, input_.atomsize) - if leftover_elements > 0: - in_buffer = input_.leftover_array - for i in range(leftover_elements): + for i in range(len_in_buffer): element = in_buffer[i] out_buffer[i] = lookup.get(element, default) - output_.append(out_buffer[:leftover_elements].astype(np.int64)) + + # compress out_buffer into labels + output_.append(out_buffer[:len_in_buffer].astype(np.int64))