diff --git a/doc/cut-over.md b/doc/cut-over.md index ada5100c6..bb0060873 100644 --- a/doc/cut-over.md +++ b/doc/cut-over.md @@ -6,6 +6,15 @@ MySQL poses some limitations on how the table swap can take place. While it supp The [facebook OSC](https://www.facebook.com/notes/mysql-at-facebook/online-schema-change-for-mysql/430801045932/) tool documents this nicely. Look for **"Cut-over phase"**. The Facebook solution uses a non-atomic swap: the original table is first renamed and pushed aside, then the ghost table is renamed to take its place. In between the two renames there's a brief period of time where your table just does not exist, and queries will fail. +Another option to support a atomic swap without the use of a lock is the use of triggers only in the cut-over phase, +gh-ost philosophy is avoid the use of triggers but in some environments like a Galera cluster isn't possible use the +lock command, and like the cut-over should take a little time it shouldn't be a problem. Triggers cut-over works like the following: + +- A stop writes event is injected in the binlog and gh-ost disable the writes once it receive it. +- The triggers are created to handle the modifications in the MySQL side. +- A created triggers event is injected in the binlog and gh-ost wait until receive it. +- The affected rows will be in an inconsistent stata during the time between the first and the second event. For this reason, this events are checked and, the values of the fields that are part of the unique key used to do the online alter are saved to sanitize that rows. + `gh-ost` solves this by using an atomic, two-step blocking swap: while one connection holds the lock, another attempts the atomic `RENAME`. The `RENAME` is guaranteed to not be executed prematurely by positioning a sentry table which blocks the `RENAME` operation until `gh-ost` is satisfied all is in order. This solution either: @@ -18,4 +27,4 @@ Also note: Internals of the atomic cut-over are discussed in [Issue #82](https://github.com/github/gh-ost/issues/82). -At this time the command-line argument `--cut-over` is supported, and defaults to the atomic cut-over algorithm described above. Also supported is `--cut-over=two-step`, which uses the FB non-atomic algorithm. We recommend using the default cut-over that has been battle tested in our production environments. +At this time the command-line argument `--cut-over` is supported, and defaults to the atomic cut-over algorithm described above. Also supported is `--cut-over=two-step`, which uses the FB non-atomic algorithm and the `--cut-over=trigger`, which use the trigger algorithm. We recommend using the default cut-over that has been battle tested in our production environments. diff --git a/go/base/context.go b/go/base/context.go index 5ebf09267..7347f647b 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -38,6 +38,7 @@ type CutOver int const ( CutOverAtomic CutOver = iota CutOverTwoStep + CutOverTrigger ) type ThrottleReasonHint string @@ -169,6 +170,7 @@ type MigrationContext struct { StartTime time.Time RowCopyStartTime time.Time RowCopyEndTime time.Time + CreateTriggersStartTime time.Time LockTablesStartTime time.Time RenameTablesStartTime time.Time RenameTablesEndTime time.Time @@ -194,6 +196,7 @@ type MigrationContext struct { UserCommandedUnpostponeFlag int64 CutOverCompleteFlag int64 InCutOverCriticalSectionFlag int64 + ApplyDMLEventState int64 PanicAbort chan error OriginalTableColumnsOnApplier *sql.ColumnList @@ -215,6 +218,8 @@ type MigrationContext struct { MigrationIterationRangeMaxValues *sql.ColumnValues ForceTmpTableName string + TriggerCutoverUniqueKeys [][]interface{} + recentBinlogCoordinates mysql.BinlogCoordinates } diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index ce63b03e5..233f5ad19 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -226,6 +226,8 @@ func main() { migrationContext.CutOverType = base.CutOverAtomic case "two-step": migrationContext.CutOverType = base.CutOverTwoStep + case "trigger": + migrationContext.CutOverType = base.CutOverTrigger default: log.Fatalf("Unknown cut-over: %s", *cutOver) } diff --git a/go/logic/applier.go b/go/logic/applier.go index 5fb795bac..88948f3eb 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -8,6 +8,8 @@ package logic import ( gosql "database/sql" "fmt" + "regexp" + "strings" "sync/atomic" "time" @@ -516,6 +518,273 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected return chunkSize, rowsAffected, duration, nil } +// Create the trigger prefix +func (this *Applier) makeTriggerPrefix() string { + prefix := fmt.Sprintf( + "ghost_%s_%s", + this.migrationContext.DatabaseName, + this.migrationContext.OriginalTableName) + r := regexp.MustCompile("\\W") + prefix = r.ReplaceAllString(prefix, "_") + + if len(prefix) > 60 { + oldPrefix := prefix + prefix = prefix[0:60] + + log.Debugf( + "Trigger prefix %s is over 60 characters long, truncating to %s", + oldPrefix, + prefix) + } + + return prefix +} + +// CreateTriggersOriginalTable Create triggers from the original table to the ghost one +func (this *Applier) CreateTriggersOriginalTable() error { + prefix := this.makeTriggerPrefix() + + delIndexComparations := []string{} + for _, name := range this.migrationContext.UniqueKey.Columns.Names() { + columnQuoted := sql.EscapeName(name) + comparation := fmt.Sprintf( + "%s.%s.%s <=> OLD.%s", + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + columnQuoted, + columnQuoted) + delIndexComparations = append(delIndexComparations, comparation) + } + + deleteTrigger := fmt.Sprintf( + "CREATE /* gh-ost */ TRIGGER `%s_del` AFTER DELETE ON %s.%s "+ + "FOR EACH ROW "+ + "DELETE /* gh-ost */ IGNORE FROM %s.%s WHERE %s", + prefix, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + strings.Join(delIndexComparations, " AND ")) + + insertCols := []string{} + insertValues := []string{} + for i, origColumn := range this.migrationContext.SharedColumns.Names() { + key := sql.EscapeName(this.migrationContext.MappedSharedColumns.Columns()[i].Name) + insertCols = append(insertCols, key) + + value := fmt.Sprintf("NEW.%s", sql.EscapeName(origColumn)) + insertValues = append(insertValues, value) + } + + insertTrigger := fmt.Sprintf( + "CREATE /* gh-ost */ TRIGGER `%s_ins` AFTER INSERT ON %s.%s "+ + "FOR EACH ROW "+ + "REPLACE /* gh-ost */ INTO %s.%s (%s) VALUES (%s)", + prefix, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + strings.Join(insertCols, ", "), + strings.Join(insertValues, ", ")) + + updIndexComparations := []string{} + for _, name := range this.migrationContext.UniqueKey.Columns.Names() { + columnQuoted := sql.EscapeName(name) + comparation := fmt.Sprintf( + "OLD.%s <=> NEW.%s", + columnQuoted, + columnQuoted) + updIndexComparations = append(updIndexComparations, comparation) + } + + updateTrigger := fmt.Sprintf( + "CREATE /* gh-ost */ TRIGGER `%s_upd` AFTER UPDATE ON %s.%s "+ + "FOR EACH ROW "+ + "BEGIN /* gh-ost */ "+ + "DELETE IGNORE FROM %s.%s WHERE !(%s) AND %s; "+ + "REPLACE INTO %s.%s (%s) VALUES (%s); "+ + "END", + prefix, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + strings.Join(updIndexComparations, " AND "), + strings.Join(delIndexComparations, " AND "), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + strings.Join(insertCols, ", "), + strings.Join(insertValues, ", ")) + + return func() error { + this.migrationContext.CreateTriggersStartTime = time.Now() + + tx, err := this.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(deleteTrigger); err != nil { + return err + } + if _, err := tx.Exec(insertTrigger); err != nil { + return err + } + if _, err := tx.Exec(updateTrigger); err != nil { + return err + } + + if err := tx.Commit(); err != nil { + return err + } + return nil + }() +} + +// DropTriggersOldTableIfExists Drop triggers from the old table if them exists +func (this *Applier) DropTriggersOldTableIfExists() error { + prefix := this.makeTriggerPrefix() + + dropDeleteTrigger := fmt.Sprintf( + "DROP /* gh-ost */ TRIGGER IF EXISTS %s.`%s_del`", + sql.EscapeName(this.migrationContext.DatabaseName), + prefix) + + dropInsertTrigger := fmt.Sprintf( + "DROP /* gh-ost */ TRIGGER IF EXISTS %s.`%s_ins`", + sql.EscapeName(this.migrationContext.DatabaseName), + prefix) + + dropUpdateTrigger := fmt.Sprintf( + "DROP /* gh-ost */ TRIGGER IF EXISTS %s.`%s_upd`", + sql.EscapeName(this.migrationContext.DatabaseName), + prefix) + + return func() error { + tx, err := this.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(dropDeleteTrigger); err != nil { + return err + } + if _, err := tx.Exec(dropInsertTrigger); err != nil { + return err + } + if _, err := tx.Exec(dropUpdateTrigger); err != nil { + return err + } + + if err := tx.Commit(); err != nil { + return err + } + return nil + }() +} + +func (this *Applier) ObtainUniqueKeyValuesOfEvent(dmlEvent *binlog.BinlogDMLEvent) (uniqueKeys [][]interface{}) { + switch dmlEvent.DML { + case binlog.DeleteDML: + return append(uniqueKeys, + sql.ObtainUniqueKeyValues( + this.migrationContext.OriginalTableColumns, + &this.migrationContext.UniqueKey.Columns, + dmlEvent.WhereColumnValues.AbstractValues())) + + case binlog.InsertDML: + return append(uniqueKeys, + sql.ObtainUniqueKeyValues( + this.migrationContext.OriginalTableColumns, + &this.migrationContext.UniqueKey.Columns, + dmlEvent.NewColumnValues.AbstractValues())) + + case binlog.UpdateDML: + { + if _, isModified := this.updateModifiesUniqueKeyColumns(dmlEvent); isModified { + uniqueKeys = append(uniqueKeys, + sql.ObtainUniqueKeyValues( + this.migrationContext.OriginalTableColumns, + &this.migrationContext.UniqueKey.Columns, + dmlEvent.WhereColumnValues.AbstractValues())) + } + return append(uniqueKeys, + sql.ObtainUniqueKeyValues( + this.migrationContext.OriginalTableColumns, + &this.migrationContext.UniqueKey.Columns, + dmlEvent.NewColumnValues.AbstractValues())) + } + } + + return uniqueKeys +} + +func (this *Applier) SanitizeRowsDuringCutOver() error { + for len(this.migrationContext.TriggerCutoverUniqueKeys) > 0 { + cutIndex := int64(len(this.migrationContext.TriggerCutoverUniqueKeys)) - this.migrationContext.ChunkSize + if cutIndex < 0 { + cutIndex = 0 + } + + chunkValues := this.migrationContext.TriggerCutoverUniqueKeys[cutIndex:] + deleteQuery, deleteArgs, deleteErr := sql.BuildDeleteQuery( + this.migrationContext.DatabaseName, + this.migrationContext.GetGhostTableName(), + &this.migrationContext.UniqueKey.Columns, + chunkValues) + + if deleteErr != nil { + return deleteErr + } + + insertSelectQuery, insertSelectArgs, insertSelectErr := sql.BuildInsertSelectQuery( + this.migrationContext.DatabaseName, + this.migrationContext.OriginalTableName, + this.migrationContext.GetGhostTableName(), + this.migrationContext.SharedColumns.Names(), + this.migrationContext.MappedSharedColumns.Names(), + this.migrationContext.UniqueKey.Name, + &this.migrationContext.UniqueKey.Columns, + chunkValues) + + if insertSelectErr != nil { + return insertSelectErr + } + + log.Infof("Sanitizing chunk of created rows during trigger creation (%d/%d)", + int64(len(this.migrationContext.TriggerCutoverUniqueKeys))-cutIndex, + len(this.migrationContext.TriggerCutoverUniqueKeys)) + + err := func() error { + tx, err := this.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(deleteQuery, deleteArgs...); err != nil { + return err + } + if _, err := tx.Exec(insertSelectQuery, insertSelectArgs...); err != nil { + return err + } + if err := tx.Commit(); err != nil { + return err + } + return nil + }() + + if err != nil { + return err + } + + this.migrationContext.TriggerCutoverUniqueKeys = this.migrationContext.TriggerCutoverUniqueKeys[:cutIndex] + } + + return nil +} + // LockOriginalTable places a write lock on the original table func (this *Applier) LockOriginalTable() error { query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, @@ -545,6 +814,31 @@ func (this *Applier) UnlockTables() error { return nil } +// SwapTables issues a one-step swap table operation: +// - rename original table to _old +// - rename ghost table to original +func (this *Applier) SwapTables() error { + query := fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetOldTableName()), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + ) + log.Infof("Swaping original and new table: %s", query) + this.migrationContext.RenameTablesStartTime = time.Now() + if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { + return err + } + this.migrationContext.RenameTablesEndTime = time.Now() + + log.Infof("Tables swaped") + return nil +} + // SwapTablesQuickAndBumpy issues a two-step swap table operation: // - rename original table to _old // - rename ghost table to original diff --git a/go/logic/migrator.go b/go/logic/migrator.go index b1a238fda..5b21de836 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -25,8 +25,10 @@ import ( type ChangelogState string const ( - GhostTableMigrated ChangelogState = "GhostTableMigrated" - AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" + GhostTableMigrated ChangelogState = "GhostTableMigrated" + AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" + StopWriteEvents = "StopWriteEvents" + AllEventsUpToTriggersProcessed = "AllEventsUpToTriggersProcessed" ) func ReadChangelogState(s string) ChangelogState { @@ -71,10 +73,14 @@ type Migrator struct { hooksExecutor *HooksExecutor migrationContext *base.MigrationContext - firstThrottlingCollected chan bool - ghostTableMigrated chan bool - rowCopyComplete chan error - allEventsUpToLockProcessed chan string + firstThrottlingCollected chan bool + ghostTableMigrated chan bool + rowCopyComplete chan error + allEventsUpToLockProcessed chan string + stoppedWriteEvents chan string + allEventsUpToTriggersProcessed chan string + + triggerCutoverUniqueKeys [][]interface{} rowCopyCompleteFlag int64 // copyRowsQueue should not be buffered; if buffered some non-damaging but @@ -89,12 +95,14 @@ type Migrator struct { func NewMigrator(context *base.MigrationContext) *Migrator { migrator := &Migrator{ - migrationContext: context, - parser: sql.NewParser(), - ghostTableMigrated: make(chan bool), - firstThrottlingCollected: make(chan bool, 3), - rowCopyComplete: make(chan error), - allEventsUpToLockProcessed: make(chan string), + migrationContext: context, + parser: sql.NewParser(), + ghostTableMigrated: make(chan bool), + firstThrottlingCollected: make(chan bool, 3), + rowCopyComplete: make(chan error), + allEventsUpToLockProcessed: make(chan string), + stoppedWriteEvents: make(chan string), + allEventsUpToTriggersProcessed: make(chan string), copyRowsQueue: make(chan tableWriteFunc), applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), @@ -237,6 +245,38 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) }() } + case StopWriteEvents: + { + var applyEventFunc tableWriteFunc = func() error { + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1) + this.stoppedWriteEvents <- changelogStateString + return nil + } + // at this point we know that the triggers will be created and we don't want write the + // next events from the streamer, because the streamer works sequentially. So those + // events are either already handled, or have event functions in applyEventsQueue. + // So as not to create a potential deadlock, we write this func to applyEventsQueue + // asynchronously, understanding it doesn't really matter. + go func() { + this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) + }() + } + case AllEventsUpToTriggersProcessed: + { + var applyEventFunc tableWriteFunc = func() error { + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 2) + this.allEventsUpToTriggersProcessed <- changelogStateString + return nil + } + // at this point we know that the triggers are created and we want to sanitize the inconsistent + // rows between the stop writes and the triggers event, because the streamer works sequentially. + // So those events are either already handled, or have event functions in applyEventsQueue. + // So as not to create a potential deadlock, we write this func to applyEventsQueue + // asynchronously, understanding it doesn't really matter. + go func() { + this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) + }() + } default: { return fmt.Errorf("Unknown changelog state: %+v", changelogState) @@ -530,9 +570,100 @@ func (this *Migrator) cutOver() (err error) { this.handleCutOverResult(err) return err } + if this.migrationContext.CutOverType == base.CutOverTrigger { + err := this.cutOverTrigger() + this.handleCutOverResult(err) + return err + } return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) } +// waitForStopWriteEvents Inject the "StopWriteEvents" state hint, +// wait for it to appear in the binary logs, make sure the queue is drained. +func (this *Migrator) waitForStopWriteEvents() (err error) { + timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds)) + + this.migrationContext.MarkPointOfInterest() + stopWriteEventsStartTime := time.Now() + + stopWriteEventsChallenge := fmt.Sprintf( + "%s:%d", + string(StopWriteEvents), + stopWriteEventsStartTime.UnixNano()) + + log.Infof("Writing changelog state: %+v", stopWriteEventsChallenge) + if _, err := this.applier.WriteChangelogState(stopWriteEventsChallenge); err != nil { + return err + } + log.Infof("Waiting for stop writes") + for found := false; !found; { + select { + case <-timeout.C: + { + return log.Errorf("Timeout while waiting for stop writes") + } + case state := <-this.stoppedWriteEvents: + { + if state == stopWriteEventsChallenge { + log.Infof("Waiting for stop writes: got %s", state) + found = true + } else { + log.Infof("Waiting for stop writes: skipping %s", state) + } + } + } + } + stopWriteEventsDuration := time.Since(stopWriteEventsStartTime) + + log.Infof("Done waiting for stop writes; duration=%+v", stopWriteEventsDuration) + this.printStatus(ForcePrintStatusAndHintRule) + + return nil +} + +// waitForEventsUpToTriggers Inject the "AllEventsUpToTriggersProcessed" state hint, +// wait for it to appear in the binary logs, make sure the queue is drained. +func (this *Migrator) waitForEventsUpToTriggers() (err error) { + timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds)) + + this.migrationContext.MarkPointOfInterest() + waitForEventsUpToTriggersStartTime := time.Now() + + allEventsUpToTriggersProcessedChallenge := fmt.Sprintf( + "%s:%d", + string(AllEventsUpToTriggersProcessed), + waitForEventsUpToTriggersStartTime.UnixNano()) + + log.Infof("Writing changelog state: %+v", allEventsUpToTriggersProcessedChallenge) + if _, err := this.applier.WriteChangelogState(allEventsUpToTriggersProcessedChallenge); err != nil { + return err + } + log.Infof("Waiting for events up to triggers") + for found := false; !found; { + select { + case <-timeout.C: + { + return log.Errorf("Timeout while waiting for events up to triggers") + } + case state := <-this.allEventsUpToTriggersProcessed: + { + if state == allEventsUpToTriggersProcessedChallenge { + log.Infof("Waiting for events up to triggers: got %s", state) + found = true + } else { + log.Infof("Waiting for events up to triggers: skipping %s", state) + } + } + } + } + waitForEventsUpToTriggersDuration := time.Since(waitForEventsUpToTriggersStartTime) + + log.Infof("Done waiting for events up to triggers; duration=%+v", waitForEventsUpToTriggersDuration) + this.printStatus(ForcePrintStatusAndHintRule) + + return nil +} + // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // make sure the queue is drained. func (this *Migrator) waitForEventsUpToLock() (err error) { @@ -602,6 +733,69 @@ func (this *Migrator) cutOverTwoStep() (err error) { return nil } +// cutOverTrigger will create some INSERT, UPDATE and DELETE triggers to keep the table updated, +// After this it will copy the changes between the last insert and the triggers creation and do a +// simple RENAME TABLE. +func (this *Migrator) cutOverTrigger() (err error) { + atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) + defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) + + if err := this.waitForStopWriteEvents(); err != nil { + return err + } + + defer this.applier.DropTriggersOldTableIfExists() + + log.Infof( + "Creating triggers for %s.%s", + this.migrationContext.DatabaseName, + this.migrationContext.OriginalTableName) + + if err := this.retryOperation(this.applier.CreateTriggersOriginalTable); err != nil { + return err + } + + if err := this.waitForEventsUpToTriggers(); err != nil { + return log.Errore(err) + } + + this.migrationContext.TriggerCutoverUniqueKeys = nil + +CompareNewRow: + for _, rowToAdd := range this.triggerCutoverUniqueKeys { + CompareAddedRow: + for _, rowAdded := range this.migrationContext.TriggerCutoverUniqueKeys { + for i := range rowAdded { + if rowToAdd[i] != rowAdded[i] { + continue CompareAddedRow + } + } + + continue CompareNewRow + } + + this.migrationContext.TriggerCutoverUniqueKeys = append( + this.migrationContext.TriggerCutoverUniqueKeys, + rowToAdd) + } + + if err := this.retryOperation(this.applier.SanitizeRowsDuringCutOver); err != nil { + return err + } + + if err := this.retryOperation(this.applier.SwapTables); err != nil { + return err + } + if err := this.retryOperation(this.applier.DropTriggersOldTableIfExists); err != nil { + return err + } + + createTriggersAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.CreateTriggersStartTime) + renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime) + log.Debugf("Create triggers & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", createTriggersAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) + return nil +} + // atomicCutOver func (this *Migrator) atomicCutOver() (err error) { atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) @@ -988,6 +1182,7 @@ func (this *Migrator) initiateStreaming() error { if err := this.eventsStreamer.InitDBConnections(); err != nil { return err } + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 0) this.eventsStreamer.AddListener( false, this.migrationContext.DatabaseName, @@ -1164,6 +1359,17 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { return handleNonDMLEventStruct(eventStruct) } if eventStruct.dmlEvent != nil { + applyDMLEventState := atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) + if applyDMLEventState == 1 { + this.triggerCutoverUniqueKeys = append( + this.triggerCutoverUniqueKeys, + this.applier.ObtainUniqueKeyValuesOfEvent(eventStruct.dmlEvent)...) + return nil + } + if applyDMLEventState == 2 { + return nil + } + dmlEvents := [](*binlog.BinlogDMLEvent){} dmlEvents = append(dmlEvents, eventStruct.dmlEvent) var nonDmlStructToApply *applyEventStruct diff --git a/go/sql/builder.go b/go/sql/builder.go index 2c5a7ae28..ec7f1e030 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -508,3 +508,84 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol ) return result, sharedArgs, uniqueKeyArgs, nil } + +func ObtainUniqueKeyValues(tableColumns, uniqueKeyColumns *ColumnList, args []interface{}) (uniqueKeyArgs []interface{}) { + for _, column := range uniqueKeyColumns.Columns() { + tableOrdinal := tableColumns.Ordinals[column.Name] + arg := column.convertArg(args[tableOrdinal]) + uniqueKeyArgs = append(uniqueKeyArgs, arg) + } + return uniqueKeyArgs +} + +func buildColumnsPreparedValuesWhere(columns *ColumnList) []string { + values := make([]string, columns.Len(), columns.Len()) + for i, name := range columns.Names() { + values[i] = fmt.Sprintf( + "%s <=> ?", + EscapeName(name)) + } + return values +} + +func BuildDeleteQuery(databaseName, ghostTableName string, uniqueKeyColumns *ColumnList, uniqueKeyArgs [][]interface{}) (result string, explodedArgs []interface{}, err error) { + preparedValuesGetKey := buildColumnsPreparedValuesWhere(uniqueKeyColumns) + + template := fmt.Sprintf( + "(%s)", + strings.Join(preparedValuesGetKey, " and ")) + + preparedValuesWhere := make([]string, len(uniqueKeyArgs), len(uniqueKeyArgs)) + for i, uniqueKey := range uniqueKeyArgs { + explodedArgs = append(explodedArgs, uniqueKey...) + preparedValuesWhere[i] = template + } + + result = fmt.Sprintf( + "delete /* gh-ost */ from %s.%s where %s", + EscapeName(databaseName), + EscapeName(ghostTableName), + strings.Join(preparedValuesWhere, " or ")) + + return result, explodedArgs, nil +} + +func BuildInsertSelectQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, uniqueKeyArgs [][]interface{}) (result string, explodedArgs []interface{}, err error) { + databaseName = EscapeName(databaseName) + originalTableName = EscapeName(originalTableName) + ghostTableName = EscapeName(ghostTableName) + + mappedSharedColumns = duplicateNames(mappedSharedColumns) + for i := range mappedSharedColumns { + mappedSharedColumns[i] = EscapeName(mappedSharedColumns[i]) + } + mappedSharedColumnsListing := strings.Join(mappedSharedColumns, ", ") + + sharedColumns = duplicateNames(sharedColumns) + for i := range sharedColumns { + sharedColumns[i] = EscapeName(sharedColumns[i]) + } + sharedColumnsListing := strings.Join(sharedColumns, ", ") + + preparedValuesGetKey := buildColumnsPreparedValuesWhere(uniqueKeyColumns) + + template := fmt.Sprintf( + "(%s)", + strings.Join(preparedValuesGetKey, " and ")) + + preparedValuesWhere := make([]string, len(uniqueKeyArgs), len(uniqueKeyArgs)) + for i, uniqueKey := range uniqueKeyArgs { + explodedArgs = append(explodedArgs, uniqueKey...) + preparedValuesWhere[i] = template + } + + result = fmt.Sprintf( + "insert /* gh-ost %s.%s */ into %s.%s (%s) select %s from %s.%s force index (%s) where %s", + databaseName, originalTableName, + databaseName, ghostTableName, mappedSharedColumnsListing, + sharedColumnsListing, databaseName, originalTableName, + uniqueKey, + strings.Join(preparedValuesWhere, " or ")) + + return result, explodedArgs, nil +}