diff --git a/include/gdf/utils.h b/include/gdf/utils.h index 7b3f1bdc..ae191c95 100644 --- a/include/gdf/utils.h +++ b/include/gdf/utils.h @@ -6,12 +6,16 @@ __host__ __device__ static bool gdf_is_valid(const gdf_valid_type *valid, gdf_index_type pos) { - if ( valid ) + if ( nullptr != valid ) return (valid[pos / GDF_VALID_BITSIZE] >> (pos % GDF_VALID_BITSIZE)) & 1; else return true; } -inline gdf_size_type gdf_get_num_chars_bitmask(gdf_size_type size) { return (( size + ( GDF_VALID_BITSIZE - 1)) / GDF_VALID_BITSIZE ); } +inline gdf_size_type gdf_get_num_chars_bitmask(gdf_size_type size) +{ + return (( size + ( GDF_VALID_BITSIZE - 1)) / GDF_VALID_BITSIZE ); +} + #endif diff --git a/src/gdf_table.cuh b/src/gdf_table.cuh index 1a7560ed..10ec9262 100644 --- a/src/gdf_table.cuh +++ b/src/gdf_table.cuh @@ -74,6 +74,7 @@ struct row_masker gdf_valid_type ** column_valid_masks; }; + /* --------------------------------------------------------------------------*/ /** * @Synopsis Scatters a validity bitmask. @@ -106,11 +107,13 @@ void scatter_valid_mask( gdf_valid_type const * const input_mask, while(row_number < num_rows) { // Get the bit corresponding to the row + // FIXME Replace with a standard `get_bit` function const mask_type input_bit = input_mask32[row_number/BITS_PER_MASK] & (static_cast(1) << (row_number % BITS_PER_MASK)); // Only scatter the input bit if it is valid if(input_bit > 0) { + // FIXME Replace with a standard `get_bit` function const size_type output_row = scatter_map[row_number]; // Set the according output bit const mask_type output_bit = static_cast(1) << (output_row % BITS_PER_MASK); @@ -126,6 +129,62 @@ void scatter_valid_mask( gdf_valid_type const * const input_mask, } } +/* --------------------------------------------------------------------------*/ +/** + * @Synopsis Gathers a validity bitmask. + * + * This kernel is used in order to gather the validity bit mask for a gdf_column. + * + * @Param input_mask The mask that will be gathered. + * @Param output_mask The output after gathering the input + * @Param gather_map The map that indicates where elements from the input + will be gathered to in the output. output_bit[ gather_map [i] ] = input_bit[i] + * @Param num_rows The number of bits in the masks + */ +/* ----------------------------------------------------------------------------*/ +template +__global__ +void gather_valid_mask( gdf_valid_type const * const input_mask, + gdf_valid_type * const output_mask, + size_type const * const __restrict__ gather_map, + size_type const num_rows) +{ + using mask_type = uint32_t; + constexpr uint32_t BITS_PER_MASK = 8 * sizeof(mask_type); + + // Cast the validity type to a type where atomicOr is natively supported + const mask_type * __restrict__ input_mask32 = reinterpret_cast(input_mask); + mask_type * const __restrict__ output_mask32 = reinterpret_cast(output_mask); + + size_type row_number = threadIdx.x + blockIdx.x * blockDim.x; + + while(row_number < num_rows) + { + const size_type gather_location = gather_map[row_number]; + + // Get the bit corresponding from the gathered row + // FIXME Replace with a standard `get_bit` function + const mask_type input_bit = input_mask32[gather_location/BITS_PER_MASK] & (static_cast(1) << (gather_location % BITS_PER_MASK)); + + // Only set the output bit if the input is valid + if(input_bit > 0) + { + // FIXME Replace with a standard `set_bit` function + // Construct the mask that sets the bit for the output row + const mask_type output_bit = static_cast(1) << (row_number % BITS_PER_MASK); + + // Find the mask in the output that will hold the bit for output row + const size_type output_location = row_number / BITS_PER_MASK; + + // Bitwise OR to set the gathered row's bit + atomicOr(&output_mask32[output_location], output_bit); + } + + row_number += blockDim.x * gridDim.x; + } +} + + /* --------------------------------------------------------------------------*/ /** * @Synopsis A class provides useful functionality for operating on a set of gdf_columns. @@ -158,6 +217,7 @@ public: } + // Copy pointers to each column's data, types, and validity bitmasks // to the device as contiguous arrays device_columns_data.reserve(num_cols); @@ -173,7 +233,7 @@ public: { assert(nullptr != current_column->data); } - + // Compute the size of a row in the table in bytes int column_width_bytes{0}; if(GDF_SUCCESS == get_column_byte_width(current_column, &column_width_bytes)) @@ -254,13 +314,6 @@ public: return column_length; } - __device__ bool is_row_valid(size_type row_index) const - { - const bool row_valid = gdf_is_valid(d_row_valid, row_index); - - return row_valid; - } - /* --------------------------------------------------------------------------*/ /** @@ -276,6 +329,15 @@ public: } + + __device__ bool is_row_valid(size_type row_index) const + { + const bool row_valid = gdf_is_valid(d_row_valid, row_index); + + return row_valid; + } + + /* --------------------------------------------------------------------------*/ /** * @Synopsis Packs the elements of a specified row into a contiguous byte-buffer @@ -767,6 +829,8 @@ public: gdf_error gather(thrust::device_vector const & row_gather_map, gdf_table & gather_output_table) { + + // TODO Gather the bit validity masks for each column gdf_error gdf_status{GDF_SUCCESS}; // Each column can be gathered in parallel, therefore create a @@ -1152,7 +1216,9 @@ gdf_error scatter_column(column_type const * const __restrict__ input_column, } - const size_type num_columns; /** The number of columns in the table */ + + const size_type num_columns; /** The number of columns in the table */ + size_type column_length{0}; /** The number of rows in the table */ gdf_column ** host_columns{nullptr}; /** The set of gdf_columns that this table wraps */ diff --git a/src/groupby/groupby.cuh b/src/groupby/groupby.cuh index 9798860a..b6dde45a 100644 --- a/src/groupby/groupby.cuh +++ b/src/groupby/groupby.cuh @@ -58,8 +58,10 @@ gdf_error typed_groupby(gdf_table const & groupby_input_table, gdf_error gdf_error_code = GroupbyHash(groupby_input_table, in_agg_col, + in_aggregation_column->valid, groupby_output_table, out_agg_col, + &out_aggregation_column->valid, &output_size, op_type(), sort_result); @@ -278,6 +280,9 @@ gdf_column create_gdf_column(const size_t size) else if(std::is_same::value) gdf_col_type = GDF_FLOAT64; else assert(false && "Invalid type passed to create_gdf_column"); + const size_t num_masks = gdf_get_num_chars_bitmask(size); + cudaMalloc(&the_column.valid, num_masks * sizeof(gdf_valid_type)); + // Fill the gdf_column struct the_column.size = size; the_column.dtype = gdf_col_type; @@ -309,15 +314,23 @@ void compute_average(gdf_column * avg_column, gdf_column const & count_column, g { const size_t output_size = count_column.size; - // Wrap raw device pointers in thrust device ptrs to enable usage of thrust::transform - thrust::device_ptr d_sums = thrust::device_pointer_cast(static_cast(sum_column.data)); - thrust::device_ptr d_counts = thrust::device_pointer_cast(static_cast(count_column.data)); - thrust::device_ptr d_avg = thrust::device_pointer_cast(static_cast(avg_column->data)); + sum_type * d_sums = static_cast(sum_column.data); + size_t * d_counts = static_cast(count_column.data); + gdf_valid_type * count_valids = count_column.valid; - auto average_op = [] __device__ (sum_type sum, size_t count)->avg_type { return (sum / static_cast(count)); }; + auto average_op = [d_sums, d_counts, count_valids] __device__ (size_t index)->avg_type + { + if(false == gdf_is_valid(count_valids, index)){ + return static_cast(0); + } + else{ + return static_cast( d_sums[index] / d_counts[index] ); + } + }; // Computes the average into the passed in output buffer for the average column - thrust::transform(d_sums, d_sums + output_size, d_counts, d_avg, average_op); + thrust::device_ptr d_avg = thrust::device_pointer_cast(static_cast(avg_column->data)); + thrust::tabulate(d_avg, d_avg + output_size, average_op); // Update the size of the average column avg_column->size = output_size; @@ -362,6 +375,14 @@ gdf_error multi_pass_avg(int ncols, gdf_column sum_output = create_gdf_column(output_size); gdf_group_by_hash(ncols, in_groupby_columns, in_aggregation_column, out_groupby_columns, &sum_output, sort_result); + // Set the validity mask for the AVG output column, the validity + // mask will be the same as both the Count and Sum output validity masks + if(nullptr != out_aggregation_column->valid) + { + const size_t num_masks = gdf_get_num_chars_bitmask(count_output.size); + CUDA_TRY(cudaMemcpy(out_aggregation_column->valid, count_output.valid, num_masks * sizeof(gdf_valid_type), cudaMemcpyDeviceToDevice)); + } + // Compute the average from the Sum and Count columns and store into the passed in aggregation output buffer const gdf_dtype gdf_output_type = out_aggregation_column->dtype; switch(gdf_output_type){ @@ -374,9 +395,12 @@ gdf_error multi_pass_avg(int ncols, default: return GDF_UNSUPPORTED_DTYPE; } + // Free intermediate storage - cudaFree(count_output.data); - cudaFree(sum_output.data); + CUDA_TRY(cudaFree(count_output.data)); + CUDA_TRY(cudaFree(count_output.valid)); + CUDA_TRY(cudaFree(sum_output.data)); + CUDA_TRY(cudaFree(sum_output.valid)); return GDF_SUCCESS; } diff --git a/src/groupby/hash/groupby_compute_api.h b/src/groupby/hash/groupby_compute_api.h index 7ecb2a1e..80c93e22 100644 --- a/src/groupby/hash/groupby_compute_api.h +++ b/src/groupby/hash/groupby_compute_api.h @@ -138,8 +138,10 @@ template< typename aggregation_type, typename aggregation_operation> gdf_error GroupbyHash(gdf_table const & groupby_input_table, const aggregation_type * const in_aggregation_column, + gdf_valid_type const * const in_aggregation_validity_mask, gdf_table & groupby_output_table, aggregation_type * out_aggregation_column, + gdf_valid_type ** out_aggregation_validity_mask, size_type * out_size, aggregation_operation aggregation_op, bool sort_result = false) @@ -168,12 +170,23 @@ gdf_error GroupbyHash(gdf_table const & groupby_input_table, CUDA_TRY(cudaGetLastError()); + // Allocate an array to indicate the state of each bucket in the hash table + // There are 3 possible states: + // EMPTY: The bucket's payload is empty + // NULL_VALUD: The bucket's payload is a NULL + // VALID_VALUE: The bucket's payload is a valid value + bucket_state * hash_bucket_states{nullptr}; + CUDA_TRY( cudaMalloc(&hash_bucket_states, hash_table_size * sizeof(bucket_state)) ); + CUDA_TRY( cudaMemset(hash_bucket_states, bucket_state::EMPTY, hash_table_size * sizeof(bucket_state)) ); + // Inserts (i, aggregation_column[i]) as a key-value pair into the // hash table. When a given key already exists in the table, the aggregation operation // is computed between the new and existing value, and the result is stored back. build_aggregation_table<<>>(the_map.get(), groupby_input_table, in_aggregation_column, + in_aggregation_validity_mask, + hash_bucket_states, input_num_rows, aggregation_op, row_comparator(*the_map, groupby_input_table, groupby_input_table)); @@ -186,13 +199,23 @@ gdf_error GroupbyHash(gdf_table const & groupby_input_table, const dim3 extract_grid_size ((hash_table_size + THREAD_BLOCK_SIZE - 1) / THREAD_BLOCK_SIZE, 1, 1); + // Initialize output aggregation column's validity mask + const size_type num_masks = gdf_get_num_chars_bitmask(input_num_rows); + if(nullptr == *out_aggregation_validity_mask) + { + CUDA_TRY( cudaMalloc(out_aggregation_validity_mask, num_masks * sizeof(gdf_valid_type)) ); + } + CUDA_TRY( cudaMemset(*out_aggregation_validity_mask, 0, num_masks * sizeof(gdf_valid_type)) ); + // Extracts every non-empty key and value into separate contiguous arrays, // which provides the result of the groupby operation extract_groupby_result<<>>(the_map.get(), hash_table_size, + hash_bucket_states, groupby_output_table, groupby_input_table, out_aggregation_column, + *out_aggregation_validity_mask, global_write_index); CUDA_TRY(cudaGetLastError()); @@ -204,16 +227,33 @@ gdf_error GroupbyHash(gdf_table const & groupby_input_table, groupby_output_table.set_column_length(*out_size); // Optionally sort the groupby/aggregation result columns - if(true == sort_result) { + if((*out_size > 0) && (true == sort_result)) { auto sorted_indices = groupby_output_table.sort(); thrust::device_vector agg(*out_size); thrust::gather(thrust::device, - sorted_indices.begin(), sorted_indices.end(), - out_aggregation_column, - agg.begin()); + sorted_indices.begin(), + sorted_indices.end(), + out_aggregation_column, + agg.begin()); thrust::copy(agg.begin(), agg.end(), out_aggregation_column); + + if(nullptr != *out_aggregation_validity_mask) + { + // Reorder the bit-validity mask of the aggregation output column + // according to the new sorted order + const size_type gather_grid_size = (*out_size + THREAD_BLOCK_SIZE - 1)/THREAD_BLOCK_SIZE; + const size_type num_masks = gdf_get_num_chars_bitmask(*out_size); + thrust::device_vector new_valid_mask(num_masks,0); + gather_valid_mask<<>>(*out_aggregation_validity_mask, + new_valid_mask.data().get(), + sorted_indices.data().get(), + *out_size); + thrust::copy(new_valid_mask.begin(), new_valid_mask.end(), *out_aggregation_validity_mask); + } } + CUDA_TRY( cudaFree(hash_bucket_states) ); + return GDF_SUCCESS; } #endif diff --git a/src/groupby/hash/groupby_kernels.cuh b/src/groupby/hash/groupby_kernels.cuh index 4aae7d33..0fb15472 100644 --- a/src/groupby/hash/groupby_kernels.cuh +++ b/src/groupby/hash/groupby_kernels.cuh @@ -17,10 +17,25 @@ #ifndef GROUPBY_KERNELS_H #define GROUPBY_KERNELS_H +#include #include "../../hashmap/concurrent_unordered_map.cuh" #include "aggregation_operations.cuh" #include "../../gdf_table.cuh" + +/* --------------------------------------------------------------------------*/ +/** + * @Synopsis Indicates the state of a hash bucket in the hash table. + */ +/* ----------------------------------------------------------------------------*/ +enum bucket_state : int +{ + EMPTY = 0, /** Indicates that the hash bucket is empty */ + NULL_VALUE, /** Indicates that the bucket's payload contains a NULL value */ + VALID_VALUE /** Indicates that the bucket's payload contains a valid value */ +}; +using state_t = std::underlying_type::type; + /* --------------------------------------------------------------------------*/ /** * @Synopsis Takes in two columns of equal length. One column to groupby and the @@ -47,27 +62,59 @@ template const & groupby_input_table, const aggregation_type * const __restrict__ aggregation_column, + gdf_valid_type const * const __restrict__ aggregation_validitity_mask, + bucket_state * const __restrict__ hash_bucket_states, size_type column_size, aggregation_operation op, row_comparator the_comparator) { size_type i = threadIdx.x + blockIdx.x * blockDim.x; - while( i < column_size ){ + const auto map_start = the_map->begin(); - // Hash the current row of the input table - const auto row_hash = groupby_input_table.hash_row(i); + while( i < column_size ){ - // Attempt to insert the current row's index. - // The hash value of the row will determine the write location. - // The rows at the current row index and the existing row index - // will be compared for equality. If they are equal, the aggregation - // operation is performed. - the_map->insert(thrust::make_pair(i, aggregation_column[i]), - op, - the_comparator, - true, - row_hash); + // Only insert into the hash table if the row is valid + if( true == groupby_input_table.is_row_valid(i) ) + { + + // Hash the current row of the input table + const auto row_hash = groupby_input_table.hash_row(i); + + if(false == gdf_is_valid(aggregation_validitity_mask,i)) + { + // If the value in the aggregation column is NULL, only insert + // the key without a payload + const size_type insert_location = the_map->insert_key(i, the_comparator, true, row_hash); + + // If the aggregation value is NULL, and the hash bucket is empty, + // then set the state of the bucket to show that there is a NULL value for this key + // The casts are required to cast the enum type to a type supported by + // atomicCAS + // TODO Use a bitmask instead of a 32 bit flag for every bucket + atomicCAS(reinterpret_cast(&hash_bucket_states[insert_location]), + static_cast(bucket_state::EMPTY), + static_cast(bucket_state::NULL_VALUE)); + } + else + { + + // Attempt to insert the current row's index. + // The hash value of the row will determine the write location. + // The rows at the current row index and the existing row index + // will be compared for equality. If they are equal, the aggregation + // operation is performed. + const size_type insert_location = the_map->insert(thrust::make_pair(i, aggregation_column[i]), + op, + the_comparator, + true, + row_hash); + + // Indicate that the payload for this hash bucket is valid + atomicExch(reinterpret_cast(&hash_bucket_states[insert_location]), + static_cast(bucket_state::VALID_VALUE)); + } + } i += blockDim.x * gridDim.x; } @@ -81,28 +128,52 @@ template const & groupby_input_table, const aggregation_type * const __restrict__ aggregation_column, + gdf_valid_type const * const __restrict__ aggregation_validitity_mask, + bucket_state * const __restrict__ hash_bucket_states, size_type column_size, count_op op, row_comparator the_comparator) { - size_type i = threadIdx.x + blockIdx.x * blockDim.x; - // Hash the current row of the input table - const auto row_hash = groupby_input_table.hash_row(i); + auto map_start = the_map->begin(); + size_type i = threadIdx.x + blockIdx.x * blockDim.x; while( i < column_size ){ - // When the aggregator is COUNT, ignore the aggregation column and just insert '0' - // Attempt to insert the current row's index. - // The hash value of the row will determine the write location. - // The rows at the current row index and the existing row index - // will be compared for equality. If they are equal, the aggregation - // operation is performed. - the_map->insert(thrust::make_pair(i, static_cast(0)), - op, - the_comparator, - true, - row_hash); + // Only insert into the hash table if the the row is valid + if(groupby_input_table.is_row_valid(i) ) + { + // Hash the current row of the input table + const auto row_hash = groupby_input_table.hash_row(i); + + size_type insert_location{0}; + if(false == gdf_is_valid(aggregation_validitity_mask,i)) + { + // For COUNT, the aggregation result value can never be NULL, i.e., counting an + // aggregation column of all NULL should return 0. Therefore, insert the key + // only and set the state to VALID. Since the payload is initialized with 0, + // it will return 0 for a column of all nulls as expected + insert_location = the_map->insert_key(i, the_comparator, true, row_hash); + } + else + { + // When the aggregator is COUNT, ignore the aggregation column and just insert '0' + // Attempt to insert the current row's index. + // The hash value of the row will determine the write location. + // The rows at the current row index and the existing row index + // will be compared for equality. If they are equal, the aggregation + // operation is performed. + insert_location = the_map->insert(thrust::make_pair(i, static_cast(0)), + op, + the_comparator, + true, + row_hash); + } + + // Indicate that the payload for this hash bucket is valid + atomicExch(reinterpret_cast(&hash_bucket_states[insert_location]), + static_cast(bucket_state::VALID_VALUE)); + } i += blockDim.x * gridDim.x; } } @@ -119,7 +190,6 @@ __global__ void build_aggregation_table(map_type * const __restrict__ the_map, * @Param global_write_index A variable in device global memory used to coordinate * where threads write their output * - * @Returns */ /* ----------------------------------------------------------------------------*/ template __global__ void extract_groupby_result(const map_type * const __restrict__ the_map, const size_type map_size, + const bucket_state * const __restrict__ hash_bucket_states, gdf_table & groupby_output_table, gdf_table const & groupby_input_table, aggregation_type * const __restrict__ aggregation_out_column, + gdf_valid_type * aggregation_out_valid_mask, size_type * const global_write_index) { size_type i = threadIdx.x + blockIdx.x * blockDim.x; - constexpr typename map_type::key_type unused_key{map_type::get_unused_key()}; - const typename map_type::value_type * const __restrict__ hashtabl_values = the_map->data(); // TODO: Use _shared_ thread block cache for writing temporary ouputs and then // write to the global output while(i < map_size){ - const typename map_type::key_type current_key = hashtabl_values[i].first; + const bucket_state current_state = hash_bucket_states[i]; - if( current_key != unused_key){ + // If the hash bucket isn't empty, then we need to add it to the output + if(bucket_state::EMPTY != current_state) + { + const typename map_type::key_type output_row = hashtabl_values[i].first; const size_type thread_write_index = atomicAdd(global_write_index, 1); - // Copy the row at current_key from the input table to the row at + // Copy the row from the input table to the row at // thread_write_index in the output table groupby_output_table.copy_row(groupby_input_table, thread_write_index, - current_key); - - aggregation_out_column[thread_write_index] = hashtabl_values[i].second; + output_row); + + // If this bucket holds a valid aggregation value, copy it to the + // aggregation output and set it's validity bit + if( bucket_state::NULL_VALUE != current_state ) + { + aggregation_out_column[thread_write_index] = hashtabl_values[i].second; + + // Set the valid bit for this row. Need to cast the valid mask type + // to a 32 bit type where atomics are supported + if(nullptr != aggregation_out_valid_mask) + { + // FIXME Replace with a standard `set_bit` function + uint32_t * valid_mask32 = reinterpret_cast(aggregation_out_valid_mask); + const uint32_t output_bit32 = (uint32_t(1) << (thread_write_index % uint32_t(32))); + uint32_t * output_mask32 = &(valid_mask32[(thread_write_index / uint32_t(32))]); + atomicOr(output_mask32, output_bit32); + } + } } + i += gridDim.x * blockDim.x; } } diff --git a/src/hashmap/concurrent_unordered_map.cuh b/src/hashmap/concurrent_unordered_map.cuh index 7b68ae37..3e4d8700 100644 --- a/src/hashmap/concurrent_unordered_map.cuh +++ b/src/hashmap/concurrent_unordered_map.cuh @@ -226,6 +226,12 @@ public: __host__ __device__ explicit cycle_iterator_adapter( const iterator_type& begin, const iterator_type& end, const iterator_type& current ) : m_begin( begin ), m_end( end ), m_current( current ) {} + + + template + __device__ + friend size_t operator-(const cycle_iterator_adapter& lhs, const cycle_iterator_adapter& rhs); + __host__ __device__ cycle_iterator_adapter& operator++() { @@ -308,6 +314,13 @@ __host__ __device__ bool operator!=(const cycle_iterator_adapter& lhs, const return !lhs.equal(rhs); } +template +__device__ +size_t operator-(const cycle_iterator_adapter& lhs, const cycle_iterator_adapter& rhs) +{ + return lhs.m_current - rhs.m_current; +} + /** * Does support concurrent insert, but not concurrent insert and probping. * @@ -481,7 +494,7 @@ public: class comparison_type = key_equal, typename hash_value_type = typename Hasher::result_type> __forceinline__ - __device__ iterator insert(const value_type& x, + __device__ size_type insert(const value_type& x, aggregation_type op, comparison_type keys_equal = key_equal(), bool precomputed_hash = false, @@ -510,6 +523,7 @@ public: const key_type insert_key = x.first; bool insert_success = false; + size_type insert_location{0}; while (false == insert_success) { @@ -533,13 +547,69 @@ public: update_existing_value(existing_value, x, op); insert_success = true; + insert_location = current_index; + } + + current_index = (current_index+1)%hashtbl_size; + current_hash_bucket = &(hashtbl_values[current_index]); + } + + //return iterator( m_hashtbl_values,m_hashtbl_values+hashtbl_size, current_hash_bucket); + return insert_location; + } + + template + __forceinline__ + __device__ size_type insert_key(const key_type & insert_key, + comparison_type keys_equal = key_equal(), + bool precomputed_hash = false, + hash_value_type precomputed_hash_value = 0) + { + const size_type hashtbl_size = m_hashtbl_size; + value_type* hashtbl_values = m_hashtbl_values; + + hash_value_type hash_value{0}; + + // If a precomputed hash value has been passed in, then use it to determine + // the write location of the new key + if(true == precomputed_hash) + { + hash_value = precomputed_hash_value; + } + // Otherwise, compute the hash value from the new key + else + { + hash_value = m_hf(insert_key); + } + + size_type current_index = hash_value % hashtbl_size; + value_type *current_hash_bucket = &(hashtbl_values[current_index]); + + bool insert_success = false; + size_type insert_location{0}; + + while (false == insert_success) { + key_type& existing_key = current_hash_bucket->first; + + // Try and set the existing_key for the current hash bucket to insert_key + const key_type old_key = atomicCAS( &existing_key, unused_key, insert_key); + + // If old_key == unused_key, the current hash bucket was empty + // and existing_key was updated to insert_key by the atomicCAS. + // If old_key == insert_key, this key has already been inserted. + if ( keys_equal( unused_key, old_key ) || keys_equal(insert_key, old_key) ) { + insert_success = true; + insert_location = current_index; } current_index = (current_index+1)%hashtbl_size; current_hash_bucket = &(hashtbl_values[current_index]); } - return iterator( m_hashtbl_values,m_hashtbl_values+hashtbl_size, current_hash_bucket); + //return iterator( m_hashtbl_values,m_hashtbl_values+hashtbl_size, current_hash_bucket); + // Return the index where the insert occured + return insert_location; } /* This function is not currently implemented diff --git a/src/sqls_ops.cu b/src/sqls_ops.cu index be9893b6..8a5039cb 100644 --- a/src/sqls_ops.cu +++ b/src/sqls_ops.cu @@ -1174,7 +1174,6 @@ gdf_error gdf_group_by_single(int ncols, // # columns } else if( ctxt->flag_method == GDF_HASH ) { - bool sort_result = false; if(1 == ctxt->flag_sort_result){