diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ec8978d5f..3556e1ef9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,13 +8,18 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@master + - uses: actions/checkout@v2 - - name: Set up Go 1.12 + - name: Set up Go 1.14 uses: actions/setup-go@v1 with: - version: 1.12 - id: go + go-version: 1.14 - name: Build run: script/cibuild + + - name: Upload gh-ost binary artifact + uses: actions/upload-artifact@v1 + with: + name: gh-ost + path: bin/gh-ost diff --git a/.github/workflows/replica-tests.yml b/.github/workflows/replica-tests.yml index d7751315f..31e205206 100644 --- a/.github/workflows/replica-tests.yml +++ b/.github/workflows/replica-tests.yml @@ -8,13 +8,12 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@master + - uses: actions/checkout@v2 - - name: Set up Go 1.12 + - name: Set up Go 1.14 uses: actions/setup-go@v1 with: - version: 1.12 - id: go + go-version: 1.14 - name: migration tests run: script/cibuild-gh-ost-replica-tests diff --git a/Dockerfile.packaging b/Dockerfile.packaging index 214c70c97..74aa1903d 100644 --- a/Dockerfile.packaging +++ b/Dockerfile.packaging @@ -1,6 +1,6 @@ # -FROM golang:1.12.6 +FROM golang:1.14.4 RUN apt-get update RUN apt-get install -y ruby ruby-dev rubygems build-essential diff --git a/Dockerfile.test b/Dockerfile.test index 6abc4a053..3b3bfe541 100644 --- a/Dockerfile.test +++ b/Dockerfile.test @@ -1,4 +1,4 @@ -FROM golang:1.12.1 +FROM golang:1.14.4 LABEL maintainer="github@github.com" RUN apt-get update diff --git a/README.md b/README.md index cd6f59269..7b2bc1dee 100644 --- a/README.md +++ b/README.md @@ -94,7 +94,7 @@ Please see [Coding gh-ost](doc/coding-ghost.md) for a guide to getting started d [Download latest release here](https://github.com/github/gh-ost/releases/latest) -`gh-ost` is a Go project; it is built with Go `1.12` and above. To build on your own, use either: +`gh-ost` is a Go project; it is built with Go `1.14` and above. To build on your own, use either: - [script/build](https://github.com/github/gh-ost/blob/master/script/build) - this is the same build script used by CI hence the authoritative; artifact is `./bin/gh-ost` binary. - [build.sh](https://github.com/github/gh-ost/blob/master/build.sh) for building `tar.gz` artifacts in `/tmp/gh-ost` diff --git a/build.sh b/build.sh index 46db9c237..b5d465935 100755 --- a/build.sh +++ b/build.sh @@ -18,8 +18,8 @@ function build { GOOS=$3 GOARCH=$4 - if ! go version | egrep -q 'go(1\.1[234])' ; then - echo "go version must be 1.12 or above" + if ! go version | egrep -q 'go(1\.1[456])' ; then + echo "go version must be 1.14 or above" exit 1 fi diff --git a/go/base/checksum_comparison.go b/go/base/checksum_comparison.go new file mode 100644 index 000000000..adfe26c86 --- /dev/null +++ b/go/base/checksum_comparison.go @@ -0,0 +1,49 @@ +/* + Copyright 2015 Shlomi Noach, courtesy Booking.com + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package base + +import ( + "fmt" + + "github.com/github/gh-ost/go/sql" +) + +type ChecksumFunc func() (checksum string, err error) + +// BinlogCoordinates described binary log coordinates in the form of log file & log position. +type ChecksumComparison struct { + Iteration int64 + OriginalTableChecksumFunc ChecksumFunc + GhostTableChecksumFunc ChecksumFunc + MigrationIterationRangeMinValues *sql.ColumnValues + MigrationIterationRangeMaxValues *sql.ColumnValues + Attempts int +} + +func NewChecksumComparison( + iteration int64, + originalTableChecksumFunc, ghostTableChecksumFunc ChecksumFunc, + rangeMinValues, rangeMaxValues *sql.ColumnValues, +) *ChecksumComparison { + return &ChecksumComparison{ + Iteration: iteration, + OriginalTableChecksumFunc: originalTableChecksumFunc, + GhostTableChecksumFunc: ghostTableChecksumFunc, + MigrationIterationRangeMinValues: rangeMinValues, + MigrationIterationRangeMaxValues: rangeMaxValues, + Attempts: 0, + } +} + +func (this *ChecksumComparison) IncrementAttempts() { + this.Attempts = this.Attempts + 1 +} + +func (this *ChecksumComparison) String() string { + return fmt.Sprintf("iteration: %d, range: [%s]..[%s], attempts: %d", + this.Iteration, this.MigrationIterationRangeMinValues, this.MigrationIterationRangeMaxValues, this.Attempts, + ) +} diff --git a/go/base/context.go b/go/base/context.go index 1030463e5..533717896 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -77,12 +77,14 @@ func NewThrottleCheckResult(throttle bool, reason string, reasonHint ThrottleRea type MigrationContext struct { Uuid string - DatabaseName string - OriginalTableName string - AlterStatement string + DatabaseName string + OriginalTableName string + AlterStatement string + AlterStatementOptions string // anything following the 'ALTER TABLE [schema.]table' from AlterStatement CountTableRows bool ConcurrentCountTableRows bool + ChecksumData bool AllowedRunningOnMaster bool AllowedMasterMaster bool SwitchToRowBinlogFormat bool @@ -178,6 +180,9 @@ type MigrationContext struct { pointOfInterestTimeMutex *sync.Mutex CurrentLag int64 currentProgress uint64 + PendingChecksumComparisons int64 + SuccessfulChecksumComparisons int64 + SubmittedChecksumComparisons int64 ThrottleHTTPStatusCode int64 controlReplicasLagResult mysql.ReplicationLagResult TotalRowsCopied int64 @@ -206,6 +211,7 @@ type MigrationContext struct { GhostTableVirtualColumns *sql.ColumnList GhostTableUniqueKeys [](*sql.UniqueKey) UniqueKey *sql.UniqueKey + GhostUniqueKey *sql.UniqueKey SharedColumns *sql.ColumnList ColumnRenameMap map[string]string DroppedColumnsMap map[string]bool diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index ee0c98686..9e408cf29 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -14,6 +14,7 @@ import ( "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/logic" + "github.com/github/gh-ost/go/sql" _ "github.com/go-sql-driver/mysql" "github.com/outbrain/golib/log" @@ -66,6 +67,7 @@ func main() { flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)") flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)") flag.BoolVar(&migrationContext.ConcurrentCountTableRows, "concurrent-rowcount", true, "(with --exact-rowcount), when true (default): count rows after row-copy begins, concurrently, and adjust row estimate later on; when false: first count rows, then start row copy") + flag.BoolVar(&migrationContext.ChecksumData, "checksum-data", false, "if true, checksum original and ghost table shared data on the fly, fail migration if checksum mismatches. Checksum queries run on applier node (the master unless testing on replica)") flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica") flag.BoolVar(&migrationContext.AllowedMasterMaster, "allow-master-master", false, "explicitly allow running in a master-master setup") flag.BoolVar(&migrationContext.NullableUniqueKeyAllowed, "allow-nullable-unique-key", false, "allow gh-ost to migrate based on a unique key with nullable columns. As long as no NULL values exist, this should be OK. If NULL values exist in chosen key, data may be corrupted. Use at your own risk!") @@ -172,14 +174,25 @@ func main() { migrationContext.Log.SetLevel(log.ERROR) } + if migrationContext.AlterStatement == "" { + log.Fatalf("--alter must be provided and statement must not be empty") + } + parser := sql.NewParserFromAlterStatement(migrationContext.AlterStatement) + migrationContext.AlterStatementOptions = parser.GetAlterStatementOptions() + if migrationContext.DatabaseName == "" { - migrationContext.Log.Fatalf("--database must be provided and database name must not be empty") + if parser.HasExplicitSchema() { + migrationContext.DatabaseName = parser.GetExplicitSchema() + } else { + log.Fatalf("--database must be provided and database name must not be empty, or --alter must specify database name") + } } if migrationContext.OriginalTableName == "" { - migrationContext.Log.Fatalf("--table must be provided and table name must not be empty") - } - if migrationContext.AlterStatement == "" { - migrationContext.Log.Fatalf("--alter must be provided and statement must not be empty") + if parser.HasExplicitTable() { + migrationContext.OriginalTableName = parser.GetExplicitTable() + } else { + log.Fatalf("--table must be provided and table name must not be empty, or --alter must specify table name") + } } migrationContext.Noop = !(*executeFlag) if migrationContext.AllowedRunningOnMaster && migrationContext.TestOnReplica { diff --git a/go/logic/applier.go b/go/logic/applier.go index 926023ffe..3b1d9b61f 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -21,6 +21,7 @@ import ( const ( atomicCutOverMagicHint = "ghost-cut-over-sentry" + groupConcatMaxLength = 1024 * 1024 ) type dmlBuildResult struct { @@ -68,7 +69,7 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier { func (this *Applier) InitDBConnections() (err error) { - applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName) + applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName, fmt.Sprintf("group_concat_max_len=%d", groupConcatMaxLength)) if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil { return err } @@ -190,7 +191,7 @@ func (this *Applier) AlterGhost() error { query := fmt.Sprintf(`alter /* gh-ost */ table %s.%s %s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetGhostTableName()), - this.migrationContext.AlterStatement, + this.migrationContext.AlterStatementOptions, ) this.migrationContext.Log.Infof("Altering ghost table %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), @@ -454,25 +455,25 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo // ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where // data actually gets copied from original table. -func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) { +func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, checksumComparison *base.ChecksumComparison, err error) { startTime := time.Now() chunkSize = atomic.LoadInt64(&this.migrationContext.ChunkSize) - query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery( + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := sql.BuildRangeInsertPreparedQuery( this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName(), this.migrationContext.SharedColumns.Names(), this.migrationContext.MappedSharedColumns.Names(), - this.migrationContext.UniqueKey.Name, - &this.migrationContext.UniqueKey.Columns, + this.migrationContext.UniqueKey, + this.migrationContext.GhostUniqueKey, this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(), this.migrationContext.GetIteration() == 0, this.migrationContext.IsTransactionalTable(), ) if err != nil { - return chunkSize, rowsAffected, duration, err + return chunkSize, rowsAffected, duration, checksumComparison, err } sqlResult, err := func() (gosql.Result, error) { @@ -491,7 +492,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected if _, err := tx.Exec(sessionQuery); err != nil { return nil, err } - result, err := tx.Exec(query, explodedArgs...) + result, err := tx.Exec(insertQuery, explodedArgs...) if err != nil { return nil, err } @@ -502,7 +503,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected }() if err != nil { - return chunkSize, rowsAffected, duration, err + return chunkSize, rowsAffected, duration, checksumComparison, err } rowsAffected, _ = sqlResult.RowsAffected() duration = time.Since(startTime) @@ -512,7 +513,38 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected this.migrationContext.MigrationIterationRangeMaxValues, this.migrationContext.GetIteration(), chunkSize) - return chunkSize, rowsAffected, duration, nil + + var originalTableChecksumFunc base.ChecksumFunc = func() (checksum string, err error) { + err = this.db.QueryRow(originalChecksumQuery, explodedArgs...).Scan(&checksum) + return checksum, err + } + var ghostTableChecksumFunc base.ChecksumFunc = func() (checksum string, err error) { + err = this.db.QueryRow(ghostChecksumQuery, explodedArgs...).Scan(&checksum) + return checksum, err + } + checksumComparison = base.NewChecksumComparison( + this.migrationContext.GetIteration(), + originalTableChecksumFunc, ghostTableChecksumFunc, + this.migrationContext.MigrationIterationRangeMinValues, + this.migrationContext.MigrationIterationRangeMaxValues, + ) + + return chunkSize, rowsAffected, duration, checksumComparison, nil +} + +func (this *Applier) CompareChecksum(checksumComparison *base.ChecksumComparison) error { + originalChecksum, err := checksumComparison.OriginalTableChecksumFunc() + if err != nil { + return err + } + ghostChecksum, err := checksumComparison.GhostTableChecksumFunc() + if err != nil { + return err + } + if originalChecksum != ghostChecksum { + return fmt.Errorf("Checksum failure. Iteration: %d", checksumComparison.Iteration) + } + return nil } // LockOriginalTable places a write lock on the original table @@ -811,6 +843,10 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke } tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2 + if this.migrationContext.ChecksumData { + // Allow extra time for checksum to evaluate + tableLockTimeoutSeconds += this.migrationContext.CutOverLockTimeoutSeconds + } this.migrationContext.Log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds) query = fmt.Sprintf(`set session lock_wait_timeout:=%d`, tableLockTimeoutSeconds) if _, err := tx.Exec(query); err != nil { diff --git a/go/logic/inspect.go b/go/logic/inspect.go index bc1083061..e7beeff1e 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -125,7 +125,7 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) { if err != nil { return err } - sharedUniqueKeys, err := this.getSharedUniqueKeys(this.migrationContext.OriginalTableUniqueKeys, this.migrationContext.GhostTableUniqueKeys) + sharedUniqueKeys, ghostSharedUniqueKeys, err := this.getSharedUniqueKeys(this.migrationContext.OriginalTableUniqueKeys, this.migrationContext.GhostTableUniqueKeys) if err != nil { return err } @@ -150,13 +150,15 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) { } if uniqueKeyIsValid { this.migrationContext.UniqueKey = sharedUniqueKeys[i] + this.migrationContext.GhostUniqueKey = ghostSharedUniqueKeys[i] break } } if this.migrationContext.UniqueKey == nil { return fmt.Errorf("No shared unique key can be found after ALTER! Bailing out") } - this.migrationContext.Log.Infof("Chosen shared unique key is %s", this.migrationContext.UniqueKey.Name) + this.migrationContext.Log.Infof("Chosen shared unique key is %s. ghost unique key is %s", this.migrationContext.UniqueKey.Name, this.migrationContext.GhostUniqueKey.Name) + this.migrationContext.Log.Infof("Chosen shared unique key columns %+v. ghost unique key columns: %+v", this.migrationContext.UniqueKey.Columns.Names(), this.migrationContext.GhostUniqueKey.Columns.Names()) if this.migrationContext.UniqueKey.HasNullable { if this.migrationContext.NullableUniqueKeyAllowed { this.migrationContext.Log.Warningf("Chosen key (%s) has nullable columns. You have supplied with --allow-nullable-unique-key and so this migration proceeds. As long as there aren't NULL values in this key's column, migration should be fine. NULL values will corrupt migration's data", this.migrationContext.UniqueKey) @@ -668,17 +670,18 @@ func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](* // getSharedUniqueKeys returns the intersection of two given unique keys, // testing by list of columns -func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [](*sql.UniqueKey)) (uniqueKeys [](*sql.UniqueKey), err error) { +func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [](*sql.UniqueKey)) (sharedUniqueKeys [](*sql.UniqueKey), ghostSharedUniqueKeys [](*sql.UniqueKey), err error) { // We actually do NOT rely on key name, just on the set of columns. This is because maybe // the ALTER is on the name itself... for _, originalUniqueKey := range originalUniqueKeys { for _, ghostUniqueKey := range ghostUniqueKeys { if originalUniqueKey.Columns.EqualsByNames(&ghostUniqueKey.Columns) { - uniqueKeys = append(uniqueKeys, originalUniqueKey) + sharedUniqueKeys = append(sharedUniqueKeys, originalUniqueKey) + ghostSharedUniqueKeys = append(ghostSharedUniqueKeys, ghostUniqueKey) } } } - return uniqueKeys, nil + return sharedUniqueKeys, ghostSharedUniqueKeys, nil } // getSharedColumns returns the intersection of two lists of columns in same order as the first list diff --git a/go/logic/migrator.go b/go/logic/migrator.go index bb5407a8a..943ffb034 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -11,6 +11,7 @@ import ( "math" "os" "strings" + "sync" "sync/atomic" "time" @@ -23,8 +24,9 @@ import ( type ChangelogState string const ( - GhostTableMigrated ChangelogState = "GhostTableMigrated" - AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" + GhostTableMigrated ChangelogState = "GhostTableMigrated" + AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" + checksumComparisonQueueBuffer = 10 ) func ReadChangelogState(s string) ChangelogState { @@ -60,7 +62,7 @@ const ( // Migrator is the main schema migration flow manager. type Migrator struct { - parser *sql.Parser + parser *sql.AlterTableParser inspector *Inspector applier *Applier eventsStreamer *EventsStreamer @@ -80,6 +82,10 @@ type Migrator struct { copyRowsQueue chan tableWriteFunc applyEventsQueue chan *applyEventStruct + rowChecksumCompleteQueue chan bool + checksumComparisonQueue chan *base.ChecksumComparison + checksumComparisonMap map[int64]*base.ChecksumComparison + handledChangelogStates map[string]bool finishedMigrating int64 @@ -88,14 +94,19 @@ type Migrator struct { func NewMigrator(context *base.MigrationContext) *Migrator { migrator := &Migrator{ migrationContext: context, - parser: sql.NewParser(), + parser: sql.NewAlterTableParser(), ghostTableMigrated: make(chan bool), firstThrottlingCollected: make(chan bool, 3), rowCopyComplete: make(chan error), allEventsUpToLockProcessed: make(chan string), - copyRowsQueue: make(chan tableWriteFunc), - applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), + copyRowsQueue: make(chan tableWriteFunc), + applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), + + rowChecksumCompleteQueue: make(chan bool), + checksumComparisonQueue: make(chan *base.ChecksumComparison, checksumComparisonQueueBuffer), + checksumComparisonMap: make(map[int64]*base.ChecksumComparison), + handledChangelogStates: make(map[string]bool), finishedMigrating: 0, } @@ -202,6 +213,10 @@ func (this *Migrator) consumeRowCopyComplete() { }() } +func (this *Migrator) consumeChecksumComparisonsComplete() { + <-this.rowChecksumCompleteQueue +} + func (this *Migrator) canStopStreaming() bool { return atomic.LoadInt64(&this.migrationContext.CutOverCompleteFlag) != 0 } @@ -250,6 +265,40 @@ func (this *Migrator) listenOnPanicAbort() { this.migrationContext.Log.Fatale(err) } +func (this *Migrator) processChecksumComparisons() { + var completeOnce sync.Once + for { + func() { + // Avoid blocking. Only pull from the queue the available events + for { + select { + case checksumComparison := <-this.checksumComparisonQueue: + this.migrationContext.Log.Debugf("new checksums!!!!!!!!! %+v", checksumComparison) + this.checksumComparisonMap[checksumComparison.Iteration] = checksumComparison + default: + return + } + } + }() + // Iterate the pending checksums. Some of these have been pulled from the queue just above; + // others may be subsuccessful checksums from previous iterations + atomic.StoreInt64(&this.migrationContext.PendingChecksumComparisons, int64(len(this.checksumComparisonMap))) + for iteration, checksumComparison := range this.checksumComparisonMap { + if err := this.applier.CompareChecksum(checksumComparison); err != nil { + checksumComparison.IncrementAttempts() + } else { + atomic.AddInt64(&this.migrationContext.SuccessfulChecksumComparisons, 1) + delete(this.checksumComparisonMap, iteration) + } + } + atomic.StoreInt64(&this.migrationContext.PendingChecksumComparisons, int64(len(this.checksumComparisonMap))) + if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 { + go completeOnce.Do(func() { this.rowChecksumCompleteQueue <- true }) + } + time.Sleep(250 * time.Millisecond) + } +} + // validateStatement validates the `alter` statement meets criteria. // At this time this means: // - column renames are approved @@ -388,6 +437,7 @@ func (this *Migrator) Migrate() (err error) { } go this.executeWriteFuncs() go this.iterateChunks() + go this.processChecksumComparisons() this.migrationContext.MarkRowCopyStartTime() go this.initiateStatus() @@ -397,6 +447,11 @@ func (this *Migrator) Migrate() (err error) { if err := this.hooksExecutor.onRowCopyComplete(); err != nil { return err } + if this.migrationContext.ChecksumData { + this.migrationContext.Log.Debugf("Operating until checksum comparison iteration is complete") + this.consumeChecksumComparisonsComplete() + this.migrationContext.Log.Infof("+ checksum comparison iteration compelete") + } this.printStatus(ForcePrintStatusRule) if err := this.hooksExecutor.onBeforeCutOver(); err != nil { @@ -531,6 +586,25 @@ func (this *Migrator) cutOver() (err error) { return this.migrationContext.Log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) } +func (this *Migrator) waitForChecksumToClear() (err error) { + timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds)) + for { + select { + case <-timeout.C: + { + return this.migrationContext.Log.Errorf("Timeout while waiting for checksums to clear. There are still checksum mismatches") + } + default: + { + if len(this.checksumComparisonMap) == 0 { + return nil + } + time.Sleep(250 * time.Millisecond) + } + } + } +} + // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // make sure the queue is drained. func (this *Migrator) waitForEventsUpToLock() (err error) { @@ -587,6 +661,9 @@ func (this *Migrator) cutOverTwoStep() (err error) { if err := this.retryOperation(this.waitForEventsUpToLock); err != nil { return err } + if err := this.retryOperation(this.waitForChecksumToClear); err != nil { + return err + } if err := this.retryOperation(this.applier.SwapTablesQuickAndBumpy); err != nil { return err } @@ -631,7 +708,9 @@ func (this *Migrator) atomicCutOver() (err error) { if err := this.waitForEventsUpToLock(); err != nil { return this.migrationContext.Log.Errore(err) } - + if err := this.waitForChecksumToClear(); err != nil { + return this.migrationContext.Log.Errore(err) + } // Step 2 // We now attempt an atomic RENAME on original & ghost tables, and expect it to block. this.migrationContext.RenameTablesStartTime = time.Now() @@ -958,10 +1037,11 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates() - status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, State: %s; ETA: %s", + status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Checksums: %d/%d,%d, Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, State: %s; ETA: %s", totalRowsCopied, rowsEstimate, progressPct, atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied), len(this.applyEventsQueue), cap(this.applyEventsQueue), + atomic.LoadInt64(&this.migrationContext.SuccessfulChecksumComparisons), atomic.LoadInt64(&this.migrationContext.SubmittedChecksumComparisons), atomic.LoadInt64(&this.migrationContext.PendingChecksumComparisons), base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), currentBinlogCoordinates, this.migrationContext.GetCurrentLagDuration().Seconds(), @@ -1130,10 +1210,16 @@ func (this *Migrator) iterateChunks() error { // _ghost_ table, which no longer exists. So, bothering error messages and all, but no damage. return nil } - _, rowsAffected, _, err := this.applier.ApplyIterationInsertQuery() + _, rowsAffected, _, checksumComparison, err := this.applier.ApplyIterationInsertQuery() if err != nil { return err // wrapping call will retry } + + if this.migrationContext.ChecksumData { + this.migrationContext.Log.Debugf("adding checksum") + atomic.AddInt64(&this.migrationContext.SubmittedChecksumComparisons, 1) + this.checksumComparisonQueue <- checksumComparison + } atomic.AddInt64(&this.migrationContext.TotalRowsCopied, rowsAffected) atomic.AddInt64(&this.migrationContext.Iteration, 1) return nil diff --git a/go/mysql/connection.go b/go/mysql/connection.go index 654998cc2..bd809963e 100644 --- a/go/mysql/connection.go +++ b/go/mysql/connection.go @@ -12,6 +12,7 @@ import ( "fmt" "io/ioutil" "net" + "strings" "github.com/go-sql-driver/mysql" ) @@ -102,7 +103,7 @@ func (this *ConnectionConfig) TLSConfig() *tls.Config { return this.tlsConfig } -func (this *ConnectionConfig) GetDBUri(databaseName string) string { +func (this *ConnectionConfig) GetDBUri(databaseName string, extraOptions ...string) string { hostname := this.Key.Hostname var ip = net.ParseIP(hostname) if (ip != nil) && (ip.To4() == nil) { @@ -116,5 +117,11 @@ func (this *ConnectionConfig) GetDBUri(databaseName string) string { if this.tlsConfig != nil { tlsOption = TLS_CONFIG_KEY } - return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?interpolateParams=%t&autocommit=true&charset=utf8mb4,utf8,latin1&tls=%s", this.User, this.Password, hostname, this.Key.Port, databaseName, interpolateParams, tlsOption) + extraOptionsParams := "" + if len(extraOptions) > 0 { + extraOptionsParams = fmt.Sprintf("&%s", strings.Join(extraOptions, "&")) + } + return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?interpolateParams=%t&autocommit=true&charset=utf8mb4,utf8,latin1&tls=%s%s", + this.User, this.Password, hostname, this.Key.Port, databaseName, interpolateParams, tlsOption, extraOptionsParams, + ) } diff --git a/go/sql/builder.go b/go/sql/builder.go index 2c5a7ae28..d4a05d954 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -178,60 +178,135 @@ func BuildRangePreparedComparison(columns *ColumnList, args []interface{}, compa return BuildRangeComparison(columns.Names(), values, args, comparisonSign) } -func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) { +func BuildRangeInsertQuery( + databaseName, originalTableName, ghostTableName string, + sharedColumns []string, mappedSharedColumns []string, + uniqueKey *UniqueKey, + ghostUniqueKey *UniqueKey, + rangeStartValues, rangeEndValues []string, + rangeStartArgs, rangeEndArgs []interface{}, + includeRangeStartValues bool, transactionalTable bool, +) ( + insertQuery, originalChecksumQuery, ghostChecksumQuery string, explodedArgs []interface{}, err error, +) { if len(sharedColumns) == 0 { - return "", explodedArgs, fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery") + return "", "", "", explodedArgs, fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery") } databaseName = EscapeName(databaseName) originalTableName = EscapeName(originalTableName) ghostTableName = EscapeName(ghostTableName) - mappedSharedColumns = duplicateNames(mappedSharedColumns) - for i := range mappedSharedColumns { - mappedSharedColumns[i] = EscapeName(mappedSharedColumns[i]) - } - mappedSharedColumnsListing := strings.Join(mappedSharedColumns, ", ") - sharedColumns = duplicateNames(sharedColumns) for i := range sharedColumns { sharedColumns[i] = EscapeName(sharedColumns[i]) } sharedColumnsListing := strings.Join(sharedColumns, ", ") - uniqueKey = EscapeName(uniqueKey) + mappedSharedColumns = duplicateNames(mappedSharedColumns) + for i := range mappedSharedColumns { + mappedSharedColumns[i] = EscapeName(mappedSharedColumns[i]) + } + mappedSharedColumnsListing := strings.Join(mappedSharedColumns, ", ") + + uniqueKeyName := EscapeName(uniqueKey.Name) + ghostUniqueKeyName := EscapeName(ghostUniqueKey.Name) var minRangeComparisonSign ValueComparisonSign = GreaterThanComparisonSign if includeRangeStartValues { minRangeComparisonSign = GreaterThanOrEqualsComparisonSign } - rangeStartComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeStartValues, rangeStartArgs, minRangeComparisonSign) + rangeStartComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKey.Columns.Names(), rangeStartValues, rangeStartArgs, minRangeComparisonSign) if err != nil { - return "", explodedArgs, err + return "", "", "", explodedArgs, err } explodedArgs = append(explodedArgs, rangeExplodedArgs...) - rangeEndComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeEndValues, rangeEndArgs, LessThanOrEqualsComparisonSign) + rangeEndComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKey.Columns.Names(), rangeEndValues, rangeEndArgs, LessThanOrEqualsComparisonSign) if err != nil { - return "", explodedArgs, err + return "", "", "", explodedArgs, err } explodedArgs = append(explodedArgs, rangeExplodedArgs...) transactionalClause := "" if transactionalTable { transactionalClause = "lock in share mode" } - result = fmt.Sprintf(` + insertQuery = fmt.Sprintf(` insert /* gh-ost %s.%s */ ignore into %s.%s (%s) (select %s from %s.%s force index (%s) where (%s and %s) %s ) `, databaseName, originalTableName, databaseName, ghostTableName, mappedSharedColumnsListing, - sharedColumnsListing, databaseName, originalTableName, uniqueKey, + sharedColumnsListing, databaseName, originalTableName, uniqueKeyName, rangeStartComparison, rangeEndComparison, transactionalClause) - return result, explodedArgs, nil + + // Now for checksum comparisons + + sharedColumns = duplicateNames(sharedColumns) // already escaped + for i := range sharedColumns { + sharedColumns[i] = fmt.Sprintf(`IFNULL(%s, 'NULL')`, sharedColumns[i]) + } + sharedColumnsListing = strings.Join(sharedColumns, ", ") + + mappedSharedColumns = duplicateNames(mappedSharedColumns) + for i := range mappedSharedColumns { + mappedSharedColumns[i] = fmt.Sprintf(`IFNULL(%s, 'NULL')`, mappedSharedColumns[i]) + } + mappedSharedColumnsListing = strings.Join(mappedSharedColumns, ", ") + + // escape unique key columns for comparison queries + uniqueKeyColumnNames := duplicateNames(uniqueKey.Columns.Names()) + for i := range uniqueKeyColumnNames { + uniqueKeyColumnNames[i] = EscapeName(uniqueKeyColumnNames[i]) + } + uniqueKeyColumnsListing := strings.Join(uniqueKeyColumnNames, ", ") + + originalChecksumQuery = fmt.Sprintf(` + select /* gh-ost checksum %s.%s */ + sha2( + group_concat( + sha2(concat_ws(',', %s), 256) order by %s + ), 256 + ) + from %s.%s force index (%s) + where (%s and %s) + `, databaseName, originalTableName, + sharedColumnsListing, uniqueKeyColumnsListing, + databaseName, originalTableName, uniqueKeyName, + rangeStartComparison, rangeEndComparison) + + ghostUniqueKeyColumnNames := duplicateNames(ghostUniqueKey.Columns.Names()) + for i := range ghostUniqueKeyColumnNames { + ghostUniqueKeyColumnNames[i] = EscapeName(ghostUniqueKeyColumnNames[i]) + } + ghostUniqueKeyColumnsListing := strings.Join(ghostUniqueKeyColumnNames, ", ") + + ghostChecksumQuery = fmt.Sprintf(` + select /* gh-ost checksum %s.%s */ + sha2( + group_concat( + sha2(concat_ws(',', %s), 256) order by %s + ), 256 + ) + from %s.%s force index (%s) + where (%s and %s) + `, databaseName, ghostTableName, + mappedSharedColumnsListing, ghostUniqueKeyColumnsListing, + databaseName, ghostTableName, ghostUniqueKeyName, + rangeStartComparison, rangeEndComparison) + return insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, nil } -func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) { - rangeStartValues := buildColumnsPreparedValues(uniqueKeyColumns) - rangeEndValues := buildColumnsPreparedValues(uniqueKeyColumns) - return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable) +func BuildRangeInsertPreparedQuery( + databaseName, originalTableName, ghostTableName string, + sharedColumns []string, mappedSharedColumns []string, + uniqueKey *UniqueKey, + ghostUniqueKey *UniqueKey, + rangeStartArgs, rangeEndArgs []interface{}, + includeRangeStartValues bool, transactionalTable bool, +) ( + insertQuery, originalChecksumQuery, ghostChecksumQuery string, explodedArgs []interface{}, err error, +) { + rangeStartValues := buildColumnsPreparedValues(&uniqueKey.Columns) + rangeEndValues := buildColumnsPreparedValues(&uniqueKey.Columns) + return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, ghostUniqueKey, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable) } func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) { diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index a178c4ccb..60c94ddb6 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -165,14 +165,21 @@ func TestBuildRangeInsertQuery(t *testing.T) { ghostTableName := "ghost" sharedColumns := []string{"id", "name", "position"} { - uniqueKey := "PRIMARY" - uniqueKeyColumns := NewColumnList([]string{"id"}) + uniqueKey := &UniqueKey{ + Name: "PRIMARY", + Columns: *NewColumnList([]string{"id"}), + } + ghostUniqueKey := &UniqueKey{ + Name: "PRIMARY", + Columns: *NewColumnList([]string{"id"}), + } rangeStartValues := []string{"@v1s"} rangeEndValues := []string{"@v1e"} rangeStartArgs := []interface{}{3} rangeEndArgs := []interface{}{103} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery( + databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, ghostUniqueKey, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position) @@ -180,18 +187,49 @@ func TestBuildRangeInsertQuery(t *testing.T) { where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e)))) ) ` - test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectEquals(normalizeQuery(insertQuery), normalizeQuery(expected)) + + expectedOriginalChecksumQuery := ` + select /* gh-ost checksum mydb.tbl */ + sha2( + group_concat( + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(position, 'NULL')), 256) order by id + ), 256 + ) + from mydb.tbl force index (PRIMARY) + where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) + + expectedGhostChecksumQuery := ` + select /* gh-ost checksum mydb.ghost */ + sha2( + group_concat( + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(position, 'NULL')), 256) order by id + ), 256 + ) + from mydb.ghost force index (PRIMARY) + where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(ghostChecksumQuery), normalizeQuery(expectedGhostChecksumQuery)) test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 103, 103})) } { - uniqueKey := "name_position_uidx" - uniqueKeyColumns := NewColumnList([]string{"name", "position"}) + uniqueKey := &UniqueKey{ + Name: "name_position_uidx", + Columns: *NewColumnList([]string{"name", "position"}), + } + ghostUniqueKey := &UniqueKey{ + Name: "name_position_uidx", + Columns: *NewColumnList([]string{"name", "position"}), + } rangeStartValues := []string{"@v1s", "@v2s"} rangeEndValues := []string{"@v1e", "@v2e"} rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery( + databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, ghostUniqueKey, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position) @@ -199,7 +237,32 @@ func TestBuildRangeInsertQuery(t *testing.T) { where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e)))) ) ` - test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectEquals(normalizeQuery(insertQuery), normalizeQuery(expected)) + + expectedOriginalChecksumQuery := ` + select /* gh-ost checksum mydb.tbl */ + sha2( + group_concat( + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(position, 'NULL')), 256) order by name, position + ), 256 + ) + from mydb.tbl force index (name_position_uidx) + where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) + + expectedGhostChecksumQuery := ` + select /* gh-ost checksum mydb.ghost */ + sha2( + group_concat( + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(position, 'NULL')), 256) order by name, position + ), 256 + ) + from mydb.ghost force index (name_position_uidx) + where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(ghostChecksumQuery), normalizeQuery(expectedGhostChecksumQuery)) + test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 17, 3, 17, 103, 103, 117, 103, 117})) } } @@ -211,14 +274,20 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { sharedColumns := []string{"id", "name", "position"} mappedSharedColumns := []string{"id", "name", "location"} { - uniqueKey := "PRIMARY" - uniqueKeyColumns := NewColumnList([]string{"id"}) + uniqueKey := &UniqueKey{ + Name: "PRIMARY", + Columns: *NewColumnList([]string{"id"}), + } + ghostUniqueKey := &UniqueKey{ + Name: "PRIMARY", + Columns: *NewColumnList([]string{"id"}), + } rangeStartValues := []string{"@v1s"} rangeEndValues := []string{"@v1e"} rangeStartArgs := []interface{}{3} rangeEndArgs := []interface{}{103} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, ghostUniqueKey, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, location) @@ -226,18 +295,50 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e)))) ) ` - test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectEquals(normalizeQuery(insertQuery), normalizeQuery(expected)) + + expectedOriginalChecksumQuery := ` + select /* gh-ost checksum mydb.tbl */ + sha2( + group_concat( + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(position, 'NULL')), 256) order by id + ), 256 + ) + from mydb.tbl force index (PRIMARY) + where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) + + expectedGhostChecksumQuery := ` + select /* gh-ost checksum mydb.ghost */ + sha2( + group_concat( + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(location, 'NULL')), 256) order by id + ), 256 + ) + from mydb.ghost force index (PRIMARY) + where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(ghostChecksumQuery), normalizeQuery(expectedGhostChecksumQuery)) + test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 103, 103})) } { - uniqueKey := "name_position_uidx" - uniqueKeyColumns := NewColumnList([]string{"name", "position"}) + uniqueKey := &UniqueKey{ + Name: "name_position_uidx", + Columns: *NewColumnList([]string{"name", "position"}), + } + ghostUniqueKey := &UniqueKey{ + Name: "name_position_uidx", + Columns: *NewColumnList([]string{"name", "location"}), + } rangeStartValues := []string{"@v1s", "@v2s"} rangeEndValues := []string{"@v1e", "@v2e"} rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery( + databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, ghostUniqueKey, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, location) @@ -245,7 +346,32 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e)))) ) ` - test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectEquals(normalizeQuery(insertQuery), normalizeQuery(expected)) + + expectedOriginalChecksumQuery := ` + select /* gh-ost checksum mydb.tbl */ + sha2( + group_concat( + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(position, 'NULL')), 256) order by name, position + ), 256 + ) + from mydb.tbl force index (name_position_uidx) + where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) + + expectedGhostChecksumQuery := ` + select /* gh-ost checksum mydb.ghost */ + sha2( + group_concat( + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(location, 'NULL')), 256) order by name, location + ), 256 + ) + from mydb.ghost force index (name_position_uidx) + where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(ghostChecksumQuery), normalizeQuery(expectedGhostChecksumQuery)) + test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 17, 3, 17, 103, 103, 117, 103, 117})) } } @@ -256,12 +382,19 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) { ghostTableName := "ghost" sharedColumns := []string{"id", "name", "position"} { - uniqueKey := "name_position_uidx" - uniqueKeyColumns := NewColumnList([]string{"name", "position"}) + uniqueKey := &UniqueKey{ + Name: "name_position_uidx", + Columns: *NewColumnList([]string{"name", "position"}), + } + ghostUniqueKey := &UniqueKey{ + Name: "name_position_uidx", + Columns: *NewColumnList([]string{"name", "position"}), + } rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, explodedArgs, err := BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, true, true) + query, _, _, explodedArgs, err := BuildRangeInsertPreparedQuery( + databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, ghostUniqueKey, rangeStartArgs, rangeEndArgs, true, true) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position) diff --git a/go/sql/parser.go b/go/sql/parser.go index 447cbcc16..ebb8b3883 100644 --- a/go/sql/parser.go +++ b/go/sql/parser.go @@ -12,26 +12,54 @@ import ( ) var ( - sanitizeQuotesRegexp = regexp.MustCompile("('[^']*')") - renameColumnRegexp = regexp.MustCompile(`(?i)\bchange\s+(column\s+|)([\S]+)\s+([\S]+)\s+`) - dropColumnRegexp = regexp.MustCompile(`(?i)\bdrop\s+(column\s+|)([\S]+)$`) - renameTableRegexp = regexp.MustCompile(`(?i)\brename\s+(to|as)\s+`) + sanitizeQuotesRegexp = regexp.MustCompile("('[^']*')") + renameColumnRegexp = regexp.MustCompile(`(?i)\bchange\s+(column\s+|)([\S]+)\s+([\S]+)\s+`) + dropColumnRegexp = regexp.MustCompile(`(?i)\bdrop\s+(column\s+|)([\S]+)$`) + renameTableRegexp = regexp.MustCompile(`(?i)\brename\s+(to|as)\s+`) + alterTableExplicitSchemaTableRegexps = []*regexp.Regexp{ + // ALTER TABLE `scm`.`tbl` something + regexp.MustCompile(`(?i)\balter\s+table\s+` + "`" + `([^` + "`" + `]+)` + "`" + `[.]` + "`" + `([^` + "`" + `]+)` + "`" + `\s+(.*$)`), + // ALTER TABLE `scm`.tbl something + regexp.MustCompile(`(?i)\balter\s+table\s+` + "`" + `([^` + "`" + `]+)` + "`" + `[.]([\S]+)\s+(.*$)`), + // ALTER TABLE scm.`tbl` something + regexp.MustCompile(`(?i)\balter\s+table\s+([\S]+)[.]` + "`" + `([^` + "`" + `]+)` + "`" + `\s+(.*$)`), + // ALTER TABLE scm.tbl something + regexp.MustCompile(`(?i)\balter\s+table\s+([\S]+)[.]([\S]+)\s+(.*$)`), + } + alterTableExplicitTableRegexps = []*regexp.Regexp{ + // ALTER TABLE `tbl` something + regexp.MustCompile(`(?i)\balter\s+table\s+` + "`" + `([^` + "`" + `]+)` + "`" + `\s+(.*$)`), + // ALTER TABLE tbl something + regexp.MustCompile(`(?i)\balter\s+table\s+([\S]+)\s+(.*$)`), + } ) -type Parser struct { +type AlterTableParser struct { columnRenameMap map[string]string droppedColumns map[string]bool isRenameTable bool + + alterStatementOptions string + alterTokens []string + + explicitSchema string + explicitTable string } -func NewParser() *Parser { - return &Parser{ +func NewAlterTableParser() *AlterTableParser { + return &AlterTableParser{ columnRenameMap: make(map[string]string), droppedColumns: make(map[string]bool), } } -func (this *Parser) tokenizeAlterStatement(alterStatement string) (tokens []string, err error) { +func NewParserFromAlterStatement(alterStatement string) *AlterTableParser { + parser := NewAlterTableParser() + parser.ParseAlterStatement(alterStatement) + return parser +} + +func (this *AlterTableParser) tokenizeAlterStatement(alterStatement string) (tokens []string, err error) { terminatingQuote := rune(0) f := func(c rune) bool { switch { @@ -58,13 +86,13 @@ func (this *Parser) tokenizeAlterStatement(alterStatement string) (tokens []stri return tokens, nil } -func (this *Parser) sanitizeQuotesFromAlterStatement(alterStatement string) (strippedStatement string) { +func (this *AlterTableParser) sanitizeQuotesFromAlterStatement(alterStatement string) (strippedStatement string) { strippedStatement = alterStatement strippedStatement = sanitizeQuotesRegexp.ReplaceAllString(strippedStatement, "''") return strippedStatement } -func (this *Parser) parseAlterToken(alterToken string) (err error) { +func (this *AlterTableParser) parseAlterToken(alterToken string) (err error) { { // rename allStringSubmatch := renameColumnRegexp.FindAllStringSubmatch(alterToken, -1) @@ -97,16 +125,34 @@ func (this *Parser) parseAlterToken(alterToken string) (err error) { return nil } -func (this *Parser) ParseAlterStatement(alterStatement string) (err error) { - alterTokens, _ := this.tokenizeAlterStatement(alterStatement) +func (this *AlterTableParser) ParseAlterStatement(alterStatement string) (err error) { + + this.alterStatementOptions = alterStatement + for _, alterTableRegexp := range alterTableExplicitSchemaTableRegexps { + if submatch := alterTableRegexp.FindStringSubmatch(this.alterStatementOptions); len(submatch) > 0 { + this.explicitSchema = submatch[1] + this.explicitTable = submatch[2] + this.alterStatementOptions = submatch[3] + break + } + } + for _, alterTableRegexp := range alterTableExplicitTableRegexps { + if submatch := alterTableRegexp.FindStringSubmatch(this.alterStatementOptions); len(submatch) > 0 { + this.explicitTable = submatch[1] + this.alterStatementOptions = submatch[2] + break + } + } + alterTokens, _ := this.tokenizeAlterStatement(this.alterStatementOptions) for _, alterToken := range alterTokens { alterToken = this.sanitizeQuotesFromAlterStatement(alterToken) this.parseAlterToken(alterToken) + this.alterTokens = append(this.alterTokens, alterToken) } return nil } -func (this *Parser) GetNonTrivialRenames() map[string]string { +func (this *AlterTableParser) GetNonTrivialRenames() map[string]string { result := make(map[string]string) for column, renamed := range this.columnRenameMap { if column != renamed { @@ -116,14 +162,33 @@ func (this *Parser) GetNonTrivialRenames() map[string]string { return result } -func (this *Parser) HasNonTrivialRenames() bool { +func (this *AlterTableParser) HasNonTrivialRenames() bool { return len(this.GetNonTrivialRenames()) > 0 } -func (this *Parser) DroppedColumnsMap() map[string]bool { +func (this *AlterTableParser) DroppedColumnsMap() map[string]bool { return this.droppedColumns } -func (this *Parser) IsRenameTable() bool { +func (this *AlterTableParser) IsRenameTable() bool { return this.isRenameTable } +func (this *AlterTableParser) GetExplicitSchema() string { + return this.explicitSchema +} + +func (this *AlterTableParser) HasExplicitSchema() bool { + return this.GetExplicitSchema() != "" +} + +func (this *AlterTableParser) GetExplicitTable() string { + return this.explicitTable +} + +func (this *AlterTableParser) HasExplicitTable() bool { + return this.GetExplicitTable() != "" +} + +func (this *AlterTableParser) GetAlterStatementOptions() string { + return this.alterStatementOptions +} diff --git a/go/sql/parser_test.go b/go/sql/parser_test.go index 5d381304e..79faa630e 100644 --- a/go/sql/parser_test.go +++ b/go/sql/parser_test.go @@ -19,17 +19,19 @@ func init() { func TestParseAlterStatement(t *testing.T) { statement := "add column t int, engine=innodb" - parser := NewParser() + parser := NewAlterTableParser() err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.alterStatementOptions, statement) test.S(t).ExpectFalse(parser.HasNonTrivialRenames()) } func TestParseAlterStatementTrivialRename(t *testing.T) { statement := "add column t int, change ts ts timestamp, engine=innodb" - parser := NewParser() + parser := NewAlterTableParser() err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.alterStatementOptions, statement) test.S(t).ExpectFalse(parser.HasNonTrivialRenames()) test.S(t).ExpectEquals(len(parser.columnRenameMap), 1) test.S(t).ExpectEquals(parser.columnRenameMap["ts"], "ts") @@ -37,9 +39,10 @@ func TestParseAlterStatementTrivialRename(t *testing.T) { func TestParseAlterStatementTrivialRenames(t *testing.T) { statement := "add column t int, change ts ts timestamp, CHANGE f `f` float, engine=innodb" - parser := NewParser() + parser := NewAlterTableParser() err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.alterStatementOptions, statement) test.S(t).ExpectFalse(parser.HasNonTrivialRenames()) test.S(t).ExpectEquals(len(parser.columnRenameMap), 2) test.S(t).ExpectEquals(parser.columnRenameMap["ts"], "ts") @@ -58,9 +61,10 @@ func TestParseAlterStatementNonTrivial(t *testing.T) { } for _, statement := range statements { - parser := NewParser() + parser := NewAlterTableParser() err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.alterStatementOptions, statement) renames := parser.GetNonTrivialRenames() test.S(t).ExpectEquals(len(renames), 2) test.S(t).ExpectEquals(renames["i"], "count") @@ -69,7 +73,7 @@ func TestParseAlterStatementNonTrivial(t *testing.T) { } func TestTokenizeAlterStatement(t *testing.T) { - parser := NewParser() + parser := NewAlterTableParser() { alterStatement := "add column t int" tokens, _ := parser.tokenizeAlterStatement(alterStatement) @@ -108,7 +112,7 @@ func TestTokenizeAlterStatement(t *testing.T) { } func TestSanitizeQuotesFromAlterStatement(t *testing.T) { - parser := NewParser() + parser := NewAlterTableParser() { alterStatement := "add column e enum('a','b','c')" strippedStatement := parser.sanitizeQuotesFromAlterStatement(alterStatement) @@ -124,7 +128,7 @@ func TestSanitizeQuotesFromAlterStatement(t *testing.T) { func TestParseAlterStatementDroppedColumns(t *testing.T) { { - parser := NewParser() + parser := NewAlterTableParser() statement := "drop column b" err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) @@ -132,16 +136,17 @@ func TestParseAlterStatementDroppedColumns(t *testing.T) { test.S(t).ExpectTrue(parser.droppedColumns["b"]) } { - parser := NewParser() + parser := NewAlterTableParser() statement := "drop column b, drop key c_idx, drop column `d`" err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.alterStatementOptions, statement) test.S(t).ExpectEquals(len(parser.droppedColumns), 2) test.S(t).ExpectTrue(parser.droppedColumns["b"]) test.S(t).ExpectTrue(parser.droppedColumns["d"]) } { - parser := NewParser() + parser := NewAlterTableParser() statement := "drop column b, drop key c_idx, drop column `d`, drop `e`, drop primary key, drop foreign key fk_1" err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) @@ -151,7 +156,7 @@ func TestParseAlterStatementDroppedColumns(t *testing.T) { test.S(t).ExpectTrue(parser.droppedColumns["e"]) } { - parser := NewParser() + parser := NewAlterTableParser() statement := "drop column b, drop bad statement, add column i int" err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) @@ -163,38 +168,133 @@ func TestParseAlterStatementDroppedColumns(t *testing.T) { func TestParseAlterStatementRenameTable(t *testing.T) { { - parser := NewParser() + parser := NewAlterTableParser() statement := "drop column b" err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) test.S(t).ExpectFalse(parser.isRenameTable) } { - parser := NewParser() + parser := NewAlterTableParser() statement := "rename as something_else" err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) test.S(t).ExpectTrue(parser.isRenameTable) } { - parser := NewParser() + parser := NewAlterTableParser() statement := "drop column b, rename as something_else" err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.alterStatementOptions, statement) test.S(t).ExpectTrue(parser.isRenameTable) } { - parser := NewParser() + parser := NewAlterTableParser() statement := "engine=innodb rename as something_else" err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) test.S(t).ExpectTrue(parser.isRenameTable) } { - parser := NewParser() + parser := NewAlterTableParser() statement := "rename as something_else, engine=innodb" err := parser.ParseAlterStatement(statement) test.S(t).ExpectNil(err) test.S(t).ExpectTrue(parser.isRenameTable) } } + +func TestParseAlterStatementExplicitTable(t *testing.T) { + + { + parser := NewAlterTableParser() + statement := "drop column b" + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.explicitSchema, "") + test.S(t).ExpectEquals(parser.explicitTable, "") + test.S(t).ExpectEquals(parser.alterStatementOptions, "drop column b") + test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) + } + { + parser := NewAlterTableParser() + statement := "alter table tbl drop column b" + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.explicitSchema, "") + test.S(t).ExpectEquals(parser.explicitTable, "tbl") + test.S(t).ExpectEquals(parser.alterStatementOptions, "drop column b") + test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) + } + { + parser := NewAlterTableParser() + statement := "alter table `tbl` drop column b" + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.explicitSchema, "") + test.S(t).ExpectEquals(parser.explicitTable, "tbl") + test.S(t).ExpectEquals(parser.alterStatementOptions, "drop column b") + test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) + } + { + parser := NewAlterTableParser() + statement := "alter table `scm with spaces`.`tbl` drop column b" + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.explicitSchema, "scm with spaces") + test.S(t).ExpectEquals(parser.explicitTable, "tbl") + test.S(t).ExpectEquals(parser.alterStatementOptions, "drop column b") + test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) + } + { + parser := NewAlterTableParser() + statement := "alter table `scm`.`tbl with spaces` drop column b" + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.explicitSchema, "scm") + test.S(t).ExpectEquals(parser.explicitTable, "tbl with spaces") + test.S(t).ExpectEquals(parser.alterStatementOptions, "drop column b") + test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) + } + { + parser := NewAlterTableParser() + statement := "alter table `scm`.tbl drop column b" + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.explicitSchema, "scm") + test.S(t).ExpectEquals(parser.explicitTable, "tbl") + test.S(t).ExpectEquals(parser.alterStatementOptions, "drop column b") + test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) + } + { + parser := NewAlterTableParser() + statement := "alter table scm.`tbl` drop column b" + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.explicitSchema, "scm") + test.S(t).ExpectEquals(parser.explicitTable, "tbl") + test.S(t).ExpectEquals(parser.alterStatementOptions, "drop column b") + test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) + } + { + parser := NewAlterTableParser() + statement := "alter table scm.tbl drop column b" + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.explicitSchema, "scm") + test.S(t).ExpectEquals(parser.explicitTable, "tbl") + test.S(t).ExpectEquals(parser.alterStatementOptions, "drop column b") + test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b"})) + } + { + parser := NewAlterTableParser() + statement := "alter table scm.tbl drop column b, add index idx(i)" + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(parser.explicitSchema, "scm") + test.S(t).ExpectEquals(parser.explicitTable, "tbl") + test.S(t).ExpectEquals(parser.alterStatementOptions, "drop column b, add index idx(i)") + test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b", "add index idx(i)"})) + } +} diff --git a/localtests/test.sh b/localtests/test.sh index d4b3f1723..938dda53b 100755 --- a/localtests/test.sh +++ b/localtests/test.sh @@ -148,6 +148,7 @@ test_single() { --alter='engine=innodb' \ --exact-rowcount \ --assume-rbr \ + --checksum-data \ --initially-drop-old-table \ --initially-drop-ghost-table \ --throttle-query='select timestampdiff(second, min(last_update), now()) < 5 from _gh_ost_test_ghc' \ @@ -229,11 +230,12 @@ build_binary() { echo "Using binary: $ghost_binary" return 0 fi - go build -o $ghost_binary go/cmd/gh-ost/main.go + ./script/build if [ $? -ne 0 ] ; then echo "Build failure" exit 1 fi + cp bin/gh-ost $ghost_binary } test_all() { diff --git a/script/ensure-go-installed b/script/ensure-go-installed index c9c2ae863..98ccdc1db 100755 --- a/script/ensure-go-installed +++ b/script/ensure-go-installed @@ -1,13 +1,13 @@ #!/bin/bash -PREFERRED_GO_VERSION=go1.12.6 -SUPPORTED_GO_VERSIONS='go1.1[234]' +PREFERRED_GO_VERSION=go1.14.4 +SUPPORTED_GO_VERSIONS='go1.1[456]' GO_PKG_DARWIN=${PREFERRED_GO_VERSION}.darwin-amd64.pkg -GO_PKG_DARWIN_SHA=ea78245e43de2996fa0973033064b33f48820cfe39f4f3c6e953040925cc5815 +GO_PKG_DARWIN_SHA=b518f21f823759ee30faddb1f623810a432499f050c9338777523d9c8551c62c GO_PKG_LINUX=${PREFERRED_GO_VERSION}.linux-amd64.tar.gz -GO_PKG_LINUX_SHA=dbcf71a3c1ea53b8d54ef1b48c85a39a6c9a935d01fc8291ff2b92028e59913c +GO_PKG_LINUX_SHA=aed845e4185a0b2a3c3d5e1d0a35491702c55889192bb9c30e67a3de6849c067 export ROOTDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )/.." && pwd )" cd $ROOTDIR