From 5bdaae0655e27f44baf287bba3303b6287b0aa1f Mon Sep 17 00:00:00 2001 From: Alexander Date: Fri, 24 Jun 2016 16:32:39 +0400 Subject: [PATCH 1/4] Parallel upload for SNP data --- .../transmart/etl/SNPDataProcessor.groovy | 44 +++++++++++++++++-- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/src/main/groovy/com/thomsonreuters/lsps/transmart/etl/SNPDataProcessor.groovy b/src/main/groovy/com/thomsonreuters/lsps/transmart/etl/SNPDataProcessor.groovy index e4689ef9..1aa6d983 100644 --- a/src/main/groovy/com/thomsonreuters/lsps/transmart/etl/SNPDataProcessor.groovy +++ b/src/main/groovy/com/thomsonreuters/lsps/transmart/etl/SNPDataProcessor.groovy @@ -25,9 +25,14 @@ import com.thomsonreuters.lsps.transmart.files.CsvLikeFile import groovy.sql.Sql import java.nio.file.Path +import java.util.concurrent.Callable +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.Future class SNPDataProcessor extends AbstractDataProcessor { + int THREAD_COUNT = 4 public SNPDataProcessor(Object conf) { super(conf); } @@ -59,15 +64,15 @@ class SNPDataProcessor extends AbstractDataProcessor { // Load data to tmp tables LT_SNP_CALLS_BY_GSM and LT_SNP_COPY_NUMBER def callsFileList = studyInfo['callsFileNameList'] as List if (callsFileList.size() > 0) { - callsFileList.each { String name -> - processSnpCallsFile(sql, dir.resolve(name)) + parallerCall(callsFileList) { fileName, sqlInst -> + processSnpCallsFile(sqlInst, dir.resolve(fileName)) } } def copyNumberFileList = studyInfo['copyNumberFileList'] as List if (copyNumberFileList.size() > 0) { - copyNumberFileList.each { String name -> - processSnpCopyNumberFile(sql, dir.resolve(name)) + parallerCall(copyNumberFileList) { fileName, sqlInst -> + processSnpCopyNumberFile(sqlInst, dir.resolve(fileName)) } } @@ -78,6 +83,37 @@ class SNPDataProcessor extends AbstractDataProcessor { return true; } + private void parallerCall(fileList, Closure uploadFunction) { + ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT) + List> tasks = new ArrayList<>(); + try { + fileList.each { String name -> + tasks.add(new Callable() { + @Override + Object call() throws Exception { + Sql threadSql = database.newSql() + threadSql.connection.autoCommit = false + try { + uploadFunction(name, threadSql) + threadSql.commit() + } + finally { + threadSql.connection.close() + } + return null; + } + }); + } + List> invokeAll = executorService.invokeAll(tasks); + } + catch (InterruptedException e) { + config.logger.log(LogType.ERROR, "Data wasn't upload") + } + finally { + executorService.shutdown() + } + } + private void processSnpCallsFile(Sql sql, Path f) { config.logger.log(LogType.MESSAGE, "Processing calls for ${f.getFileName()}") loadFileToTable(sql, f, "lt_snp_calls_by_gsm", ['GSM_NUM', 'SNP_NAME', 'SNP_CALLS']) From 472d53a91c80b8afd5ca062fbad6fef9c07163af Mon Sep 17 00:00:00 2001 From: Alexander Date: Wed, 29 Jun 2016 17:38:11 +0400 Subject: [PATCH 2/4] Parallel upload for Clinical Data --- .../ExpectedSummaryStatisticAnotherOrder.txt | 11 ++++++++ .../etl/ClinicalDataProcessor.groovy | 25 ++++++++++++++++++- .../etl/ClinicalDataProcessorTest.groovy | 5 ++-- 3 files changed, 38 insertions(+), 3 deletions(-) create mode 100644 fixtures/Test Studies/Test Study_GSE0/ClinicalDataToUpload/ExpectedSummaryStatisticAnotherOrder.txt diff --git a/fixtures/Test Studies/Test Study_GSE0/ClinicalDataToUpload/ExpectedSummaryStatisticAnotherOrder.txt b/fixtures/Test Studies/Test Study_GSE0/ClinicalDataToUpload/ExpectedSummaryStatisticAnotherOrder.txt new file mode 100644 index 00000000..fab23a24 --- /dev/null +++ b/fixtures/Test Studies/Test Study_GSE0/ClinicalDataToUpload/ExpectedSummaryStatisticAnotherOrder.txt @@ -0,0 +1,11 @@ +File Variable Variable Type N null Mean Median IQR Min Max SD Count Required Validation rule QC missing data QC data range +TST_DEMO.txt SUBJ_ID ID 9 0 Yes OK +TST_DEMO.txt Age In Years Numerical 9 0 30.555556 20.9 11.1 11.5 90.0 23.734843 Yes "Greater than 30, when ""Sex"" is equal to ""Male""; Lesser than 50; Greater than or equal to 20; Lesser than or equal to 20; >30; <50; >=20; <=20; 30-50; Between 30 to 50" OK "Range checks failed: >30 ('HCC2935', 'HCC4006', 'HCC827', 'NCIH3255', 'PC14', 'SW48'); 30-50 ('HCC2935', 'HCC4006', 'HCC827', 'NCIH3255', 'PC14', 'SKMEL28', 'SW48'); Between 30 to 50 ('HCC2935', 'HCC4006', 'HCC827', 'NCIH3255', 'PC14', 'SKMEL28', 'SW48'); Lesser than or equal to 20 ('HCC4006', 'HCC827', 'NCIH1650', 'NCIH1975', 'PC14', 'SKMEL28'); <=20 ('HCC4006', 'HCC827', 'NCIH1650', 'NCIH1975', 'PC14', 'SKMEL28'); Greater than 30, when ""Sex"" is equal to ""Male"" ('HCC827', 'NCIH3255'); Greater than or equal to 20 ('NCIH3255', 'SW48'); >=20 ('NCIH3255', 'SW48'); Lesser than 50 ('SKMEL28'); <50 ('SKMEL28')" +TST_DEMO.txt Sex Categorical 7 2 Female: 5, Male: 2 Yes 2 missing ('HCC4006', 'SW48') +TST_DEMO.txt Assessment Date Date 9 0 +TST_DEMO.txt Language Text 3 6 +TST001.txt SUBJ_ID ID 12 0 Yes OK +TST001.txt Mutant Allele (Genomic) Text 12 0 +TST001.txt Mutant Allele (cDNA) Text 12 0 +TST001.txt Mutation Type Text 12 0 +TST001.txt Variant Type Text 12 0 diff --git a/src/main/groovy/com/thomsonreuters/lsps/transmart/etl/ClinicalDataProcessor.groovy b/src/main/groovy/com/thomsonreuters/lsps/transmart/etl/ClinicalDataProcessor.groovy index 4a885614..ea7c5974 100644 --- a/src/main/groovy/com/thomsonreuters/lsps/transmart/etl/ClinicalDataProcessor.groovy +++ b/src/main/groovy/com/thomsonreuters/lsps/transmart/etl/ClinicalDataProcessor.groovy @@ -35,10 +35,12 @@ import groovy.transform.CompileStatic import java.nio.file.Files import java.nio.file.Path import java.sql.SQLException +import java.util.concurrent.* class ClinicalDataProcessor extends AbstractDataProcessor { StatisticCollector statistic = new StatisticCollector() def usedStudyId = '' + int THREAD_COUNT = 4 public ClinicalDataProcessor(Object conf) { super(conf); @@ -179,9 +181,30 @@ class ClinicalDataProcessor extends AbstractDataProcessor { mergeMode = getMergeMode(mappingFile) + final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); + final ExecutorCompletionService completionService = new ExecutorCompletionService<>(threadPool); + mapping.eachFileMapping { fileMapping -> - this.processFile(sql, dir.resolve(fileMapping.fileName), fileMapping) + completionService.submit(new Callable() { + @Override + Object call() throws Exception { + sql.connection.autoCommit = false + processFile(sql, dir.resolve(fileMapping.fileName), fileMapping) + } + }) + } + + for (int i = 0; i< mapping.mappings.size(); i++){ + final Future f = completionService.take() + try { + Object result = f.get() + } catch (ExecutionException e){ + throw new DataProcessingException(e.getCause().message) + } } + + threadPool.shutdown() + mappingFileFound = true } if (!mappingFileFound) { diff --git a/src/test/groovy/com/thomsonreuters/lsps/transmart/etl/ClinicalDataProcessorTest.groovy b/src/test/groovy/com/thomsonreuters/lsps/transmart/etl/ClinicalDataProcessorTest.groovy index cf74d25b..fbb3d147 100644 --- a/src/test/groovy/com/thomsonreuters/lsps/transmart/etl/ClinicalDataProcessorTest.groovy +++ b/src/test/groovy/com/thomsonreuters/lsps/transmart/etl/ClinicalDataProcessorTest.groovy @@ -50,6 +50,7 @@ class ClinicalDataProcessorTest extends Specification implements ConfigAwareTest Study.deleteById(config, clinicalData.studyId) def expectedFile = new File(clinicalData.dir, 'ExpectedSummaryStatistic.txt') + def expectedFileAnotherOrder = new File(clinicalData.dir, 'ExpectedSummaryStatisticAnotherOrder.txt') def actualFile = new File(clinicalData.dir, 'SummaryStatistic.txt') actualFile.delete() def result = clinicalData.load(config) @@ -57,7 +58,7 @@ class ClinicalDataProcessorTest extends Specification implements ConfigAwareTest then: assertThat("Clinical data loading shouldn't fail", result, equalTo(true)) actualFile.exists() - actualFile.readLines() == expectedFile.readLines() + actualFile.readLines() == expectedFile.readLines() || actualFile.readLines() == expectedFileAnotherOrder.readLines() } def "it should collect statistic"() { @@ -73,7 +74,7 @@ class ClinicalDataProcessorTest extends Specification implements ConfigAwareTest expect: statistic != null - statistic.tables.keySet() as List == ['TST001.txt', 'TST_DEMO.txt'] + statistic.tables.keySet() as List == ['TST001.txt', 'TST_DEMO.txt'] || statistic.tables.keySet() as List == ['TST_DEMO.txt', 'TST001.txt'] def demo = statistic.tables.'TST_DEMO.txt' demo != null demo.variables.keySet() as List == ['SUBJ_ID', 'Age In Years', 'Sex', 'Assessment Date', 'Language'] From 713a1124839384b59b4bb2c4c5e1988e8f514cbe Mon Sep 17 00:00:00 2001 From: Alexander Date: Wed, 29 Jun 2016 17:38:51 +0400 Subject: [PATCH 3/4] Fixed fixture data for test --- .../ClinicalDataToUpload/TST_DEMO.txt | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/fixtures/Test Studies/Test Study With Different StudyID_GSE0DIFFSID/ClinicalDataToUpload/TST_DEMO.txt b/fixtures/Test Studies/Test Study With Different StudyID_GSE0DIFFSID/ClinicalDataToUpload/TST_DEMO.txt index b0616076..a42c3b91 100644 --- a/fixtures/Test Studies/Test Study With Different StudyID_GSE0DIFFSID/ClinicalDataToUpload/TST_DEMO.txt +++ b/fixtures/Test Studies/Test Study With Different StudyID_GSE0DIFFSID/ClinicalDataToUpload/TST_DEMO.txt @@ -1,12 +1,12 @@ STUDY_ID SUBJ_ID Age In Years Sex Assessment Date Language -GSE0DUB HCC2935 20 Female 09/15/2014 -GSE0DUB HCC4006 20.5 08/31/2014 -GSE0DUB HCC827 20.9 Male 10/01/2014 Spain -GSE0DUB NCIH1650 31.1 Female 10/01/2014 -GSE0DUB NCIH1975 40 Female 10/10/2014 -GSE0DUB NCIH3255 18 Male 09/17/2014 English -GSE0DUB PC14 23 Female 09/28/2014 -GSE0DUB PC14 25 Male 09/28/2014 -GSE0DUB SKMEL28 90 Female 10/12/2014 English -GSE0DUB SW48 11.5 09/28/2014 -GSE0DUB SW48 12 09/28/2014 +GSE0DIFFSID HCC2935 20 Female 09/15/2014 +GSE0DIFFSID HCC4006 20.5 08/31/2014 +GSE0DIFFSID HCC827 20.9 Male 10/01/2014 Spain +GSE0DIFFSID NCIH1650 31.1 Female 10/01/2014 +GSE0DIFFSID NCIH1975 40 Female 10/10/2014 +GSE0DIFFSID NCIH3255 18 Male 09/17/2014 English +GSE0DIFFSID PC14 23 Female 09/28/2014 +GSE0DIFFSID PC14 25 Male 09/28/2014 +GSE0DIFFSID SKMEL28 90 Female 10/12/2014 English +GSE0DIFFSID SW48 11.5 09/28/2014 +GSE0DIFFSID SW48 12 09/28/2014 From 83de6dea22cf3dc20f8a2e3c612a86d184003879 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 19 Jan 2017 12:04:59 +0400 Subject: [PATCH 4/4] Fixed wrong merge --- .../transmart/etl/SNPDataProcessor.groovy | 31 ------------------- 1 file changed, 31 deletions(-) diff --git a/src/main/groovy/com/thomsonreuters/lsps/transmart/etl/SNPDataProcessor.groovy b/src/main/groovy/com/thomsonreuters/lsps/transmart/etl/SNPDataProcessor.groovy index 2d797787..50b8c56e 100644 --- a/src/main/groovy/com/thomsonreuters/lsps/transmart/etl/SNPDataProcessor.groovy +++ b/src/main/groovy/com/thomsonreuters/lsps/transmart/etl/SNPDataProcessor.groovy @@ -114,37 +114,6 @@ class SNPDataProcessor extends AbstractDataProcessor { } } - private void parallerCall(fileList, Closure uploadFunction) { - ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT) - List> tasks = new ArrayList<>(); - try { - fileList.each { String name -> - tasks.add(new Callable() { - @Override - Object call() throws Exception { - Sql threadSql = database.newSql() - threadSql.connection.autoCommit = false - try { - uploadFunction(name, threadSql) - threadSql.commit() - } - finally { - threadSql.connection.close() - } - return null; - } - }); - } - List> invokeAll = executorService.invokeAll(tasks); - } - catch (InterruptedException e) { - config.logger.log(LogType.ERROR, "Data wasn't upload") - } - finally { - executorService.shutdown() - } - } - private void processSnpCallsFile(Sql sql, Path f) { config.logger.log(LogType.MESSAGE, "Processing calls for ${f.getFileName()}") loadFileToTable(sql, f, "lt_snp_calls_by_gsm", ['GSM_NUM', 'SNP_NAME', 'SNP_CALLS'])