From e6d6a2a7e030d24cc7051720fce025a9c1d3c39e Mon Sep 17 00:00:00 2001 From: Jake Hemstad Date: Fri, 14 Sep 2018 17:39:41 -0700 Subject: [PATCH 01/18] Implemented null support for hash-based groupbys. --- src/groupby/hash/groupby_kernels.cuh | 62 ++++++++++++++++------------ 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/src/groupby/hash/groupby_kernels.cuh b/src/groupby/hash/groupby_kernels.cuh index 4aae7d33..51fa74c5 100644 --- a/src/groupby/hash/groupby_kernels.cuh +++ b/src/groupby/hash/groupby_kernels.cuh @@ -55,19 +55,23 @@ __global__ void build_aggregation_table(map_type * const __restrict__ the_map, while( i < column_size ){ - // Hash the current row of the input table - const auto row_hash = groupby_input_table.hash_row(i); - - // Attempt to insert the current row's index. - // The hash value of the row will determine the write location. - // The rows at the current row index and the existing row index - // will be compared for equality. If they are equal, the aggregation - // operation is performed. - the_map->insert(thrust::make_pair(i, aggregation_column[i]), - op, - the_comparator, - true, - row_hash); + // Only insert into the hash table if the row is valid + if( groupby_input_table.is_row_valid(i) ) + { + // Hash the current row of the input table + const auto row_hash = groupby_input_table.hash_row(i); + + // Attempt to insert the current row's index. + // The hash value of the row will determine the write location. + // The rows at the current row index and the existing row index + // will be compared for equality. If they are equal, the aggregation + // operation is performed. + the_map->insert(thrust::make_pair(i, aggregation_column[i]), + op, + the_comparator, + true, + row_hash); + } i += blockDim.x * gridDim.x; } @@ -87,22 +91,26 @@ __global__ void build_aggregation_table(map_type * const __restrict__ the_map, { size_type i = threadIdx.x + blockIdx.x * blockDim.x; - // Hash the current row of the input table - const auto row_hash = groupby_input_table.hash_row(i); - while( i < column_size ){ - // When the aggregator is COUNT, ignore the aggregation column and just insert '0' - // Attempt to insert the current row's index. - // The hash value of the row will determine the write location. - // The rows at the current row index and the existing row index - // will be compared for equality. If they are equal, the aggregation - // operation is performed. - the_map->insert(thrust::make_pair(i, static_cast(0)), - op, - the_comparator, - true, - row_hash); + // Only insert into the hash table if the the row is valid + if(groupby_input_table.is_row_valid(i)) + { + // Hash the current row of the input table + const auto row_hash = groupby_input_table.hash_row(i); + + // When the aggregator is COUNT, ignore the aggregation column and just insert '0' + // Attempt to insert the current row's index. + // The hash value of the row will determine the write location. + // The rows at the current row index and the existing row index + // will be compared for equality. If they are equal, the aggregation + // operation is performed. + the_map->insert(thrust::make_pair(i, static_cast(0)), + op, + the_comparator, + true, + row_hash); + } i += blockDim.x * gridDim.x; } } From 3c6c85ba2f4e66079f8397bab0142463acb246d4 Mon Sep 17 00:00:00 2001 From: Jake Hemstad Date: Wed, 19 Sep 2018 10:04:49 -0700 Subject: [PATCH 02/18] Added checking for NULLs in the aggregation column. --- src/groupby/groupby.cuh | 1 + src/groupby/hash/groupby_compute_api.h | 2 ++ src/groupby/hash/groupby_kernels.cuh | 8 ++++++-- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/groupby/groupby.cuh b/src/groupby/groupby.cuh index 9798860a..098c52b3 100644 --- a/src/groupby/groupby.cuh +++ b/src/groupby/groupby.cuh @@ -58,6 +58,7 @@ gdf_error typed_groupby(gdf_table const & groupby_input_table, gdf_error gdf_error_code = GroupbyHash(groupby_input_table, in_agg_col, + in_aggregation_column->valid, groupby_output_table, out_agg_col, &output_size, diff --git a/src/groupby/hash/groupby_compute_api.h b/src/groupby/hash/groupby_compute_api.h index 7ecb2a1e..8ac12b6e 100644 --- a/src/groupby/hash/groupby_compute_api.h +++ b/src/groupby/hash/groupby_compute_api.h @@ -138,6 +138,7 @@ template< typename aggregation_type, typename aggregation_operation> gdf_error GroupbyHash(gdf_table const & groupby_input_table, const aggregation_type * const in_aggregation_column, + gdf_valid_type const * const aggregation_validity_mask, gdf_table & groupby_output_table, aggregation_type * out_aggregation_column, size_type * out_size, @@ -174,6 +175,7 @@ gdf_error GroupbyHash(gdf_table const & groupby_input_table, build_aggregation_table<<>>(the_map.get(), groupby_input_table, in_aggregation_column, + aggregation_validity_mask, input_num_rows, aggregation_op, row_comparator(*the_map, groupby_input_table, groupby_input_table)); diff --git a/src/groupby/hash/groupby_kernels.cuh b/src/groupby/hash/groupby_kernels.cuh index 51fa74c5..9bd0d3fd 100644 --- a/src/groupby/hash/groupby_kernels.cuh +++ b/src/groupby/hash/groupby_kernels.cuh @@ -17,10 +17,12 @@ #ifndef GROUPBY_KERNELS_H #define GROUPBY_KERNELS_H +#include #include "../../hashmap/concurrent_unordered_map.cuh" #include "aggregation_operations.cuh" #include "../../gdf_table.cuh" + /* --------------------------------------------------------------------------*/ /** * @Synopsis Takes in two columns of equal length. One column to groupby and the @@ -47,6 +49,7 @@ template const & groupby_input_table, const aggregation_type * const __restrict__ aggregation_column, + gdf_valid_type const * const __restrict__ aggregation_validitity_mask, size_type column_size, aggregation_operation op, row_comparator the_comparator) @@ -56,7 +59,7 @@ __global__ void build_aggregation_table(map_type * const __restrict__ the_map, while( i < column_size ){ // Only insert into the hash table if the row is valid - if( groupby_input_table.is_row_valid(i) ) + if( groupby_input_table.is_row_valid(i) && gdf_is_valid(aggregation_validitity_mask,i)) { // Hash the current row of the input table const auto row_hash = groupby_input_table.hash_row(i); @@ -85,6 +88,7 @@ template const & groupby_input_table, const aggregation_type * const __restrict__ aggregation_column, + gdf_valid_type const * const __restrict__ aggregation_validitity_mask, size_type column_size, count_op op, row_comparator the_comparator) @@ -94,7 +98,7 @@ __global__ void build_aggregation_table(map_type * const __restrict__ the_map, while( i < column_size ){ // Only insert into the hash table if the the row is valid - if(groupby_input_table.is_row_valid(i)) + if(groupby_input_table.is_row_valid(i) && gdf_is_valid(aggregation_validitity_mask, i)) { // Hash the current row of the input table const auto row_hash = groupby_input_table.hash_row(i); From 535a777c828fdd02f411e427faa05eebe8cb8912 Mon Sep 17 00:00:00 2001 From: Jake Hemstad Date: Wed, 19 Sep 2018 18:43:19 -0700 Subject: [PATCH 03/18] Added function to set a specified bit in a validity bit mask. --- include/gdf/utils.h | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/include/gdf/utils.h b/include/gdf/utils.h index 7b3f1bdc..10aa0c3c 100644 --- a/include/gdf/utils.h +++ b/include/gdf/utils.h @@ -12,6 +12,19 @@ bool gdf_is_valid(const gdf_valid_type *valid, gdf_index_type pos) { return true; } -inline gdf_size_type gdf_get_num_chars_bitmask(gdf_size_type size) { return (( size + ( GDF_VALID_BITSIZE - 1)) / GDF_VALID_BITSIZE ); } +inline gdf_size_type gdf_get_num_chars_bitmask(gdf_size_type size) +{ + return (( size + ( GDF_VALID_BITSIZE - 1)) / GDF_VALID_BITSIZE ); +} + +__host__ __device__ +inline void turn_bit_on(gdf_valid_type * masks, gdf_index_type pos) +{ + if(nullptr != masks) + { + masks[pos/8] |= (gdf_valid_type(1) << (pos % 8)); + } +} + #endif From b7db9eb7849b53650f9ba27099c00c0b7caa90ae Mon Sep 17 00:00:00 2001 From: Jake Hemstad Date: Wed, 19 Sep 2018 18:43:45 -0700 Subject: [PATCH 04/18] Updated handling of NULLs in the aggregation column such that NULLs are ignored, unless all of the values for a given key are NULL. --- src/groupby/groupby.cuh | 1 + src/groupby/hash/groupby_compute_api.h | 23 +++- src/groupby/hash/groupby_kernels.cuh | 135 +++++++++++++++++------ src/hashmap/concurrent_unordered_map.cuh | 74 ++++++++++++- 4 files changed, 198 insertions(+), 35 deletions(-) diff --git a/src/groupby/groupby.cuh b/src/groupby/groupby.cuh index 098c52b3..9339840f 100644 --- a/src/groupby/groupby.cuh +++ b/src/groupby/groupby.cuh @@ -61,6 +61,7 @@ gdf_error typed_groupby(gdf_table const & groupby_input_table, in_aggregation_column->valid, groupby_output_table, out_agg_col, + out_aggregation_column->valid, &output_size, op_type(), sort_result); diff --git a/src/groupby/hash/groupby_compute_api.h b/src/groupby/hash/groupby_compute_api.h index 8ac12b6e..12e8ce6e 100644 --- a/src/groupby/hash/groupby_compute_api.h +++ b/src/groupby/hash/groupby_compute_api.h @@ -138,9 +138,10 @@ template< typename aggregation_type, typename aggregation_operation> gdf_error GroupbyHash(gdf_table const & groupby_input_table, const aggregation_type * const in_aggregation_column, - gdf_valid_type const * const aggregation_validity_mask, + gdf_valid_type const * const in_aggregation_validity_mask, gdf_table & groupby_output_table, aggregation_type * out_aggregation_column, + gdf_valid_type * const out_aggregation_validity_mask, size_type * out_size, aggregation_operation aggregation_op, bool sort_result = false) @@ -169,13 +170,20 @@ gdf_error GroupbyHash(gdf_table const & groupby_input_table, CUDA_TRY(cudaGetLastError()); + + // Allocate an array to indicate the state of each bucket in the hash table + bucket_state * hash_bucket_states{nullptr}; + CUDA_TRY( cudaMalloc(&hash_bucket_states, hash_table_size * sizeof(bucket_state)) ); + CUDA_TRY( cudaMemset(hash_bucket_states, bucket_state::EMPTY, hash_table_size * sizeof(bucket_state)) ); + // Inserts (i, aggregation_column[i]) as a key-value pair into the // hash table. When a given key already exists in the table, the aggregation operation // is computed between the new and existing value, and the result is stored back. build_aggregation_table<<>>(the_map.get(), groupby_input_table, in_aggregation_column, - aggregation_validity_mask, + in_aggregation_validity_mask, + hash_bucket_states, input_num_rows, aggregation_op, row_comparator(*the_map, groupby_input_table, groupby_input_table)); @@ -188,13 +196,22 @@ gdf_error GroupbyHash(gdf_table const & groupby_input_table, const dim3 extract_grid_size ((hash_table_size + THREAD_BLOCK_SIZE - 1) / THREAD_BLOCK_SIZE, 1, 1); + // Initialize output aggregation column's validity mask + if(nullptr != out_aggregation_validity_mask) + { + const size_type num_masks = gdf_get_num_chars_bitmask(input_num_rows); + CUDA_TRY( cudaMemset(out_aggregation_validity_mask, 0, num_masks * sizeof(gdf_valid_type)) ); + } + // Extracts every non-empty key and value into separate contiguous arrays, // which provides the result of the groupby operation extract_groupby_result<<>>(the_map.get(), hash_table_size, + hash_bucket_states, groupby_output_table, groupby_input_table, out_aggregation_column, + out_aggregation_validity_mask, global_write_index); CUDA_TRY(cudaGetLastError()); @@ -216,6 +233,8 @@ gdf_error GroupbyHash(gdf_table const & groupby_input_table, thrust::copy(agg.begin(), agg.end(), out_aggregation_column); } + CUDA_TRY( cudaFree(hash_bucket_states) ); + return GDF_SUCCESS; } #endif diff --git a/src/groupby/hash/groupby_kernels.cuh b/src/groupby/hash/groupby_kernels.cuh index 9bd0d3fd..de8e3850 100644 --- a/src/groupby/hash/groupby_kernels.cuh +++ b/src/groupby/hash/groupby_kernels.cuh @@ -23,6 +23,19 @@ #include "../../gdf_table.cuh" +/* --------------------------------------------------------------------------*/ +/** + * @Synopsis Indicates the state of a hash bucket in the hash table. + */ +/* ----------------------------------------------------------------------------*/ +enum bucket_state : int +{ + EMPTY = 0, /** Indicates that the hash bucket is empty */ + NULL_VALUE, /** Indicates that the bucket contains a NULL value */ + VALID_VALUE /** Indicates that the bucket contains a valid value */ +}; +using state_t = std::underlying_type::type; + /* --------------------------------------------------------------------------*/ /** * @Synopsis Takes in two columns of equal length. One column to groupby and the @@ -50,30 +63,55 @@ __global__ void build_aggregation_table(map_type * const __restrict__ the_map, gdf_table const & groupby_input_table, const aggregation_type * const __restrict__ aggregation_column, gdf_valid_type const * const __restrict__ aggregation_validitity_mask, + bucket_state * const __restrict__ hash_bucket_states, size_type column_size, aggregation_operation op, row_comparator the_comparator) { size_type i = threadIdx.x + blockIdx.x * blockDim.x; + const auto map_start = the_map->begin(); + while( i < column_size ){ // Only insert into the hash table if the row is valid - if( groupby_input_table.is_row_valid(i) && gdf_is_valid(aggregation_validitity_mask,i)) + if( true == groupby_input_table.is_row_valid(i) ) { + // Hash the current row of the input table const auto row_hash = groupby_input_table.hash_row(i); - // Attempt to insert the current row's index. - // The hash value of the row will determine the write location. - // The rows at the current row index and the existing row index - // will be compared for equality. If they are equal, the aggregation - // operation is performed. - the_map->insert(thrust::make_pair(i, aggregation_column[i]), - op, - the_comparator, - true, - row_hash); + if(false == gdf_is_valid(aggregation_validitity_mask,i)) + { + const size_type insert_location = the_map->insert_key(i, the_comparator, true, row_hash); + + // If the aggregation value is NULL, and the hash bucket is empty, + // then set the state of the bucket to show that there is a NULL value for this key + // The casts are required to cast the enum type to a type supported by + // atomicCAS + // TODO Use a bitmask instead of a 32 bit flag for every bucket + atomicCAS(reinterpret_cast(&hash_bucket_states[insert_location]), + static_cast(bucket_state::EMPTY), + static_cast(bucket_state::NULL_VALUE)); + } + else + { + + // Attempt to insert the current row's index. + // The hash value of the row will determine the write location. + // The rows at the current row index and the existing row index + // will be compared for equality. If they are equal, the aggregation + // operation is performed. + const size_type insert_location = the_map->insert(thrust::make_pair(i, aggregation_column[i]), + op, + the_comparator, + true, + row_hash); + // If it's not NULL, indicate that there is a valid value + // in this bucket + atomicExch(reinterpret_cast(&hash_bucket_states[insert_location]), + static_cast(bucket_state::VALID_VALUE)); + } } i += blockDim.x * gridDim.x; @@ -89,31 +127,56 @@ __global__ void build_aggregation_table(map_type * const __restrict__ the_map, gdf_table const & groupby_input_table, const aggregation_type * const __restrict__ aggregation_column, gdf_valid_type const * const __restrict__ aggregation_validitity_mask, + bucket_state * const __restrict__ hash_bucket_states, size_type column_size, count_op op, row_comparator the_comparator) { + + auto map_start = the_map->begin(); size_type i = threadIdx.x + blockIdx.x * blockDim.x; while( i < column_size ){ // Only insert into the hash table if the the row is valid - if(groupby_input_table.is_row_valid(i) && gdf_is_valid(aggregation_validitity_mask, i)) + if(groupby_input_table.is_row_valid(i) ) { // Hash the current row of the input table const auto row_hash = groupby_input_table.hash_row(i); - // When the aggregator is COUNT, ignore the aggregation column and just insert '0' - // Attempt to insert the current row's index. - // The hash value of the row will determine the write location. - // The rows at the current row index and the existing row index - // will be compared for equality. If they are equal, the aggregation - // operation is performed. - the_map->insert(thrust::make_pair(i, static_cast(0)), - op, - the_comparator, - true, - row_hash); + + if(false == gdf_is_valid(aggregation_validitity_mask,i)) + { + // If the aggregation value is NULL, and the hash bucket is empty, + // then set the state of the bucket to show that there is a NULL value for this key + // The casts are required to cast the enum type to a type supported by + // atomicCAS + // TODO Use a bitmask instead of a 32 bit flag for every bucket + const size_type insert_location = the_map->insert_key(i, the_comparator, true, row_hash); + + atomicCAS(reinterpret_cast(&hash_bucket_states[insert_location]), + static_cast(bucket_state::EMPTY), + static_cast(bucket_state::NULL_VALUE)); + } + else + { + // When the aggregator is COUNT, ignore the aggregation column and just insert '0' + // Attempt to insert the current row's index. + // The hash value of the row will determine the write location. + // The rows at the current row index and the existing row index + // will be compared for equality. If they are equal, the aggregation + // operation is performed. + const size_type insert_location = the_map->insert(thrust::make_pair(i, static_cast(0)), + op, + the_comparator, + true, + row_hash); + + // If it's not NULL, indicate that there is a valid value + // in this bucket + atomicExch(reinterpret_cast(&hash_bucket_states[insert_location]), + static_cast(bucket_state::VALID_VALUE)); + } } i += blockDim.x * gridDim.x; } @@ -139,34 +202,44 @@ template __global__ void extract_groupby_result(const map_type * const __restrict__ the_map, const size_type map_size, + const bucket_state * const __restrict__ hash_bucket_states, gdf_table & groupby_output_table, gdf_table const & groupby_input_table, aggregation_type * const __restrict__ aggregation_out_column, + gdf_valid_type * const __restrict__ aggregation_out_valid_mask, size_type * const global_write_index) { size_type i = threadIdx.x + blockIdx.x * blockDim.x; - constexpr typename map_type::key_type unused_key{map_type::get_unused_key()}; - const typename map_type::value_type * const __restrict__ hashtabl_values = the_map->data(); // TODO: Use _shared_ thread block cache for writing temporary ouputs and then // write to the global output while(i < map_size){ - const typename map_type::key_type current_key = hashtabl_values[i].first; + const bucket_state current_state = hash_bucket_states[i]; - if( current_key != unused_key){ + // If the hash bucket isn't empty, then we need to add it to the output + if(bucket_state::EMPTY != current_state) + { + const typename map_type::key_type output_row = hashtabl_values[i].first; const size_type thread_write_index = atomicAdd(global_write_index, 1); - // Copy the row at current_key from the input table to the row at + // Copy the row from the input table to the row at // thread_write_index in the output table groupby_output_table.copy_row(groupby_input_table, thread_write_index, - current_key); - - aggregation_out_column[thread_write_index] = hashtabl_values[i].second; + output_row); + + // If this bucket hold's a valid aggregation value, copy it to the + // aggregation output and set it's validity bit + if( bucket_state::NULL_VALUE != current_state ) + { + aggregation_out_column[thread_write_index] = hashtabl_values[i].second; + turn_bit_on(aggregation_out_valid_mask, i); + } } + i += gridDim.x * blockDim.x; } } diff --git a/src/hashmap/concurrent_unordered_map.cuh b/src/hashmap/concurrent_unordered_map.cuh index 5f2c4446..f128c296 100644 --- a/src/hashmap/concurrent_unordered_map.cuh +++ b/src/hashmap/concurrent_unordered_map.cuh @@ -226,6 +226,12 @@ public: __host__ __device__ explicit cycle_iterator_adapter( const iterator_type& begin, const iterator_type& end, const iterator_type& current ) : m_begin( begin ), m_end( end ), m_current( current ) {} + + + template + __device__ + friend size_t operator-(const cycle_iterator_adapter& lhs, const cycle_iterator_adapter& rhs); + __host__ __device__ cycle_iterator_adapter& operator++() { @@ -308,6 +314,13 @@ __host__ __device__ bool operator!=(const cycle_iterator_adapter& lhs, const return !lhs.equal(rhs); } +template +__device__ +size_t operator-(const cycle_iterator_adapter& lhs, const cycle_iterator_adapter& rhs) +{ + return lhs.m_current - rhs.m_current; +} + /** * Does support concurrent insert, but not concurrent insert and probping. * @@ -481,7 +494,7 @@ public: class comparison_type = key_equal, typename hash_value_type = typename Hasher::result_type> __forceinline__ - __device__ iterator insert(const value_type& x, + __device__ size_type insert(const value_type& x, aggregation_type op, comparison_type keys_equal = key_equal(), bool precomputed_hash = false, @@ -510,6 +523,7 @@ public: const key_type insert_key = x.first; bool insert_success = false; + size_type insert_location{0}; while (false == insert_success) { @@ -533,13 +547,69 @@ public: update_existing_value(existing_value, x, op); insert_success = true; + insert_location = current_index; + } + + current_index = (current_index+1)%hashtbl_size; + current_hash_bucket = &(hashtbl_values[current_index]); + } + + //return iterator( m_hashtbl_values,m_hashtbl_values+hashtbl_size, current_hash_bucket); + return insert_location; + } + + template + __forceinline__ + __device__ size_type insert_key(const key_type & insert_key, + comparison_type keys_equal = key_equal(), + bool precomputed_hash = false, + hash_value_type precomputed_hash_value = 0) + { + const size_type hashtbl_size = m_hashtbl_size; + value_type* hashtbl_values = m_hashtbl_values; + + hash_value_type hash_value{0}; + + // If a precomputed hash value has been passed in, then use it to determine + // the write location of the new key + if(true == precomputed_hash) + { + hash_value = precomputed_hash_value; + } + // Otherwise, compute the hash value from the new key + else + { + hash_value = m_hf(insert_key); + } + + size_type current_index = hash_value % hashtbl_size; + value_type *current_hash_bucket = &(hashtbl_values[current_index]); + + bool insert_success = false; + size_type insert_location{0}; + + while (false == insert_success) { + key_type& existing_key = current_hash_bucket->first; + + // Try and set the existing_key for the current hash bucket to insert_key + const key_type old_key = atomicCAS( &existing_key, unused_key, insert_key); + + // If old_key == unused_key, the current hash bucket was empty + // and existing_key was updated to insert_key by the atomicCAS. + // If old_key == insert_key, this key has already been inserted. + if ( keys_equal( unused_key, old_key ) || keys_equal(insert_key, old_key) ) { + insert_success = true; + insert_location = current_index; } current_index = (current_index+1)%hashtbl_size; current_hash_bucket = &(hashtbl_values[current_index]); } - return iterator( m_hashtbl_values,m_hashtbl_values+hashtbl_size, current_hash_bucket); + //return iterator( m_hashtbl_values,m_hashtbl_values+hashtbl_size, current_hash_bucket); + // Return the index where the insert occured + return insert_location; } /* This function is not currently implemented From c904f47ec32aa1697d2a20fb528fe38cb386c3d0 Mon Sep 17 00:00:00 2001 From: Jake Hemstad Date: Wed, 19 Sep 2018 20:47:14 -0700 Subject: [PATCH 05/18] Updated setting the validity bits in the aggregation output to use atomicOrs to avoid race conditions. --- src/groupby/hash/groupby_kernels.cuh | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/groupby/hash/groupby_kernels.cuh b/src/groupby/hash/groupby_kernels.cuh index de8e3850..5d8fb9fe 100644 --- a/src/groupby/hash/groupby_kernels.cuh +++ b/src/groupby/hash/groupby_kernels.cuh @@ -206,7 +206,7 @@ __global__ void extract_groupby_result(const map_type * const __restrict__ the_m gdf_table & groupby_output_table, gdf_table const & groupby_input_table, aggregation_type * const __restrict__ aggregation_out_column, - gdf_valid_type * const __restrict__ aggregation_out_valid_mask, + gdf_valid_type * aggregation_out_valid_mask, size_type * const global_write_index) { size_type i = threadIdx.x + blockIdx.x * blockDim.x; @@ -236,7 +236,16 @@ __global__ void extract_groupby_result(const map_type * const __restrict__ the_m if( bucket_state::NULL_VALUE != current_state ) { aggregation_out_column[thread_write_index] = hashtabl_values[i].second; - turn_bit_on(aggregation_out_valid_mask, i); + + // Set the valid bit for this row. Need to cast the valid mask type + // to a 32 bit type where atomics are supported + if(nullptr != aggregation_out_valid_mask) + { + uint32_t * valid_mask32 = reinterpret_cast(aggregation_out_valid_mask); + const uint32_t output_bit32 = (uint32_t(1) << (i % uint32_t(32))); + uint32_t * output_mask32 = &(valid_mask32[(i / uint32_t(32))]); + atomicOr(output_mask32, output_bit32); + } } } From e4980f626eb1136a2f47b0b12e460447b12de37f Mon Sep 17 00:00:00 2001 From: Jake Hemstad Date: Thu, 20 Sep 2018 08:32:45 -0700 Subject: [PATCH 06/18] Added kernel to gather the validity bits of a gdf_column. Used the kernel to rearrange the validity bits of the aggregation output column after sorting the results. --- src/gdf_table.cuh | 55 ++++++++++++++++++++++++++ src/groupby/groupby.cuh | 2 +- src/groupby/hash/groupby_compute_api.h | 32 +++++++++++---- 3 files changed, 80 insertions(+), 9 deletions(-) diff --git a/src/gdf_table.cuh b/src/gdf_table.cuh index 83fcc542..1ed9dbe7 100644 --- a/src/gdf_table.cuh +++ b/src/gdf_table.cuh @@ -127,6 +127,59 @@ void scatter_valid_mask( gdf_valid_type const * const input_mask, } } +/* --------------------------------------------------------------------------*/ +/** + * @Synopsis Gathers a validity bitmask. + * + * This kernel is used in order to gather the validity bit mask for a gdf_column. + * + * @Param input_mask The mask that will be gathered. + * @Param output_mask The output after gathering the input + * @Param gather_map The map that indicates where elements from the input + will be gathered to in the output. output_bit[ gather_map [i] ] = input_bit[i] + * @Param num_rows The number of bits in the masks + */ +/* ----------------------------------------------------------------------------*/ +template +__global__ +void gather_valid_mask( gdf_valid_type const * const input_mask, + gdf_valid_type * const output_mask, + size_type const * const __restrict__ gather_map, + size_type const num_rows) +{ + using mask_type = uint32_t; + constexpr uint32_t BITS_PER_MASK = 8 * sizeof(mask_type); + + // Cast the validity type to a type where atomicOr is natively supported + const mask_type * __restrict__ input_mask32 = reinterpret_cast(input_mask); + mask_type * const __restrict__ output_mask32 = reinterpret_cast(output_mask); + + size_type row_number = threadIdx.x + blockIdx.x * blockDim.x; + + while(row_number < num_rows) + { + const size_type gather_location = gather_map[row_number]; + + // Get the bit corresponding from the gathered row + const mask_type input_bit = input_mask32[gather_location/BITS_PER_MASK] & (static_cast(1) << (gather_location % BITS_PER_MASK)); + + // Only set the output bit if the input is valid + if(input_bit > 0) + { + // Construct the mask that sets the bit for the output row + const mask_type output_bit = static_cast(1) << (row_number % BITS_PER_MASK); + + // Find the mask in the output that will hold the bit for output row + const size_type output_location = row_number / BITS_PER_MASK; + + // Bitwise OR to set the gathered row's bit + atomicOr(&output_mask32[output_location], output_bit); + } + + row_number += blockDim.x * gridDim.x; + } +} + /* --------------------------------------------------------------------------*/ /** @@ -772,6 +825,8 @@ public: gdf_error gather(thrust::device_vector const & row_gather_map, gdf_table & gather_output_table) { + + // TODO Gather the bit validity masks for each column gdf_error gdf_status{GDF_SUCCESS}; // Each column can be gathered in parallel, therefore create a diff --git a/src/groupby/groupby.cuh b/src/groupby/groupby.cuh index 9339840f..872e28a9 100644 --- a/src/groupby/groupby.cuh +++ b/src/groupby/groupby.cuh @@ -61,7 +61,7 @@ gdf_error typed_groupby(gdf_table const & groupby_input_table, in_aggregation_column->valid, groupby_output_table, out_agg_col, - out_aggregation_column->valid, + &out_aggregation_column->valid, &output_size, op_type(), sort_result); diff --git a/src/groupby/hash/groupby_compute_api.h b/src/groupby/hash/groupby_compute_api.h index 12e8ce6e..55fcb1d6 100644 --- a/src/groupby/hash/groupby_compute_api.h +++ b/src/groupby/hash/groupby_compute_api.h @@ -141,7 +141,7 @@ gdf_error GroupbyHash(gdf_table const & groupby_input_table, gdf_valid_type const * const in_aggregation_validity_mask, gdf_table & groupby_output_table, aggregation_type * out_aggregation_column, - gdf_valid_type * const out_aggregation_validity_mask, + gdf_valid_type ** out_aggregation_validity_mask, size_type * out_size, aggregation_operation aggregation_op, bool sort_result = false) @@ -197,11 +197,12 @@ gdf_error GroupbyHash(gdf_table const & groupby_input_table, const dim3 extract_grid_size ((hash_table_size + THREAD_BLOCK_SIZE - 1) / THREAD_BLOCK_SIZE, 1, 1); // Initialize output aggregation column's validity mask - if(nullptr != out_aggregation_validity_mask) + const size_type num_masks = gdf_get_num_chars_bitmask(input_num_rows); + if(nullptr == *out_aggregation_validity_mask) { - const size_type num_masks = gdf_get_num_chars_bitmask(input_num_rows); - CUDA_TRY( cudaMemset(out_aggregation_validity_mask, 0, num_masks * sizeof(gdf_valid_type)) ); + CUDA_TRY( cudaMalloc(out_aggregation_validity_mask, num_masks * sizeof(gdf_valid_type)) ); } + CUDA_TRY( cudaMemset(*out_aggregation_validity_mask, 0, num_masks * sizeof(gdf_valid_type)) ); // Extracts every non-empty key and value into separate contiguous arrays, // which provides the result of the groupby operation @@ -211,7 +212,7 @@ gdf_error GroupbyHash(gdf_table const & groupby_input_table, groupby_output_table, groupby_input_table, out_aggregation_column, - out_aggregation_validity_mask, + *out_aggregation_validity_mask, global_write_index); CUDA_TRY(cudaGetLastError()); @@ -227,10 +228,25 @@ gdf_error GroupbyHash(gdf_table const & groupby_input_table, auto sorted_indices = groupby_output_table.sort(); thrust::device_vector agg(*out_size); thrust::gather(thrust::device, - sorted_indices.begin(), sorted_indices.end(), - out_aggregation_column, - agg.begin()); + sorted_indices.begin(), + sorted_indices.end(), + out_aggregation_column, + agg.begin()); thrust::copy(agg.begin(), agg.end(), out_aggregation_column); + + if(nullptr != *out_aggregation_validity_mask) + { + // Reorder the bit-validity mask of the aggregation output column + // according to the new sorted order + const size_type gather_grid_size = (*out_size + THREAD_BLOCK_SIZE - 1)/THREAD_BLOCK_SIZE; + const size_type num_masks = gdf_get_num_chars_bitmask(*out_size); + thrust::device_vector new_valid_mask(num_masks,0); + gather_valid_mask<<>>(*out_aggregation_validity_mask, + new_valid_mask.data().get(), + sorted_indices.data().get(), + *out_size); + thrust::copy(new_valid_mask.begin(), new_valid_mask.end(), *out_aggregation_validity_mask); + } } CUDA_TRY( cudaFree(hash_bucket_states) ); From 90f0c0c04148d219630d58d1f32124edf5403db1 Mon Sep 17 00:00:00 2001 From: Jake Hemstad Date: Thu, 20 Sep 2018 09:35:09 -0700 Subject: [PATCH 07/18] Fixed bug where the wrong bit was getting set in the aggregation output validity mask during the extraction kernel. --- src/groupby/hash/groupby_kernels.cuh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/groupby/hash/groupby_kernels.cuh b/src/groupby/hash/groupby_kernels.cuh index 5d8fb9fe..7174de26 100644 --- a/src/groupby/hash/groupby_kernels.cuh +++ b/src/groupby/hash/groupby_kernels.cuh @@ -242,8 +242,8 @@ __global__ void extract_groupby_result(const map_type * const __restrict__ the_m if(nullptr != aggregation_out_valid_mask) { uint32_t * valid_mask32 = reinterpret_cast(aggregation_out_valid_mask); - const uint32_t output_bit32 = (uint32_t(1) << (i % uint32_t(32))); - uint32_t * output_mask32 = &(valid_mask32[(i / uint32_t(32))]); + const uint32_t output_bit32 = (uint32_t(1) << (thread_write_index % uint32_t(32))); + uint32_t * output_mask32 = &(valid_mask32[(thread_write_index / uint32_t(32))]); atomicOr(output_mask32, output_bit32); } } From bd98d2eda090ca64778b5cc76170b4041c850159 Mon Sep 17 00:00:00 2001 From: Jake Hemstad Date: Thu, 20 Sep 2018 10:33:57 -0700 Subject: [PATCH 08/18] Corrected the AVG aggregator implementation so the validity bitmask of the AVG aggregator will get set correctly. --- src/groupby/groupby.cuh | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/groupby/groupby.cuh b/src/groupby/groupby.cuh index 872e28a9..9cc4b52d 100644 --- a/src/groupby/groupby.cuh +++ b/src/groupby/groupby.cuh @@ -280,6 +280,9 @@ gdf_column create_gdf_column(const size_t size) else if(std::is_same::value) gdf_col_type = GDF_FLOAT64; else assert(false && "Invalid type passed to create_gdf_column"); + const size_t num_masks = gdf_get_num_chars_bitmask(size); + cudaMalloc(&the_column.valid, num_masks * sizeof(gdf_valid_type)); + // Fill the gdf_column struct the_column.size = size; the_column.dtype = gdf_col_type; @@ -376,9 +379,19 @@ gdf_error multi_pass_avg(int ncols, default: return GDF_UNSUPPORTED_DTYPE; } + // Set the validity mask for the AVG output column, the validity + // mask will be the same as both the Count and Sum output validity masks + if(nullptr != out_aggregation_column->valid) + { + const size_t num_masks = gdf_get_num_chars_bitmask(sum_output.size); + CUDA_TRY(cudaMemcpy(out_aggregation_column->valid, sum_output.valid, num_masks * sizeof(gdf_valid_type), cudaMemcpyDeviceToDevice)); + } + // Free intermediate storage - cudaFree(count_output.data); - cudaFree(sum_output.data); + CUDA_TRY(cudaFree(count_output.data)); + CUDA_TRY(cudaFree(count_output.valid)); + CUDA_TRY(cudaFree(sum_output.data)); + CUDA_TRY(cudaFree(sum_output.valid)); return GDF_SUCCESS; } From c5fcaed5decbb8d473bfd0f250a408f1a55fd1be Mon Sep 17 00:00:00 2001 From: Jake Hemstad Date: Thu, 20 Sep 2018 11:26:55 -0700 Subject: [PATCH 09/18] Added check to ensure we don't try and sort the result if the output size is zero. --- src/groupby/hash/groupby_compute_api.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/groupby/hash/groupby_compute_api.h b/src/groupby/hash/groupby_compute_api.h index 55fcb1d6..f21597fe 100644 --- a/src/groupby/hash/groupby_compute_api.h +++ b/src/groupby/hash/groupby_compute_api.h @@ -224,7 +224,7 @@ gdf_error GroupbyHash(gdf_table const & groupby_input_table, groupby_output_table.set_column_length(*out_size); // Optionally sort the groupby/aggregation result columns - if(true == sort_result) { + if((*out_size > 0) && (true == sort_result)) { auto sorted_indices = groupby_output_table.sort(); thrust::device_vector agg(*out_size); thrust::gather(thrust::device, From 693895f4d4c3782e624c25caf369b0f436b85d9a Mon Sep 17 00:00:00 2001 From: Jake Hemstad Date: Thu, 20 Sep 2018 12:56:33 -0700 Subject: [PATCH 10/18] Return an error if the output aggregation column's nullmask isn't allocated. --- src/sqls_ops.cu | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/sqls_ops.cu b/src/sqls_ops.cu index be9893b6..c216bc35 100644 --- a/src/sqls_ops.cu +++ b/src/sqls_ops.cu @@ -1174,6 +1174,10 @@ gdf_error gdf_group_by_single(int ncols, // # columns } else if( ctxt->flag_method == GDF_HASH ) { + if(nullptr == out_col_agg->valid) + { + return GDF_DATASET_EMPTY; + } bool sort_result = false; From 9ba1fc9c8866d2946885c61119f3d4060e65deec Mon Sep 17 00:00:00 2001 From: Jake Hemstad Date: Thu, 20 Sep 2018 15:02:20 -0700 Subject: [PATCH 11/18] No longer return an error if the aggregation column's validity buffer isn't allocated. --- src/sqls_ops.cu | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/sqls_ops.cu b/src/sqls_ops.cu index c216bc35..8a5039cb 100644 --- a/src/sqls_ops.cu +++ b/src/sqls_ops.cu @@ -1174,11 +1174,6 @@ gdf_error gdf_group_by_single(int ncols, // # columns } else if( ctxt->flag_method == GDF_HASH ) { - if(nullptr == out_col_agg->valid) - { - return GDF_DATASET_EMPTY; - } - bool sort_result = false; if(1 == ctxt->flag_sort_result){ From 19ac0be28ca23e90830bb8fae4b5cf0662687257 Mon Sep 17 00:00:00 2001 From: Jake Hemstad Date: Thu, 20 Sep 2018 15:28:32 -0700 Subject: [PATCH 12/18] Updated the build_aggregation_kernel for Count such that the aggregation output can never be Null. Instead, an aggregation of all NULLs for Count will just return 0. --- src/groupby/groupby.cuh | 2 ++ src/groupby/hash/groupby_kernels.cuh | 22 +++++++--------------- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/src/groupby/groupby.cuh b/src/groupby/groupby.cuh index 9cc4b52d..41189048 100644 --- a/src/groupby/groupby.cuh +++ b/src/groupby/groupby.cuh @@ -319,6 +319,8 @@ void compute_average(gdf_column * avg_column, gdf_column const & count_column, g thrust::device_ptr d_counts = thrust::device_pointer_cast(static_cast(count_column.data)); thrust::device_ptr d_avg = thrust::device_pointer_cast(static_cast(avg_column->data)); + // TODO Should probably make sure the value in the Count column is valid + // otherwise we could end up with a divide by 0 auto average_op = [] __device__ (sum_type sum, size_t count)->avg_type { return (sum / static_cast(count)); }; // Computes the average into the passed in output buffer for the average column diff --git a/src/groupby/hash/groupby_kernels.cuh b/src/groupby/hash/groupby_kernels.cuh index 7174de26..bd683199 100644 --- a/src/groupby/hash/groupby_kernels.cuh +++ b/src/groupby/hash/groupby_kernels.cuh @@ -144,19 +144,13 @@ __global__ void build_aggregation_table(map_type * const __restrict__ the_map, // Hash the current row of the input table const auto row_hash = groupby_input_table.hash_row(i); - if(false == gdf_is_valid(aggregation_validitity_mask,i)) { - // If the aggregation value is NULL, and the hash bucket is empty, - // then set the state of the bucket to show that there is a NULL value for this key - // The casts are required to cast the enum type to a type supported by - // atomicCAS - // TODO Use a bitmask instead of a 32 bit flag for every bucket + // For COUNT, the aggregation result value can never be NULL, i.e., counting an + // aggregation column of all NULL should return 0. Therefore, insert the key + // only and set the state to VALID. Since the payload is initialized with 0, + // it will return 0 for a column of all nulls as expected const size_type insert_location = the_map->insert_key(i, the_comparator, true, row_hash); - - atomicCAS(reinterpret_cast(&hash_bucket_states[insert_location]), - static_cast(bucket_state::EMPTY), - static_cast(bucket_state::NULL_VALUE)); } else { @@ -171,12 +165,10 @@ __global__ void build_aggregation_table(map_type * const __restrict__ the_map, the_comparator, true, row_hash); - - // If it's not NULL, indicate that there is a valid value - // in this bucket - atomicExch(reinterpret_cast(&hash_bucket_states[insert_location]), - static_cast(bucket_state::VALID_VALUE)); } + + atomicExch(reinterpret_cast(&hash_bucket_states[insert_location]), + static_cast(bucket_state::VALID_VALUE)); } i += blockDim.x * gridDim.x; } From 39eaa45609d0235c61ba152a0330b016a38ef746 Mon Sep 17 00:00:00 2001 From: Jake Hemstad Date: Thu, 20 Sep 2018 15:31:39 -0700 Subject: [PATCH 13/18] Updated scope of the insert location variable in the Count overload of build aggregation table. --- src/groupby/hash/groupby_kernels.cuh | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/groupby/hash/groupby_kernels.cuh b/src/groupby/hash/groupby_kernels.cuh index bd683199..b36b698b 100644 --- a/src/groupby/hash/groupby_kernels.cuh +++ b/src/groupby/hash/groupby_kernels.cuh @@ -144,13 +144,14 @@ __global__ void build_aggregation_table(map_type * const __restrict__ the_map, // Hash the current row of the input table const auto row_hash = groupby_input_table.hash_row(i); + size_type insert_location{0}; if(false == gdf_is_valid(aggregation_validitity_mask,i)) { // For COUNT, the aggregation result value can never be NULL, i.e., counting an // aggregation column of all NULL should return 0. Therefore, insert the key // only and set the state to VALID. Since the payload is initialized with 0, // it will return 0 for a column of all nulls as expected - const size_type insert_location = the_map->insert_key(i, the_comparator, true, row_hash); + insert_location = the_map->insert_key(i, the_comparator, true, row_hash); } else { @@ -160,7 +161,7 @@ __global__ void build_aggregation_table(map_type * const __restrict__ the_map, // The rows at the current row index and the existing row index // will be compared for equality. If they are equal, the aggregation // operation is performed. - const size_type insert_location = the_map->insert(thrust::make_pair(i, static_cast(0)), + insert_location = the_map->insert(thrust::make_pair(i, static_cast(0)), op, the_comparator, true, From d47af7894990fb1a5df1bfd9bbefe5f471ba5dee Mon Sep 17 00:00:00 2001 From: Jake Hemstad Date: Mon, 24 Sep 2018 10:53:11 -0700 Subject: [PATCH 14/18] Removed turn_bit_on function as it's not used and not thread safe. --- include/gdf/utils.h | 9 --------- 1 file changed, 9 deletions(-) diff --git a/include/gdf/utils.h b/include/gdf/utils.h index 10aa0c3c..e5689131 100644 --- a/include/gdf/utils.h +++ b/include/gdf/utils.h @@ -17,14 +17,5 @@ inline gdf_size_type gdf_get_num_chars_bitmask(gdf_size_type size) return (( size + ( GDF_VALID_BITSIZE - 1)) / GDF_VALID_BITSIZE ); } -__host__ __device__ -inline void turn_bit_on(gdf_valid_type * masks, gdf_index_type pos) -{ - if(nullptr != masks) - { - masks[pos/8] |= (gdf_valid_type(1) << (pos % 8)); - } -} - #endif From 623d6299cd6c5f59d89e85e47127b324ef6fa6e1 Mon Sep 17 00:00:00 2001 From: Jake Hemstad Date: Mon, 24 Sep 2018 11:08:23 -0700 Subject: [PATCH 15/18] Updated documentation. --- src/groupby/hash/groupby_compute_api.h | 5 ++++- src/groupby/hash/groupby_kernels.cuh | 14 ++++++++------ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/groupby/hash/groupby_compute_api.h b/src/groupby/hash/groupby_compute_api.h index f21597fe..80c93e22 100644 --- a/src/groupby/hash/groupby_compute_api.h +++ b/src/groupby/hash/groupby_compute_api.h @@ -170,8 +170,11 @@ gdf_error GroupbyHash(gdf_table const & groupby_input_table, CUDA_TRY(cudaGetLastError()); - // Allocate an array to indicate the state of each bucket in the hash table + // There are 3 possible states: + // EMPTY: The bucket's payload is empty + // NULL_VALUD: The bucket's payload is a NULL + // VALID_VALUE: The bucket's payload is a valid value bucket_state * hash_bucket_states{nullptr}; CUDA_TRY( cudaMalloc(&hash_bucket_states, hash_table_size * sizeof(bucket_state)) ); CUDA_TRY( cudaMemset(hash_bucket_states, bucket_state::EMPTY, hash_table_size * sizeof(bucket_state)) ); diff --git a/src/groupby/hash/groupby_kernels.cuh b/src/groupby/hash/groupby_kernels.cuh index b36b698b..a3adf1e8 100644 --- a/src/groupby/hash/groupby_kernels.cuh +++ b/src/groupby/hash/groupby_kernels.cuh @@ -31,8 +31,8 @@ enum bucket_state : int { EMPTY = 0, /** Indicates that the hash bucket is empty */ - NULL_VALUE, /** Indicates that the bucket contains a NULL value */ - VALID_VALUE /** Indicates that the bucket contains a valid value */ + NULL_VALUE, /** Indicates that the bucket's payload contains a NULL value */ + VALID_VALUE /** Indicates that the bucket's payload contains a valid value */ }; using state_t = std::underlying_type::type; @@ -83,6 +83,8 @@ __global__ void build_aggregation_table(map_type * const __restrict__ the_map, if(false == gdf_is_valid(aggregation_validitity_mask,i)) { + // If the value in the aggregation column is NULL, only insert + // the key without a payload const size_type insert_location = the_map->insert_key(i, the_comparator, true, row_hash); // If the aggregation value is NULL, and the hash bucket is empty, @@ -107,8 +109,8 @@ __global__ void build_aggregation_table(map_type * const __restrict__ the_map, the_comparator, true, row_hash); - // If it's not NULL, indicate that there is a valid value - // in this bucket + + // Indicate that the payload for this hash bucket is valid atomicExch(reinterpret_cast(&hash_bucket_states[insert_location]), static_cast(bucket_state::VALID_VALUE)); } @@ -168,6 +170,7 @@ __global__ void build_aggregation_table(map_type * const __restrict__ the_map, row_hash); } + // Indicate that the payload for this hash bucket is valid atomicExch(reinterpret_cast(&hash_bucket_states[insert_location]), static_cast(bucket_state::VALID_VALUE)); } @@ -187,7 +190,6 @@ __global__ void build_aggregation_table(map_type * const __restrict__ the_map, * @Param global_write_index A variable in device global memory used to coordinate * where threads write their output * - * @Returns */ /* ----------------------------------------------------------------------------*/ template Date: Mon, 24 Sep 2018 11:09:26 -0700 Subject: [PATCH 16/18] Updated documentation. --- src/groupby/hash/groupby_kernels.cuh | 1 + 1 file changed, 1 insertion(+) diff --git a/src/groupby/hash/groupby_kernels.cuh b/src/groupby/hash/groupby_kernels.cuh index a3adf1e8..0fb15472 100644 --- a/src/groupby/hash/groupby_kernels.cuh +++ b/src/groupby/hash/groupby_kernels.cuh @@ -236,6 +236,7 @@ __global__ void extract_groupby_result(const map_type * const __restrict__ the_m // to a 32 bit type where atomics are supported if(nullptr != aggregation_out_valid_mask) { + // FIXME Replace with a standard `set_bit` function uint32_t * valid_mask32 = reinterpret_cast(aggregation_out_valid_mask); const uint32_t output_bit32 = (uint32_t(1) << (thread_write_index % uint32_t(32))); uint32_t * output_mask32 = &(valid_mask32[(thread_write_index / uint32_t(32))]); From 39a7764262d8cb170ac5e76a7dceacf18edee8a9 Mon Sep 17 00:00:00 2001 From: Jake Hemstad Date: Mon, 24 Sep 2018 11:14:15 -0700 Subject: [PATCH 17/18] Updated documentation. --- src/gdf_table.cuh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/gdf_table.cuh b/src/gdf_table.cuh index 1ed9dbe7..10ec9262 100644 --- a/src/gdf_table.cuh +++ b/src/gdf_table.cuh @@ -107,11 +107,13 @@ void scatter_valid_mask( gdf_valid_type const * const input_mask, while(row_number < num_rows) { // Get the bit corresponding to the row + // FIXME Replace with a standard `get_bit` function const mask_type input_bit = input_mask32[row_number/BITS_PER_MASK] & (static_cast(1) << (row_number % BITS_PER_MASK)); // Only scatter the input bit if it is valid if(input_bit > 0) { + // FIXME Replace with a standard `get_bit` function const size_type output_row = scatter_map[row_number]; // Set the according output bit const mask_type output_bit = static_cast(1) << (output_row % BITS_PER_MASK); @@ -161,11 +163,13 @@ void gather_valid_mask( gdf_valid_type const * const input_mask, const size_type gather_location = gather_map[row_number]; // Get the bit corresponding from the gathered row + // FIXME Replace with a standard `get_bit` function const mask_type input_bit = input_mask32[gather_location/BITS_PER_MASK] & (static_cast(1) << (gather_location % BITS_PER_MASK)); // Only set the output bit if the input is valid if(input_bit > 0) { + // FIXME Replace with a standard `set_bit` function // Construct the mask that sets the bit for the output row const mask_type output_bit = static_cast(1) << (row_number % BITS_PER_MASK); From c43a8c540dceaa95a2243f81afffc7bd071a1815 Mon Sep 17 00:00:00 2001 From: Jake Hemstad Date: Mon, 24 Sep 2018 16:22:52 -0700 Subject: [PATCH 18/18] Updated implementation of AVG aggregator to check the validity of the Count value before doing the division. --- include/gdf/utils.h | 2 +- src/groupby/groupby.cuh | 39 +++++++++++++++++++++++---------------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/include/gdf/utils.h b/include/gdf/utils.h index e5689131..ae191c95 100644 --- a/include/gdf/utils.h +++ b/include/gdf/utils.h @@ -6,7 +6,7 @@ __host__ __device__ static bool gdf_is_valid(const gdf_valid_type *valid, gdf_index_type pos) { - if ( valid ) + if ( nullptr != valid ) return (valid[pos / GDF_VALID_BITSIZE] >> (pos % GDF_VALID_BITSIZE)) & 1; else return true; diff --git a/src/groupby/groupby.cuh b/src/groupby/groupby.cuh index 41189048..b6dde45a 100644 --- a/src/groupby/groupby.cuh +++ b/src/groupby/groupby.cuh @@ -314,17 +314,23 @@ void compute_average(gdf_column * avg_column, gdf_column const & count_column, g { const size_t output_size = count_column.size; - // Wrap raw device pointers in thrust device ptrs to enable usage of thrust::transform - thrust::device_ptr d_sums = thrust::device_pointer_cast(static_cast(sum_column.data)); - thrust::device_ptr d_counts = thrust::device_pointer_cast(static_cast(count_column.data)); - thrust::device_ptr d_avg = thrust::device_pointer_cast(static_cast(avg_column->data)); - - // TODO Should probably make sure the value in the Count column is valid - // otherwise we could end up with a divide by 0 - auto average_op = [] __device__ (sum_type sum, size_t count)->avg_type { return (sum / static_cast(count)); }; + sum_type * d_sums = static_cast(sum_column.data); + size_t * d_counts = static_cast(count_column.data); + gdf_valid_type * count_valids = count_column.valid; + + auto average_op = [d_sums, d_counts, count_valids] __device__ (size_t index)->avg_type + { + if(false == gdf_is_valid(count_valids, index)){ + return static_cast(0); + } + else{ + return static_cast( d_sums[index] / d_counts[index] ); + } + }; // Computes the average into the passed in output buffer for the average column - thrust::transform(d_sums, d_sums + output_size, d_counts, d_avg, average_op); + thrust::device_ptr d_avg = thrust::device_pointer_cast(static_cast(avg_column->data)); + thrust::tabulate(d_avg, d_avg + output_size, average_op); // Update the size of the average column avg_column->size = output_size; @@ -369,6 +375,14 @@ gdf_error multi_pass_avg(int ncols, gdf_column sum_output = create_gdf_column(output_size); gdf_group_by_hash(ncols, in_groupby_columns, in_aggregation_column, out_groupby_columns, &sum_output, sort_result); + // Set the validity mask for the AVG output column, the validity + // mask will be the same as both the Count and Sum output validity masks + if(nullptr != out_aggregation_column->valid) + { + const size_t num_masks = gdf_get_num_chars_bitmask(count_output.size); + CUDA_TRY(cudaMemcpy(out_aggregation_column->valid, count_output.valid, num_masks * sizeof(gdf_valid_type), cudaMemcpyDeviceToDevice)); + } + // Compute the average from the Sum and Count columns and store into the passed in aggregation output buffer const gdf_dtype gdf_output_type = out_aggregation_column->dtype; switch(gdf_output_type){ @@ -381,13 +395,6 @@ gdf_error multi_pass_avg(int ncols, default: return GDF_UNSUPPORTED_DTYPE; } - // Set the validity mask for the AVG output column, the validity - // mask will be the same as both the Count and Sum output validity masks - if(nullptr != out_aggregation_column->valid) - { - const size_t num_masks = gdf_get_num_chars_bitmask(sum_output.size); - CUDA_TRY(cudaMemcpy(out_aggregation_column->valid, sum_output.valid, num_masks * sizeof(gdf_valid_type), cudaMemcpyDeviceToDevice)); - } // Free intermediate storage CUDA_TRY(cudaFree(count_output.data));