The aim of this project is to compare the time it takes to extract, transform and load large datasets using three different methods. Please read the Medium post for more information.
The data used in this project is the publicly available Stack Exchange Data Dump. Users.xml and Posts.xml were converted to users.csv.gz and posts.csv.gz and used as the source files for this project.
For the ISO 3166-1 country codes, the CSV used from DataHub was used (as country_codes.csv).
For Google Cloud:
- Create a project
- Create a Cloud Storage Bucket and upload
posts.csv.gz,users.csv.gzandcountry_codes.csvfiles
For Spark:
- Install sbt
- Install SDKMAN!
- In the
sparkfolder, use SDKMAN! to install JDK (8 or 11 currently supported) and set JDK version of the project usingsdk env init
For Dask:
- Python 3.x installed
- Install packages from
requirements.txt->pip install -r /dask/requirements.txt(for running on local)
For dbt:
- Python 3.x installed
- Install packages from
requirements.txt->pip install -r /dbt/requirements.txt - Copy the ISO 3166-1 country codes CSV into
./bigquery_dbt/seeds/country_codes.csv - Setup a dbt profile in
~/.dbt/profiles.ymlcalledbigquery_dbtfor BigQuery (Example)
-
Make BigQuery dataset
bq mk --dataset ${PROJECT_ID}:${DATASET} -
Load files into BigQuery as tables (can be done concurrently)
bq load \
--autodetect \
--source_format=CSV \
${DATASET}.posts \
gs://${BUCKET_NAME}/posts.csv.gz
bq load \
--autodetect \
--source_format=CSV \
${DATASET}.users \
gs://${BUCKET_NAME}/users.csv.gz
-
Ensure Google project id is specified in
databasefield inschema.yml -
Run dbt
cd ./bigquery_dbt
dbt build # Load CSV as reference table (via seeds), run tests etc.
dbt run
- Load created table into GCS
bq extract \
--destination_format CSV \
--compression GZIP \
--field_delimiter ',' \
${PROJECT_ID}:${DATASET}.aggregated_users \
gs://${BUCKET_NAME}/dbt_bigquery/agg_users.csv.gz
- Ensure that you change the
gcsBucketvalue inaggregate-users.scala - Run the following (in the
sparkfolder) to compile and package the project into a.jarfor Dataproc:
sbt
Then within the sbt console:
package
- Copy the
jarfrom local to GCS (optional):
gsutil cp spark/target/scala-2.12/${JAR_FILENAME}.jar gs://${BUCKET_NAME}/spark/aggregateusers.jar
- Create Dataproc cluster:
gcloud dataproc clusters create ${SPARK_CLUSTER_NAME} \
--project=${PROJECT_ID} \
--region=${REGION} \
--image-version=2.0 \
--master-machine-type n1-standard-8 \
--worker-machine-type n1-standard-8 \
--num-workers 6
- Submit Spark job on Dataproc cluster
gcloud dataproc jobs submit spark \
--cluster=${SPARK_CLUSTER_NAME} \
--class=stackoverflow.AggregateUsers \
--jars=gs://${BUCKET_NAME}/spark/aggregateusers.jar \
--region=${REGION}
- Delete cluster when finished
- Copy initialisation actions to local bucket (optional):
gsutil cp gs://goog-dataproc-initialization-actions-${ZONE}/dask/dask.sh gs://${BUCKET_NAME}/dask/
- Create cluster
gcloud dataproc clusters create ${DASK_CLUSTER_NAME} \
--project=${PROJECT_ID} \
--region=${REGION} \
--master-machine-type n1-standard-8 \
--worker-machine-type n1-standard-8 \
--num-workers 6 \
--image-version preview-ubuntu \
--initialization-actions gs://${BUCKET_NAME}/dask/dask.sh \
--metadata dask-runtime=yarn \
--enable-component-gateway
- Copy files
gcloud compute scp \
--project=${PROJECT_ID} \
--zone=${ZONE} \
--recurse ./dask/ ${DASK_CLUSTER_NAME}-m:~/
- Install package requirements & run
gcloud compute ssh ${CLUSTER_NAME}-m --zone ${ZONE}
/opt/conda/default/bin/python -m pip install python-dotenv
/opt/conda/default/bin/python ./dask/transform.py
- Delete cluster when finished