From aabca5258a08548228bcd8417f0629e540f9f2db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Thu, 28 Mar 2019 11:08:23 +0100 Subject: [PATCH 01/17] Added changes to use a cutover based on triggers --- go/base/context.go | 2 + go/cmd/gh-ost/main.go | 2 + go/logic/applier.go | 220 ++++++++++++++++++++++++++++++++++++++++++ go/logic/migrator.go | 161 +++++++++++++++++++++++++++---- go/sql/builder.go | 72 ++++++++++++++ 5 files changed, 439 insertions(+), 18 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index f8054d6dc..a1a9749b9 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -38,6 +38,7 @@ type CutOver int const ( CutOverAtomic CutOver = iota CutOverTwoStep + CutOverTrigger ) type ThrottleReasonHint string @@ -201,6 +202,7 @@ type MigrationContext struct { UserCommandedUnpostponeFlag int64 CutOverCompleteFlag int64 InCutOverCriticalSectionFlag int64 + ApplyDMLEventState int64 PanicAbort chan error OriginalTableColumnsOnApplier *sql.ColumnList diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 7b5af54f9..ee84d5c90 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -241,6 +241,8 @@ func main() { migrationContext.CutOverType = base.CutOverAtomic case "two-step": migrationContext.CutOverType = base.CutOverTwoStep + case "trigger": + migrationContext.CutOverType = base.CutOverTrigger default: migrationContext.Log.Fatalf("Unknown cut-over: %s", *cutOver) } diff --git a/go/logic/applier.go b/go/logic/applier.go index 9bd2ba86a..ae4e8a305 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -11,6 +11,8 @@ import ( "sync" "sync/atomic" "time" + "regexp" + "strings" "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/binlog" @@ -545,6 +547,224 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected return chunkSize, rowsAffected, duration, nil } +// Create triggers from the original table to the new one +func (this *Applier) CreateTriggersOriginalTable() error { + /////////////////////// + // Trigger preffix // + /////////////////////// + + prefix := fmt.Sprintf( + "ghost_%s_%s", + this.migrationContext.DatabaseName, + this.migrationContext.OriginalTableName) + r := regexp.MustCompile("\\W") + prefix = r.ReplaceAllString(prefix, "_") + + if len(prefix) > 60 { + oldPrefix := prefix + prefix = prefix[0:60] + + log.Debugf( + "Trigger prefix %s is over 60 characters long, truncating to %s", + oldPrefix, + prefix) + } + + ////////////////////// + // Delete trigger // + ////////////////////// + + delIndexComparations := []string{} + for _, name := range this.migrationContext.UniqueKey.Columns.Names() { + columnQuoted := sql.EscapeName(name) + comparation := fmt.Sprintf( + "%s.%s.%s <=> OLD.%s", + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + columnQuoted, + columnQuoted) + delIndexComparations = append(delIndexComparations, comparation) + } + + deleteTrigger := fmt.Sprintf( + "CREATE /* gh-ost */ TRIGGER `%s_del` AFTER DELETE ON %s.%s " + + "FOR EACH ROW " + + "DELETE /* gh-ost */ IGNORE FROM %s.%s WHERE %s", + prefix, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + strings.Join(delIndexComparations, " AND ")) + + ////////////////////// + // Insert trigger // + ////////////////////// + + insertCols := []string{} + insertValues := []string{} + for i, origColumn := range this.migrationContext.SharedColumns.Names() { + key := sql.EscapeName(this.migrationContext.MappedSharedColumns.Columns()[i].Name) + insertCols = append(insertCols, key) + + value := fmt.Sprintf("NEW.%s", sql.EscapeName(origColumn)) + insertValues = append(insertValues, value) + } + + + insertTrigger := fmt.Sprintf( + "CREATE /* gh-ost */ TRIGGER `%s_ins` AFTER INSERT ON %s.%s " + + "FOR EACH ROW " + + "REPLACE /* gh-ost */ INTO %s.%s (%s) VALUES (%s)", + prefix, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + strings.Join(insertCols, ", "), + strings.Join(insertValues, ", ")) + + ////////////////////// + // Update trigger // + ////////////////////// + + updIndexComparations := []string{} + for _, name := range this.migrationContext.UniqueKey.Columns.Names() { + columnQuoted := sql.EscapeName(name) + comparation := fmt.Sprintf( + "OLD.%s <=> NEW.%s", + columnQuoted, + columnQuoted) + updIndexComparations = append(updIndexComparations, comparation) + } + + updateTrigger := fmt.Sprintf( + "CREATE /* gh-ost */ TRIGGER `%s_upd` AFTER UPDATE ON %s.%s " + + "FOR EACH ROW " + + "BEGIN /* gh-ost */ " + + "DELETE IGNORE FROM %s.%s WHERE !(%s) AND %s; " + + "REPLACE INTO %s.%s (%s) VALUES (%s); " + + "END", + prefix, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + strings.Join(updIndexComparations, " AND "), + strings.Join(delIndexComparations, " AND "), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + strings.Join(insertCols, ", "), + strings.Join(insertValues, ", ")) + + /////////////////////////// + // Create the triggers // + /////////////////////////// + + return func() error { + tx, err := this.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(deleteTrigger); err != nil { + return err + } + if _, err := tx.Exec(insertTrigger); err != nil { + return err + } + if _, err := tx.Exec(updateTrigger); err != nil { + return err + } + + if err := tx.Commit(); err != nil { + return err + } + return nil + }() +} + +//TODO +func (this *Applier) ObtainUniqueKeyValuesOfEvent(dmlEvent *binlog.BinlogDMLEvent) (uniqueKeys [][]interface{}) { + switch dmlEvent.DML { + case binlog.DeleteDML: + return append(uniqueKeys, + sql.ObtainUniqueKeyValues( + this.migrationContext.OriginalTableColumns, + &this.migrationContext.UniqueKey.Columns, + dmlEvent.WhereColumnValues.AbstractValues())) + + case binlog.InsertDML: + return append(uniqueKeys, + sql.ObtainUniqueKeyValues( + this.migrationContext.OriginalTableColumns, + &this.migrationContext.UniqueKey.Columns, + dmlEvent.NewColumnValues.AbstractValues())) + + case binlog.UpdateDML: + { + if _, isModified := this.updateModifiesUniqueKeyColumns(dmlEvent); isModified { + uniqueKeys = append(uniqueKeys, + sql.ObtainUniqueKeyValues( + this.migrationContext.OriginalTableColumns, + &this.migrationContext.UniqueKey.Columns, + dmlEvent.WhereColumnValues.AbstractValues())) + } + return append(uniqueKeys, + sql.ObtainUniqueKeyValues( + this.migrationContext.OriginalTableColumns, + &this.migrationContext.UniqueKey.Columns, + dmlEvent.NewColumnValues.AbstractValues())) + } + } + + return uniqueKeys +} + +//TODO +func (this *Applier) DropUniqueKeysGhostTable(uniqueKeysArgs [][]interface{}) error { + query, args, err := sql.BuildDeleteQuery( + this.migrationContext.DatabaseName, + this.migrationContext.OriginalTableName, + this.migrationContext.GetGhostTableName(), + &this.migrationContext.UniqueKey.Columns, + uniqueKeysArgs) + + if err != nil { + return err + } + + log.Infof("Delete query: %s", query) + log.Infof("Delete query args: %+v", args) + + return nil + + //return func() error { + //tx, err := this.db.Begin() + //if err != nil { + //return err + //} + + //if _, err := tx.Exec(deleteTrigger); err != nil { + //return err + //} + //if _, err := tx.Exec(insertTrigger); err != nil { + //return err + //} + //if _, err := tx.Exec(updateTrigger); err != nil { + //return err + //} + + //if err := tx.Commit(); err != nil { + //return err + //} + //return nil + //}() +} + + + + // LockOriginalTable places a write lock on the original table func (this *Applier) LockOriginalTable() error { query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, diff --git a/go/logic/migrator.go b/go/logic/migrator.go index c12c21fc3..6197f4c09 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -19,13 +19,15 @@ import ( "github.com/github/gh-ost/go/binlog" "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" + "github.com/openark/golib/log" ) type ChangelogState string const ( - GhostTableMigrated ChangelogState = "GhostTableMigrated" - AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" + GhostTableMigrated ChangelogState = "GhostTableMigrated" + AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" + AllEventsUpToTriggersProcessed = "AllEventsUpToTriggersProcessed" ) func ReadChangelogState(s string) ChangelogState { @@ -70,10 +72,13 @@ type Migrator struct { hooksExecutor *HooksExecutor migrationContext *base.MigrationContext - firstThrottlingCollected chan bool - ghostTableMigrated chan bool - rowCopyComplete chan error - allEventsUpToLockProcessed chan string + firstThrottlingCollected chan bool + ghostTableMigrated chan bool + rowCopyComplete chan error + allEventsUpToLockProcessed chan string + allEventsUpToTriggersProcessed chan string + + triggerCutoverUniqueKeys [][]interface{} rowCopyCompleteFlag int64 // copyRowsQueue should not be buffered; if buffered some non-damaging but @@ -88,17 +93,17 @@ type Migrator struct { func NewMigrator(context *base.MigrationContext) *Migrator { migrator := &Migrator{ - migrationContext: context, - parser: sql.NewAlterTableParser(), - ghostTableMigrated: make(chan bool), - firstThrottlingCollected: make(chan bool, 3), - rowCopyComplete: make(chan error), - allEventsUpToLockProcessed: make(chan string), - - copyRowsQueue: make(chan tableWriteFunc), - applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), - handledChangelogStates: make(map[string]bool), - finishedMigrating: 0, + migrationContext: context, + parser: sql.NewAlterTableParser(), + ghostTableMigrated: make(chan bool), + firstThrottlingCollected: make(chan bool, 3), + rowCopyComplete: make(chan error), + allEventsUpToLockProcessed: make(chan string), + allEventsUpToTriggersProcessed: make(chan string), + copyRowsQueue: make(chan tableWriteFunc), + applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), + handledChangelogStates: make(map[string]bool), + finishedMigrating: 0, } return migrator } @@ -244,6 +249,17 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) }() } + case AllEventsUpToTriggersProcessed: + { + var applyEventFunc tableWriteFunc = func() error { + this.allEventsUpToTriggersProcessed <- changelogStateString + return nil + } + //TODO Explain + go func() { + this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) + }() + } default: { return fmt.Errorf("Unknown changelog state: %+v", changelogState) @@ -556,9 +572,58 @@ func (this *Migrator) cutOver() (err error) { this.handleCutOverResult(err) return err } + if this.migrationContext.CutOverType == base.CutOverTrigger { + err := this.cutOverTrigger() + this.handleCutOverResult(err) + return err + } return this.migrationContext.Log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) } +// Inject the "AllEventsUpToTriggersProcessed" state hint, wait for it to appear in the binary logs, +// make sure the queue is drained. +func (this *Migrator) waitForEventsUpToTriggers() (err error) { + timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds)) + + this.migrationContext.MarkPointOfInterest() + waitForEventsUpToTriggersStartTime := time.Now() + + allEventsUpToTriggersProcessedChallenge := fmt.Sprintf( + "%s:%d", + string(AllEventsUpToTriggersProcessed), + waitForEventsUpToTriggersStartTime.UnixNano()) + + log.Infof("Writing changelog state: %+v", allEventsUpToTriggersProcessedChallenge) + if _, err := this.applier.WriteChangelogState(allEventsUpToTriggersProcessedChallenge); err != nil { + return err + } + log.Infof("Waiting for events up to triggers") + //atomic.StoreInt64(&this.migrationContext.AllEventsUpToTriggersProcessedInjectedFlag, 1) + for found := false; !found; { + select { + case <-timeout.C: + { + return log.Errorf("Timeout while waiting for events up to triggers") + } + case state := <-this.allEventsUpToTriggersProcessed: + { + if state == allEventsUpToTriggersProcessedChallenge { + log.Infof("Waiting for events up to triggers: got %s", state) + found = true + } else { + log.Infof("Waiting for events up to triggers: skipping %s", state) + } + } + } + } + waitForEventsUpToTriggersDuration := time.Since(waitForEventsUpToTriggersStartTime) + + log.Infof("Done waiting for events up to triggers; duration=%+v", waitForEventsUpToTriggersDuration) + this.printStatus(ForcePrintStatusAndHintRule) + + return nil +} + // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // make sure the queue is drained. func (this *Migrator) waitForEventsUpToLock() (err error) { @@ -628,6 +693,52 @@ func (this *Migrator) cutOverTwoStep() (err error) { return nil } +// cutOverTrigger will create some INSERT, UPDATE and DELETE triggers to keep the table updated, +// After this it will copy the changes between the last insert and the triggers creation and do a +// simple RENAME TABLE. +func (this *Migrator) cutOverTrigger() (err error) { + atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) + defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1) + + log.Infof( + "Creating triggers for %s.%s", + this.migrationContext.DatabaseName, + this.migrationContext.OriginalTableName) + + time.Sleep(30 * time.Second) + + if err := this.retryOperation(this.applier.CreateTriggersOriginalTable); err != nil { + return err + } + + if err := this.waitForEventsUpToTriggers(); err != nil { + return log.Errore(err) + } + + uniqueKeys := this.triggerCutoverUniqueKeys + + if err := this.applier.DropUniqueKeysGhostTable(uniqueKeys); err != nil { + return err + } + + //if err := this.retryOperation(this.waitForEventsUpToLock); err != nil { + //return err + //} + //if err := this.retryOperation(this.applier.SwapTablesQuickAndBumpy); err != nil { + //return err + //} + //if err := this.retryOperation(this.applier.UnlockTables); err != nil { + //return err + //} + + //lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime) + //renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime) + //log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) + return nil + +} + // atomicCutOver func (this *Migrator) atomicCutOver() (err error) { atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) @@ -1027,6 +1138,7 @@ func (this *Migrator) initiateStreaming() error { if err := this.eventsStreamer.InitDBConnections(); err != nil { return err } + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 0) this.eventsStreamer.AddListener( false, this.migrationContext.DatabaseName, @@ -1209,8 +1321,14 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { } if eventStruct.dmlEvent == nil { return handleNonDMLEventStruct(eventStruct) - } + } // And this (?) if eventStruct.dmlEvent != nil { + addCutoverKeys := atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) == 1 + if addCutoverKeys { + this.triggerCutoverUniqueKeys = append( + this.triggerCutoverUniqueKeys, + this.applier.ObtainUniqueKeyValuesOfEvent(eventStruct.dmlEvent)...) + } dmlEvents := [](*binlog.BinlogDMLEvent){} dmlEvents = append(dmlEvents, eventStruct.dmlEvent) var nonDmlStructToApply *applyEventStruct @@ -1230,6 +1348,13 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { break } dmlEvents = append(dmlEvents, additionalStruct.dmlEvent) + + addCutoverKeys = addCutoverKeys && (atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) == 1) + if addCutoverKeys { + this.triggerCutoverUniqueKeys = append( + this.triggerCutoverUniqueKeys, + this.applier.ObtainUniqueKeyValuesOfEvent(additionalStruct.dmlEvent)...) + } } // Create a task to apply the DML event; this will be execute by executeWriteFuncs() var applyEventFunc tableWriteFunc = func() error { diff --git a/go/sql/builder.go b/go/sql/builder.go index 7fe366c6f..c53d3b40c 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -515,3 +515,75 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol ) return result, sharedArgs, uniqueKeyArgs, nil } + +func ObtainUniqueKeyValues(tableColumns, uniqueKeyColumns *ColumnList, args []interface{}) (uniqueKeyArgs []interface{}) { + for _, column := range uniqueKeyColumns.Columns() { + tableOrdinal := tableColumns.Ordinals[column.Name] + arg := column.convertArg(args[tableOrdinal]) + uniqueKeyArgs = append(uniqueKeyArgs, arg) + } + return uniqueKeyArgs +} + +func buildColumnsPreparedValuesWhere(databaseName string, columns *ColumnList) []string { + values := make([]string, columns.Len(), columns.Len()) + for i, name := range columns.Names() { + values[i] = fmt.Sprintf( + "%s.%s = ?", + databaseName, + EscapeName(name)) + } + return values +} + +func buildColumnsEqualsWhere(databaseName, originalTableName, ghostTableName string, columns *ColumnList) []string { + values := make([]string, columns.Len(), columns.Len()) + for i, name := range columns.Names() { + name = EscapeName(name) + values[i] = fmt.Sprintf( + "%s.%s.%s <=> %s.%s.%s", + databaseName, + originalTableName, + name, + databaseName, + ghostTableName, + name) + } + return values +} + +func BuildDeleteQuery(databaseName, originalTableName, ghostTableName string, uniqueKeyColumns *ColumnList, uniqueKeyArgs [][]interface{}) (result string, explodedArgs []interface{}, err error) { + databaseName = EscapeName(databaseName) + originalTableName = EscapeName(originalTableName) + ghostTableName = EscapeName(ghostTableName) + + preparedValuesGetKey := buildColumnsPreparedValuesWhere( + databaseName, + uniqueKeyColumns) + preparedValuesExistsKey := buildColumnsEqualsWhere( + databaseName, + originalTableName, + ghostTableName, + uniqueKeyColumns) + + template := fmt.Sprintf( + "(%s and !(%s))", + strings.Join(preparedValuesGetKey, " and "), + strings.Join(preparedValuesExistsKey, " and ")) + + // WHERE ((gho.PK = ?) AND !(orig.PK <=> gho.PK)) OR (XXX) + + preparedValuesWhere := make([]string, len(uniqueKeyArgs), len(uniqueKeyArgs)) + for i, uniqueKey := range uniqueKeyArgs { + explodedArgs = append(explodedArgs, uniqueKey...) + preparedValuesWhere[i] = template + } + + result = fmt.Sprintf( + "DELETE /* gh-ost */ FROM %s.%s WHERE %s", + databaseName, + ghostTableName, + strings.Join(preparedValuesWhere, " or ")) + + return result, explodedArgs, nil +} From 0f1624f1cde141c9d6e1b00b2a0c8289cc3a38dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Thu, 28 Mar 2019 13:39:17 +0100 Subject: [PATCH 02/17] Remove triggers --- go/base/context.go | 1 - go/logic/applier.go | 130 ++++++++++++++++++++++++++----------------- go/logic/migrator.go | 47 +++------------- 3 files changed, 89 insertions(+), 89 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index a1a9749b9..09b4e4c4f 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -202,7 +202,6 @@ type MigrationContext struct { UserCommandedUnpostponeFlag int64 CutOverCompleteFlag int64 InCutOverCriticalSectionFlag int64 - ApplyDMLEventState int64 PanicAbort chan error OriginalTableColumnsOnApplier *sql.ColumnList diff --git a/go/logic/applier.go b/go/logic/applier.go index ae4e8a305..60ebb1930 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -547,12 +547,8 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected return chunkSize, rowsAffected, duration, nil } -// Create triggers from the original table to the new one -func (this *Applier) CreateTriggersOriginalTable() error { - /////////////////////// - // Trigger preffix // - /////////////////////// - +// Create the trigger prefix +func (this *Applier) makeTriggerPrefix() string { prefix := fmt.Sprintf( "ghost_%s_%s", this.migrationContext.DatabaseName, @@ -570,6 +566,13 @@ func (this *Applier) CreateTriggersOriginalTable() error { prefix) } + return prefix +} + +// Create triggers from the original table to the new one +func (this *Applier) CreateTriggersOriginalTable() error { + prefix := this.makeTriggerPrefix() + ////////////////////// // Delete trigger // ////////////////////// @@ -684,6 +687,52 @@ func (this *Applier) CreateTriggersOriginalTable() error { }() } +// Drop triggers from the old table +func (this *Applier) DropTriggersOldTable() error { + prefix := this.makeTriggerPrefix() + + dropDeleteTrigger := fmt.Sprintf( + "DROP /* gh-ost */ TRIGGER %s.`%s_del`", + sql.EscapeName(this.migrationContext.DatabaseName), + prefix) + + dropInsertTrigger := fmt.Sprintf( + "DROP /* gh-ost */ TRIGGER %s.`%s_ins`", + sql.EscapeName(this.migrationContext.DatabaseName), + prefix) + + dropUpdateTrigger := fmt.Sprintf( + "DROP /* gh-ost */ TRIGGER %s.`%s_upd`", + sql.EscapeName(this.migrationContext.DatabaseName), + prefix) + + ///////////////////////// + // Drop the triggers // + ///////////////////////// + + return func() error { + tx, err := this.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(dropDeleteTrigger); err != nil { + return err + } + if _, err := tx.Exec(dropInsertTrigger); err != nil { + return err + } + if _, err := tx.Exec(dropUpdateTrigger); err != nil { + return err + } + + if err := tx.Commit(); err != nil { + return err + } + return nil + }() +} + //TODO func (this *Applier) ObtainUniqueKeyValuesOfEvent(dmlEvent *binlog.BinlogDMLEvent) (uniqueKeys [][]interface{}) { switch dmlEvent.DML { @@ -721,50 +770,6 @@ func (this *Applier) ObtainUniqueKeyValuesOfEvent(dmlEvent *binlog.BinlogDMLEven return uniqueKeys } -//TODO -func (this *Applier) DropUniqueKeysGhostTable(uniqueKeysArgs [][]interface{}) error { - query, args, err := sql.BuildDeleteQuery( - this.migrationContext.DatabaseName, - this.migrationContext.OriginalTableName, - this.migrationContext.GetGhostTableName(), - &this.migrationContext.UniqueKey.Columns, - uniqueKeysArgs) - - if err != nil { - return err - } - - log.Infof("Delete query: %s", query) - log.Infof("Delete query args: %+v", args) - - return nil - - //return func() error { - //tx, err := this.db.Begin() - //if err != nil { - //return err - //} - - //if _, err := tx.Exec(deleteTrigger); err != nil { - //return err - //} - //if _, err := tx.Exec(insertTrigger); err != nil { - //return err - //} - //if _, err := tx.Exec(updateTrigger); err != nil { - //return err - //} - - //if err := tx.Commit(); err != nil { - //return err - //} - //return nil - //}() -} - - - - // LockOriginalTable places a write lock on the original table func (this *Applier) LockOriginalTable() error { query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, @@ -794,6 +799,31 @@ func (this *Applier) UnlockTables() error { return nil } +// SwapTables issues a one-step swap table operation: +// - rename original table to _old +// - rename ghost table to original +func (this *Applier) SwapTables() error { + query := fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetOldTableName()), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + ) + log.Infof("Swaping original and new table: %s", query) + this.migrationContext.RenameTablesStartTime = time.Now() + if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil { + return err + } + this.migrationContext.RenameTablesEndTime = time.Now() + + log.Infof("Tables swaped") + return nil +} + // SwapTablesQuickAndBumpy issues a two-step swap table operation: // - rename original table to _old // - rename ghost table to original diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 6197f4c09..ca4e47192 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -78,8 +78,6 @@ type Migrator struct { allEventsUpToLockProcessed chan string allEventsUpToTriggersProcessed chan string - triggerCutoverUniqueKeys [][]interface{} - rowCopyCompleteFlag int64 // copyRowsQueue should not be buffered; if buffered some non-damaging but // excessive work happens at the end of the iteration as new copy-jobs arrive before realizing the copy is complete @@ -699,15 +697,13 @@ func (this *Migrator) cutOverTwoStep() (err error) { func (this *Migrator) cutOverTrigger() (err error) { atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) - atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1) + atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0) log.Infof( "Creating triggers for %s.%s", this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName) - time.Sleep(30 * time.Second) - if err := this.retryOperation(this.applier.CreateTriggersOriginalTable); err != nil { return err } @@ -715,28 +711,17 @@ func (this *Migrator) cutOverTrigger() (err error) { if err := this.waitForEventsUpToTriggers(); err != nil { return log.Errore(err) } - - uniqueKeys := this.triggerCutoverUniqueKeys - - if err := this.applier.DropUniqueKeysGhostTable(uniqueKeys); err != nil { + if err := this.retryOperation(this.applier.SwapTables); err != nil { + return err + } + if err := this.retryOperation(this.applier.DropTriggersOldTable); err != nil { return err } - //if err := this.retryOperation(this.waitForEventsUpToLock); err != nil { - //return err - //} - //if err := this.retryOperation(this.applier.SwapTablesQuickAndBumpy); err != nil { - //return err - //} - //if err := this.retryOperation(this.applier.UnlockTables); err != nil { - //return err - //} - - //lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime) - //renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime) - //log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) + lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime) + renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime) + log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) return nil - } // atomicCutOver @@ -1138,7 +1123,6 @@ func (this *Migrator) initiateStreaming() error { if err := this.eventsStreamer.InitDBConnections(); err != nil { return err } - atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 0) this.eventsStreamer.AddListener( false, this.migrationContext.DatabaseName, @@ -1321,14 +1305,8 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { } if eventStruct.dmlEvent == nil { return handleNonDMLEventStruct(eventStruct) - } // And this (?) + } if eventStruct.dmlEvent != nil { - addCutoverKeys := atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) == 1 - if addCutoverKeys { - this.triggerCutoverUniqueKeys = append( - this.triggerCutoverUniqueKeys, - this.applier.ObtainUniqueKeyValuesOfEvent(eventStruct.dmlEvent)...) - } dmlEvents := [](*binlog.BinlogDMLEvent){} dmlEvents = append(dmlEvents, eventStruct.dmlEvent) var nonDmlStructToApply *applyEventStruct @@ -1348,13 +1326,6 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { break } dmlEvents = append(dmlEvents, additionalStruct.dmlEvent) - - addCutoverKeys = addCutoverKeys && (atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) == 1) - if addCutoverKeys { - this.triggerCutoverUniqueKeys = append( - this.triggerCutoverUniqueKeys, - this.applier.ObtainUniqueKeyValuesOfEvent(additionalStruct.dmlEvent)...) - } } // Create a task to apply the DML event; this will be execute by executeWriteFuncs() var applyEventFunc tableWriteFunc = func() error { From deb6f632cd6f2fc2c4ebd245b1c29b899f4bfe9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Thu, 28 Mar 2019 18:22:37 +0100 Subject: [PATCH 03/17] Reverted --- go/base/context.go | 1 + go/logic/migrator.go | 24 +++++++++++++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/go/base/context.go b/go/base/context.go index 09b4e4c4f..3b45718c7 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -202,6 +202,7 @@ type MigrationContext struct { UserCommandedUnpostponeFlag int64 CutOverCompleteFlag int64 InCutOverCriticalSectionFlag int64 + ApplyDMLEventState int64 PanicAbort chan error OriginalTableColumnsOnApplier *sql.ColumnList diff --git a/go/logic/migrator.go b/go/logic/migrator.go index ca4e47192..fdb5197a6 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -78,6 +78,8 @@ type Migrator struct { allEventsUpToLockProcessed chan string allEventsUpToTriggersProcessed chan string + triggerCutoverUniqueKeys [][]interface {} + rowCopyCompleteFlag int64 // copyRowsQueue should not be buffered; if buffered some non-damaging but // excessive work happens at the end of the iteration as new copy-jobs arrive before realizing the copy is complete @@ -697,7 +699,7 @@ func (this *Migrator) cutOverTwoStep() (err error) { func (this *Migrator) cutOverTrigger() (err error) { atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) - atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0) + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1) log.Infof( "Creating triggers for %s.%s", @@ -711,6 +713,11 @@ func (this *Migrator) cutOverTrigger() (err error) { if err := this.waitForEventsUpToTriggers(); err != nil { return log.Errore(err) } + + //TODO remove + log.Infof("Received trigger event, waiting 30s after swap it") + time.Sleep(30 * time.Second) + if err := this.retryOperation(this.applier.SwapTables); err != nil { return err } @@ -1123,6 +1130,7 @@ func (this *Migrator) initiateStreaming() error { if err := this.eventsStreamer.InitDBConnections(); err != nil { return err } + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 0) this.eventsStreamer.AddListener( false, this.migrationContext.DatabaseName, @@ -1307,6 +1315,13 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { return handleNonDMLEventStruct(eventStruct) } if eventStruct.dmlEvent != nil { + addCutoverKeys := atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) == 1 + if addCutoverKeys { + this.triggerCutoverUniqueKeys = append( + this.triggerCutoverUniqueKeys, + this.applier.ObtainUniqueKeyValuesOfEvent(eventStruct.dmlEvent)...) + } + dmlEvents := [](*binlog.BinlogDMLEvent){} dmlEvents = append(dmlEvents, eventStruct.dmlEvent) var nonDmlStructToApply *applyEventStruct @@ -1326,6 +1341,13 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { break } dmlEvents = append(dmlEvents, additionalStruct.dmlEvent) + + addCutoverKeys = addCutoverKeys && (atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) == 1) + if addCutoverKeys { + this.triggerCutoverUniqueKeys = append( + this.triggerCutoverUniqueKeys, + this.applier.ObtainUniqueKeyValuesOfEvent(additionalStruct.dmlEvent)...) + } } // Create a task to apply the DML event; this will be execute by executeWriteFuncs() var applyEventFunc tableWriteFunc = func() error { From 75d1207708fe8379a8ba370cbfe4ed9e1965a56b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Fri, 29 Mar 2019 10:09:59 +0100 Subject: [PATCH 04/17] Added logic to insert events during trigger creation --- go/logic/applier.go | 46 +++++++++++++++++++++++++++ go/logic/migrator.go | 69 ++++++++++++++++++++++++++++++++-------- go/sql/builder.go | 75 +++++++++++++++++++++++++------------------- 3 files changed, 144 insertions(+), 46 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index 60ebb1930..fcd6d2c0a 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -770,6 +770,52 @@ func (this *Applier) ObtainUniqueKeyValuesOfEvent(dmlEvent *binlog.BinlogDMLEven return uniqueKeys } +func (this *Applier) DropUniqueKeysGhostTable(uniqueKeysArgs [][]interface{}) error { + if len(uniqueKeysArgs) == 0 { + return nil + } + + query, args, err := sql.BuildDeleteQuery( + this.migrationContext.DatabaseName, + this.migrationContext.GetGhostTableName(), + &this.migrationContext.UniqueKey.Columns, + uniqueKeysArgs) + + if err != nil { + return err + } + + log.Infof("Removing %d created rows during trigger creation", + len(uniqueKeysArgs)) + _, err = sqlutils.ExecNoPrepare(this.db, query, args...) + return err +} + +func (this *Applier) InsertSelectUniqueKeysGhostTable(uniqueKeysArgs [][]interface{}) error { + if len(uniqueKeysArgs) == 0 { + return nil + } + + query, args, err := sql.BuildInsertSelectQuery( + this.migrationContext.DatabaseName, + this.migrationContext.OriginalTableName, + this.migrationContext.GetGhostTableName(), + this.migrationContext.SharedColumns.Names(), + this.migrationContext.MappedSharedColumns.Names(), + this.migrationContext.UniqueKey.Name, + &this.migrationContext.UniqueKey.Columns, + uniqueKeysArgs) + + if err != nil { + return err + } + + log.Infof("Inserting %d created rows during trigger creation", + len(uniqueKeysArgs)) + _, err = sqlutils.ExecNoPrepare(this.db, query, args...) + return err +} + // LockOriginalTable places a write lock on the original table func (this *Applier) LockOriginalTable() error { query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, diff --git a/go/logic/migrator.go b/go/logic/migrator.go index fdb5197a6..0ce6f7532 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -27,6 +27,7 @@ type ChangelogState string const ( GhostTableMigrated ChangelogState = "GhostTableMigrated" AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" + StopWriteEvents = "StopWriteEvents" AllEventsUpToTriggersProcessed = "AllEventsUpToTriggersProcessed" ) @@ -78,7 +79,7 @@ type Migrator struct { allEventsUpToLockProcessed chan string allEventsUpToTriggersProcessed chan string - triggerCutoverUniqueKeys [][]interface {} + triggerCutoverUniqueKeys [][]interface{} rowCopyCompleteFlag int64 // copyRowsQueue should not be buffered; if buffered some non-damaging but @@ -249,9 +250,21 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) }() } + case StopWriteEvents: + { + var applyEventFunc tableWriteFunc = func() error { + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1) + return nil + } + //TODO Explain + go func() { + this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) + }() + } case AllEventsUpToTriggersProcessed: { var applyEventFunc tableWriteFunc = func() error { + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 2) this.allEventsUpToTriggersProcessed <- changelogStateString return nil } @@ -580,6 +593,26 @@ func (this *Migrator) cutOver() (err error) { return this.migrationContext.Log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) } +// Inject the "StopWriteEvents" state hint +func (this *Migrator) injectStopWriteEvents() (err error) { + this.migrationContext.MarkPointOfInterest() + injectStopWriteEventsStartTime := time.Now() + + injectStopWriteEventsChallenge := fmt.Sprintf( + "%s:%d", + string(StopWriteEvents), + injectStopWriteEventsStartTime.UnixNano()) + + log.Infof("Writing changelog state: %+v", injectStopWriteEventsChallenge) + if _, err := this.applier.WriteChangelogState(injectStopWriteEventsChallenge); err != nil { + return err + } + + this.printStatus(ForcePrintStatusAndHintRule) + + return nil +} + // Inject the "AllEventsUpToTriggersProcessed" state hint, wait for it to appear in the binary logs, // make sure the queue is drained. func (this *Migrator) waitForEventsUpToTriggers() (err error) { @@ -699,7 +732,10 @@ func (this *Migrator) cutOverTwoStep() (err error) { func (this *Migrator) cutOverTrigger() (err error) { atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) - atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1) + + if err := this.injectStopWriteEvents(); err != nil { + return err + } log.Infof( "Creating triggers for %s.%s", @@ -714,9 +750,19 @@ func (this *Migrator) cutOverTrigger() (err error) { return log.Errore(err) } + //TODO retry the operation + if err := this.applier.DropUniqueKeysGhostTable(this.triggerCutoverUniqueKeys); err != nil { + return err + } + + //TODO retry the operation + if err := this.applier.InsertSelectUniqueKeysGhostTable(this.triggerCutoverUniqueKeys); err != nil { + return err + } + //TODO remove - log.Infof("Received trigger event, waiting 30s after swap it") - time.Sleep(30 * time.Second) + //log.Infof("Received trigger event, waiting 30s after swap it") + //time.Sleep(30 * time.Second) if err := this.retryOperation(this.applier.SwapTables); err != nil { return err @@ -1315,11 +1361,15 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { return handleNonDMLEventStruct(eventStruct) } if eventStruct.dmlEvent != nil { - addCutoverKeys := atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) == 1 - if addCutoverKeys { + applyDMLEventState := atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) + if applyDMLEventState == 1 { this.triggerCutoverUniqueKeys = append( this.triggerCutoverUniqueKeys, this.applier.ObtainUniqueKeyValuesOfEvent(eventStruct.dmlEvent)...) + return nil + } + if applyDMLEventState == 2 { + return nil } dmlEvents := [](*binlog.BinlogDMLEvent){} @@ -1341,13 +1391,6 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { break } dmlEvents = append(dmlEvents, additionalStruct.dmlEvent) - - addCutoverKeys = addCutoverKeys && (atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) == 1) - if addCutoverKeys { - this.triggerCutoverUniqueKeys = append( - this.triggerCutoverUniqueKeys, - this.applier.ObtainUniqueKeyValuesOfEvent(additionalStruct.dmlEvent)...) - } } // Create a task to apply the DML event; this will be execute by executeWriteFuncs() var applyEventFunc tableWriteFunc = func() error { diff --git a/go/sql/builder.go b/go/sql/builder.go index c53d3b40c..4b3ec6291 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -525,53 +525,60 @@ func ObtainUniqueKeyValues(tableColumns, uniqueKeyColumns *ColumnList, args []in return uniqueKeyArgs } -func buildColumnsPreparedValuesWhere(databaseName string, columns *ColumnList) []string { +func buildColumnsPreparedValuesWhere(columns *ColumnList) []string { values := make([]string, columns.Len(), columns.Len()) for i, name := range columns.Names() { values[i] = fmt.Sprintf( - "%s.%s = ?", - databaseName, + "%s <=> ?", EscapeName(name)) } return values } -func buildColumnsEqualsWhere(databaseName, originalTableName, ghostTableName string, columns *ColumnList) []string { - values := make([]string, columns.Len(), columns.Len()) - for i, name := range columns.Names() { - name = EscapeName(name) - values[i] = fmt.Sprintf( - "%s.%s.%s <=> %s.%s.%s", - databaseName, - originalTableName, - name, - databaseName, - ghostTableName, - name) +func BuildDeleteQuery(databaseName, ghostTableName string, uniqueKeyColumns *ColumnList, uniqueKeyArgs [][]interface{}) (result string, explodedArgs []interface{}, err error) { + preparedValuesGetKey := buildColumnsPreparedValuesWhere(uniqueKeyColumns) + + template := fmt.Sprintf( + "(%s)", + strings.Join(preparedValuesGetKey, " and ")) + + preparedValuesWhere := make([]string, len(uniqueKeyArgs), len(uniqueKeyArgs)) + for i, uniqueKey := range uniqueKeyArgs { + explodedArgs = append(explodedArgs, uniqueKey...) + preparedValuesWhere[i] = template } - return values + + result = fmt.Sprintf( + "delete /* gh-ost */ from %s.%s where %s", + EscapeName(databaseName), + EscapeName(ghostTableName), + strings.Join(preparedValuesWhere, " or ")) + + return result, explodedArgs, nil } -func BuildDeleteQuery(databaseName, originalTableName, ghostTableName string, uniqueKeyColumns *ColumnList, uniqueKeyArgs [][]interface{}) (result string, explodedArgs []interface{}, err error) { +func BuildInsertSelectQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, uniqueKeyArgs [][]interface{}) (result string, explodedArgs []interface{}, err error) { databaseName = EscapeName(databaseName) originalTableName = EscapeName(originalTableName) ghostTableName = EscapeName(ghostTableName) - preparedValuesGetKey := buildColumnsPreparedValuesWhere( - databaseName, - uniqueKeyColumns) - preparedValuesExistsKey := buildColumnsEqualsWhere( - databaseName, - originalTableName, - ghostTableName, - uniqueKeyColumns) + mappedSharedColumns = duplicateNames(mappedSharedColumns) + for i := range mappedSharedColumns { + mappedSharedColumns[i] = EscapeName(mappedSharedColumns[i]) + } + mappedSharedColumnsListing := strings.Join(mappedSharedColumns, ", ") - template := fmt.Sprintf( - "(%s and !(%s))", - strings.Join(preparedValuesGetKey, " and "), - strings.Join(preparedValuesExistsKey, " and ")) + sharedColumns = duplicateNames(sharedColumns) + for i := range sharedColumns { + sharedColumns[i] = EscapeName(sharedColumns[i]) + } + sharedColumnsListing := strings.Join(sharedColumns, ", ") + + preparedValuesGetKey := buildColumnsPreparedValuesWhere(uniqueKeyColumns) - // WHERE ((gho.PK = ?) AND !(orig.PK <=> gho.PK)) OR (XXX) + template := fmt.Sprintf( + "(%s)", + strings.Join(preparedValuesGetKey, " and ")) preparedValuesWhere := make([]string, len(uniqueKeyArgs), len(uniqueKeyArgs)) for i, uniqueKey := range uniqueKeyArgs { @@ -580,9 +587,11 @@ func BuildDeleteQuery(databaseName, originalTableName, ghostTableName string, un } result = fmt.Sprintf( - "DELETE /* gh-ost */ FROM %s.%s WHERE %s", - databaseName, - ghostTableName, + "insert /* gh-ost %s.%s */ into %s.%s (%s) select %s from %s.%s force index (%s) where %s", + databaseName, originalTableName, + databaseName, ghostTableName, mappedSharedColumnsListing, + sharedColumnsListing, databaseName, originalTableName, + uniqueKey, strings.Join(preparedValuesWhere, " or ")) return result, explodedArgs, nil From c0a2bd71a6b3c3b2ec2186a469a704febfd9a4af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Fri, 29 Mar 2019 13:15:56 +0100 Subject: [PATCH 05/17] Improved sanitize step --- go/base/context.go | 2 + go/logic/applier.go | 103 +++++++++++++++++++++++++------------------ go/logic/migrator.go | 28 ++++++++---- go/sql/builder.go | 4 +- 4 files changed, 83 insertions(+), 54 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 3b45718c7..695faad0a 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -225,6 +225,8 @@ type MigrationContext struct { MigrationIterationRangeMaxValues *sql.ColumnValues ForceTmpTableName string + TriggerCutoverUniqueKeys [][]interface {} + recentBinlogCoordinates mysql.BinlogCoordinates Log Logger diff --git a/go/logic/applier.go b/go/logic/applier.go index fcd6d2c0a..14b2843b9 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -770,50 +770,67 @@ func (this *Applier) ObtainUniqueKeyValuesOfEvent(dmlEvent *binlog.BinlogDMLEven return uniqueKeys } -func (this *Applier) DropUniqueKeysGhostTable(uniqueKeysArgs [][]interface{}) error { - if len(uniqueKeysArgs) == 0 { - return nil - } - - query, args, err := sql.BuildDeleteQuery( - this.migrationContext.DatabaseName, - this.migrationContext.GetGhostTableName(), - &this.migrationContext.UniqueKey.Columns, - uniqueKeysArgs) - - if err != nil { - return err - } - - log.Infof("Removing %d created rows during trigger creation", - len(uniqueKeysArgs)) - _, err = sqlutils.ExecNoPrepare(this.db, query, args...) - return err -} +func (this *Applier) SanitizeRowsDuringCutOver() error { + for len(this.migrationContext.TriggerCutoverUniqueKeys) > 0 { + cutIndex := int64(len(this.migrationContext.TriggerCutoverUniqueKeys)) - this.migrationContext.ChunkSize + if cutIndex < 0 { + cutIndex = 0 + } + + chunkValues := this.migrationContext.TriggerCutoverUniqueKeys[cutIndex:] + deleteQuery, deleteArgs, deleteErr := sql.BuildDeleteQuery( + this.migrationContext.DatabaseName, + this.migrationContext.GetGhostTableName(), + &this.migrationContext.UniqueKey.Columns, + chunkValues) + + if deleteErr != nil { + return deleteErr + } + + insertSelectQuery, insertSelectArgs, insertSelectErr := sql.BuildInsertSelectQuery( + this.migrationContext.DatabaseName, + this.migrationContext.OriginalTableName, + this.migrationContext.GetGhostTableName(), + this.migrationContext.SharedColumns.Names(), + this.migrationContext.MappedSharedColumns.Names(), + this.migrationContext.UniqueKey.Name, + &this.migrationContext.UniqueKey.Columns, + chunkValues) + + if insertSelectErr != nil { + return insertSelectErr + } + + log.Infof("Sanitizing chunk of %d created rows during trigger creation", + len(chunkValues)) -func (this *Applier) InsertSelectUniqueKeysGhostTable(uniqueKeysArgs [][]interface{}) error { - if len(uniqueKeysArgs) == 0 { - return nil - } - - query, args, err := sql.BuildInsertSelectQuery( - this.migrationContext.DatabaseName, - this.migrationContext.OriginalTableName, - this.migrationContext.GetGhostTableName(), - this.migrationContext.SharedColumns.Names(), - this.migrationContext.MappedSharedColumns.Names(), - this.migrationContext.UniqueKey.Name, - &this.migrationContext.UniqueKey.Columns, - uniqueKeysArgs) - - if err != nil { - return err - } - - log.Infof("Inserting %d created rows during trigger creation", - len(uniqueKeysArgs)) - _, err = sqlutils.ExecNoPrepare(this.db, query, args...) - return err + err := func() error { + tx, err := this.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(deleteQuery, deleteArgs...); err != nil { + return err + } + if _, err := tx.Exec(insertSelectQuery, insertSelectArgs...); err != nil { + return err + } + if err := tx.Commit(); err != nil { + return err + } + return nil + }() + + if err != nil { + return err + } + + this.migrationContext.TriggerCutoverUniqueKeys = this.migrationContext.TriggerCutoverUniqueKeys[:cutIndex] + } + + return nil } // LockOriginalTable places a write lock on the original table diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 0ce6f7532..20708f343 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -750,20 +750,30 @@ func (this *Migrator) cutOverTrigger() (err error) { return log.Errore(err) } - //TODO retry the operation - if err := this.applier.DropUniqueKeysGhostTable(this.triggerCutoverUniqueKeys); err != nil { - return err + this.migrationContext.TriggerCutoverUniqueKeys = nil + +CompareNewRow: + for _, rowToAdd := range this.triggerCutoverUniqueKeys { + CompareAddedRow: + for _, rowAdded := range this.migrationContext.TriggerCutoverUniqueKeys { + for i := range rowAdded { + if rowToAdd[i] != rowAdded[i] { + continue CompareAddedRow + } + } + + continue CompareNewRow + } + + this.migrationContext.TriggerCutoverUniqueKeys = append( + this.migrationContext.TriggerCutoverUniqueKeys, + rowToAdd) } - //TODO retry the operation - if err := this.applier.InsertSelectUniqueKeysGhostTable(this.triggerCutoverUniqueKeys); err != nil { + if err := this.retryOperation(this.applier.SanitizeRowsDuringCutOver); err != nil { return err } - //TODO remove - //log.Infof("Received trigger event, waiting 30s after swap it") - //time.Sleep(30 * time.Second) - if err := this.retryOperation(this.applier.SwapTables); err != nil { return err } diff --git a/go/sql/builder.go b/go/sql/builder.go index 4b3ec6291..dc7729d0d 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -549,7 +549,7 @@ func BuildDeleteQuery(databaseName, ghostTableName string, uniqueKeyColumns *Col } result = fmt.Sprintf( - "delete /* gh-ost */ from %s.%s where %s", + "delete /* gh-ost */ ignore from %s.%s where %s", EscapeName(databaseName), EscapeName(ghostTableName), strings.Join(preparedValuesWhere, " or ")) @@ -587,7 +587,7 @@ func BuildInsertSelectQuery(databaseName, originalTableName, ghostTableName stri } result = fmt.Sprintf( - "insert /* gh-ost %s.%s */ into %s.%s (%s) select %s from %s.%s force index (%s) where %s", + "insert /* gh-ost %s.%s */ ignore into %s.%s (%s) select %s from %s.%s force index (%s) where %s", databaseName, originalTableName, databaseName, ghostTableName, mappedSharedColumnsListing, sharedColumnsListing, databaseName, originalTableName, From 537bd96d337a995a84bff3730e32bfa406c02e8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Fri, 29 Mar 2019 13:16:06 +0100 Subject: [PATCH 06/17] Added doc --- doc/cut-over.md | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/doc/cut-over.md b/doc/cut-over.md index ada5100c6..ddaa99245 100644 --- a/doc/cut-over.md +++ b/doc/cut-over.md @@ -6,6 +6,18 @@ MySQL poses some limitations on how the table swap can take place. While it supp The [facebook OSC](https://www.facebook.com/notes/mysql-at-facebook/online-schema-change-for-mysql/430801045932/) tool documents this nicely. Look for **"Cut-over phase"**. The Facebook solution uses a non-atomic swap: the original table is first renamed and pushed aside, then the ghost table is renamed to take its place. In between the two renames there's a brief period of time where your table just does not exist, and queries will fail. +Another option to support a atomic swap without the use of a lock is the use of triggers only in the cut-over phase, +gh-ost philosophy is avoid the use of triggers but in some environments like a Galera cluster isn't possible use the +lock command, and like the cut-over should take a little time it shouldn't be a problem. This method work in the next +way: + +- A stop writes event is injected in the binlog and gh-ost disable the writes once it receive it. +- The triggers are created to handle the modifications in the MySQL side. +- A created triggers event is injected in the binlog and gh-ost wait until receive it. +- Since while the first event and the second one the affected rows will be in a inconsistent state, the unique key + values of this events are tracked. +- The rows tracked are deleted and inserted back from the original table, so like we have already the triggers is assured his consistence. + `gh-ost` solves this by using an atomic, two-step blocking swap: while one connection holds the lock, another attempts the atomic `RENAME`. The `RENAME` is guaranteed to not be executed prematurely by positioning a sentry table which blocks the `RENAME` operation until `gh-ost` is satisfied all is in order. This solution either: @@ -18,4 +30,4 @@ Also note: Internals of the atomic cut-over are discussed in [Issue #82](https://github.com/github/gh-ost/issues/82). -At this time the command-line argument `--cut-over` is supported, and defaults to the atomic cut-over algorithm described above. Also supported is `--cut-over=two-step`, which uses the FB non-atomic algorithm. We recommend using the default cut-over that has been battle tested in our production environments. +At this time the command-line argument `--cut-over` is supported, and defaults to the atomic cut-over algorithm described above. Also supported is `--cut-over=two-step`, which uses the FB non-atomic algorithm and the `--cut-over=trigger`, which use the trigger algorithm. We recommend using the default cut-over that has been battle tested in our production environments. From 0817b2882dba2e8015f1f6e3f2fdc4d670845542 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Fri, 29 Mar 2019 15:27:53 +0100 Subject: [PATCH 07/17] Added correction fixes --- doc/cut-over.md | 7 ++----- go/logic/applier.go | 25 ++----------------------- go/logic/migrator.go | 6 +++--- 3 files changed, 7 insertions(+), 31 deletions(-) diff --git a/doc/cut-over.md b/doc/cut-over.md index ddaa99245..bb0060873 100644 --- a/doc/cut-over.md +++ b/doc/cut-over.md @@ -8,15 +8,12 @@ The [facebook OSC](https://www.facebook.com/notes/mysql-at-facebook/online-schem Another option to support a atomic swap without the use of a lock is the use of triggers only in the cut-over phase, gh-ost philosophy is avoid the use of triggers but in some environments like a Galera cluster isn't possible use the -lock command, and like the cut-over should take a little time it shouldn't be a problem. This method work in the next -way: +lock command, and like the cut-over should take a little time it shouldn't be a problem. Triggers cut-over works like the following: - A stop writes event is injected in the binlog and gh-ost disable the writes once it receive it. - The triggers are created to handle the modifications in the MySQL side. - A created triggers event is injected in the binlog and gh-ost wait until receive it. -- Since while the first event and the second one the affected rows will be in a inconsistent state, the unique key - values of this events are tracked. -- The rows tracked are deleted and inserted back from the original table, so like we have already the triggers is assured his consistence. +- The affected rows will be in an inconsistent stata during the time between the first and the second event. For this reason, this events are checked and, the values of the fields that are part of the unique key used to do the online alter are saved to sanitize that rows. `gh-ost` solves this by using an atomic, two-step blocking swap: while one connection holds the lock, another attempts the atomic `RENAME`. The `RENAME` is guaranteed to not be executed prematurely by positioning a sentry table which blocks the `RENAME` operation until `gh-ost` is satisfied all is in order. diff --git a/go/logic/applier.go b/go/logic/applier.go index 14b2843b9..0c7adc775 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -569,14 +569,10 @@ func (this *Applier) makeTriggerPrefix() string { return prefix } -// Create triggers from the original table to the new one +// CreateTriggersOriginalTable Create triggers from the original table to the ghost one func (this *Applier) CreateTriggersOriginalTable() error { prefix := this.makeTriggerPrefix() - ////////////////////// - // Delete trigger // - ////////////////////// - delIndexComparations := []string{} for _, name := range this.migrationContext.UniqueKey.Columns.Names() { columnQuoted := sql.EscapeName(name) @@ -600,10 +596,6 @@ func (this *Applier) CreateTriggersOriginalTable() error { sql.EscapeName(this.migrationContext.GetGhostTableName()), strings.Join(delIndexComparations, " AND ")) - ////////////////////// - // Insert trigger // - ////////////////////// - insertCols := []string{} insertValues := []string{} for i, origColumn := range this.migrationContext.SharedColumns.Names() { @@ -627,10 +619,6 @@ func (this *Applier) CreateTriggersOriginalTable() error { strings.Join(insertCols, ", "), strings.Join(insertValues, ", ")) - ////////////////////// - // Update trigger // - ////////////////////// - updIndexComparations := []string{} for _, name := range this.migrationContext.UniqueKey.Columns.Names() { columnQuoted := sql.EscapeName(name) @@ -660,10 +648,6 @@ func (this *Applier) CreateTriggersOriginalTable() error { strings.Join(insertCols, ", "), strings.Join(insertValues, ", ")) - /////////////////////////// - // Create the triggers // - /////////////////////////// - return func() error { tx, err := this.db.Begin() if err != nil { @@ -687,7 +671,7 @@ func (this *Applier) CreateTriggersOriginalTable() error { }() } -// Drop triggers from the old table +// DropTriggersOldTable Drop triggers from the old table func (this *Applier) DropTriggersOldTable() error { prefix := this.makeTriggerPrefix() @@ -706,10 +690,6 @@ func (this *Applier) DropTriggersOldTable() error { sql.EscapeName(this.migrationContext.DatabaseName), prefix) - ///////////////////////// - // Drop the triggers // - ///////////////////////// - return func() error { tx, err := this.db.Begin() if err != nil { @@ -733,7 +713,6 @@ func (this *Applier) DropTriggersOldTable() error { }() } -//TODO func (this *Applier) ObtainUniqueKeyValuesOfEvent(dmlEvent *binlog.BinlogDMLEvent) (uniqueKeys [][]interface{}) { switch dmlEvent.DML { case binlog.DeleteDML: diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 20708f343..aa36a8061 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -593,7 +593,7 @@ func (this *Migrator) cutOver() (err error) { return this.migrationContext.Log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) } -// Inject the "StopWriteEvents" state hint +// injectStopWriteEvents Inject the "StopWriteEvents" state hint func (this *Migrator) injectStopWriteEvents() (err error) { this.migrationContext.MarkPointOfInterest() injectStopWriteEventsStartTime := time.Now() @@ -613,8 +613,8 @@ func (this *Migrator) injectStopWriteEvents() (err error) { return nil } -// Inject the "AllEventsUpToTriggersProcessed" state hint, wait for it to appear in the binary logs, -// make sure the queue is drained. +// waitForEventsUpToTriggers Inject the "AllEventsUpToTriggersProcessed" state hint, +// wait for it to appear in the binary logs, make sure the queue is drained. func (this *Migrator) waitForEventsUpToTriggers() (err error) { timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds)) From 19d6324ed1aa963bf46e9442e326bf4d4b838fc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Mon, 1 Apr 2019 13:13:29 +0200 Subject: [PATCH 08/17] Added doc and fixed queries --- go/logic/migrator.go | 12 ++++++++++-- go/sql/builder.go | 4 ++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index aa36a8061..7e58adacb 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -256,7 +256,11 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1) return nil } - //TODO Explain + // at this point we know that the triggers will be created and we don't want write the + // next events from the streamer, because the streamer works sequentially. So those + // events are either already handled, or have event functions in applyEventsQueue. + // So as not to create a potential deadlock, we write this func to applyEventsQueue + // asynchronously, understanding it doesn't really matter. go func() { this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) }() @@ -268,7 +272,11 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er this.allEventsUpToTriggersProcessed <- changelogStateString return nil } - //TODO Explain + // at this point we know that the triggers are created and we want to sanitize the inconsistent + // rows between the stop and the triggers event, because the streamer works sequentially. + // So those events are either already handled, or have event functions in applyEventsQueue. + // So as not to create a potential deadlock, we write this func to applyEventsQueue + // asynchronously, understanding it doesn't really matter. go func() { this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) }() diff --git a/go/sql/builder.go b/go/sql/builder.go index dc7729d0d..4b3ec6291 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -549,7 +549,7 @@ func BuildDeleteQuery(databaseName, ghostTableName string, uniqueKeyColumns *Col } result = fmt.Sprintf( - "delete /* gh-ost */ ignore from %s.%s where %s", + "delete /* gh-ost */ from %s.%s where %s", EscapeName(databaseName), EscapeName(ghostTableName), strings.Join(preparedValuesWhere, " or ")) @@ -587,7 +587,7 @@ func BuildInsertSelectQuery(databaseName, originalTableName, ghostTableName stri } result = fmt.Sprintf( - "insert /* gh-ost %s.%s */ ignore into %s.%s (%s) select %s from %s.%s force index (%s) where %s", + "insert /* gh-ost %s.%s */ into %s.%s (%s) select %s from %s.%s force index (%s) where %s", databaseName, originalTableName, databaseName, ghostTableName, mappedSharedColumnsListing, sharedColumnsListing, databaseName, originalTableName, From ddcae4bf0ac1a0f0aaa339a08f8f25ad0dae1baa Mon Sep 17 00:00:00 2001 From: Juan Manuel Fernandez Date: Sat, 18 May 2019 00:43:57 +0200 Subject: [PATCH 09/17] Fixed cut-over logic and changed doc --- doc/cut-over.md | 4 +-- go/base/context.go | 1 + go/logic/applier.go | 2 ++ go/logic/migrator.go | 58 ++++++++++++++------------------------------ 4 files changed, 23 insertions(+), 42 deletions(-) diff --git a/doc/cut-over.md b/doc/cut-over.md index bb0060873..9395997d6 100644 --- a/doc/cut-over.md +++ b/doc/cut-over.md @@ -10,10 +10,10 @@ Another option to support a atomic swap without the use of a lock is the use of gh-ost philosophy is avoid the use of triggers but in some environments like a Galera cluster isn't possible use the lock command, and like the cut-over should take a little time it shouldn't be a problem. Triggers cut-over works like the following: -- A stop writes event is injected in the binlog and gh-ost disable the writes once it receive it. +- gh-ost disable the writes but it keep reading the binlog to keep a record of the changed rows. - The triggers are created to handle the modifications in the MySQL side. - A created triggers event is injected in the binlog and gh-ost wait until receive it. -- The affected rows will be in an inconsistent stata during the time between the first and the second event. For this reason, this events are checked and, the values of the fields that are part of the unique key used to do the online alter are saved to sanitize that rows. +- The affected rows during the change of writer could be in an inconsistent state, for this reason is necessary sanitize the modified rows removing them if exists and adding them if exists from the original table to the ghost one. `gh-ost` solves this by using an atomic, two-step blocking swap: while one connection holds the lock, another attempts the atomic `RENAME`. The `RENAME` is guaranteed to not be executed prematurely by positioning a sentry table which blocks the `RENAME` operation until `gh-ost` is satisfied all is in order. diff --git a/go/base/context.go b/go/base/context.go index 695faad0a..5808d504d 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -174,6 +174,7 @@ type MigrationContext struct { StartTime time.Time RowCopyStartTime time.Time RowCopyEndTime time.Time + CreateTriggersStartTime time.Time LockTablesStartTime time.Time RenameTablesStartTime time.Time RenameTablesEndTime time.Time diff --git a/go/logic/applier.go b/go/logic/applier.go index 0c7adc775..7ad8540c2 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -649,6 +649,8 @@ func (this *Applier) CreateTriggersOriginalTable() error { strings.Join(insertValues, ", ")) return func() error { + this.migrationContext.CreateTriggersStartTime = time.Now() + tx, err := this.db.Begin() if err != nil { return err diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 7e58adacb..de35b60a7 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -27,7 +27,6 @@ type ChangelogState string const ( GhostTableMigrated ChangelogState = "GhostTableMigrated" AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" - StopWriteEvents = "StopWriteEvents" AllEventsUpToTriggersProcessed = "AllEventsUpToTriggersProcessed" ) @@ -77,6 +76,7 @@ type Migrator struct { ghostTableMigrated chan bool rowCopyComplete chan error allEventsUpToLockProcessed chan string + stoppedBinlogWrites chan bool allEventsUpToTriggersProcessed chan string triggerCutoverUniqueKeys [][]interface{} @@ -250,30 +250,15 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) }() } - case StopWriteEvents: - { - var applyEventFunc tableWriteFunc = func() error { - atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1) - return nil - } - // at this point we know that the triggers will be created and we don't want write the - // next events from the streamer, because the streamer works sequentially. So those - // events are either already handled, or have event functions in applyEventsQueue. - // So as not to create a potential deadlock, we write this func to applyEventsQueue - // asynchronously, understanding it doesn't really matter. - go func() { - this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) - }() - } case AllEventsUpToTriggersProcessed: { var applyEventFunc tableWriteFunc = func() error { - atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 2) + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 3) this.allEventsUpToTriggersProcessed <- changelogStateString return nil } // at this point we know that the triggers are created and we want to sanitize the inconsistent - // rows between the stop and the triggers event, because the streamer works sequentially. + // rows between the stop writes and the triggers event, because the streamer works sequentially. // So those events are either already handled, or have event functions in applyEventsQueue. // So as not to create a potential deadlock, we write this func to applyEventsQueue // asynchronously, understanding it doesn't really matter. @@ -601,22 +586,13 @@ func (this *Migrator) cutOver() (err error) { return this.migrationContext.Log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) } -// injectStopWriteEvents Inject the "StopWriteEvents" state hint -func (this *Migrator) injectStopWriteEvents() (err error) { - this.migrationContext.MarkPointOfInterest() - injectStopWriteEventsStartTime := time.Now() +// waitForStopWrites Disable the writes from the binlog and keep a record of the affected rows +func (this *Migrator) waitForStopWrites() (err error) { + log.Infof("Stopping the writes") + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1) + <-this.stoppedBinlogWrites - injectStopWriteEventsChallenge := fmt.Sprintf( - "%s:%d", - string(StopWriteEvents), - injectStopWriteEventsStartTime.UnixNano()) - - log.Infof("Writing changelog state: %+v", injectStopWriteEventsChallenge) - if _, err := this.applier.WriteChangelogState(injectStopWriteEventsChallenge); err != nil { - return err - } - - this.printStatus(ForcePrintStatusAndHintRule) + log.Infof("Done waiting to stop the writes") return nil } @@ -639,7 +615,6 @@ func (this *Migrator) waitForEventsUpToTriggers() (err error) { return err } log.Infof("Waiting for events up to triggers") - //atomic.StoreInt64(&this.migrationContext.AllEventsUpToTriggersProcessedInjectedFlag, 1) for found := false; !found; { select { case <-timeout.C: @@ -741,9 +716,7 @@ func (this *Migrator) cutOverTrigger() (err error) { atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) - if err := this.injectStopWriteEvents(); err != nil { - return err - } + this.waitForStopWrites() log.Infof( "Creating triggers for %s.%s", @@ -789,9 +762,9 @@ CompareNewRow: return err } - lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime) + createTriggersAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.CreateTriggersStartTime) renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime) - log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) + log.Debugf("Create triggers & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", createTriggersAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) return nil } @@ -1381,12 +1354,17 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { if eventStruct.dmlEvent != nil { applyDMLEventState := atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) if applyDMLEventState == 1 { + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 2) + applyDMLEventState = 2 + this.stoppedBinlogWrites <- true + } + if applyDMLEventState == 2 { this.triggerCutoverUniqueKeys = append( this.triggerCutoverUniqueKeys, this.applier.ObtainUniqueKeyValuesOfEvent(eventStruct.dmlEvent)...) return nil } - if applyDMLEventState == 2 { + if applyDMLEventState == 3 { return nil } From 28a462967eb9d0e07b41a419cfbc01e14d5db7b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Mon, 20 May 2019 17:11:18 +0200 Subject: [PATCH 10/17] Added logic to remove the triggers in case of error and created channel --- go/logic/applier.go | 6 +++--- go/logic/migrator.go | 3 +++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index 7ad8540c2..906e1cb88 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -678,17 +678,17 @@ func (this *Applier) DropTriggersOldTable() error { prefix := this.makeTriggerPrefix() dropDeleteTrigger := fmt.Sprintf( - "DROP /* gh-ost */ TRIGGER %s.`%s_del`", + "DROP /* gh-ost */ TRIGGER IF EXISTS %s.`%s_del`", sql.EscapeName(this.migrationContext.DatabaseName), prefix) dropInsertTrigger := fmt.Sprintf( - "DROP /* gh-ost */ TRIGGER %s.`%s_ins`", + "DROP /* gh-ost */ TRIGGER IF EXISTS %s.`%s_ins`", sql.EscapeName(this.migrationContext.DatabaseName), prefix) dropUpdateTrigger := fmt.Sprintf( - "DROP /* gh-ost */ TRIGGER %s.`%s_upd`", + "DROP /* gh-ost */ TRIGGER IF EXISTS %s.`%s_upd`", sql.EscapeName(this.migrationContext.DatabaseName), prefix) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index de35b60a7..665cbf638 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -100,6 +100,7 @@ func NewMigrator(context *base.MigrationContext) *Migrator { firstThrottlingCollected: make(chan bool, 3), rowCopyComplete: make(chan error), allEventsUpToLockProcessed: make(chan string), + stoppedBinlogWrites: make(chan bool), allEventsUpToTriggersProcessed: make(chan string), copyRowsQueue: make(chan tableWriteFunc), applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), @@ -718,6 +719,8 @@ func (this *Migrator) cutOverTrigger() (err error) { this.waitForStopWrites() + defer this.applier.DropTriggersOldTable() + log.Infof( "Creating triggers for %s.%s", this.migrationContext.DatabaseName, From bed3b88cce8f0b7b3f9ee159e06349d7207d3461 Mon Sep 17 00:00:00 2001 From: Juan Manuel Fernandez Date: Mon, 20 May 2019 19:34:44 +0200 Subject: [PATCH 11/17] Explained better method logic --- go/logic/applier.go | 4 ++-- go/logic/migrator.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index 906e1cb88..0e76e54df 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -673,8 +673,8 @@ func (this *Applier) CreateTriggersOriginalTable() error { }() } -// DropTriggersOldTable Drop triggers from the old table -func (this *Applier) DropTriggersOldTable() error { +// DropTriggersOldTableIfExists Drop triggers from the old table if them exists +func (this *Applier) DropTriggersOldTableIfExists() error { prefix := this.makeTriggerPrefix() dropDeleteTrigger := fmt.Sprintf( diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 665cbf638..db07da280 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -719,7 +719,7 @@ func (this *Migrator) cutOverTrigger() (err error) { this.waitForStopWrites() - defer this.applier.DropTriggersOldTable() + defer this.applier.DropTriggersOldTableIfExists() log.Infof( "Creating triggers for %s.%s", @@ -761,7 +761,7 @@ CompareNewRow: if err := this.retryOperation(this.applier.SwapTables); err != nil { return err } - if err := this.retryOperation(this.applier.DropTriggersOldTable); err != nil { + if err := this.retryOperation(this.applier.DropTriggersOldTableIfExists); err != nil { return err } From 728a9e1aba5b6902ff146b3d5804ccadfc1b4004 Mon Sep 17 00:00:00 2001 From: Juan Manuel Fernandez Date: Mon, 20 May 2019 20:05:31 +0200 Subject: [PATCH 12/17] Fixed wait forever if not updates take place in the table during the cut-over --- doc/cut-over.md | 4 +-- go/logic/migrator.go | 78 +++++++++++++++++++++++++++++++++++--------- 2 files changed, 64 insertions(+), 18 deletions(-) diff --git a/doc/cut-over.md b/doc/cut-over.md index 9395997d6..bb0060873 100644 --- a/doc/cut-over.md +++ b/doc/cut-over.md @@ -10,10 +10,10 @@ Another option to support a atomic swap without the use of a lock is the use of gh-ost philosophy is avoid the use of triggers but in some environments like a Galera cluster isn't possible use the lock command, and like the cut-over should take a little time it shouldn't be a problem. Triggers cut-over works like the following: -- gh-ost disable the writes but it keep reading the binlog to keep a record of the changed rows. +- A stop writes event is injected in the binlog and gh-ost disable the writes once it receive it. - The triggers are created to handle the modifications in the MySQL side. - A created triggers event is injected in the binlog and gh-ost wait until receive it. -- The affected rows during the change of writer could be in an inconsistent state, for this reason is necessary sanitize the modified rows removing them if exists and adding them if exists from the original table to the ghost one. +- The affected rows will be in an inconsistent stata during the time between the first and the second event. For this reason, this events are checked and, the values of the fields that are part of the unique key used to do the online alter are saved to sanitize that rows. `gh-ost` solves this by using an atomic, two-step blocking swap: while one connection holds the lock, another attempts the atomic `RENAME`. The `RENAME` is guaranteed to not be executed prematurely by positioning a sentry table which blocks the `RENAME` operation until `gh-ost` is satisfied all is in order. diff --git a/go/logic/migrator.go b/go/logic/migrator.go index db07da280..4c7ac910d 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -27,6 +27,7 @@ type ChangelogState string const ( GhostTableMigrated ChangelogState = "GhostTableMigrated" AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" + StopWriteEvents = "StopWriteEvents" AllEventsUpToTriggersProcessed = "AllEventsUpToTriggersProcessed" ) @@ -76,7 +77,7 @@ type Migrator struct { ghostTableMigrated chan bool rowCopyComplete chan error allEventsUpToLockProcessed chan string - stoppedBinlogWrites chan bool + stoppedWriteEvents chan string allEventsUpToTriggersProcessed chan string triggerCutoverUniqueKeys [][]interface{} @@ -100,7 +101,7 @@ func NewMigrator(context *base.MigrationContext) *Migrator { firstThrottlingCollected: make(chan bool, 3), rowCopyComplete: make(chan error), allEventsUpToLockProcessed: make(chan string), - stoppedBinlogWrites: make(chan bool), + stoppedWriteEvents: make(chan string), allEventsUpToTriggersProcessed: make(chan string), copyRowsQueue: make(chan tableWriteFunc), applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), @@ -251,10 +252,26 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) }() } + case StopWriteEvents: + { + var applyEventFunc tableWriteFunc = func() error { + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1) + this.stoppedWriteEvents <- changelogStateString + return nil + } + // at this point we know that the triggers will be created and we don't want write the + // next events from the streamer, because the streamer works sequentially. So those + // events are either already handled, or have event functions in applyEventsQueue. + // So as not to create a potential deadlock, we write this func to applyEventsQueue + // asynchronously, understanding it doesn't really matter. + go func() { + this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) + }() + } case AllEventsUpToTriggersProcessed: { var applyEventFunc tableWriteFunc = func() error { - atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 3) + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 2) this.allEventsUpToTriggersProcessed <- changelogStateString return nil } @@ -587,13 +604,45 @@ func (this *Migrator) cutOver() (err error) { return this.migrationContext.Log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) } -// waitForStopWrites Disable the writes from the binlog and keep a record of the affected rows -func (this *Migrator) waitForStopWrites() (err error) { - log.Infof("Stopping the writes") - atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1) - <-this.stoppedBinlogWrites +// waitForStopWriteEvents Inject the "StopWriteEvents" state hint, +// wait for it to appear in the binary logs, make sure the queue is drained. +func (this *Migrator) waitForStopWriteEvents() (err error) { + timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds)) - log.Infof("Done waiting to stop the writes") + this.migrationContext.MarkPointOfInterest() + stopWriteEventsStartTime := time.Now() + + stopWriteEventsChallenge := fmt.Sprintf( + "%s:%d", + string(StopWriteEvents), + stopWriteEventsStartTime.UnixNano()) + + log.Infof("Writing changelog state: %+v", stopWriteEventsChallenge) + if _, err := this.applier.WriteChangelogState(stopWriteEventsChallenge); err != nil { + return err + } + log.Infof("Waiting for stop writes") + for found := false; !found; { + select { + case <-timeout.C: + { + return log.Errorf("Timeout while waiting for stop writes") + } + case state := <-this.stoppedWriteEvents: + { + if state == stopWriteEventsChallenge { + log.Infof("Waiting for stop writes: got %s", state) + found = true + } else { + log.Infof("Waiting for stop writes: skipping %s", state) + } + } + } + } + stopWriteEventsDuration := time.Since(stopWriteEventsStartTime) + + log.Infof("Done waiting for stop writes; duration=%+v", stopWriteEventsDuration) + this.printStatus(ForcePrintStatusAndHintRule) return nil } @@ -717,7 +766,9 @@ func (this *Migrator) cutOverTrigger() (err error) { atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) - this.waitForStopWrites() + if err := this.waitForStopWriteEvents(); err != nil { + return err + } defer this.applier.DropTriggersOldTableIfExists() @@ -1357,17 +1408,12 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { if eventStruct.dmlEvent != nil { applyDMLEventState := atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) if applyDMLEventState == 1 { - atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 2) - applyDMLEventState = 2 - this.stoppedBinlogWrites <- true - } - if applyDMLEventState == 2 { this.triggerCutoverUniqueKeys = append( this.triggerCutoverUniqueKeys, this.applier.ObtainUniqueKeyValuesOfEvent(eventStruct.dmlEvent)...) return nil } - if applyDMLEventState == 3 { + if applyDMLEventState == 2 { return nil } From 87e61a0cfb6d4994be6d638ddb609ad63f704dcf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Tue, 10 Sep 2019 10:47:37 +0200 Subject: [PATCH 13/17] Don't use singleton connection for the swap --- go/logic/applier.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index 0e76e54df..d239c0653 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -859,7 +859,7 @@ func (this *Applier) SwapTables() error { ) log.Infof("Swaping original and new table: %s", query) this.migrationContext.RenameTablesStartTime = time.Now() - if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil { + if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { return err } this.migrationContext.RenameTablesEndTime = time.Now() From 71b1a0b08d619bfb7bb7cc66a3f7069367ab68d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Tue, 10 Sep 2019 15:37:06 +0200 Subject: [PATCH 14/17] Runned automatic formater --- go/base/context.go | 2 +- go/logic/applier.go | 29 ++++++++++++++--------------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 5808d504d..daaa7c719 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -226,7 +226,7 @@ type MigrationContext struct { MigrationIterationRangeMaxValues *sql.ColumnValues ForceTmpTableName string - TriggerCutoverUniqueKeys [][]interface {} + TriggerCutoverUniqueKeys [][]interface{} recentBinlogCoordinates mysql.BinlogCoordinates diff --git a/go/logic/applier.go b/go/logic/applier.go index d239c0653..4a16181d5 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -8,11 +8,11 @@ package logic import ( gosql "database/sql" "fmt" + "regexp" + "strings" "sync" "sync/atomic" "time" - "regexp" - "strings" "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/binlog" @@ -586,9 +586,9 @@ func (this *Applier) CreateTriggersOriginalTable() error { } deleteTrigger := fmt.Sprintf( - "CREATE /* gh-ost */ TRIGGER `%s_del` AFTER DELETE ON %s.%s " + - "FOR EACH ROW " + - "DELETE /* gh-ost */ IGNORE FROM %s.%s WHERE %s", + "CREATE /* gh-ost */ TRIGGER `%s_del` AFTER DELETE ON %s.%s "+ + "FOR EACH ROW "+ + "DELETE /* gh-ost */ IGNORE FROM %s.%s WHERE %s", prefix, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName), @@ -606,11 +606,10 @@ func (this *Applier) CreateTriggersOriginalTable() error { insertValues = append(insertValues, value) } - insertTrigger := fmt.Sprintf( - "CREATE /* gh-ost */ TRIGGER `%s_ins` AFTER INSERT ON %s.%s " + - "FOR EACH ROW " + - "REPLACE /* gh-ost */ INTO %s.%s (%s) VALUES (%s)", + "CREATE /* gh-ost */ TRIGGER `%s_ins` AFTER INSERT ON %s.%s "+ + "FOR EACH ROW "+ + "REPLACE /* gh-ost */ INTO %s.%s (%s) VALUES (%s)", prefix, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName), @@ -630,12 +629,12 @@ func (this *Applier) CreateTriggersOriginalTable() error { } updateTrigger := fmt.Sprintf( - "CREATE /* gh-ost */ TRIGGER `%s_upd` AFTER UPDATE ON %s.%s " + - "FOR EACH ROW " + - "BEGIN /* gh-ost */ " + - "DELETE IGNORE FROM %s.%s WHERE !(%s) AND %s; " + - "REPLACE INTO %s.%s (%s) VALUES (%s); " + - "END", + "CREATE /* gh-ost */ TRIGGER `%s_upd` AFTER UPDATE ON %s.%s "+ + "FOR EACH ROW "+ + "BEGIN /* gh-ost */ "+ + "DELETE IGNORE FROM %s.%s WHERE !(%s) AND %s; "+ + "REPLACE INTO %s.%s (%s) VALUES (%s); "+ + "END", prefix, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName), From e604ef49941cc15d5b648aab092355fd5f9f0fa7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Mon, 27 Jan 2020 10:42:14 +0100 Subject: [PATCH 15/17] Improve sanitize message --- go/logic/applier.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index 4a16181d5..6fdbe75c3 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -782,8 +782,9 @@ func (this *Applier) SanitizeRowsDuringCutOver() error { return insertSelectErr } - log.Infof("Sanitizing chunk of %d created rows during trigger creation", - len(chunkValues)) + log.Infof("Sanitizing chunk of created rows during trigger creation (%d/%d)", + int64(len(this.migrationContext.TriggerCutoverUniqueKeys))-cutIndex, + len(this.migrationContext.TriggerCutoverUniqueKeys)) err := func() error { tx, err := this.db.Begin() From 08f4e34dd964af214f3eddc39e2977e51f0304ce Mon Sep 17 00:00:00 2001 From: Alon Peer Date: Thu, 26 Aug 2021 10:14:47 +0200 Subject: [PATCH 16/17] Set missing isUniqueKeyColumn value --- go/sql/builder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/sql/builder.go b/go/sql/builder.go index 4b3ec6291..11fd8cf18 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -519,7 +519,7 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol func ObtainUniqueKeyValues(tableColumns, uniqueKeyColumns *ColumnList, args []interface{}) (uniqueKeyArgs []interface{}) { for _, column := range uniqueKeyColumns.Columns() { tableOrdinal := tableColumns.Ordinals[column.Name] - arg := column.convertArg(args[tableOrdinal]) + arg := column.convertArg(args[tableOrdinal], true) uniqueKeyArgs = append(uniqueKeyArgs, arg) } return uniqueKeyArgs From 9a67740c1ee62ede6df044e14ea61a6f127f102b Mon Sep 17 00:00:00 2001 From: Alon Pe'er Date: Wed, 15 Sep 2021 13:15:42 +0200 Subject: [PATCH 17/17] Grammar fix Co-authored-by: Rashiq --- doc/cut-over.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/cut-over.md b/doc/cut-over.md index bb0060873..86a2f5999 100644 --- a/doc/cut-over.md +++ b/doc/cut-over.md @@ -12,7 +12,7 @@ lock command, and like the cut-over should take a little time it shouldn't be a - A stop writes event is injected in the binlog and gh-ost disable the writes once it receive it. - The triggers are created to handle the modifications in the MySQL side. -- A created triggers event is injected in the binlog and gh-ost wait until receive it. +- A created triggers event is injected in the binlog and gh-ost waits until it receive it. - The affected rows will be in an inconsistent stata during the time between the first and the second event. For this reason, this events are checked and, the values of the fields that are part of the unique key used to do the online alter are saved to sanitize that rows. `gh-ost` solves this by using an atomic, two-step blocking swap: while one connection holds the lock, another attempts the atomic `RENAME`. The `RENAME` is guaranteed to not be executed prematurely by positioning a sentry table which blocks the `RENAME` operation until `gh-ost` is satisfied all is in order.