From 1a8c372947bd441ab95bf6a4b2ea5b1922475eb4 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 28 Jun 2020 08:39:16 +0300 Subject: [PATCH 01/22] Using golang 1.14 --- .github/workflows/ci.yml | 7 +++---- .github/workflows/replica-tests.yml | 7 +++---- Dockerfile.packaging | 2 +- Dockerfile.test | 2 +- build.sh | 4 ++-- script/ensure-go-installed | 4 ++-- 6 files changed, 12 insertions(+), 14 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ec8978d5f..ded3c8b1f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,13 +8,12 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@master + - uses: actions/checkout@v2 - - name: Set up Go 1.12 + - name: Set up Go 1.14 uses: actions/setup-go@v1 with: - version: 1.12 - id: go + go-version: 1.14 - name: Build run: script/cibuild diff --git a/.github/workflows/replica-tests.yml b/.github/workflows/replica-tests.yml index d7751315f..31e205206 100644 --- a/.github/workflows/replica-tests.yml +++ b/.github/workflows/replica-tests.yml @@ -8,13 +8,12 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@master + - uses: actions/checkout@v2 - - name: Set up Go 1.12 + - name: Set up Go 1.14 uses: actions/setup-go@v1 with: - version: 1.12 - id: go + go-version: 1.14 - name: migration tests run: script/cibuild-gh-ost-replica-tests diff --git a/Dockerfile.packaging b/Dockerfile.packaging index 214c70c97..74aa1903d 100644 --- a/Dockerfile.packaging +++ b/Dockerfile.packaging @@ -1,6 +1,6 @@ # -FROM golang:1.12.6 +FROM golang:1.14.4 RUN apt-get update RUN apt-get install -y ruby ruby-dev rubygems build-essential diff --git a/Dockerfile.test b/Dockerfile.test index 6abc4a053..3b3bfe541 100644 --- a/Dockerfile.test +++ b/Dockerfile.test @@ -1,4 +1,4 @@ -FROM golang:1.12.1 +FROM golang:1.14.4 LABEL maintainer="github@github.com" RUN apt-get update diff --git a/build.sh b/build.sh index 46db9c237..98633cdef 100755 --- a/build.sh +++ b/build.sh @@ -18,8 +18,8 @@ function build { GOOS=$3 GOARCH=$4 - if ! go version | egrep -q 'go(1\.1[234])' ; then - echo "go version must be 1.12 or above" + if ! go version | egrep -q 'go(1\.1[23456])' ; then + echo "go version must be 1.14 or above" exit 1 fi diff --git a/script/ensure-go-installed b/script/ensure-go-installed index c9c2ae863..1abdeb337 100755 --- a/script/ensure-go-installed +++ b/script/ensure-go-installed @@ -1,7 +1,7 @@ #!/bin/bash -PREFERRED_GO_VERSION=go1.12.6 -SUPPORTED_GO_VERSIONS='go1.1[234]' +PREFERRED_GO_VERSION=go1.14 +SUPPORTED_GO_VERSIONS='go1.1[23456]' GO_PKG_DARWIN=${PREFERRED_GO_VERSION}.darwin-amd64.pkg GO_PKG_DARWIN_SHA=ea78245e43de2996fa0973033064b33f48820cfe39f4f3c6e953040925cc5815 From fb4aca156751db491d8a83096367c379cdebcc64 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 28 Jun 2020 08:49:30 +0300 Subject: [PATCH 02/22] checksums --- script/ensure-go-installed | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/script/ensure-go-installed b/script/ensure-go-installed index 1abdeb337..32ade460b 100755 --- a/script/ensure-go-installed +++ b/script/ensure-go-installed @@ -1,13 +1,13 @@ #!/bin/bash -PREFERRED_GO_VERSION=go1.14 +PREFERRED_GO_VERSION=go1.14.4 SUPPORTED_GO_VERSIONS='go1.1[23456]' GO_PKG_DARWIN=${PREFERRED_GO_VERSION}.darwin-amd64.pkg -GO_PKG_DARWIN_SHA=ea78245e43de2996fa0973033064b33f48820cfe39f4f3c6e953040925cc5815 +GO_PKG_DARWIN_SHA=b518f21f823759ee30faddb1f623810a432499f050c9338777523d9c8551c62c GO_PKG_LINUX=${PREFERRED_GO_VERSION}.linux-amd64.tar.gz -GO_PKG_LINUX_SHA=dbcf71a3c1ea53b8d54ef1b48c85a39a6c9a935d01fc8291ff2b92028e59913c +GO_PKG_LINUX_SHA=aed845e4185a0b2a3c3d5e1d0a35491702c55889192bb9c30e67a3de6849c067 export ROOTDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )/.." && pwd )" cd $ROOTDIR From 2b71b73285338e96798d2530c8adec9aeb28910f Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 28 Jun 2020 08:57:19 +0300 Subject: [PATCH 03/22] Actions/workflows: upload binary artifact --- .github/workflows/ci.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ded3c8b1f..3556e1ef9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,3 +17,9 @@ jobs: - name: Build run: script/cibuild + + - name: Upload gh-ost binary artifact + uses: actions/upload-artifact@v1 + with: + name: gh-ost + path: bin/gh-ost From b60b12d2ca2125104159b076b4b6198cf02c0d7c Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 29 Jun 2020 08:53:51 +0300 Subject: [PATCH 04/22] Support --checksum-data flag, on-the-fly checksum verification --- go/base/checksum_comparison.go | 46 +++++++++++++++++++++++++ go/base/context.go | 1 + go/cmd/gh-ost/main.go | 1 + go/logic/applier.go | 61 ++++++++++++++++++++++++++++++---- go/logic/migrator.go | 58 +++++++++++++++++++++++++++----- go/sql/builder.go | 47 ++++++++++++++++++++++---- go/sql/builder_test.go | 2 +- localtests/test.sh | 4 ++- 8 files changed, 197 insertions(+), 23 deletions(-) create mode 100644 go/base/checksum_comparison.go diff --git a/go/base/checksum_comparison.go b/go/base/checksum_comparison.go new file mode 100644 index 000000000..b0d470908 --- /dev/null +++ b/go/base/checksum_comparison.go @@ -0,0 +1,46 @@ +/* + Copyright 2015 Shlomi Noach, courtesy Booking.com + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package base + +import ( + "fmt" + + "github.com/github/gh-ost/go/sql" +) + +type ChecksumFunc func() (checksum string, err error) + +// BinlogCoordinates described binary log coordinates in the form of log file & log position. +type ChecksumComparison struct { + OriginalTableChecksumFunc ChecksumFunc + GhostTableChecksumFunc ChecksumFunc + MigrationIterationRangeMinValues *sql.ColumnValues + MigrationIterationRangeMaxValues *sql.ColumnValues + Attempts int +} + +func NewChecksumComparison( + originalTableChecksumFunc, ghostTableChecksumFunc ChecksumFunc, + rangeMinValues, rangeMaxValues *sql.ColumnValues, +) *ChecksumComparison { + return &ChecksumComparison{ + OriginalTableChecksumFunc: originalTableChecksumFunc, + GhostTableChecksumFunc: ghostTableChecksumFunc, + MigrationIterationRangeMinValues: rangeMinValues, + MigrationIterationRangeMaxValues: rangeMaxValues, + Attempts: 0, + } +} + +func (this *ChecksumComparison) IncrementAttempts() { + this.Attempts = this.Attempts + 1 +} + +func (this *ChecksumComparison) String() string { + return fmt.Sprintf("range: [%s]..[%s]; attempts: %d", + this.MigrationIterationRangeMinValues, this.MigrationIterationRangeMaxValues, this.Attempts, + ) +} diff --git a/go/base/context.go b/go/base/context.go index 52d02dd34..a7ed8e799 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -82,6 +82,7 @@ type MigrationContext struct { CountTableRows bool ConcurrentCountTableRows bool + ChecksumData bool AllowedRunningOnMaster bool AllowedMasterMaster bool SwitchToRowBinlogFormat bool diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 33fc6f4cb..ffb70620c 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -66,6 +66,7 @@ func main() { flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)") flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)") flag.BoolVar(&migrationContext.ConcurrentCountTableRows, "concurrent-rowcount", true, "(with --exact-rowcount), when true (default): count rows after row-copy begins, concurrently, and adjust row estimate later on; when false: first count rows, then start row copy") + flag.BoolVar(&migrationContext.ChecksumData, "checksum-data", false, "if true, checksum original and ghost table shared data on the fly, fail migration if checksum mismatches. Checksum queries run on applier node (the master unless testing on replica)") flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica") flag.BoolVar(&migrationContext.AllowedMasterMaster, "allow-master-master", false, "explicitly allow running in a master-master setup") flag.BoolVar(&migrationContext.NullableUniqueKeyAllowed, "allow-nullable-unique-key", false, "allow gh-ost to migrate based on a unique key with nullable columns. As long as no NULL values exist, this should be OK. If NULL values exist in chosen key, data may be corrupted. Use at your own risk!") diff --git a/go/logic/applier.go b/go/logic/applier.go index 5fb795bac..35f31543c 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -455,11 +455,11 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo // ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where // data actually gets copied from original table. -func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) { +func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, checksumComparison *base.ChecksumComparison, err error) { startTime := time.Now() chunkSize = atomic.LoadInt64(&this.migrationContext.ChunkSize) - query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery( + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := sql.BuildRangeInsertPreparedQuery( this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName(), @@ -473,7 +473,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected this.migrationContext.IsTransactionalTable(), ) if err != nil { - return chunkSize, rowsAffected, duration, err + return chunkSize, rowsAffected, duration, checksumComparison, err } sqlResult, err := func() (gosql.Result, error) { @@ -492,7 +492,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected if _, err := tx.Exec(sessionQuery); err != nil { return nil, err } - result, err := tx.Exec(query, explodedArgs...) + result, err := tx.Exec(insertQuery, explodedArgs...) if err != nil { return nil, err } @@ -503,7 +503,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected }() if err != nil { - return chunkSize, rowsAffected, duration, err + return chunkSize, rowsAffected, duration, checksumComparison, err } rowsAffected, _ = sqlResult.RowsAffected() duration = time.Since(startTime) @@ -513,7 +513,56 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected this.migrationContext.MigrationIterationRangeMaxValues, this.migrationContext.GetIteration(), chunkSize) - return chunkSize, rowsAffected, duration, nil + + groupConcatMaxLen := 1024 * 1024 + var originalTableChecksumFunc base.ChecksumFunc = func() (checksum string, err error) { + tx, err := this.db.Begin() + if err != nil { + return checksum, err + } + defer tx.Rollback() + if _, err := tx.Exec(`set session group_concat_max_len := ?`, groupConcatMaxLen); err != nil { + return checksum, err + } + + err = tx.QueryRow(originalChecksumQuery, explodedArgs...).Scan(&checksum) + return checksum, err + } + var ghostTableChecksumFunc base.ChecksumFunc = func() (checksum string, err error) { + tx, err := this.db.Begin() + if err != nil { + return checksum, err + } + defer tx.Rollback() + if _, err := tx.Exec(`set session group_concat_max_len := ?`, groupConcatMaxLen); err != nil { + return checksum, err + } + + err = tx.QueryRow(ghostChecksumQuery, explodedArgs...).Scan(&checksum) + return checksum, err + } + checksumComparison = base.NewChecksumComparison( + originalTableChecksumFunc, ghostTableChecksumFunc, + this.migrationContext.MigrationIterationRangeMinValues, + this.migrationContext.MigrationIterationRangeMaxValues, + ) + + return chunkSize, rowsAffected, duration, checksumComparison, nil +} + +func (this *Applier) CompareChecksum(checksumComparison *base.ChecksumComparison) error { + originalChecksum, err := checksumComparison.OriginalTableChecksumFunc() + if err != nil { + return err + } + ghostChecksum, err := checksumComparison.GhostTableChecksumFunc() + if err != nil { + return err + } + if originalChecksum != ghostChecksum { + return fmt.Errorf("Checksum failure") + } + return nil } // LockOriginalTable places a write lock on the original table diff --git a/go/logic/migrator.go b/go/logic/migrator.go index b1a238fda..3e65c9848 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -79,8 +79,9 @@ type Migrator struct { rowCopyCompleteFlag int64 // copyRowsQueue should not be buffered; if buffered some non-damaging but // excessive work happens at the end of the iteration as new copy-jobs arrive before realizing the copy is complete - copyRowsQueue chan tableWriteFunc - applyEventsQueue chan *applyEventStruct + copyRowsQueue chan tableWriteFunc + applyEventsQueue chan *applyEventStruct + checksumComparisonQueue chan *base.ChecksumComparison handledChangelogStates map[string]bool @@ -96,10 +97,11 @@ func NewMigrator(context *base.MigrationContext) *Migrator { rowCopyComplete: make(chan error), allEventsUpToLockProcessed: make(chan string), - copyRowsQueue: make(chan tableWriteFunc), - applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), - handledChangelogStates: make(map[string]bool), - finishedMigrating: 0, + copyRowsQueue: make(chan tableWriteFunc), + applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), + checksumComparisonQueue: make(chan *base.ChecksumComparison), + handledChangelogStates: make(map[string]bool), + finishedMigrating: 0, } return migrator } @@ -252,6 +254,18 @@ func (this *Migrator) listenOnPanicAbort() { log.Fatale(err) } +func (this *Migrator) processChecksumComparisons() { + for checksumComparison := range this.checksumComparisonQueue { + if err := this.applier.CompareChecksum(checksumComparison); err != nil { + checksumComparison.IncrementAttempts() + go func() { this.checksumComparisonQueue <- checksumComparison }() + log.Errorf("Checksum error. Checksum=%s, err=%+v", checksumComparison.String(), err) + } else { + log.Debugf("Checksum match. Checksum=%s", checksumComparison.String()) + } + } +} + // validateStatement validates the `alter` statement meets criteria. // At this time this means: // - column renames are approved @@ -390,6 +404,7 @@ func (this *Migrator) Migrate() (err error) { } go this.executeWriteFuncs() go this.iterateChunks() + go this.processChecksumComparisons() this.migrationContext.MarkRowCopyStartTime() go this.initiateStatus() @@ -533,6 +548,25 @@ func (this *Migrator) cutOver() (err error) { return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) } +func (this *Migrator) waitForChecksumToClear() (err error) { + timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds)) + for { + select { + case <-timeout.C: + { + return log.Errorf("Timeout while waiting for checksums to clear. There are still checksum mismatches") + } + default: + { + if len(this.checksumComparisonQueue) == 0 { + return nil + } + time.Sleep(250 * time.Millisecond) + } + } + } +} + // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // make sure the queue is drained. func (this *Migrator) waitForEventsUpToLock() (err error) { @@ -589,6 +623,9 @@ func (this *Migrator) cutOverTwoStep() (err error) { if err := this.retryOperation(this.waitForEventsUpToLock); err != nil { return err } + if err := this.retryOperation(this.waitForChecksumToClear); err != nil { + return err + } if err := this.retryOperation(this.applier.SwapTablesQuickAndBumpy); err != nil { return err } @@ -633,7 +670,9 @@ func (this *Migrator) atomicCutOver() (err error) { if err := this.waitForEventsUpToLock(); err != nil { return log.Errore(err) } - + if err := this.waitForChecksumToClear(); err != nil { + return log.Errore(err) + } // Step 2 // We now attempt an atomic RENAME on original & ghost tables, and expect it to block. this.migrationContext.RenameTablesStartTime = time.Now() @@ -1132,10 +1171,13 @@ func (this *Migrator) iterateChunks() error { // _ghost_ table, which no longer exists. So, bothering error messages and all, but no damage. return nil } - _, rowsAffected, _, err := this.applier.ApplyIterationInsertQuery() + _, rowsAffected, _, checksumComparison, err := this.applier.ApplyIterationInsertQuery() if err != nil { return err // wrapping call will retry } + if this.migrationContext.ChecksumData { + go func() { this.checksumComparisonQueue <- checksumComparison }() + } atomic.AddInt64(&this.migrationContext.TotalRowsCopied, rowsAffected) atomic.AddInt64(&this.migrationContext.Iteration, 1) return nil diff --git a/go/sql/builder.go b/go/sql/builder.go index 2c5a7ae28..73628139e 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -178,9 +178,9 @@ func BuildRangePreparedComparison(columns *ColumnList, args []interface{}, compa return BuildRangeComparison(columns.Names(), values, args, comparisonSign) } -func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) { +func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (insertQuery, originalChecksumQuery, ghostChecksumQuery string, explodedArgs []interface{}, err error) { if len(sharedColumns) == 0 { - return "", explodedArgs, fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery") + return "", "", "", explodedArgs, fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery") } databaseName = EscapeName(databaseName) originalTableName = EscapeName(originalTableName) @@ -205,19 +205,19 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin } rangeStartComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeStartValues, rangeStartArgs, minRangeComparisonSign) if err != nil { - return "", explodedArgs, err + return "", "", "", explodedArgs, err } explodedArgs = append(explodedArgs, rangeExplodedArgs...) rangeEndComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeEndValues, rangeEndArgs, LessThanOrEqualsComparisonSign) if err != nil { - return "", explodedArgs, err + return "", "", "", explodedArgs, err } explodedArgs = append(explodedArgs, rangeExplodedArgs...) transactionalClause := "" if transactionalTable { transactionalClause = "lock in share mode" } - result = fmt.Sprintf(` + insertQuery = fmt.Sprintf(` insert /* gh-ost %s.%s */ ignore into %s.%s (%s) (select %s from %s.%s force index (%s) where (%s and %s) %s @@ -225,10 +225,43 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin `, databaseName, originalTableName, databaseName, ghostTableName, mappedSharedColumnsListing, sharedColumnsListing, databaseName, originalTableName, uniqueKey, rangeStartComparison, rangeEndComparison, transactionalClause) - return result, explodedArgs, nil + // escape unique key columns for comparison queries + uniqueKeyColumnNames := duplicateNames(uniqueKeyColumns.Names()) + for i := range uniqueKeyColumnNames { + uniqueKeyColumnNames[i] = EscapeName(uniqueKeyColumnNames[i]) + } + uniqueKeyColumnsListing := strings.Join(uniqueKeyColumnNames, ", ") + + originalChecksumQuery = fmt.Sprintf(` + select /* gh-ost checksum %s.%s */ + sha2( + group_concat( + sha2(concat_ws(',', %s), 256) order by %s + ), 256 + ) + from %s.%s force index (%s) + where (%s and %s) + `, databaseName, originalTableName, + sharedColumnsListing, uniqueKeyColumnsListing, + databaseName, originalTableName, uniqueKey, + rangeStartComparison, rangeEndComparison) + ghostChecksumQuery = fmt.Sprintf(` + select /* gh-ost checksum %s.%s */ + sha2( + group_concat( + sha2(concat_ws(',', %s), 256) order by %s + ), 256 + ) + from %s.%s force index (%s) + where (%s and %s) + `, databaseName, ghostTableName, + mappedSharedColumnsListing, uniqueKeyColumnsListing, + databaseName, ghostTableName, uniqueKey, + rangeStartComparison, rangeEndComparison) + return insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, nil } -func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) { +func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (insertQuery, originalChecksumQuery, ghostChecksumQuery string, explodedArgs []interface{}, err error) { rangeStartValues := buildColumnsPreparedValues(uniqueKeyColumns) rangeEndValues := buildColumnsPreparedValues(uniqueKeyColumns) return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable) diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index a178c4ccb..0781ed7af 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -261,7 +261,7 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) { rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, explodedArgs, err := BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, true, true) + query, _, _, explodedArgs, err := BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, true, true) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position) diff --git a/localtests/test.sh b/localtests/test.sh index d4b3f1723..fdaf729a3 100755 --- a/localtests/test.sh +++ b/localtests/test.sh @@ -148,6 +148,7 @@ test_single() { --alter='engine=innodb' \ --exact-rowcount \ --assume-rbr \ + --checksum-data=true \ --initially-drop-old-table \ --initially-drop-ghost-table \ --throttle-query='select timestampdiff(second, min(last_update), now()) < 5 from _gh_ost_test_ghc' \ @@ -229,11 +230,12 @@ build_binary() { echo "Using binary: $ghost_binary" return 0 fi - go build -o $ghost_binary go/cmd/gh-ost/main.go + ./script/build if [ $? -ne 0 ] ; then echo "Build failure" exit 1 fi + cp bin/gh-ost $ghost_binary } test_all() { From 5c0d9abb69638af43e79cb3201c1c35a3aca3ea6 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 29 Jun 2020 09:01:36 +0300 Subject: [PATCH 05/22] extra table timeout when checksum-data is enabled --- go/logic/applier.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/go/logic/applier.go b/go/logic/applier.go index 35f31543c..52c7c60d3 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -861,6 +861,10 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke } tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2 + if this.migrationContext.ChecksumData { + // Allow extra time for checksum to evaluate + tableLockTimeoutSeconds += this.migrationContext.CutOverLockTimeoutSeconds + } log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds) query = fmt.Sprintf(`set session lock_wait_timeout:=%d`, tableLockTimeoutSeconds) if _, err := tx.Exec(query); err != nil { From 4be4cb96acbc97584b71a96ae5fffa1f615f5c38 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 29 Jun 2020 09:10:29 +0300 Subject: [PATCH 06/22] builder tests --- go/sql/builder_test.go | 56 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 52 insertions(+), 4 deletions(-) diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index 0781ed7af..e4055249f 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -172,7 +172,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { rangeStartArgs := []interface{}{3} rangeEndArgs := []interface{}{103} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position) @@ -180,7 +180,31 @@ func TestBuildRangeInsertQuery(t *testing.T) { where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e)))) ) ` - test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectEquals(normalizeQuery(insertQuery), normalizeQuery(expected)) + + expectedOriginalChecksumQuery := ` + select /* gh-ost checksum gh-ost mydb.tbl */ + sha2( + group_concat( + sha2(concat_ws(',', id, name, position), 256) order by id + ), 256 + ) + from mydb.tbl force index (PRIMARY) + where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) + + expectedOriginalChecksumQuery := ` + select /* gh-ost checksum gh-ost mydb.ghost */ + sha2( + group_concat( + sha2(concat_ws(',', id, name, position), 256) order by id + ), 256 + ) + from mydb.ghost force index (PRIMARY) + where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 103, 103})) } { @@ -191,7 +215,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position) @@ -199,7 +223,31 @@ func TestBuildRangeInsertQuery(t *testing.T) { where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e)))) ) ` - test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectEquals(normalizeQuery(insertQuery), normalizeQuery(expected)) + + expectedOriginalChecksumQuery := ` + select /* gh-ost checksum gh-ost mydb.tbl */ + sha2( + group_concat( + sha2(concat_ws(',', id, name, position), 256) order by name, position + ), 256 + ) + from mydb.tbl force index (name_position_uidx) + where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) + + expectedOriginalChecksumQuery := ` + select /* gh-ost checksum gh-ost mydb.ghost */ + sha2( + group_concat( + sha2(concat_ws(',', id, name, position), 256) order by name, position + ), 256 + ) + from mydb.ghost force index (name_position_uidx) + where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 17, 3, 17, 103, 103, 117, 103, 117})) } } From ed7aa8570047bf4e515b9716e006630a6952b5d9 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 29 Jun 2020 09:18:13 +0300 Subject: [PATCH 07/22] builder tests --- go/sql/builder_test.go | 63 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 57 insertions(+), 6 deletions(-) diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index e4055249f..89f243c2f 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -194,7 +194,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { ` test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) - expectedOriginalChecksumQuery := ` + expectedGhostChecksumQuery := ` select /* gh-ost checksum gh-ost mydb.ghost */ sha2( group_concat( @@ -204,7 +204,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { from mydb.ghost force index (PRIMARY) where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e)))) ` - test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) + test.S(t).ExpectEquals(normalizeQuery(ghostChecksumQuery), normalizeQuery(expectedGhostChecksumQuery)) test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 103, 103})) } { @@ -237,7 +237,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { ` test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) - expectedOriginalChecksumQuery := ` + expectedGhostChecksumQuery := ` select /* gh-ost checksum gh-ost mydb.ghost */ sha2( group_concat( @@ -247,7 +247,8 @@ func TestBuildRangeInsertQuery(t *testing.T) { from mydb.ghost force index (name_position_uidx) where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e)))) ` - test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) + test.S(t).ExpectEquals(normalizeQuery(ghostChecksumQuery), normalizeQuery(expectedGhostChecksumQuery)) + test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 17, 3, 17, 103, 103, 117, 103, 117})) } } @@ -266,7 +267,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { rangeStartArgs := []interface{}{3} rangeEndArgs := []interface{}{103} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, location) @@ -274,7 +275,32 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e)))) ) ` - test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectEquals(normalizeQuery(insertQuery), normalizeQuery(expected)) + + expectedOriginalChecksumQuery := ` + select /* gh-ost checksum gh-ost mydb.tbl */ + sha2( + group_concat( + sha2(concat_ws(',', id, name, position), 256) order by id + ), 256 + ) + from mydb.tbl force index (PRIMARY) + where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) + + expectedGhostChecksumQuery := ` + select /* gh-ost checksum gh-ost mydb.ghost */ + sha2( + group_concat( + sha2(concat_ws(',', id, name, location), 256) order by id + ), 256 + ) + from mydb.ghost force index (PRIMARY) + where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(ghostChecksumQuery), normalizeQuery(expectedGhostChecksumQuery)) + test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 103, 103})) } { @@ -294,6 +320,31 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { ) ` test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + + expectedOriginalChecksumQuery := ` + select /* gh-ost checksum gh-ost mydb.tbl */ + sha2( + group_concat( + sha2(concat_ws(',', id, name, position), 256) order by name, position + ), 256 + ) + from mydb.tbl force index (name_position_uidx) + where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) + + expectedGhostChecksumQuery := ` + select /* gh-ost checksum gh-ost mydb.ghost */ + sha2( + group_concat( + sha2(concat_ws(',', id, name, location), 256) order by name, location + ), 256 + ) + from mydb.ghost force index (name_position_uidx) + where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(ghostChecksumQuery), normalizeQuery(expectedGhostChecksumQuery)) + test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 17, 3, 17, 103, 103, 117, 103, 117})) } } From edc10535fab512981bae99e193a4b501a0866324 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 29 Jun 2020 09:40:41 +0300 Subject: [PATCH 08/22] builder tests --- go/logic/applier.go | 1 + go/sql/builder.go | 39 +++++++++++++++++++++++++++++++++++---- go/sql/builder_test.go | 31 +++++++++++++++++-------------- 3 files changed, 53 insertions(+), 18 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index 52c7c60d3..7f3346f40 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -465,6 +465,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected this.migrationContext.GetGhostTableName(), this.migrationContext.SharedColumns.Names(), this.migrationContext.MappedSharedColumns.Names(), + this.migrationContext.ColumnRenameMap, this.migrationContext.UniqueKey.Name, &this.migrationContext.UniqueKey.Columns, this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), diff --git a/go/sql/builder.go b/go/sql/builder.go index 73628139e..c3d512771 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -178,7 +178,17 @@ func BuildRangePreparedComparison(columns *ColumnList, args []interface{}, compa return BuildRangeComparison(columns.Names(), values, args, comparisonSign) } -func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (insertQuery, originalChecksumQuery, ghostChecksumQuery string, explodedArgs []interface{}, err error) { +func BuildRangeInsertQuery( + databaseName, originalTableName, ghostTableName string, + sharedColumns []string, mappedSharedColumns []string, + columnRenameMap map[string]string, + uniqueKey string, uniqueKeyColumns *ColumnList, + rangeStartValues, rangeEndValues []string, + rangeStartArgs, rangeEndArgs []interface{}, + includeRangeStartValues bool, transactionalTable bool, +) ( + insertQuery, originalChecksumQuery, ghostChecksumQuery string, explodedArgs []interface{}, err error, +) { if len(sharedColumns) == 0 { return "", "", "", explodedArgs, fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery") } @@ -225,6 +235,7 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin `, databaseName, originalTableName, databaseName, ghostTableName, mappedSharedColumnsListing, sharedColumnsListing, databaseName, originalTableName, uniqueKey, rangeStartComparison, rangeEndComparison, transactionalClause) + // escape unique key columns for comparison queries uniqueKeyColumnNames := duplicateNames(uniqueKeyColumns.Names()) for i := range uniqueKeyColumnNames { @@ -245,6 +256,17 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin sharedColumnsListing, uniqueKeyColumnsListing, databaseName, originalTableName, uniqueKey, rangeStartComparison, rangeEndComparison) + + mappedUniqueKeyColumnNames := duplicateNames(uniqueKeyColumns.Names()) + for i, name := range mappedUniqueKeyColumnNames { + if mappedName, ok := columnRenameMap[name]; ok { + mappedUniqueKeyColumnNames[i] = EscapeName(mappedName) + } else { + mappedUniqueKeyColumnNames[i] = EscapeName(name) + } + } + mappedUniqueKeyColumnsListing := strings.Join(mappedUniqueKeyColumnNames, ", ") + ghostChecksumQuery = fmt.Sprintf(` select /* gh-ost checksum %s.%s */ sha2( @@ -255,16 +277,25 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin from %s.%s force index (%s) where (%s and %s) `, databaseName, ghostTableName, - mappedSharedColumnsListing, uniqueKeyColumnsListing, + mappedSharedColumnsListing, mappedUniqueKeyColumnsListing, databaseName, ghostTableName, uniqueKey, rangeStartComparison, rangeEndComparison) return insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, nil } -func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (insertQuery, originalChecksumQuery, ghostChecksumQuery string, explodedArgs []interface{}, err error) { +func BuildRangeInsertPreparedQuery( + databaseName, originalTableName, ghostTableName string, + sharedColumns []string, mappedSharedColumns []string, + columnRenameMap map[string]string, + uniqueKey string, uniqueKeyColumns *ColumnList, + rangeStartArgs, rangeEndArgs []interface{}, + includeRangeStartValues bool, transactionalTable bool, +) ( + insertQuery, originalChecksumQuery, ghostChecksumQuery string, explodedArgs []interface{}, err error, +) { rangeStartValues := buildColumnsPreparedValues(uniqueKeyColumns) rangeEndValues := buildColumnsPreparedValues(uniqueKeyColumns) - return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable) + return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, columnRenameMap, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable) } func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) { diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index 89f243c2f..f95fa1afc 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -164,6 +164,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { originalTableName := "tbl" ghostTableName := "ghost" sharedColumns := []string{"id", "name", "position"} + columnRenameMap := map[string]string{} { uniqueKey := "PRIMARY" uniqueKeyColumns := NewColumnList([]string{"id"}) @@ -172,7 +173,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { rangeStartArgs := []interface{}{3} rangeEndArgs := []interface{}{103} - insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, columnRenameMap, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position) @@ -183,7 +184,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { test.S(t).ExpectEquals(normalizeQuery(insertQuery), normalizeQuery(expected)) expectedOriginalChecksumQuery := ` - select /* gh-ost checksum gh-ost mydb.tbl */ + select /* gh-ost checksum mydb.tbl */ sha2( group_concat( sha2(concat_ws(',', id, name, position), 256) order by id @@ -195,7 +196,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) expectedGhostChecksumQuery := ` - select /* gh-ost checksum gh-ost mydb.ghost */ + select /* gh-ost checksum mydb.ghost */ sha2( group_concat( sha2(concat_ws(',', id, name, position), 256) order by id @@ -215,7 +216,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, columnRenameMap, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position) @@ -226,7 +227,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { test.S(t).ExpectEquals(normalizeQuery(insertQuery), normalizeQuery(expected)) expectedOriginalChecksumQuery := ` - select /* gh-ost checksum gh-ost mydb.tbl */ + select /* gh-ost checksum mydb.tbl */ sha2( group_concat( sha2(concat_ws(',', id, name, position), 256) order by name, position @@ -238,7 +239,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) expectedGhostChecksumQuery := ` - select /* gh-ost checksum gh-ost mydb.ghost */ + select /* gh-ost checksum mydb.ghost */ sha2( group_concat( sha2(concat_ws(',', id, name, position), 256) order by name, position @@ -259,6 +260,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { ghostTableName := "ghost" sharedColumns := []string{"id", "name", "position"} mappedSharedColumns := []string{"id", "name", "location"} + columnRenameMap := map[string]string{"position": "location"} { uniqueKey := "PRIMARY" uniqueKeyColumns := NewColumnList([]string{"id"}) @@ -267,7 +269,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { rangeStartArgs := []interface{}{3} rangeEndArgs := []interface{}{103} - insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, columnRenameMap, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, location) @@ -278,7 +280,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { test.S(t).ExpectEquals(normalizeQuery(insertQuery), normalizeQuery(expected)) expectedOriginalChecksumQuery := ` - select /* gh-ost checksum gh-ost mydb.tbl */ + select /* gh-ost checksum mydb.tbl */ sha2( group_concat( sha2(concat_ws(',', id, name, position), 256) order by id @@ -290,7 +292,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) expectedGhostChecksumQuery := ` - select /* gh-ost checksum gh-ost mydb.ghost */ + select /* gh-ost checksum mydb.ghost */ sha2( group_concat( sha2(concat_ws(',', id, name, location), 256) order by id @@ -311,7 +313,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, columnRenameMap, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, location) @@ -319,10 +321,10 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e)))) ) ` - test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectEquals(normalizeQuery(insertQuery), normalizeQuery(expected)) expectedOriginalChecksumQuery := ` - select /* gh-ost checksum gh-ost mydb.tbl */ + select /* gh-ost checksum mydb.tbl */ sha2( group_concat( sha2(concat_ws(',', id, name, position), 256) order by name, position @@ -334,7 +336,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) expectedGhostChecksumQuery := ` - select /* gh-ost checksum gh-ost mydb.ghost */ + select /* gh-ost checksum mydb.ghost */ sha2( group_concat( sha2(concat_ws(',', id, name, location), 256) order by name, location @@ -354,13 +356,14 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) { originalTableName := "tbl" ghostTableName := "ghost" sharedColumns := []string{"id", "name", "position"} + columnRenameMap := map[string]string{} { uniqueKey := "name_position_uidx" uniqueKeyColumns := NewColumnList([]string{"name", "position"}) rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, _, _, explodedArgs, err := BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, true, true) + query, _, _, explodedArgs, err := BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, columnRenameMap, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, true, true) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position) From 8eb300bdc8bb183e79e4f50f40288e114aa5c55a Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 29 Jun 2020 09:52:47 +0300 Subject: [PATCH 09/22] expect 1.14 and above in build scripts; update to readme.md --- README.md | 2 +- build.sh | 2 +- script/ensure-go-installed | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index cd6f59269..7b2bc1dee 100644 --- a/README.md +++ b/README.md @@ -94,7 +94,7 @@ Please see [Coding gh-ost](doc/coding-ghost.md) for a guide to getting started d [Download latest release here](https://github.com/github/gh-ost/releases/latest) -`gh-ost` is a Go project; it is built with Go `1.12` and above. To build on your own, use either: +`gh-ost` is a Go project; it is built with Go `1.14` and above. To build on your own, use either: - [script/build](https://github.com/github/gh-ost/blob/master/script/build) - this is the same build script used by CI hence the authoritative; artifact is `./bin/gh-ost` binary. - [build.sh](https://github.com/github/gh-ost/blob/master/build.sh) for building `tar.gz` artifacts in `/tmp/gh-ost` diff --git a/build.sh b/build.sh index 98633cdef..b5d465935 100755 --- a/build.sh +++ b/build.sh @@ -18,7 +18,7 @@ function build { GOOS=$3 GOARCH=$4 - if ! go version | egrep -q 'go(1\.1[23456])' ; then + if ! go version | egrep -q 'go(1\.1[456])' ; then echo "go version must be 1.14 or above" exit 1 fi diff --git a/script/ensure-go-installed b/script/ensure-go-installed index 32ade460b..98ccdc1db 100755 --- a/script/ensure-go-installed +++ b/script/ensure-go-installed @@ -1,7 +1,7 @@ #!/bin/bash PREFERRED_GO_VERSION=go1.14.4 -SUPPORTED_GO_VERSIONS='go1.1[23456]' +SUPPORTED_GO_VERSIONS='go1.1[456]' GO_PKG_DARWIN=${PREFERRED_GO_VERSION}.darwin-amd64.pkg GO_PKG_DARWIN_SHA=b518f21f823759ee30faddb1f623810a432499f050c9338777523d9c8551c62c From b774dc1f0965b66876760a79830381745a3aa561 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 29 Jun 2020 16:25:26 +0300 Subject: [PATCH 10/22] better iteration on checksum comparison --- go/base/checksum_comparison.go | 3 +++ go/logic/applier.go | 1 + go/logic/migrator.go | 41 ++++++++++++++++++++++------------ 3 files changed, 31 insertions(+), 14 deletions(-) diff --git a/go/base/checksum_comparison.go b/go/base/checksum_comparison.go index b0d470908..6c605452e 100644 --- a/go/base/checksum_comparison.go +++ b/go/base/checksum_comparison.go @@ -15,6 +15,7 @@ type ChecksumFunc func() (checksum string, err error) // BinlogCoordinates described binary log coordinates in the form of log file & log position. type ChecksumComparison struct { + Iteration int64 OriginalTableChecksumFunc ChecksumFunc GhostTableChecksumFunc ChecksumFunc MigrationIterationRangeMinValues *sql.ColumnValues @@ -23,10 +24,12 @@ type ChecksumComparison struct { } func NewChecksumComparison( + iteration int64, originalTableChecksumFunc, ghostTableChecksumFunc ChecksumFunc, rangeMinValues, rangeMaxValues *sql.ColumnValues, ) *ChecksumComparison { return &ChecksumComparison{ + Iteration: iteration, OriginalTableChecksumFunc: originalTableChecksumFunc, GhostTableChecksumFunc: ghostTableChecksumFunc, MigrationIterationRangeMinValues: rangeMinValues, diff --git a/go/logic/applier.go b/go/logic/applier.go index 7f3346f40..bbe8f183c 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -543,6 +543,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected return checksum, err } checksumComparison = base.NewChecksumComparison( + this.migrationContext.GetIteration(), originalTableChecksumFunc, ghostTableChecksumFunc, this.migrationContext.MigrationIterationRangeMinValues, this.migrationContext.MigrationIterationRangeMaxValues, diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 3e65c9848..44221c0f2 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -79,9 +79,11 @@ type Migrator struct { rowCopyCompleteFlag int64 // copyRowsQueue should not be buffered; if buffered some non-damaging but // excessive work happens at the end of the iteration as new copy-jobs arrive before realizing the copy is complete - copyRowsQueue chan tableWriteFunc - applyEventsQueue chan *applyEventStruct + copyRowsQueue chan tableWriteFunc + applyEventsQueue chan *applyEventStruct + checksumComparisonQueue chan *base.ChecksumComparison + checksumComparisonMap map[int64]*base.ChecksumComparison handledChangelogStates map[string]bool @@ -97,11 +99,14 @@ func NewMigrator(context *base.MigrationContext) *Migrator { rowCopyComplete: make(chan error), allEventsUpToLockProcessed: make(chan string), - copyRowsQueue: make(chan tableWriteFunc), - applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), + copyRowsQueue: make(chan tableWriteFunc), + applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), + checksumComparisonQueue: make(chan *base.ChecksumComparison), - handledChangelogStates: make(map[string]bool), - finishedMigrating: 0, + checksumComparisonMap: make(map[int64]*base.ChecksumComparison), + + handledChangelogStates: make(map[string]bool), + finishedMigrating: 0, } return migrator } @@ -255,14 +260,22 @@ func (this *Migrator) listenOnPanicAbort() { } func (this *Migrator) processChecksumComparisons() { - for checksumComparison := range this.checksumComparisonQueue { - if err := this.applier.CompareChecksum(checksumComparison); err != nil { - checksumComparison.IncrementAttempts() - go func() { this.checksumComparisonQueue <- checksumComparison }() - log.Errorf("Checksum error. Checksum=%s, err=%+v", checksumComparison.String(), err) - } else { - log.Debugf("Checksum match. Checksum=%s", checksumComparison.String()) + for { + newChecksums := len(this.checksumComparisonQueue) + for i := 0; i < newChecksums; i++ { + checksumComparison := <-this.checksumComparisonQueue + this.checksumComparisonMap[checksumComparison.Iteration] = checksumComparison + } + for iteration, checksumComparison := range this.checksumComparisonMap { + if err := this.applier.CompareChecksum(checksumComparison); err != nil { + checksumComparison.IncrementAttempts() + log.Errorf("Checksum error. Checksum=%s, err=%+v", checksumComparison.String(), err) + } else { + delete(this.checksumComparisonMap, iteration) + log.Debugf("Checksum match. Checksum=%s", checksumComparison.String()) + } } + time.Sleep(100 * time.Millisecond) } } @@ -558,7 +571,7 @@ func (this *Migrator) waitForChecksumToClear() (err error) { } default: { - if len(this.checksumComparisonQueue) == 0 { + if len(this.checksumComparisonMap) == 0 { return nil } time.Sleep(250 * time.Millisecond) From aa33f10e5e7974d28b73a4fd272f69ca9c1deb23 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 29 Jun 2020 16:26:52 +0300 Subject: [PATCH 11/22] iteration on string representation --- go/base/checksum_comparison.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/base/checksum_comparison.go b/go/base/checksum_comparison.go index 6c605452e..adfe26c86 100644 --- a/go/base/checksum_comparison.go +++ b/go/base/checksum_comparison.go @@ -43,7 +43,7 @@ func (this *ChecksumComparison) IncrementAttempts() { } func (this *ChecksumComparison) String() string { - return fmt.Sprintf("range: [%s]..[%s]; attempts: %d", - this.MigrationIterationRangeMinValues, this.MigrationIterationRangeMaxValues, this.Attempts, + return fmt.Sprintf("iteration: %d, range: [%s]..[%s], attempts: %d", + this.Iteration, this.MigrationIterationRangeMinValues, this.MigrationIterationRangeMaxValues, this.Attempts, ) } From f430ba439e19cfa78d4d97df9b4c313a4ec2a170 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 1 Jul 2020 10:12:25 +0300 Subject: [PATCH 12/22] Visibility into pending/successful cehcksum comparisons --- go/base/context.go | 2 ++ go/logic/migrator.go | 11 +++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index a7ed8e799..98477377f 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -178,6 +178,8 @@ type MigrationContext struct { pointOfInterestTimeMutex *sync.Mutex CurrentLag int64 currentProgress uint64 + PendingChecksumComparisons int64 + SuccessfulChecksumComparisons int64 ThrottleHTTPStatusCode int64 controlReplicasLagResult mysql.ReplicationLagResult TotalRowsCopied int64 diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 44221c0f2..d5ee202d8 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -261,21 +261,27 @@ func (this *Migrator) listenOnPanicAbort() { func (this *Migrator) processChecksumComparisons() { for { + // Avoid blocking. Only pull from the queue the amount of items known at this time: newChecksums := len(this.checksumComparisonQueue) for i := 0; i < newChecksums; i++ { checksumComparison := <-this.checksumComparisonQueue this.checksumComparisonMap[checksumComparison.Iteration] = checksumComparison } + // Iterate the pending checksums. Some of these have been pulled from the queue just above; + // others may be subsuccessful checksums from previous iterations for iteration, checksumComparison := range this.checksumComparisonMap { if err := this.applier.CompareChecksum(checksumComparison); err != nil { checksumComparison.IncrementAttempts() log.Errorf("Checksum error. Checksum=%s, err=%+v", checksumComparison.String(), err) } else { + atomic.AddInt64(&this.migrationContext.SuccessfulChecksumComparisons, 1) delete(this.checksumComparisonMap, iteration) log.Debugf("Checksum match. Checksum=%s", checksumComparison.String()) } } - time.Sleep(100 * time.Millisecond) + atomic.StoreInt64(&this.migrationContext.PendingChecksumComparisons, int64(len(this.checksumComparisonMap))) + + time.Sleep(250 * time.Millisecond) } } @@ -1012,10 +1018,11 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates() - status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, State: %s; ETA: %s", + status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Checksums: %d,%d, Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, State: %s; ETA: %s", totalRowsCopied, rowsEstimate, progressPct, atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied), len(this.applyEventsQueue), cap(this.applyEventsQueue), + atomic.LoadInt64(&this.migrationContext.PendingChecksumComparisons), atomic.LoadInt64(&this.migrationContext.SuccessfulChecksumComparisons), base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), currentBinlogCoordinates, this.migrationContext.GetCurrentLagDuration().Seconds(), From 1f47f526d3236e48e0d27bbbb028669b5d12aadb Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 5 Jul 2020 11:49:31 +0300 Subject: [PATCH 13/22] better synchronization logic --- go/base/context.go | 1 + go/logic/applier.go | 28 ++++-------------- go/logic/migrator.go | 64 ++++++++++++++++++++++++++++++------------ go/mysql/connection.go | 11 ++++++-- localtests/test.sh | 2 +- 5 files changed, 62 insertions(+), 44 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 98477377f..17b2f3c27 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -180,6 +180,7 @@ type MigrationContext struct { currentProgress uint64 PendingChecksumComparisons int64 SuccessfulChecksumComparisons int64 + SubmittedChecksumComparisons int64 ThrottleHTTPStatusCode int64 controlReplicasLagResult mysql.ReplicationLagResult TotalRowsCopied int64 diff --git a/go/logic/applier.go b/go/logic/applier.go index bbe8f183c..c6628ada4 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -22,6 +22,7 @@ import ( const ( atomicCutOverMagicHint = "ghost-cut-over-sentry" + groupConcatMaxLength = 1024 * 1024 ) type dmlBuildResult struct { @@ -69,7 +70,7 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier { func (this *Applier) InitDBConnections() (err error) { - applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName) + applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName, fmt.Sprintf("group_concat_max_len=%d", groupConcatMaxLength)) if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil { return err } @@ -515,31 +516,12 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected this.migrationContext.GetIteration(), chunkSize) - groupConcatMaxLen := 1024 * 1024 var originalTableChecksumFunc base.ChecksumFunc = func() (checksum string, err error) { - tx, err := this.db.Begin() - if err != nil { - return checksum, err - } - defer tx.Rollback() - if _, err := tx.Exec(`set session group_concat_max_len := ?`, groupConcatMaxLen); err != nil { - return checksum, err - } - - err = tx.QueryRow(originalChecksumQuery, explodedArgs...).Scan(&checksum) + err = this.db.QueryRow(originalChecksumQuery, explodedArgs...).Scan(&checksum) return checksum, err } var ghostTableChecksumFunc base.ChecksumFunc = func() (checksum string, err error) { - tx, err := this.db.Begin() - if err != nil { - return checksum, err - } - defer tx.Rollback() - if _, err := tx.Exec(`set session group_concat_max_len := ?`, groupConcatMaxLen); err != nil { - return checksum, err - } - - err = tx.QueryRow(ghostChecksumQuery, explodedArgs...).Scan(&checksum) + err = this.db.QueryRow(ghostChecksumQuery, explodedArgs...).Scan(&checksum) return checksum, err } checksumComparison = base.NewChecksumComparison( @@ -562,7 +544,7 @@ func (this *Applier) CompareChecksum(checksumComparison *base.ChecksumComparison return err } if originalChecksum != ghostChecksum { - return fmt.Errorf("Checksum failure") + return fmt.Errorf("Checksum failure. Iteration: %d", checksumComparison.Iteration) } return nil } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index d5ee202d8..e31b29698 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -11,6 +11,7 @@ import ( "math" "os" "strings" + "sync" "sync/atomic" "time" @@ -25,8 +26,9 @@ import ( type ChangelogState string const ( - GhostTableMigrated ChangelogState = "GhostTableMigrated" - AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" + GhostTableMigrated ChangelogState = "GhostTableMigrated" + AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" + checksumComparisonQueueBuffer = 10 ) func ReadChangelogState(s string) ChangelogState { @@ -82,8 +84,9 @@ type Migrator struct { copyRowsQueue chan tableWriteFunc applyEventsQueue chan *applyEventStruct - checksumComparisonQueue chan *base.ChecksumComparison - checksumComparisonMap map[int64]*base.ChecksumComparison + rowChecksumCompleteQueue chan bool + checksumComparisonQueue chan *base.ChecksumComparison + checksumComparisonMap map[int64]*base.ChecksumComparison handledChangelogStates map[string]bool @@ -102,8 +105,9 @@ func NewMigrator(context *base.MigrationContext) *Migrator { copyRowsQueue: make(chan tableWriteFunc), applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), - checksumComparisonQueue: make(chan *base.ChecksumComparison), - checksumComparisonMap: make(map[int64]*base.ChecksumComparison), + rowChecksumCompleteQueue: make(chan bool), + checksumComparisonQueue: make(chan *base.ChecksumComparison, checksumComparisonQueueBuffer), + checksumComparisonMap: make(map[int64]*base.ChecksumComparison), handledChangelogStates: make(map[string]bool), finishedMigrating: 0, @@ -211,6 +215,10 @@ func (this *Migrator) consumeRowCopyComplete() { }() } +func (this *Migrator) consumeChecksumComparisonsComplete() { + <-this.rowChecksumCompleteQueue +} + func (this *Migrator) canStopStreaming() bool { return atomic.LoadInt64(&this.migrationContext.CutOverCompleteFlag) != 0 } @@ -260,27 +268,39 @@ func (this *Migrator) listenOnPanicAbort() { } func (this *Migrator) processChecksumComparisons() { + var completeOnce sync.Once for { - // Avoid blocking. Only pull from the queue the amount of items known at this time: - newChecksums := len(this.checksumComparisonQueue) - for i := 0; i < newChecksums; i++ { - checksumComparison := <-this.checksumComparisonQueue - this.checksumComparisonMap[checksumComparison.Iteration] = checksumComparison - } + func() { + // Avoid blocking. Only pull from the queue the available events + for { + select { + case checksumComparison := <-this.checksumComparisonQueue: + log.Debugf("new checksums!!!!!!!!! %+v", checksumComparison) + this.checksumComparisonMap[checksumComparison.Iteration] = checksumComparison + default: + return + } + } + }() // Iterate the pending checksums. Some of these have been pulled from the queue just above; // others may be subsuccessful checksums from previous iterations + atomic.StoreInt64(&this.migrationContext.PendingChecksumComparisons, int64(len(this.checksumComparisonMap))) + log.Debugf("-----checksum iterations") for iteration, checksumComparison := range this.checksumComparisonMap { + log.Debugf("-----checksum iteration") if err := this.applier.CompareChecksum(checksumComparison); err != nil { checksumComparison.IncrementAttempts() - log.Errorf("Checksum error. Checksum=%s, err=%+v", checksumComparison.String(), err) + log.Errorf("--------------Checksum error. Checksum=%s, err=%+v", checksumComparison.String(), err) } else { atomic.AddInt64(&this.migrationContext.SuccessfulChecksumComparisons, 1) delete(this.checksumComparisonMap, iteration) - log.Debugf("Checksum match. Checksum=%s", checksumComparison.String()) + log.Debugf("-------------Checksum match. Checksum=%s", checksumComparison.String()) } } atomic.StoreInt64(&this.migrationContext.PendingChecksumComparisons, int64(len(this.checksumComparisonMap))) - + if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 { + go completeOnce.Do(func() { this.rowChecksumCompleteQueue <- true }) + } time.Sleep(250 * time.Millisecond) } } @@ -433,6 +453,11 @@ func (this *Migrator) Migrate() (err error) { if err := this.hooksExecutor.onRowCopyComplete(); err != nil { return err } + if this.migrationContext.ChecksumData { + log.Debugf("Operating until checksum comparison iteration is complete") + this.consumeChecksumComparisonsComplete() + log.Infof("+ checksum comparison iteration compelete") + } this.printStatus(ForcePrintStatusRule) if err := this.hooksExecutor.onBeforeCutOver(); err != nil { @@ -1018,11 +1043,11 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates() - status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Checksums: %d,%d, Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, State: %s; ETA: %s", + status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Checksums: %d/%d,%d, Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, State: %s; ETA: %s", totalRowsCopied, rowsEstimate, progressPct, atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied), len(this.applyEventsQueue), cap(this.applyEventsQueue), - atomic.LoadInt64(&this.migrationContext.PendingChecksumComparisons), atomic.LoadInt64(&this.migrationContext.SuccessfulChecksumComparisons), + atomic.LoadInt64(&this.migrationContext.SuccessfulChecksumComparisons), atomic.LoadInt64(&this.migrationContext.SubmittedChecksumComparisons), atomic.LoadInt64(&this.migrationContext.PendingChecksumComparisons), base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), currentBinlogCoordinates, this.migrationContext.GetCurrentLagDuration().Seconds(), @@ -1195,8 +1220,11 @@ func (this *Migrator) iterateChunks() error { if err != nil { return err // wrapping call will retry } + if this.migrationContext.ChecksumData { - go func() { this.checksumComparisonQueue <- checksumComparison }() + log.Debugf("adding checksum") + atomic.AddInt64(&this.migrationContext.SubmittedChecksumComparisons, 1) + this.checksumComparisonQueue <- checksumComparison } atomic.AddInt64(&this.migrationContext.TotalRowsCopied, rowsAffected) atomic.AddInt64(&this.migrationContext.Iteration, 1) diff --git a/go/mysql/connection.go b/go/mysql/connection.go index 654998cc2..bd809963e 100644 --- a/go/mysql/connection.go +++ b/go/mysql/connection.go @@ -12,6 +12,7 @@ import ( "fmt" "io/ioutil" "net" + "strings" "github.com/go-sql-driver/mysql" ) @@ -102,7 +103,7 @@ func (this *ConnectionConfig) TLSConfig() *tls.Config { return this.tlsConfig } -func (this *ConnectionConfig) GetDBUri(databaseName string) string { +func (this *ConnectionConfig) GetDBUri(databaseName string, extraOptions ...string) string { hostname := this.Key.Hostname var ip = net.ParseIP(hostname) if (ip != nil) && (ip.To4() == nil) { @@ -116,5 +117,11 @@ func (this *ConnectionConfig) GetDBUri(databaseName string) string { if this.tlsConfig != nil { tlsOption = TLS_CONFIG_KEY } - return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?interpolateParams=%t&autocommit=true&charset=utf8mb4,utf8,latin1&tls=%s", this.User, this.Password, hostname, this.Key.Port, databaseName, interpolateParams, tlsOption) + extraOptionsParams := "" + if len(extraOptions) > 0 { + extraOptionsParams = fmt.Sprintf("&%s", strings.Join(extraOptions, "&")) + } + return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?interpolateParams=%t&autocommit=true&charset=utf8mb4,utf8,latin1&tls=%s%s", + this.User, this.Password, hostname, this.Key.Port, databaseName, interpolateParams, tlsOption, extraOptionsParams, + ) } diff --git a/localtests/test.sh b/localtests/test.sh index fdaf729a3..938dda53b 100755 --- a/localtests/test.sh +++ b/localtests/test.sh @@ -148,7 +148,7 @@ test_single() { --alter='engine=innodb' \ --exact-rowcount \ --assume-rbr \ - --checksum-data=true \ + --checksum-data \ --initially-drop-old-table \ --initially-drop-ghost-table \ --throttle-query='select timestampdiff(second, min(last_update), now()) < 5 from _gh_ost_test_ghc' \ From 3907a139f9b9d513954902bafa87ec33f55aa39d Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 6 Jul 2020 13:55:15 +0300 Subject: [PATCH 14/22] GhostUniqueKey --- go/base/context.go | 1 + go/logic/inspect.go | 12 +++++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 17b2f3c27..ce5794f2b 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -209,6 +209,7 @@ type MigrationContext struct { GhostTableVirtualColumns *sql.ColumnList GhostTableUniqueKeys [](*sql.UniqueKey) UniqueKey *sql.UniqueKey + GhostUniqueKey *sql.UniqueKey SharedColumns *sql.ColumnList ColumnRenameMap map[string]string DroppedColumnsMap map[string]bool diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 31184b0c2..0aa4da10a 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -126,7 +126,7 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) { if err != nil { return err } - sharedUniqueKeys, err := this.getSharedUniqueKeys(this.migrationContext.OriginalTableUniqueKeys, this.migrationContext.GhostTableUniqueKeys) + sharedUniqueKeys, ghostSharedUniqueKeys, err := this.getSharedUniqueKeys(this.migrationContext.OriginalTableUniqueKeys, this.migrationContext.GhostTableUniqueKeys) if err != nil { return err } @@ -151,13 +151,14 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) { } if uniqueKeyIsValid { this.migrationContext.UniqueKey = sharedUniqueKeys[i] + this.migrationContext.GhostUniqueKey = ghostSharedUniqueKeys[i] break } } if this.migrationContext.UniqueKey == nil { return fmt.Errorf("No shared unique key can be found after ALTER! Bailing out") } - log.Infof("Chosen shared unique key is %s", this.migrationContext.UniqueKey.Name) + log.Infof("Chosen shared unique key is %s. ghost unique key is %s", this.migrationContext.UniqueKey.Name, this.migrationContext.GhostUniqueKey.Name) if this.migrationContext.UniqueKey.HasNullable { if this.migrationContext.NullableUniqueKeyAllowed { log.Warningf("Chosen key (%s) has nullable columns. You have supplied with --allow-nullable-unique-key and so this migration proceeds. As long as there aren't NULL values in this key's column, migration should be fine. NULL values will corrupt migration's data", this.migrationContext.UniqueKey) @@ -669,17 +670,18 @@ func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](* // getSharedUniqueKeys returns the intersection of two given unique keys, // testing by list of columns -func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [](*sql.UniqueKey)) (uniqueKeys [](*sql.UniqueKey), err error) { +func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [](*sql.UniqueKey)) (sharedUniqueKeys [](*sql.UniqueKey), ghostSharedUniqueKeys [](*sql.UniqueKey), err error) { // We actually do NOT rely on key name, just on the set of columns. This is because maybe // the ALTER is on the name itself... for _, originalUniqueKey := range originalUniqueKeys { for _, ghostUniqueKey := range ghostUniqueKeys { if originalUniqueKey.Columns.EqualsByNames(&ghostUniqueKey.Columns) { - uniqueKeys = append(uniqueKeys, originalUniqueKey) + sharedUniqueKeys = append(sharedUniqueKeys, originalUniqueKey) + ghostSharedUniqueKeys = append(ghostSharedUniqueKeys, ghostUniqueKey) } } } - return uniqueKeys, nil + return sharedUniqueKeys, ghostSharedUniqueKeys, nil } // getSharedColumns returns the intersection of two lists of columns in same order as the first list From 1182ad098e39d68c4cf207bd775901167ad647a9 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 6 Jul 2020 14:17:01 +0300 Subject: [PATCH 15/22] Using GhostUniqueKey for building checksum query on ghost table --- go/logic/applier.go | 5 ++-- go/logic/inspect.go | 1 + go/sql/builder.go | 53 ++++++++++++++++++--------------- go/sql/builder_test.go | 67 ++++++++++++++++++++++++++++++------------ 4 files changed, 82 insertions(+), 44 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index c6628ada4..6e579cc4e 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -466,9 +466,8 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected this.migrationContext.GetGhostTableName(), this.migrationContext.SharedColumns.Names(), this.migrationContext.MappedSharedColumns.Names(), - this.migrationContext.ColumnRenameMap, - this.migrationContext.UniqueKey.Name, - &this.migrationContext.UniqueKey.Columns, + this.migrationContext.UniqueKey, + this.migrationContext.GhostUniqueKey, this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(), this.migrationContext.GetIteration() == 0, diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 0aa4da10a..e35880188 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -159,6 +159,7 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) { return fmt.Errorf("No shared unique key can be found after ALTER! Bailing out") } log.Infof("Chosen shared unique key is %s. ghost unique key is %s", this.migrationContext.UniqueKey.Name, this.migrationContext.GhostUniqueKey.Name) + log.Infof("Chosen shared unique key columns %+v. ghost unique key columns: %+v", this.migrationContext.UniqueKey.Columns.Names(), this.migrationContext.GhostUniqueKey.Columns.Names()) if this.migrationContext.UniqueKey.HasNullable { if this.migrationContext.NullableUniqueKeyAllowed { log.Warningf("Chosen key (%s) has nullable columns. You have supplied with --allow-nullable-unique-key and so this migration proceeds. As long as there aren't NULL values in this key's column, migration should be fine. NULL values will corrupt migration's data", this.migrationContext.UniqueKey) diff --git a/go/sql/builder.go b/go/sql/builder.go index c3d512771..da3ee14c6 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -181,8 +181,8 @@ func BuildRangePreparedComparison(columns *ColumnList, args []interface{}, compa func BuildRangeInsertQuery( databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, - columnRenameMap map[string]string, - uniqueKey string, uniqueKeyColumns *ColumnList, + uniqueKey *UniqueKey, + ghostUniqueKey *UniqueKey, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool, @@ -208,17 +208,18 @@ func BuildRangeInsertQuery( } sharedColumnsListing := strings.Join(sharedColumns, ", ") - uniqueKey = EscapeName(uniqueKey) + uniqueKeyName := EscapeName(uniqueKey.Name) + ghostUniqueKeyName := EscapeName(ghostUniqueKey.Name) var minRangeComparisonSign ValueComparisonSign = GreaterThanComparisonSign if includeRangeStartValues { minRangeComparisonSign = GreaterThanOrEqualsComparisonSign } - rangeStartComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeStartValues, rangeStartArgs, minRangeComparisonSign) + rangeStartComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKey.Columns.Names(), rangeStartValues, rangeStartArgs, minRangeComparisonSign) if err != nil { return "", "", "", explodedArgs, err } explodedArgs = append(explodedArgs, rangeExplodedArgs...) - rangeEndComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeEndValues, rangeEndArgs, LessThanOrEqualsComparisonSign) + rangeEndComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKey.Columns.Names(), rangeEndValues, rangeEndArgs, LessThanOrEqualsComparisonSign) if err != nil { return "", "", "", explodedArgs, err } @@ -233,11 +234,11 @@ func BuildRangeInsertQuery( where (%s and %s) %s ) `, databaseName, originalTableName, databaseName, ghostTableName, mappedSharedColumnsListing, - sharedColumnsListing, databaseName, originalTableName, uniqueKey, + sharedColumnsListing, databaseName, originalTableName, uniqueKeyName, rangeStartComparison, rangeEndComparison, transactionalClause) // escape unique key columns for comparison queries - uniqueKeyColumnNames := duplicateNames(uniqueKeyColumns.Names()) + uniqueKeyColumnNames := duplicateNames(uniqueKey.Columns.Names()) for i := range uniqueKeyColumnNames { uniqueKeyColumnNames[i] = EscapeName(uniqueKeyColumnNames[i]) } @@ -254,18 +255,24 @@ func BuildRangeInsertQuery( where (%s and %s) `, databaseName, originalTableName, sharedColumnsListing, uniqueKeyColumnsListing, - databaseName, originalTableName, uniqueKey, + databaseName, originalTableName, uniqueKeyName, rangeStartComparison, rangeEndComparison) - mappedUniqueKeyColumnNames := duplicateNames(uniqueKeyColumns.Names()) - for i, name := range mappedUniqueKeyColumnNames { - if mappedName, ok := columnRenameMap[name]; ok { - mappedUniqueKeyColumnNames[i] = EscapeName(mappedName) - } else { - mappedUniqueKeyColumnNames[i] = EscapeName(name) - } + // mappedUniqueKeyColumnNames := duplicateNames(uniqueKey.Columns.Names()) + // for i, name := range mappedUniqueKeyColumnNames { + // if mappedName, ok := columnRenameMap[name]; ok { + // mappedUniqueKeyColumnNames[i] = EscapeName(mappedName) + // } else { + // mappedUniqueKeyColumnNames[i] = EscapeName(name) + // } + // } + // mappedUniqueKeyColumnsListing := strings.Join(mappedUniqueKeyColumnNames, ", ") + + ghostUniqueKeyColumnNames := duplicateNames(ghostUniqueKey.Columns.Names()) + for i := range ghostUniqueKeyColumnNames { + ghostUniqueKeyColumnNames[i] = EscapeName(ghostUniqueKeyColumnNames[i]) } - mappedUniqueKeyColumnsListing := strings.Join(mappedUniqueKeyColumnNames, ", ") + ghostUniqueKeyColumnsListing := strings.Join(ghostUniqueKeyColumnNames, ", ") ghostChecksumQuery = fmt.Sprintf(` select /* gh-ost checksum %s.%s */ @@ -277,8 +284,8 @@ func BuildRangeInsertQuery( from %s.%s force index (%s) where (%s and %s) `, databaseName, ghostTableName, - mappedSharedColumnsListing, mappedUniqueKeyColumnsListing, - databaseName, ghostTableName, uniqueKey, + mappedSharedColumnsListing, ghostUniqueKeyColumnsListing, + databaseName, ghostTableName, ghostUniqueKeyName, rangeStartComparison, rangeEndComparison) return insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, nil } @@ -286,16 +293,16 @@ func BuildRangeInsertQuery( func BuildRangeInsertPreparedQuery( databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, - columnRenameMap map[string]string, - uniqueKey string, uniqueKeyColumns *ColumnList, + uniqueKey *UniqueKey, + ghostUniqueKey *UniqueKey, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool, ) ( insertQuery, originalChecksumQuery, ghostChecksumQuery string, explodedArgs []interface{}, err error, ) { - rangeStartValues := buildColumnsPreparedValues(uniqueKeyColumns) - rangeEndValues := buildColumnsPreparedValues(uniqueKeyColumns) - return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, columnRenameMap, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable) + rangeStartValues := buildColumnsPreparedValues(&uniqueKey.Columns) + rangeEndValues := buildColumnsPreparedValues(&uniqueKey.Columns) + return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, ghostUniqueKey, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable) } func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) { diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index f95fa1afc..7a8241571 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -164,16 +164,22 @@ func TestBuildRangeInsertQuery(t *testing.T) { originalTableName := "tbl" ghostTableName := "ghost" sharedColumns := []string{"id", "name", "position"} - columnRenameMap := map[string]string{} { - uniqueKey := "PRIMARY" - uniqueKeyColumns := NewColumnList([]string{"id"}) + uniqueKey := &UniqueKey{ + Name: "PRIMARY", + Columns: *NewColumnList([]string{"id"}), + } + ghostUniqueKey := &UniqueKey{ + Name: "PRIMARY", + Columns: *NewColumnList([]string{"id"}), + } rangeStartValues := []string{"@v1s"} rangeEndValues := []string{"@v1e"} rangeStartArgs := []interface{}{3} rangeEndArgs := []interface{}{103} - insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, columnRenameMap, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery( + databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, ghostUniqueKey, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position) @@ -209,14 +215,21 @@ func TestBuildRangeInsertQuery(t *testing.T) { test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 103, 103})) } { - uniqueKey := "name_position_uidx" - uniqueKeyColumns := NewColumnList([]string{"name", "position"}) + uniqueKey := &UniqueKey{ + Name: "name_position_uidx", + Columns: *NewColumnList([]string{"name", "position"}), + } + ghostUniqueKey := &UniqueKey{ + Name: "name_position_uidx", + Columns: *NewColumnList([]string{"name", "position"}), + } rangeStartValues := []string{"@v1s", "@v2s"} rangeEndValues := []string{"@v1e", "@v2e"} rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, columnRenameMap, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery( + databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, ghostUniqueKey, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position) @@ -260,16 +273,21 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { ghostTableName := "ghost" sharedColumns := []string{"id", "name", "position"} mappedSharedColumns := []string{"id", "name", "location"} - columnRenameMap := map[string]string{"position": "location"} { - uniqueKey := "PRIMARY" - uniqueKeyColumns := NewColumnList([]string{"id"}) + uniqueKey := &UniqueKey{ + Name: "PRIMARY", + Columns: *NewColumnList([]string{"id"}), + } + ghostUniqueKey := &UniqueKey{ + Name: "PRIMARY", + Columns: *NewColumnList([]string{"id"}), + } rangeStartValues := []string{"@v1s"} rangeEndValues := []string{"@v1e"} rangeStartArgs := []interface{}{3} rangeEndArgs := []interface{}{103} - insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, columnRenameMap, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, ghostUniqueKey, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, location) @@ -306,14 +324,21 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 103, 103})) } { - uniqueKey := "name_position_uidx" - uniqueKeyColumns := NewColumnList([]string{"name", "position"}) + uniqueKey := &UniqueKey{ + Name: "name_position_uidx", + Columns: *NewColumnList([]string{"name", "position"}), + } + ghostUniqueKey := &UniqueKey{ + Name: "name_position_uidx", + Columns: *NewColumnList([]string{"name", "location"}), + } rangeStartValues := []string{"@v1s", "@v2s"} rangeEndValues := []string{"@v1e", "@v2e"} rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, columnRenameMap, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery( + databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, ghostUniqueKey, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, location) @@ -356,14 +381,20 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) { originalTableName := "tbl" ghostTableName := "ghost" sharedColumns := []string{"id", "name", "position"} - columnRenameMap := map[string]string{} { - uniqueKey := "name_position_uidx" - uniqueKeyColumns := NewColumnList([]string{"name", "position"}) + uniqueKey := &UniqueKey{ + Name: "name_position_uidx", + Columns: *NewColumnList([]string{"name", "position"}), + } + ghostUniqueKey := &UniqueKey{ + Name: "name_position_uidx", + Columns: *NewColumnList([]string{"name", "position"}), + } rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, _, _, explodedArgs, err := BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, columnRenameMap, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, true, true) + query, _, _, explodedArgs, err := BuildRangeInsertPreparedQuery( + databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, ghostUniqueKey, rangeStartArgs, rangeEndArgs, true, true) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position) From 8e43847b62ef65527d3cf8facfe8d5a90e4602ad Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 7 Jul 2020 11:04:39 +0300 Subject: [PATCH 16/22] stricter checksum with IFNULL --- go/sql/builder.go | 36 ++++++++++++++++++++---------------- go/sql/builder_test.go | 16 ++++++++-------- 2 files changed, 28 insertions(+), 24 deletions(-) diff --git a/go/sql/builder.go b/go/sql/builder.go index da3ee14c6..d4a05d954 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -196,18 +196,18 @@ func BuildRangeInsertQuery( originalTableName = EscapeName(originalTableName) ghostTableName = EscapeName(ghostTableName) - mappedSharedColumns = duplicateNames(mappedSharedColumns) - for i := range mappedSharedColumns { - mappedSharedColumns[i] = EscapeName(mappedSharedColumns[i]) - } - mappedSharedColumnsListing := strings.Join(mappedSharedColumns, ", ") - sharedColumns = duplicateNames(sharedColumns) for i := range sharedColumns { sharedColumns[i] = EscapeName(sharedColumns[i]) } sharedColumnsListing := strings.Join(sharedColumns, ", ") + mappedSharedColumns = duplicateNames(mappedSharedColumns) + for i := range mappedSharedColumns { + mappedSharedColumns[i] = EscapeName(mappedSharedColumns[i]) + } + mappedSharedColumnsListing := strings.Join(mappedSharedColumns, ", ") + uniqueKeyName := EscapeName(uniqueKey.Name) ghostUniqueKeyName := EscapeName(ghostUniqueKey.Name) var minRangeComparisonSign ValueComparisonSign = GreaterThanComparisonSign @@ -237,6 +237,20 @@ func BuildRangeInsertQuery( sharedColumnsListing, databaseName, originalTableName, uniqueKeyName, rangeStartComparison, rangeEndComparison, transactionalClause) + // Now for checksum comparisons + + sharedColumns = duplicateNames(sharedColumns) // already escaped + for i := range sharedColumns { + sharedColumns[i] = fmt.Sprintf(`IFNULL(%s, 'NULL')`, sharedColumns[i]) + } + sharedColumnsListing = strings.Join(sharedColumns, ", ") + + mappedSharedColumns = duplicateNames(mappedSharedColumns) + for i := range mappedSharedColumns { + mappedSharedColumns[i] = fmt.Sprintf(`IFNULL(%s, 'NULL')`, mappedSharedColumns[i]) + } + mappedSharedColumnsListing = strings.Join(mappedSharedColumns, ", ") + // escape unique key columns for comparison queries uniqueKeyColumnNames := duplicateNames(uniqueKey.Columns.Names()) for i := range uniqueKeyColumnNames { @@ -258,16 +272,6 @@ func BuildRangeInsertQuery( databaseName, originalTableName, uniqueKeyName, rangeStartComparison, rangeEndComparison) - // mappedUniqueKeyColumnNames := duplicateNames(uniqueKey.Columns.Names()) - // for i, name := range mappedUniqueKeyColumnNames { - // if mappedName, ok := columnRenameMap[name]; ok { - // mappedUniqueKeyColumnNames[i] = EscapeName(mappedName) - // } else { - // mappedUniqueKeyColumnNames[i] = EscapeName(name) - // } - // } - // mappedUniqueKeyColumnsListing := strings.Join(mappedUniqueKeyColumnNames, ", ") - ghostUniqueKeyColumnNames := duplicateNames(ghostUniqueKey.Columns.Names()) for i := range ghostUniqueKeyColumnNames { ghostUniqueKeyColumnNames[i] = EscapeName(ghostUniqueKeyColumnNames[i]) diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index 7a8241571..60c94ddb6 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -193,7 +193,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { select /* gh-ost checksum mydb.tbl */ sha2( group_concat( - sha2(concat_ws(',', id, name, position), 256) order by id + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(position, 'NULL')), 256) order by id ), 256 ) from mydb.tbl force index (PRIMARY) @@ -205,7 +205,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { select /* gh-ost checksum mydb.ghost */ sha2( group_concat( - sha2(concat_ws(',', id, name, position), 256) order by id + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(position, 'NULL')), 256) order by id ), 256 ) from mydb.ghost force index (PRIMARY) @@ -243,7 +243,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { select /* gh-ost checksum mydb.tbl */ sha2( group_concat( - sha2(concat_ws(',', id, name, position), 256) order by name, position + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(position, 'NULL')), 256) order by name, position ), 256 ) from mydb.tbl force index (name_position_uidx) @@ -255,7 +255,7 @@ func TestBuildRangeInsertQuery(t *testing.T) { select /* gh-ost checksum mydb.ghost */ sha2( group_concat( - sha2(concat_ws(',', id, name, position), 256) order by name, position + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(position, 'NULL')), 256) order by name, position ), 256 ) from mydb.ghost force index (name_position_uidx) @@ -301,7 +301,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { select /* gh-ost checksum mydb.tbl */ sha2( group_concat( - sha2(concat_ws(',', id, name, position), 256) order by id + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(position, 'NULL')), 256) order by id ), 256 ) from mydb.tbl force index (PRIMARY) @@ -313,7 +313,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { select /* gh-ost checksum mydb.ghost */ sha2( group_concat( - sha2(concat_ws(',', id, name, location), 256) order by id + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(location, 'NULL')), 256) order by id ), 256 ) from mydb.ghost force index (PRIMARY) @@ -352,7 +352,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { select /* gh-ost checksum mydb.tbl */ sha2( group_concat( - sha2(concat_ws(',', id, name, position), 256) order by name, position + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(position, 'NULL')), 256) order by name, position ), 256 ) from mydb.tbl force index (name_position_uidx) @@ -364,7 +364,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { select /* gh-ost checksum mydb.ghost */ sha2( group_concat( - sha2(concat_ws(',', id, name, location), 256) order by name, location + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(location, 'NULL')), 256) order by name, location ), 256 ) from mydb.ghost force index (name_position_uidx) From 6c7b4736e15e0fc8c1768395d16cc514c95036a0 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 22 Jul 2020 12:33:02 +0300 Subject: [PATCH 17/22] Support a complete ALTER TABLE statement in --alter --- go/cmd/gh-ost/main.go | 21 +++++++-- go/logic/migrator.go | 4 +- go/sql/parser.go | 83 +++++++++++++++++++++++++++------ go/sql/parser_test.go | 106 ++++++++++++++++++++++++++++++++++++------ 4 files changed, 177 insertions(+), 37 deletions(-) diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 33fc6f4cb..57c6ff84e 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -14,6 +14,7 @@ import ( "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/logic" + "github.com/github/gh-ost/go/sql" _ "github.com/go-sql-driver/mysql" "github.com/outbrain/golib/log" @@ -172,14 +173,24 @@ func main() { log.SetLevel(log.ERROR) } + if migrationContext.AlterStatement == "" { + log.Fatalf("--alter must be provided and statement must not be empty") + } + parser := sql.NewParserFromAlterStatement(migrationContext.AlterStatement) + if migrationContext.DatabaseName == "" { - log.Fatalf("--database must be provided and database name must not be empty") + if parser.HasExplicitSchema() { + migrationContext.DatabaseName = parser.GetExplicitSchema() + } else { + log.Fatalf("--database must be provided and database name must not be empty, or --alter must specify database name") + } } if migrationContext.OriginalTableName == "" { - log.Fatalf("--table must be provided and table name must not be empty") - } - if migrationContext.AlterStatement == "" { - log.Fatalf("--alter must be provided and statement must not be empty") + if parser.HasExplicitTable() { + migrationContext.OriginalTableName = parser.GetExplicitTable() + } else { + log.Fatalf("--table must be provided and table name must not be empty, or --alter must specify table name") + } } migrationContext.Noop = !(*executeFlag) if migrationContext.AllowedRunningOnMaster && migrationContext.TestOnReplica { diff --git a/go/logic/migrator.go b/go/logic/migrator.go index b1a238fda..cff64e1d4 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -62,7 +62,7 @@ const ( // Migrator is the main schema migration flow manager. type Migrator struct { - parser *sql.Parser + parser *sql.AlterTableParser inspector *Inspector applier *Applier eventsStreamer *EventsStreamer @@ -90,7 +90,7 @@ type Migrator struct { func NewMigrator(context *base.MigrationContext) *Migrator { migrator := &Migrator{ migrationContext: context, - parser: sql.NewParser(), + parser: sql.NewAlterTableParser(), ghostTableMigrated: make(chan bool), firstThrottlingCollected: make(chan bool, 3), rowCopyComplete: make(chan error), diff --git a/go/sql/parser.go b/go/sql/parser.go index 447cbcc16..d050fcd54 100644 --- a/go/sql/parser.go +++ b/go/sql/parser.go @@ -12,26 +12,47 @@ import ( ) var ( - sanitizeQuotesRegexp = regexp.MustCompile("('[^']*')") - renameColumnRegexp = regexp.MustCompile(`(?i)\bchange\s+(column\s+|)([\S]+)\s+([\S]+)\s+`) - dropColumnRegexp = regexp.MustCompile(`(?i)\bdrop\s+(column\s+|)([\S]+)$`) - renameTableRegexp = regexp.MustCompile(`(?i)\brename\s+(to|as)\s+`) + sanitizeQuotesRegexp = regexp.MustCompile("('[^']*')") + renameColumnRegexp = regexp.MustCompile(`(?i)\bchange\s+(column\s+|)([\S]+)\s+([\S]+)\s+`) + dropColumnRegexp = regexp.MustCompile(`(?i)\bdrop\s+(column\s+|)([\S]+)$`) + renameTableRegexp = regexp.MustCompile(`(?i)\brename\s+(to|as)\s+`) + alterTableExplicitSchemaTableRegexps = []*regexp.Regexp{ + regexp.MustCompile(`(?i)\balter\s+table\s+` + "`" + `([^` + "`" + `]+)` + "`" + `[.]` + "`" + `([^` + "`" + `]+)` + "`" + `\s+(.*$)`), + regexp.MustCompile(`(?i)\balter\s+table\s+` + "`" + `([^` + "`" + `]+)` + "`" + `[.]([\S]+)\s+(.*$)`), + regexp.MustCompile(`(?i)\balter\s+table\s+([\S]+)[.]` + "`" + `([^` + "`" + `]+)` + "`" + `\s+(.*$)`), + regexp.MustCompile(`(?i)\balter\s+table\s+([\S]+)[.]([\S]+)\s+(.*$)`), + } + alterTableExplicitTableRegexps = []*regexp.Regexp{ + regexp.MustCompile(`(?i)\balter\s+table\s+` + "`" + `([^` + "`" + `]+)` + "`" + `\s+(.*$)`), + regexp.MustCompile(`(?i)\balter\s+table\s+([\S]+)\s+(.*$)`), + } ) -type Parser struct { +type AlterTableParser struct { columnRenameMap map[string]string droppedColumns map[string]bool isRenameTable bool + + alterTokens []string + + explicitSchema string + explicitTable string } -func NewParser() *Parser { - return &Parser{ +func NewAlterTableParser() *AlterTableParser { + return &AlterTableParser{ columnRenameMap: make(map[string]string), droppedColumns: make(map[string]bool), } } -func (this *Parser) tokenizeAlterStatement(alterStatement string) (tokens []string, err error) { +func NewParserFromAlterStatement(alterStatement string) *AlterTableParser { + parser := NewAlterTableParser() + parser.ParseAlterStatement(alterStatement) + return parser +} + +func (this *AlterTableParser) tokenizeAlterStatement(alterStatement string) (tokens []string, err error) { terminatingQuote := rune(0) f := func(c rune) bool { switch { @@ -58,13 +79,13 @@ func (this *Parser) tokenizeAlterStatement(alterStatement string) (tokens []stri return tokens, nil } -func (this *Parser) sanitizeQuotesFromAlterStatement(alterStatement string) (strippedStatement string) { +func (this *AlterTableParser) sanitizeQuotesFromAlterStatement(alterStatement string) (strippedStatement string) { strippedStatement = alterStatement strippedStatement = sanitizeQuotesRegexp.ReplaceAllString(strippedStatement, "''") return strippedStatement } -func (this *Parser) parseAlterToken(alterToken string) (err error) { +func (this *AlterTableParser) parseAlterToken(alterToken string) (err error) { { // rename allStringSubmatch := renameColumnRegexp.FindAllStringSubmatch(alterToken, -1) @@ -97,16 +118,33 @@ func (this *Parser) parseAlterToken(alterToken string) (err error) { return nil } -func (this *Parser) ParseAlterStatement(alterStatement string) (err error) { +func (this *AlterTableParser) ParseAlterStatement(alterStatement string) (err error) { + + for _, alterTableRegexp := range alterTableExplicitSchemaTableRegexps { + if submatch := alterTableRegexp.FindStringSubmatch(alterStatement); len(submatch) > 0 { + this.explicitSchema = submatch[1] + this.explicitTable = submatch[2] + alterStatement = submatch[3] + break + } + } + for _, alterTableRegexp := range alterTableExplicitTableRegexps { + if submatch := alterTableRegexp.FindStringSubmatch(alterStatement); len(submatch) > 0 { + this.explicitTable = submatch[1] + alterStatement = submatch[2] + break + } + } alterTokens, _ := this.tokenizeAlterStatement(alterStatement) for _, alterToken := range alterTokens { alterToken = this.sanitizeQuotesFromAlterStatement(alterToken) this.parseAlterToken(alterToken) + this.alterTokens = append(this.alterTokens, alterToken) } return nil } -func (this *Parser) GetNonTrivialRenames() map[string]string { +func (this *AlterTableParser) GetNonTrivialRenames() map[string]string { result := make(map[string]string) for column, renamed := range this.columnRenameMap { if column != renamed { @@ -116,14 +154,29 @@ func (this *Parser) GetNonTrivialRenames() map[string]string { return result } -func (this *Parser) HasNonTrivialRenames() bool { +func (this *AlterTableParser) HasNonTrivialRenames() bool { return len(this.GetNonTrivialRenames()) > 0 } -func (this *Parser) DroppedColumnsMap() map[string]bool { +func (this *AlterTableParser) DroppedColumnsMap() map[string]bool { return this.droppedColumns } -func (this *Parser) IsRenameTable() bool { +func (this *AlterTableParser) IsRenameTable() bool { return this.isRenameTable } +func (this *AlterTableParser) GetExplicitSchema() string { + return this.explicitSchema +} + +func (this *AlterTableParser) HasExplicitSchema() bool { + return this.GetExplicitSchema() != "" +} + +func (this *AlterTableParser) GetExplicitTable() string { + return this.explicitTable +} + +func (this *AlterTableParser) HasExplicitTable() bool { + return this.GetExplicitTable() != "" +} diff --git a/go/sql/parser_test.go b/go/sql/parser_test.go index 5d381304e..626fe751e 100644 --- a/go/sql/parser_test.go +++ b/go/sql/parser_test.go @@ -19,7 +19,7 @@ func init() { func TestParseAlterStatement(t *testing.T) { statement := "add column t int, engine=innodb" - parser := NewParser() + parser := NewAlterTableParser() err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) test.S(t).ExpectFalse(parser.HasNonTrivialRenames()) @@ -27,7 +27,7 @@ func TestParseAlterStatement(t *testing.T) { func TestParseAlterStatementTrivialRename(t *testing.T) { statement := "add column t int, change ts ts timestamp, engine=innodb" - parser := NewParser() + parser := NewAlterTableParser() err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) test.S(t).ExpectFalse(parser.HasNonTrivialRenames()) @@ -37,7 +37,7 @@ func TestParseAlterStatementTrivialRename(t *testing.T) { func TestParseAlterStatementTrivialRenames(t *testing.T) { statement := "add column t int, change ts ts timestamp, CHANGE f `f` float, engine=innodb" - parser := NewParser() + parser := NewAlterTableParser() err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) test.S(t).ExpectFalse(parser.HasNonTrivialRenames()) @@ -58,7 +58,7 @@ func TestParseAlterStatementNonTrivial(t *testing.T) { } for _, statement := range statements { - parser := NewParser() + parser := NewAlterTableParser() err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) renames := parser.GetNonTrivialRenames() @@ -69,7 +69,7 @@ func TestParseAlterStatementNonTrivial(t *testing.T) { } func TestTokenizeAlterStatement(t *testing.T) { - parser := NewParser() + parser := NewAlterTableParser() { alterStatement := "add column t int" tokens, _ := parser.tokenizeAlterStatement(alterStatement) @@ -108,7 +108,7 @@ func TestTokenizeAlterStatement(t *testing.T) { } func TestSanitizeQuotesFromAlterStatement(t *testing.T) { - parser := NewParser() + parser := NewAlterTableParser() { alterStatement := "add column e enum('a','b','c')" strippedStatement := parser.sanitizeQuotesFromAlterStatement(alterStatement) @@ -124,7 +124,7 @@ func TestSanitizeQuotesFromAlterStatement(t *testing.T) { func TestParseAlterStatementDroppedColumns(t *testing.T) { { - parser := NewParser() + parser := NewAlterTableParser() statement := "drop column b" err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) @@ -132,7 +132,7 @@ func TestParseAlterStatementDroppedColumns(t *testing.T) { test.S(t).ExpectTrue(parser.droppedColumns["b"]) } { - parser := NewParser() + parser := NewAlterTableParser() statement := "drop column b, drop key c_idx, drop column `d`" err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) @@ -141,7 +141,7 @@ func TestParseAlterStatementDroppedColumns(t *testing.T) { test.S(t).ExpectTrue(parser.droppedColumns["d"]) } { - parser := NewParser() + parser := NewAlterTableParser() statement := "drop column b, drop key c_idx, drop column `d`, drop `e`, drop primary key, drop foreign key fk_1" err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) @@ -151,7 +151,7 @@ func TestParseAlterStatementDroppedColumns(t *testing.T) { test.S(t).ExpectTrue(parser.droppedColumns["e"]) } { - parser := NewParser() + parser := NewAlterTableParser() statement := "drop column b, drop bad statement, add column i int" err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) @@ -163,38 +163,114 @@ func TestParseAlterStatementDroppedColumns(t *testing.T) { func TestParseAlterStatementRenameTable(t *testing.T) { { - parser := NewParser() + parser := NewAlterTableParser() statement := "drop column b" err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) test.S(t).ExpectFalse(parser.isRenameTable) } { - parser := NewParser() + parser := NewAlterTableParser() statement := "rename as something_else" err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) test.S(t).ExpectTrue(parser.isRenameTable) } { - parser := NewParser() + parser := NewAlterTableParser() statement := "drop column b, rename as something_else" err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) test.S(t).ExpectTrue(parser.isRenameTable) } { - parser := NewParser() + parser := NewAlterTableParser() statement := "engine=innodb rename as something_else" err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) test.S(t).ExpectTrue(parser.isRenameTable) } { - parser := NewParser() + parser := NewAlterTableParser() statement := "rename as something_else, engine=innodb" err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) test.S(t).ExpectTrue(parser.isRenameTable) } } + +func TestParseAlterStatementExplicitTable(t *testing.T) { + + { + parser := NewAlterTableParser() + statement := "drop column b" + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.explicitSchema, "") + test.S(t).ExpectEquals(parser.explicitTable, "") + test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) + } + { + parser := NewAlterTableParser() + statement := "alter table tbl drop column b" + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.explicitSchema, "") + test.S(t).ExpectEquals(parser.explicitTable, "tbl") + test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) + } + { + parser := NewAlterTableParser() + statement := "alter table `tbl` drop column b" + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.explicitSchema, "") + test.S(t).ExpectEquals(parser.explicitTable, "tbl") + test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) + } + { + parser := NewAlterTableParser() + statement := "alter table `scm with spaces`.`tbl` drop column b" + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.explicitSchema, "scm with spaces") + test.S(t).ExpectEquals(parser.explicitTable, "tbl") + test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) + } + { + parser := NewAlterTableParser() + statement := "alter table `scm`.`tbl with spaces` drop column b" + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.explicitSchema, "scm") + test.S(t).ExpectEquals(parser.explicitTable, "tbl with spaces") + test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) + } + { + parser := NewAlterTableParser() + statement := "alter table `scm`.tbl drop column b" + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.explicitSchema, "scm") + test.S(t).ExpectEquals(parser.explicitTable, "tbl") + test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) + } + { + parser := NewAlterTableParser() + statement := "alter table scm.`tbl` drop column b" + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.explicitSchema, "scm") + test.S(t).ExpectEquals(parser.explicitTable, "tbl") + test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) + } + { + parser := NewAlterTableParser() + statement := "alter table scm.tbl drop column b" + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.explicitSchema, "scm") + test.S(t).ExpectEquals(parser.explicitTable, "tbl") + test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) + } +} From c9249f2b71ba30d13cbf0ead2717f287786e780f Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 23 Jul 2020 11:38:05 +0300 Subject: [PATCH 18/22] Updating and using AlterTableOptions --- go/base/context.go | 7 ++++--- go/cmd/gh-ost/main.go | 1 + go/logic/applier.go | 2 +- go/sql/parser.go | 18 ++++++++++++------ 4 files changed, 18 insertions(+), 10 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 1030463e5..cee66efe7 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -77,9 +77,10 @@ func NewThrottleCheckResult(throttle bool, reason string, reasonHint ThrottleRea type MigrationContext struct { Uuid string - DatabaseName string - OriginalTableName string - AlterStatement string + DatabaseName string + OriginalTableName string + AlterStatement string + AlterStatementOptions string // anything following the 'ALTER TABLE [schema.]table' from AlterStatement CountTableRows bool ConcurrentCountTableRows bool diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 31729a508..d287d1176 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -177,6 +177,7 @@ func main() { log.Fatalf("--alter must be provided and statement must not be empty") } parser := sql.NewParserFromAlterStatement(migrationContext.AlterStatement) + migrationContext.AlterStatementOptions = parser.GetAlterStatementOptions() if migrationContext.DatabaseName == "" { if parser.HasExplicitSchema() { diff --git a/go/logic/applier.go b/go/logic/applier.go index 926023ffe..3b1b9bf06 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -190,7 +190,7 @@ func (this *Applier) AlterGhost() error { query := fmt.Sprintf(`alter /* gh-ost */ table %s.%s %s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetGhostTableName()), - this.migrationContext.AlterStatement, + this.migrationContext.AlterStatementOptions, ) this.migrationContext.Log.Infof("Altering ghost table %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), diff --git a/go/sql/parser.go b/go/sql/parser.go index d050fcd54..f7333a77c 100644 --- a/go/sql/parser.go +++ b/go/sql/parser.go @@ -33,7 +33,8 @@ type AlterTableParser struct { droppedColumns map[string]bool isRenameTable bool - alterTokens []string + alterStatementOptions string + alterTokens []string explicitSchema string explicitTable string @@ -120,22 +121,23 @@ func (this *AlterTableParser) parseAlterToken(alterToken string) (err error) { func (this *AlterTableParser) ParseAlterStatement(alterStatement string) (err error) { + this.alterStatementOptions = alterStatement for _, alterTableRegexp := range alterTableExplicitSchemaTableRegexps { - if submatch := alterTableRegexp.FindStringSubmatch(alterStatement); len(submatch) > 0 { + if submatch := alterTableRegexp.FindStringSubmatch(this.alterStatementOptions); len(submatch) > 0 { this.explicitSchema = submatch[1] this.explicitTable = submatch[2] - alterStatement = submatch[3] + this.alterStatementOptions = submatch[3] break } } for _, alterTableRegexp := range alterTableExplicitTableRegexps { - if submatch := alterTableRegexp.FindStringSubmatch(alterStatement); len(submatch) > 0 { + if submatch := alterTableRegexp.FindStringSubmatch(this.alterStatementOptions); len(submatch) > 0 { this.explicitTable = submatch[1] - alterStatement = submatch[2] + this.alterStatementOptions = submatch[2] break } } - alterTokens, _ := this.tokenizeAlterStatement(alterStatement) + alterTokens, _ := this.tokenizeAlterStatement(this.alterStatementOptions) for _, alterToken := range alterTokens { alterToken = this.sanitizeQuotesFromAlterStatement(alterToken) this.parseAlterToken(alterToken) @@ -180,3 +182,7 @@ func (this *AlterTableParser) GetExplicitTable() string { func (this *AlterTableParser) HasExplicitTable() bool { return this.GetExplicitTable() != "" } + +func (this *AlterTableParser) GetAlterStatementOptions() string { + return this.alterStatementOptions +} From 731df3cd152656ac21781bbdcf43e397542042e6 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 23 Jul 2020 14:04:14 +0300 Subject: [PATCH 19/22] comments --- go/sql/parser.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/go/sql/parser.go b/go/sql/parser.go index f7333a77c..ebb8b3883 100644 --- a/go/sql/parser.go +++ b/go/sql/parser.go @@ -17,13 +17,19 @@ var ( dropColumnRegexp = regexp.MustCompile(`(?i)\bdrop\s+(column\s+|)([\S]+)$`) renameTableRegexp = regexp.MustCompile(`(?i)\brename\s+(to|as)\s+`) alterTableExplicitSchemaTableRegexps = []*regexp.Regexp{ + // ALTER TABLE `scm`.`tbl` something regexp.MustCompile(`(?i)\balter\s+table\s+` + "`" + `([^` + "`" + `]+)` + "`" + `[.]` + "`" + `([^` + "`" + `]+)` + "`" + `\s+(.*$)`), + // ALTER TABLE `scm`.tbl something regexp.MustCompile(`(?i)\balter\s+table\s+` + "`" + `([^` + "`" + `]+)` + "`" + `[.]([\S]+)\s+(.*$)`), + // ALTER TABLE scm.`tbl` something regexp.MustCompile(`(?i)\balter\s+table\s+([\S]+)[.]` + "`" + `([^` + "`" + `]+)` + "`" + `\s+(.*$)`), + // ALTER TABLE scm.tbl something regexp.MustCompile(`(?i)\balter\s+table\s+([\S]+)[.]([\S]+)\s+(.*$)`), } alterTableExplicitTableRegexps = []*regexp.Regexp{ + // ALTER TABLE `tbl` something regexp.MustCompile(`(?i)\balter\s+table\s+` + "`" + `([^` + "`" + `]+)` + "`" + `\s+(.*$)`), + // ALTER TABLE tbl something regexp.MustCompile(`(?i)\balter\s+table\s+([\S]+)\s+(.*$)`), } ) From 317c807f1f9ad0859086f280db9b4dfcb27ab646 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 28 Jul 2020 11:44:35 +0300 Subject: [PATCH 20/22] removed debug messages --- go/logic/migrator.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 7e17b0855..c9e1606cb 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -283,16 +283,12 @@ func (this *Migrator) processChecksumComparisons() { // Iterate the pending checksums. Some of these have been pulled from the queue just above; // others may be subsuccessful checksums from previous iterations atomic.StoreInt64(&this.migrationContext.PendingChecksumComparisons, int64(len(this.checksumComparisonMap))) - log.Debugf("-----checksum iterations") for iteration, checksumComparison := range this.checksumComparisonMap { - log.Debugf("-----checksum iteration") if err := this.applier.CompareChecksum(checksumComparison); err != nil { checksumComparison.IncrementAttempts() - log.Errorf("--------------Checksum error. Checksum=%s, err=%+v", checksumComparison.String(), err) } else { atomic.AddInt64(&this.migrationContext.SuccessfulChecksumComparisons, 1) delete(this.checksumComparisonMap, iteration) - log.Debugf("-------------Checksum match. Checksum=%s", checksumComparison.String()) } } atomic.StoreInt64(&this.migrationContext.PendingChecksumComparisons, int64(len(this.checksumComparisonMap))) From 1de2b5d8fa980bdb5c4ca651d17436aadb76c96f Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 28 Jul 2020 11:46:55 +0300 Subject: [PATCH 21/22] fix log call --- go/logic/migrator.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index c9e1606cb..2a54e1dda 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -273,7 +273,7 @@ func (this *Migrator) processChecksumComparisons() { for { select { case checksumComparison := <-this.checksumComparisonQueue: - log.Debugf("new checksums!!!!!!!!! %+v", checksumComparison) + this.migrationContext.Log.Debugf("new checksums!!!!!!!!! %+v", checksumComparison) this.checksumComparisonMap[checksumComparison.Iteration] = checksumComparison default: return @@ -448,9 +448,9 @@ func (this *Migrator) Migrate() (err error) { return err } if this.migrationContext.ChecksumData { - log.Debugf("Operating until checksum comparison iteration is complete") + this.migrationContext.Log.Debugf("Operating until checksum comparison iteration is complete") this.consumeChecksumComparisonsComplete() - log.Infof("+ checksum comparison iteration compelete") + this.migrationContext.Log.Infof("+ checksum comparison iteration compelete") } this.printStatus(ForcePrintStatusRule) @@ -592,7 +592,7 @@ func (this *Migrator) waitForChecksumToClear() (err error) { select { case <-timeout.C: { - return log.Errorf("Timeout while waiting for checksums to clear. There are still checksum mismatches") + return this.migrationContext.Log.Errorf("Timeout while waiting for checksums to clear. There are still checksum mismatches") } default: { @@ -709,7 +709,7 @@ func (this *Migrator) atomicCutOver() (err error) { return this.migrationContext.Log.Errore(err) } if err := this.waitForChecksumToClear(); err != nil { - return log.Errore(err) + return this.migrationContext.Log.Errore(err) } // Step 2 // We now attempt an atomic RENAME on original & ghost tables, and expect it to block. @@ -1216,7 +1216,7 @@ func (this *Migrator) iterateChunks() error { } if this.migrationContext.ChecksumData { - log.Debugf("adding checksum") + this.migrationContext.Log.Debugf("adding checksum") atomic.AddInt64(&this.migrationContext.SubmittedChecksumComparisons, 1) this.checksumComparisonQueue <- checksumComparison } From ae4dd1867a230c2864538d3e45adbf2a2804c4ff Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 29 Jul 2020 15:06:13 +0300 Subject: [PATCH 22/22] extra unit test checks --- go/sql/parser_test.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/go/sql/parser_test.go b/go/sql/parser_test.go index 626fe751e..79faa630e 100644 --- a/go/sql/parser_test.go +++ b/go/sql/parser_test.go @@ -22,6 +22,7 @@ func TestParseAlterStatement(t *testing.T) { parser := NewAlterTableParser() err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.alterStatementOptions, statement) test.S(t).ExpectFalse(parser.HasNonTrivialRenames()) } @@ -30,6 +31,7 @@ func TestParseAlterStatementTrivialRename(t *testing.T) { parser := NewAlterTableParser() err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.alterStatementOptions, statement) test.S(t).ExpectFalse(parser.HasNonTrivialRenames()) test.S(t).ExpectEquals(len(parser.columnRenameMap), 1) test.S(t).ExpectEquals(parser.columnRenameMap["ts"], "ts") @@ -40,6 +42,7 @@ func TestParseAlterStatementTrivialRenames(t *testing.T) { parser := NewAlterTableParser() err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.alterStatementOptions, statement) test.S(t).ExpectFalse(parser.HasNonTrivialRenames()) test.S(t).ExpectEquals(len(parser.columnRenameMap), 2) test.S(t).ExpectEquals(parser.columnRenameMap["ts"], "ts") @@ -61,6 +64,7 @@ func TestParseAlterStatementNonTrivial(t *testing.T) { parser := NewAlterTableParser() err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.alterStatementOptions, statement) renames := parser.GetNonTrivialRenames() test.S(t).ExpectEquals(len(renames), 2) test.S(t).ExpectEquals(renames["i"], "count") @@ -136,6 +140,7 @@ func TestParseAlterStatementDroppedColumns(t *testing.T) { statement := "drop column b, drop key c_idx, drop column `d`" err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.alterStatementOptions, statement) test.S(t).ExpectEquals(len(parser.droppedColumns), 2) test.S(t).ExpectTrue(parser.droppedColumns["b"]) test.S(t).ExpectTrue(parser.droppedColumns["d"]) @@ -181,6 +186,7 @@ func TestParseAlterStatementRenameTable(t *testing.T) { statement := "drop column b, rename as something_else" err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.alterStatementOptions, statement) test.S(t).ExpectTrue(parser.isRenameTable) } { @@ -208,6 +214,7 @@ func TestParseAlterStatementExplicitTable(t *testing.T) { test.S(t).ExpectNil(err) test.S(t).ExpectEquals(parser.explicitSchema, "") test.S(t).ExpectEquals(parser.explicitTable, "") + test.S(t).ExpectEquals(parser.alterStatementOptions, "drop column b") test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) } { @@ -217,6 +224,7 @@ func TestParseAlterStatementExplicitTable(t *testing.T) { test.S(t).ExpectNil(err) test.S(t).ExpectEquals(parser.explicitSchema, "") test.S(t).ExpectEquals(parser.explicitTable, "tbl") + test.S(t).ExpectEquals(parser.alterStatementOptions, "drop column b") test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) } { @@ -226,6 +234,7 @@ func TestParseAlterStatementExplicitTable(t *testing.T) { test.S(t).ExpectNil(err) test.S(t).ExpectEquals(parser.explicitSchema, "") test.S(t).ExpectEquals(parser.explicitTable, "tbl") + test.S(t).ExpectEquals(parser.alterStatementOptions, "drop column b") test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) } { @@ -235,6 +244,7 @@ func TestParseAlterStatementExplicitTable(t *testing.T) { test.S(t).ExpectNil(err) test.S(t).ExpectEquals(parser.explicitSchema, "scm with spaces") test.S(t).ExpectEquals(parser.explicitTable, "tbl") + test.S(t).ExpectEquals(parser.alterStatementOptions, "drop column b") test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) } { @@ -244,6 +254,7 @@ func TestParseAlterStatementExplicitTable(t *testing.T) { test.S(t).ExpectNil(err) test.S(t).ExpectEquals(parser.explicitSchema, "scm") test.S(t).ExpectEquals(parser.explicitTable, "tbl with spaces") + test.S(t).ExpectEquals(parser.alterStatementOptions, "drop column b") test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) } { @@ -253,6 +264,7 @@ func TestParseAlterStatementExplicitTable(t *testing.T) { test.S(t).ExpectNil(err) test.S(t).ExpectEquals(parser.explicitSchema, "scm") test.S(t).ExpectEquals(parser.explicitTable, "tbl") + test.S(t).ExpectEquals(parser.alterStatementOptions, "drop column b") test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) } { @@ -262,6 +274,7 @@ func TestParseAlterStatementExplicitTable(t *testing.T) { test.S(t).ExpectNil(err) test.S(t).ExpectEquals(parser.explicitSchema, "scm") test.S(t).ExpectEquals(parser.explicitTable, "tbl") + test.S(t).ExpectEquals(parser.alterStatementOptions, "drop column b") test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) } { @@ -271,6 +284,17 @@ func TestParseAlterStatementExplicitTable(t *testing.T) { test.S(t).ExpectNil(err) test.S(t).ExpectEquals(parser.explicitSchema, "scm") test.S(t).ExpectEquals(parser.explicitTable, "tbl") + test.S(t).ExpectEquals(parser.alterStatementOptions, "drop column b") test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) } + { + parser := NewAlterTableParser() + statement := "alter table scm.tbl drop column b, add index idx(i)" + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.explicitSchema, "scm") + test.S(t).ExpectEquals(parser.explicitTable, "tbl") + test.S(t).ExpectEquals(parser.alterStatementOptions, "drop column b, add index idx(i)") + test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b", "add index idx(i)"})) + } }