From ba1dfabc5b8da3241b8d86325348e0bada3e7dd5 Mon Sep 17 00:00:00 2001 From: Waylon Flinn Date: Mon, 5 Oct 2015 20:27:14 -0500 Subject: [PATCH 1/3] use iterblocks --- bquery/ctable_ext.pyx | 390 +++++++++++++----------------------------- 1 file changed, 121 insertions(+), 269 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index 1b6a362..f6d3dd9 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -102,8 +102,7 @@ cdef void _factorize_str_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_str(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer dict reverse np.ndarray in_buffer np.ndarray[np.uint64_t] out_buffer @@ -113,20 +112,19 @@ def factorize_str(carray carray_, carray labels=None): ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') in_buffer = np.empty(chunklen, dtype=carray_.dtype) table = kh_init_str() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_str_helper(chunklen, + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + + _factorize_str_helper(len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, out_buffer, @@ -134,22 +132,9 @@ def factorize_str(carray carray_, carray labels=None): &count, reverse, ) - # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_str_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + + # TODO: need to use an explicit loop here for speed? + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_str(table) @@ -238,8 +223,7 @@ cdef factorize_number(carray carray_, numpy_native_number_input typehint, carra p_dtype = np.float64 cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, i, count, chunklen, len_in_buffer dict reverse np.ndarray[numpy_native_number_input] in_buffer np.ndarray[np.uint64_t] out_buffer @@ -248,13 +232,12 @@ cdef factorize_number(carray carray_, numpy_native_number_input typehint, carra count = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype=p_dtype) if numpy_native_number_input is np.int64_t: table = kh_init_int64() @@ -265,12 +248,9 @@ cdef factorize_number(carray carray_, numpy_native_number_input typehint, carra else: table = kh_init_int64() - - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_number_helper[numpy_native_number_input](chunklen, + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + _factorize_number_helper[numpy_native_number_input](len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, out_buffer, @@ -278,22 +258,11 @@ cdef factorize_number(carray carray_, numpy_native_number_input typehint, carra &count, reverse, ) + + + # TODO: need to use an explicit loop here for speed? # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_number_helper[numpy_native_number_input](leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) if numpy_native_number_input is np.int64_t: kh_destroy_int64(table) @@ -327,11 +296,10 @@ cpdef factorize(carray carray_, carray labels=None): @cython.wraparound(False) def groupsort_indexer(carray index, Py_ssize_t ngroups): cdef: - Py_ssize_t i, label, n + Py_ssize_t label, n, i, len_in_buffer np.ndarray[int64_t] counts, where, np_result # -- carray c_result - chunk input_chunk, index_chunk Py_ssize_t index_chunk_nr, index_chunk_len, leftover_elements np.ndarray[int64_t] in_buffer @@ -344,22 +312,10 @@ def groupsort_indexer(carray index, Py_ssize_t ngroups): counts = np.zeros(ngroups + 1, dtype=np.int64) n = len(index) - for index_chunk_nr in range(index.nchunks): - # fill input buffer - input_chunk = index.chunks[index_chunk_nr] - input_chunk._getitem(0, index_chunk_len, in_buffer.data) + for in_buffer in bz.iterblocks(index): + len_in_buffer = len(in_buffer) - # loop through rows - for i in range(index_chunk_len): - counts[index[i] + 1] += 1 - - leftover_elements = cython.cdiv(index.leftover, index.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = index.leftover_array - - # loop through rows - for i in range(leftover_elements): + for i in range(len_in_buffer): counts[index[i] + 1] += 1 # mark the start of each contiguous group of like-indexed data @@ -464,10 +420,9 @@ cpdef aggregate(carray ca_input, carray ca_factor, p_dtype = np.float64 cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t current_index, i, j, end_counts, start_counts np.ndarray[np.int64_t] factor_buffer @@ -480,6 +435,7 @@ cpdef aggregate(carray ca_input, carray ca_factor, count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) if agg_method == _COUNT_DISTINCT: positions, counts = groupsort_indexer(ca_factor, nr_groups) @@ -494,16 +450,9 @@ cpdef aggregate(carray ca_input, carray ca_factor, return - input_chunk_len = ca_input.chunklen - factor_chunk_len = ca_factor.chunklen - factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = next(iter_ca_factor) + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 # create special buffers for complex operations @@ -512,122 +461,65 @@ cpdef aggregate(carray ca_input, carray ca_factor, if agg_method == _STDEV: mean_buffer = np.zeros(nr_groups, dtype='float64') - for input_chunk_nr in range(ca_input.nchunks): - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) - - # loop through rows - for i in range(input_chunk_len): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - if agg_method == _SUM: - out_buffer[current_index] += in_buffer[i] - elif agg_method == _MEAN: - # method from Knuth - count_buffer[current_index] += 1 - delta = in_buffer[i] - out_buffer[current_index] - out_buffer[current_index] += delta / count_buffer[current_index] - elif agg_method == _STDEV: - count_buffer[current_index] += 1 - delta = in_buffer[i] - mean_buffer[current_index] - mean_buffer[current_index] += delta / count_buffer[current_index] - # M2 = M2 + delta*(x - mean) - out_buffer[current_index] += delta * (in_buffer[i] - mean_buffer[current_index]) - elif agg_method == _COUNT: - out_buffer[current_index] += 1 - elif agg_method == _COUNT_NA: - - v = in_buffer[i] - if v == v: # skip NA values + try: + for in_buffer in bz.iterblocks(ca_input): + len_in_buffer = len(in_buffer) + + # loop through rows + for i in range(len_in_buffer): + + # go to next factor buffer if necessary + + if factor_chunk_row == factor_buffer_len: + factor_chunk_nr += 1 + factor_buffer = next(iter_ca_factor) + factor_buffer_len = len(factor_buffer) + factor_chunk_row = 0 + + # retrieve index + current_index = factor_buffer[factor_chunk_row] + factor_chunk_row += 1 + + # update value if it's not an invalid index + if current_index != skip_key: + if agg_method == _SUM: + out_buffer[current_index] += in_buffer[i] + elif agg_method == _MEAN: + # method from Knuth + count_buffer[current_index] += 1 + delta = in_buffer[i] - out_buffer[current_index] + out_buffer[current_index] += delta / count_buffer[current_index] + elif agg_method == _STDEV: + count_buffer[current_index] += 1 + delta = in_buffer[i] - mean_buffer[current_index] + mean_buffer[current_index] += delta / count_buffer[current_index] + # M2 = M2 + delta*(x - mean) + out_buffer[current_index] += delta * (in_buffer[i] - mean_buffer[current_index]) + elif agg_method == _COUNT: out_buffer[current_index] += 1 - elif agg_method == _SORTED_COUNT_DISTINCT: - v = in_buffer[i] - if not count_distinct_started: - count_distinct_started = 1 - last_values = np.zeros(nr_groups, dtype=p_dtype) - last_values[0] = v - out_buffer[0] = 1 - else: - if v != last_values[current_index]: - out_buffer[current_index] += 1 + elif agg_method == _COUNT_NA: - last_values[current_index] = v - else: - raise NotImplementedError('sumtype not supported') - - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = ca_input.leftover_array - - # loop through rows - for i in range(leftover_elements): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - if agg_method == _SUM: - out_buffer[current_index] += in_buffer[i] - elif agg_method == _MEAN: - # method from Knuth - count_buffer[current_index] += 1 - delta = in_buffer[i] - out_buffer[current_index] - out_buffer[current_index] += delta / count_buffer[current_index] - elif agg_method == _STDEV: - count_buffer[current_index] += 1 - delta = in_buffer[i] - mean_buffer[current_index] - mean_buffer[current_index] += delta / count_buffer[current_index] - # M2 = M2 + delta*(x - mean) - out_buffer[current_index] += delta * (in_buffer[i] - mean_buffer[current_index]) - elif agg_method == _COUNT: - out_buffer[current_index] += 1 - elif agg_method == _COUNT_NA: - v = in_buffer[i] - if v == v: # skip NA values - out_buffer[current_index] += 1 - elif agg_method == _SORTED_COUNT_DISTINCT: - v = in_buffer[i] - if not count_distinct_started: - count_distinct_started = 1 - last_values = np.zeros(nr_groups, dtype=p_dtype) - last_values[0] = v - out_buffer[0] = 1 - else: - if v != last_values[current_index]: + v = in_buffer[i] + if v == v: # skip NA values out_buffer[current_index] += 1 - - last_values[current_index] = v - else: - raise NotImplementedError('sumtype not supported') + elif agg_method == _SORTED_COUNT_DISTINCT: + v = in_buffer[i] + if not count_distinct_started: + count_distinct_started = 1 + last_values = np.zeros(nr_groups, dtype=p_dtype) + last_values[0] = v + out_buffer[0] = 1 + else: + if v != last_values[current_index]: + out_buffer[current_index] += 1 + + last_values[current_index] = v + else: + raise NotImplementedError('sumtype not supported') + except StopIteration: + pass + finally: + del iter_ca_factor if agg_method == _STDEV: for i in range(len(out_buffer)): @@ -641,10 +533,9 @@ cpdef aggregate(carray ca_input, carray ca_factor, @cython.boundscheck(False) cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t current_index, i, factor_total_chunks np.ndarray in_buffer np.ndarray[np.int64_t] factor_buffer @@ -653,70 +544,41 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) + - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype=ca_input.dtype) - factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = next(iter_ca_factor) + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 - out_buffer = np.zeros(nr_groups, dtype=ca_input.dtype) - - for input_chunk_nr in range(ca_input.nchunks): - - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) - for i in range(input_chunk_len): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 + out_buffer = np.zeros(nr_groups, dtype=ca_input.dtype) - # update value if it's not an invalid index - if current_index != skip_key: - out_buffer[current_index] = in_buffer[i] + try: + for in_buffer in bz.iterblocks(ca_input): + len_in_buffer = len(in_buffer) - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - in_buffer = ca_input.leftover_array + for i in range(len_in_buffer): - for i in range(leftover_elements): + # go to next factor buffer if necessary + if factor_chunk_row == factor_buffer_len: + factor_chunk_nr += 1 + factor_buffer = next(iter_ca_factor) + factor_buffer_len = len(factor_buffer) + factor_chunk_row = 0 - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 + # retrieve index + current_index = factor_buffer[factor_chunk_row] + factor_chunk_row += 1 - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - out_buffer[current_index] = in_buffer[i] + # update value if it's not an invalid index + if current_index != skip_key: + out_buffer[current_index] = in_buffer[i] + except StopIteration: + pass + finally: + del iter_ca_factor # check whether a row has to be fixed if skip_key < nr_groups: @@ -845,29 +707,19 @@ cpdef translate_int64(carray input_, carray output_, dict lookup, np.npy_int64 d :return: """ cdef: - chunk chunk_ - Py_ssize_t i, chunklen, leftover_elements + Py_ssize_t i, chunklen, len_in_buffer np.ndarray[np.npy_int64] in_buffer np.ndarray[np.npy_int64] out_buffer chunklen = input_.chunklen out_buffer = np.empty(chunklen, dtype='int64') - in_buffer = np.empty(chunklen, dtype='int64') - for i in range(input_.nchunks): - chunk_ = input_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - for i in range(chunklen): - element = in_buffer[i] - out_buffer[i] = lookup.get(element, default) - # compress out_buffer into labels - output_.append(out_buffer.astype(np.int64)) + for in_buffer in bz.iterblocks(input_): + len_in_buffer = len(in_buffer) - leftover_elements = cython.cdiv(input_.leftover, input_.atomsize) - if leftover_elements > 0: - in_buffer = input_.leftover_array - for i in range(leftover_elements): + for i in range(len_in_buffer): element = in_buffer[i] out_buffer[i] = lookup.get(element, default) - output_.append(out_buffer[:leftover_elements].astype(np.int64)) + + # compress out_buffer into labels + output_.append(out_buffer[:len_in_buffer].astype(np.int64)) From 2331fbd82f7fdb5bd9afc6c87cccb037198791d2 Mon Sep 17 00:00:00 2001 From: Waylon Flinn Date: Mon, 5 Oct 2015 20:42:10 -0500 Subject: [PATCH 2/3] simplify factorize final copy --- bquery/ctable_ext.pyx | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index f6d3dd9..623cf40 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -124,6 +124,10 @@ def factorize_str(carray carray_, carray labels=None): for in_buffer in bz.iterblocks(carray_): len_in_buffer = len(in_buffer) + # should only happen on leftovers + if(len_in_buffer != chunklen): + out_buffer = np.empty(len_in_buffer, dtype='uint64') + _factorize_str_helper(len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, @@ -134,7 +138,7 @@ def factorize_str(carray carray_, carray labels=None): ) # TODO: need to use an explicit loop here for speed? - labels.append(out_buffer[:len_in_buffer].astype(np.int64)) + labels.append(out_buffer) kh_destroy_str(table) @@ -250,6 +254,11 @@ cdef factorize_number(carray carray_, numpy_native_number_input typehint, carra for in_buffer in bz.iterblocks(carray_): len_in_buffer = len(in_buffer) + + # should only happen on leftovers + if(len_in_buffer != chunklen): + out_buffer = np.empty(len_in_buffer, dtype='uint64') + _factorize_number_helper[numpy_native_number_input](len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, @@ -262,7 +271,7 @@ cdef factorize_number(carray carray_, numpy_native_number_input typehint, carra # TODO: need to use an explicit loop here for speed? # compress out_buffer into labels - labels.append(out_buffer[:len_in_buffer].astype(np.int64)) + labels.append(out_buffer) if numpy_native_number_input is np.int64_t: kh_destroy_int64(table) From f70a208d9ab366512d7ca95847f1c152c3eb75e3 Mon Sep 17 00:00:00 2001 From: Waylon Flinn Date: Tue, 6 Oct 2015 07:43:42 -0500 Subject: [PATCH 3/3] replaced try/except with bounds check --- bquery/ctable_ext.pyx | 163 +++++++++++++++++++++--------------------- 1 file changed, 81 insertions(+), 82 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index 623cf40..95a6c0a 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -430,7 +430,7 @@ cpdef aggregate(carray ca_input, carray ca_factor, cdef: Py_ssize_t in_buffer_len, factor_buffer_len - Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t factor_chunk_nr, factor_chunk_row, factor_total_chunks Py_ssize_t current_index, i, j, end_counts, start_counts np.ndarray[np.int64_t] factor_buffer @@ -459,6 +459,7 @@ cpdef aggregate(carray ca_input, carray ca_factor, return + factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 factor_buffer = next(iter_ca_factor) factor_buffer_len = len(factor_buffer) @@ -470,65 +471,64 @@ cpdef aggregate(carray ca_input, carray ca_factor, if agg_method == _STDEV: mean_buffer = np.zeros(nr_groups, dtype='float64') - try: - for in_buffer in bz.iterblocks(ca_input): - len_in_buffer = len(in_buffer) - - # loop through rows - for i in range(len_in_buffer): - - # go to next factor buffer if necessary - - if factor_chunk_row == factor_buffer_len: - factor_chunk_nr += 1 - factor_buffer = next(iter_ca_factor) - factor_buffer_len = len(factor_buffer) - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - if agg_method == _SUM: - out_buffer[current_index] += in_buffer[i] - elif agg_method == _MEAN: - # method from Knuth - count_buffer[current_index] += 1 - delta = in_buffer[i] - out_buffer[current_index] - out_buffer[current_index] += delta / count_buffer[current_index] - elif agg_method == _STDEV: - count_buffer[current_index] += 1 - delta = in_buffer[i] - mean_buffer[current_index] - mean_buffer[current_index] += delta / count_buffer[current_index] - # M2 = M2 + delta*(x - mean) - out_buffer[current_index] += delta * (in_buffer[i] - mean_buffer[current_index]) - elif agg_method == _COUNT: - out_buffer[current_index] += 1 - elif agg_method == _COUNT_NA: + for in_buffer in bz.iterblocks(ca_input): + len_in_buffer = len(in_buffer) - v = in_buffer[i] - if v == v: # skip NA values - out_buffer[current_index] += 1 - elif agg_method == _SORTED_COUNT_DISTINCT: - v = in_buffer[i] - if not count_distinct_started: - count_distinct_started = 1 - last_values = np.zeros(nr_groups, dtype=p_dtype) - last_values[0] = v - out_buffer[0] = 1 - else: - if v != last_values[current_index]: - out_buffer[current_index] += 1 - - last_values[current_index] = v + # loop through rows + for i in range(len_in_buffer): + + # go to next factor buffer if necessary + # TODO: make chunk sizes the same for ca_factor + # and ca_input, to eliminate separate iteration + if (factor_chunk_row == factor_buffer_len + and factor_chunk_nr < factor_total_chunks): + factor_chunk_nr += 1 + factor_buffer = next(iter_ca_factor) + factor_buffer_len = len(factor_buffer) + factor_chunk_row = 0 + + # retrieve index + current_index = factor_buffer[factor_chunk_row] + factor_chunk_row += 1 + + # update value if it's not an invalid index + if current_index != skip_key: + if agg_method == _SUM: + out_buffer[current_index] += in_buffer[i] + elif agg_method == _MEAN: + # method from Knuth + count_buffer[current_index] += 1 + delta = in_buffer[i] - out_buffer[current_index] + out_buffer[current_index] += delta / count_buffer[current_index] + elif agg_method == _STDEV: + count_buffer[current_index] += 1 + delta = in_buffer[i] - mean_buffer[current_index] + mean_buffer[current_index] += delta / count_buffer[current_index] + # M2 = M2 + delta*(x - mean) + out_buffer[current_index] += delta * (in_buffer[i] - mean_buffer[current_index]) + elif agg_method == _COUNT: + out_buffer[current_index] += 1 + elif agg_method == _COUNT_NA: + + v = in_buffer[i] + if v == v: # skip NA values + out_buffer[current_index] += 1 + elif agg_method == _SORTED_COUNT_DISTINCT: + v = in_buffer[i] + if not count_distinct_started: + count_distinct_started = 1 + last_values = np.zeros(nr_groups, dtype=p_dtype) + last_values[0] = v + out_buffer[0] = 1 else: - raise NotImplementedError('sumtype not supported') - except StopIteration: - pass - finally: - del iter_ca_factor + if v != last_values[current_index]: + out_buffer[current_index] += 1 + + last_values[current_index] = v + else: + raise NotImplementedError('sumtype not supported') + + del iter_ca_factor if agg_method == _STDEV: for i in range(len(out_buffer)): @@ -564,30 +564,29 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ out_buffer = np.zeros(nr_groups, dtype=ca_input.dtype) - try: - for in_buffer in bz.iterblocks(ca_input): - len_in_buffer = len(in_buffer) - - for i in range(len_in_buffer): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_buffer_len: - factor_chunk_nr += 1 - factor_buffer = next(iter_ca_factor) - factor_buffer_len = len(factor_buffer) - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - out_buffer[current_index] = in_buffer[i] - except StopIteration: - pass - finally: - del iter_ca_factor + for in_buffer in bz.iterblocks(ca_input): + len_in_buffer = len(in_buffer) + + for i in range(len_in_buffer): + + # go to next factor buffer if necessary + if (factor_chunk_row == factor_buffer_len + and factor_chunk_nr < factor_total_chunks): + + factor_chunk_nr += 1 + factor_buffer = next(iter_ca_factor) + factor_buffer_len = len(factor_buffer) + factor_chunk_row = 0 + + # retrieve index + current_index = factor_buffer[factor_chunk_row] + factor_chunk_row += 1 + + # update value if it's not an invalid index + if current_index != skip_key: + out_buffer[current_index] = in_buffer[i] + + del iter_ca_factor # check whether a row has to be fixed if skip_key < nr_groups: