Skip to content
This repository was archived by the owner on Dec 21, 2018. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1164a2b
Merge branch 'fea-ext-null-support-hash-joins' into fea-ext-null-supo…
jrhemstad Sep 15, 2018
e6d6a2a
Implemented null support for hash-based groupbys.
jrhemstad Sep 15, 2018
76f1322
Merge branch 'master' into fea-ext-null-suport-hash-groupby
jrhemstad Sep 15, 2018
a273f87
Merge remote-tracking branch 'public/master' into fea-ext-null-suport…
jrhemstad Sep 17, 2018
ad7eb95
Merge branch 'fea-ext-null-suport-hash-groupby' of github.com:jrhemst…
jrhemstad Sep 17, 2018
3c6c85b
Added checking for NULLs in the aggregation column.
jrhemstad Sep 19, 2018
535a777
Added function to set a specified bit in a validity bit mask.
jrhemstad Sep 20, 2018
b7db9eb
Updated handling of NULLs in the aggregation column such that NULLs a…
jrhemstad Sep 20, 2018
c904f47
Updated setting the validity bits in the aggregation output to use at…
jrhemstad Sep 20, 2018
94d1e8c
Merge remote-tracking branch 'origin/master' into fea-ext-null-suport…
jrhemstad Sep 20, 2018
e4980f6
Added kernel to gather the validity bits of a gdf_column. Used the ke…
jrhemstad Sep 20, 2018
90f0c0c
Fixed bug where the wrong bit was getting set in the aggregation outp…
jrhemstad Sep 20, 2018
bd98d2e
Corrected the AVG aggregator implementation so the validity bitmask o…
jrhemstad Sep 20, 2018
c5fcaed
Added check to ensure we don't try and sort the result if the output …
jrhemstad Sep 20, 2018
693895f
Return an error if the output aggregation column's nullmask isn't all…
jrhemstad Sep 20, 2018
9ba1fc9
No longer return an error if the aggregation column's validity buffer…
jrhemstad Sep 20, 2018
19ac0be
Updated the build_aggregation_kernel for Count such that the aggregat…
jrhemstad Sep 20, 2018
39eaa45
Updated scope of the insert location variable in the Count overload o…
jrhemstad Sep 20, 2018
d47af78
Removed turn_bit_on function as it's not used and not thread safe.
jrhemstad Sep 24, 2018
623d629
Updated documentation.
jrhemstad Sep 24, 2018
2ac5493
Updated documentation.
jrhemstad Sep 24, 2018
39a7764
Updated documentation.
jrhemstad Sep 24, 2018
c43a8c5
Updated implementation of AVG aggregator to check the validity of the…
jrhemstad Sep 24, 2018
4ee4f9d
Merge remote-tracking branch 'public/master' into fea-ext-null-suport…
jrhemstad Sep 25, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions include/gdf/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
__host__ __device__
static
bool gdf_is_valid(const gdf_valid_type *valid, gdf_index_type pos) {
if ( valid )
if ( nullptr != valid )
return (valid[pos / GDF_VALID_BITSIZE] >> (pos % GDF_VALID_BITSIZE)) & 1;
else
return true;
}

inline gdf_size_type gdf_get_num_chars_bitmask(gdf_size_type size) { return (( size + ( GDF_VALID_BITSIZE - 1)) / GDF_VALID_BITSIZE ); }
inline gdf_size_type gdf_get_num_chars_bitmask(gdf_size_type size)
{
return (( size + ( GDF_VALID_BITSIZE - 1)) / GDF_VALID_BITSIZE );
}


#endif
84 changes: 75 additions & 9 deletions src/gdf_table.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ struct row_masker
gdf_valid_type ** column_valid_masks;
};


/* --------------------------------------------------------------------------*/
/**
* @Synopsis Scatters a validity bitmask.
Expand Down Expand Up @@ -106,11 +107,13 @@ void scatter_valid_mask( gdf_valid_type const * const input_mask,
while(row_number < num_rows)
{
// Get the bit corresponding to the row
// FIXME Replace with a standard `get_bit` function
const mask_type input_bit = input_mask32[row_number/BITS_PER_MASK] & (static_cast<mask_type>(1) << (row_number % BITS_PER_MASK));

// Only scatter the input bit if it is valid
if(input_bit > 0)
{
// FIXME Replace with a standard `get_bit` function
const size_type output_row = scatter_map[row_number];
// Set the according output bit
const mask_type output_bit = static_cast<mask_type>(1) << (output_row % BITS_PER_MASK);
Expand All @@ -126,6 +129,62 @@ void scatter_valid_mask( gdf_valid_type const * const input_mask,
}
}

/* --------------------------------------------------------------------------*/
/**
* @Synopsis Gathers a validity bitmask.
*
* This kernel is used in order to gather the validity bit mask for a gdf_column.
*
* @Param input_mask The mask that will be gathered.
* @Param output_mask The output after gathering the input
* @Param gather_map The map that indicates where elements from the input
will be gathered to in the output. output_bit[ gather_map [i] ] = input_bit[i]
* @Param num_rows The number of bits in the masks
*/
/* ----------------------------------------------------------------------------*/
template <typename size_type>
__global__
void gather_valid_mask( gdf_valid_type const * const input_mask,
gdf_valid_type * const output_mask,
size_type const * const __restrict__ gather_map,
size_type const num_rows)
{
using mask_type = uint32_t;
constexpr uint32_t BITS_PER_MASK = 8 * sizeof(mask_type);

// Cast the validity type to a type where atomicOr is natively supported
const mask_type * __restrict__ input_mask32 = reinterpret_cast<mask_type const *>(input_mask);
mask_type * const __restrict__ output_mask32 = reinterpret_cast<mask_type * >(output_mask);

size_type row_number = threadIdx.x + blockIdx.x * blockDim.x;

while(row_number < num_rows)
{
const size_type gather_location = gather_map[row_number];

// Get the bit corresponding from the gathered row
// FIXME Replace with a standard `get_bit` function
const mask_type input_bit = input_mask32[gather_location/BITS_PER_MASK] & (static_cast<mask_type>(1) << (gather_location % BITS_PER_MASK));

// Only set the output bit if the input is valid
if(input_bit > 0)
{
// FIXME Replace with a standard `set_bit` function
// Construct the mask that sets the bit for the output row
const mask_type output_bit = static_cast<mask_type>(1) << (row_number % BITS_PER_MASK);

// Find the mask in the output that will hold the bit for output row
const size_type output_location = row_number / BITS_PER_MASK;

// Bitwise OR to set the gathered row's bit
atomicOr(&output_mask32[output_location], output_bit);
}

row_number += blockDim.x * gridDim.x;
}
}


/* --------------------------------------------------------------------------*/
/**
* @Synopsis A class provides useful functionality for operating on a set of gdf_columns.
Expand Down Expand Up @@ -158,6 +217,7 @@ public:
}



// Copy pointers to each column's data, types, and validity bitmasks
// to the device as contiguous arrays
device_columns_data.reserve(num_cols);
Expand All @@ -173,7 +233,7 @@ public:
{
assert(nullptr != current_column->data);
}

// Compute the size of a row in the table in bytes
int column_width_bytes{0};
if(GDF_SUCCESS == get_column_byte_width(current_column, &column_width_bytes))
Expand Down Expand Up @@ -254,13 +314,6 @@ public:
return column_length;
}

__device__ bool is_row_valid(size_type row_index) const
{
const bool row_valid = gdf_is_valid(d_row_valid, row_index);

return row_valid;
}


/* --------------------------------------------------------------------------*/
/**
Expand All @@ -276,6 +329,15 @@ public:
}



__device__ bool is_row_valid(size_type row_index) const
{
const bool row_valid = gdf_is_valid(d_row_valid, row_index);

return row_valid;
}


/* --------------------------------------------------------------------------*/
/**
* @Synopsis Packs the elements of a specified row into a contiguous byte-buffer
Expand Down Expand Up @@ -767,6 +829,8 @@ public:
gdf_error gather(thrust::device_vector<size_type> const & row_gather_map,
gdf_table<size_type> & gather_output_table)
{

// TODO Gather the bit validity masks for each column
gdf_error gdf_status{GDF_SUCCESS};

// Each column can be gathered in parallel, therefore create a
Expand Down Expand Up @@ -1152,7 +1216,9 @@ gdf_error scatter_column(column_type const * const __restrict__ input_column,
}


const size_type num_columns; /** The number of columns in the table */

const size_type num_columns; /** The number of columns in the table */

size_type column_length{0}; /** The number of rows in the table */

gdf_column ** host_columns{nullptr}; /** The set of gdf_columns that this table wraps */
Expand Down
40 changes: 32 additions & 8 deletions src/groupby/groupby.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ gdf_error typed_groupby(gdf_table<size_type> const & groupby_input_table,

gdf_error gdf_error_code = GroupbyHash(groupby_input_table,
in_agg_col,
in_aggregation_column->valid,
groupby_output_table,
out_agg_col,
&out_aggregation_column->valid,
&output_size,
op_type(),
sort_result);
Expand Down Expand Up @@ -278,6 +280,9 @@ gdf_column create_gdf_column(const size_t size)
else if(std::is_same<col_type,double>::value) gdf_col_type = GDF_FLOAT64;
else assert(false && "Invalid type passed to create_gdf_column");

const size_t num_masks = gdf_get_num_chars_bitmask(size);
cudaMalloc(&the_column.valid, num_masks * sizeof(gdf_valid_type));

// Fill the gdf_column struct
the_column.size = size;
the_column.dtype = gdf_col_type;
Expand Down Expand Up @@ -309,15 +314,23 @@ void compute_average(gdf_column * avg_column, gdf_column const & count_column, g
{
const size_t output_size = count_column.size;

// Wrap raw device pointers in thrust device ptrs to enable usage of thrust::transform
thrust::device_ptr<sum_type> d_sums = thrust::device_pointer_cast(static_cast<sum_type*>(sum_column.data));
thrust::device_ptr<size_t> d_counts = thrust::device_pointer_cast(static_cast<size_t*>(count_column.data));
thrust::device_ptr<avg_type> d_avg = thrust::device_pointer_cast(static_cast<avg_type*>(avg_column->data));
sum_type * d_sums = static_cast<sum_type*>(sum_column.data);
size_t * d_counts = static_cast<size_t*>(count_column.data);
gdf_valid_type * count_valids = count_column.valid;

auto average_op = [] __device__ (sum_type sum, size_t count)->avg_type { return (sum / static_cast<avg_type>(count)); };
auto average_op = [d_sums, d_counts, count_valids] __device__ (size_t index)->avg_type
{
if(false == gdf_is_valid(count_valids, index)){
return static_cast<avg_type>(0);
}
else{
return static_cast<avg_type>( d_sums[index] / d_counts[index] );
}
};

// Computes the average into the passed in output buffer for the average column
thrust::transform(d_sums, d_sums + output_size, d_counts, d_avg, average_op);
thrust::device_ptr<avg_type> d_avg = thrust::device_pointer_cast(static_cast<avg_type*>(avg_column->data));
thrust::tabulate(d_avg, d_avg + output_size, average_op);

// Update the size of the average column
avg_column->size = output_size;
Expand Down Expand Up @@ -362,6 +375,14 @@ gdf_error multi_pass_avg(int ncols,
gdf_column sum_output = create_gdf_column<sum_type>(output_size);
gdf_group_by_hash<sum_op>(ncols, in_groupby_columns, in_aggregation_column, out_groupby_columns, &sum_output, sort_result);

// Set the validity mask for the AVG output column, the validity
// mask will be the same as both the Count and Sum output validity masks
if(nullptr != out_aggregation_column->valid)
{
const size_t num_masks = gdf_get_num_chars_bitmask(count_output.size);
CUDA_TRY(cudaMemcpy(out_aggregation_column->valid, count_output.valid, num_masks * sizeof(gdf_valid_type), cudaMemcpyDeviceToDevice));
}

// Compute the average from the Sum and Count columns and store into the passed in aggregation output buffer
const gdf_dtype gdf_output_type = out_aggregation_column->dtype;
switch(gdf_output_type){
Expand All @@ -374,9 +395,12 @@ gdf_error multi_pass_avg(int ncols,
default: return GDF_UNSUPPORTED_DTYPE;
}


// Free intermediate storage
cudaFree(count_output.data);
cudaFree(sum_output.data);
CUDA_TRY(cudaFree(count_output.data));
CUDA_TRY(cudaFree(count_output.valid));
CUDA_TRY(cudaFree(sum_output.data));
CUDA_TRY(cudaFree(sum_output.valid));

return GDF_SUCCESS;
}
Expand Down
48 changes: 44 additions & 4 deletions src/groupby/hash/groupby_compute_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,10 @@ template< typename aggregation_type,
typename aggregation_operation>
gdf_error GroupbyHash(gdf_table<size_type> const & groupby_input_table,
const aggregation_type * const in_aggregation_column,
gdf_valid_type const * const in_aggregation_validity_mask,
gdf_table<size_type> & groupby_output_table,
aggregation_type * out_aggregation_column,
gdf_valid_type ** out_aggregation_validity_mask,
size_type * out_size,
aggregation_operation aggregation_op,
bool sort_result = false)
Expand Down Expand Up @@ -168,12 +170,23 @@ gdf_error GroupbyHash(gdf_table<size_type> const & groupby_input_table,

CUDA_TRY(cudaGetLastError());

// Allocate an array to indicate the state of each bucket in the hash table
// There are 3 possible states:
// EMPTY: The bucket's payload is empty
// NULL_VALUD: The bucket's payload is a NULL
// VALID_VALUE: The bucket's payload is a valid value
bucket_state * hash_bucket_states{nullptr};
CUDA_TRY( cudaMalloc(&hash_bucket_states, hash_table_size * sizeof(bucket_state)) );
CUDA_TRY( cudaMemset(hash_bucket_states, bucket_state::EMPTY, hash_table_size * sizeof(bucket_state)) );

// Inserts (i, aggregation_column[i]) as a key-value pair into the
// hash table. When a given key already exists in the table, the aggregation operation
// is computed between the new and existing value, and the result is stored back.
build_aggregation_table<<<build_grid_size, block_size>>>(the_map.get(),
groupby_input_table,
in_aggregation_column,
in_aggregation_validity_mask,
hash_bucket_states,
input_num_rows,
aggregation_op,
row_comparator<map_type, size_type>(*the_map, groupby_input_table, groupby_input_table));
Expand All @@ -186,13 +199,23 @@ gdf_error GroupbyHash(gdf_table<size_type> const & groupby_input_table,

const dim3 extract_grid_size ((hash_table_size + THREAD_BLOCK_SIZE - 1) / THREAD_BLOCK_SIZE, 1, 1);

// Initialize output aggregation column's validity mask
const size_type num_masks = gdf_get_num_chars_bitmask(input_num_rows);
if(nullptr == *out_aggregation_validity_mask)
{
CUDA_TRY( cudaMalloc(out_aggregation_validity_mask, num_masks * sizeof(gdf_valid_type)) );
}
CUDA_TRY( cudaMemset(*out_aggregation_validity_mask, 0, num_masks * sizeof(gdf_valid_type)) );

// Extracts every non-empty key and value into separate contiguous arrays,
// which provides the result of the groupby operation
extract_groupby_result<<<extract_grid_size, block_size>>>(the_map.get(),
hash_table_size,
hash_bucket_states,
groupby_output_table,
groupby_input_table,
out_aggregation_column,
*out_aggregation_validity_mask,
global_write_index);

CUDA_TRY(cudaGetLastError());
Expand All @@ -204,16 +227,33 @@ gdf_error GroupbyHash(gdf_table<size_type> const & groupby_input_table,
groupby_output_table.set_column_length(*out_size);

// Optionally sort the groupby/aggregation result columns
if(true == sort_result) {
if((*out_size > 0) && (true == sort_result)) {
auto sorted_indices = groupby_output_table.sort();
thrust::device_vector<aggregation_type> agg(*out_size);
thrust::gather(thrust::device,
sorted_indices.begin(), sorted_indices.end(),
out_aggregation_column,
agg.begin());
sorted_indices.begin(),
sorted_indices.end(),
out_aggregation_column,
agg.begin());
thrust::copy(agg.begin(), agg.end(), out_aggregation_column);

if(nullptr != *out_aggregation_validity_mask)
{
// Reorder the bit-validity mask of the aggregation output column
// according to the new sorted order
const size_type gather_grid_size = (*out_size + THREAD_BLOCK_SIZE - 1)/THREAD_BLOCK_SIZE;
const size_type num_masks = gdf_get_num_chars_bitmask(*out_size);
thrust::device_vector<gdf_valid_type> new_valid_mask(num_masks,0);
gather_valid_mask<<<gather_grid_size, THREAD_BLOCK_SIZE>>>(*out_aggregation_validity_mask,
new_valid_mask.data().get(),
sorted_indices.data().get(),
*out_size);
thrust::copy(new_valid_mask.begin(), new_valid_mask.end(), *out_aggregation_validity_mask);
}
}

CUDA_TRY( cudaFree(hash_bucket_states) );

return GDF_SUCCESS;
}
#endif
Loading