From 85ab1bdfced7a1e2eceae520b4a584432c283846 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Thu, 28 Mar 2019 11:08:23 +0100 Subject: [PATCH 01/15] Added changes to use a cutover based on triggers --- go/base/context.go | 2 + go/cmd/gh-ost/main.go | 2 + go/logic/applier.go | 220 ++++++++++++++++++++++++++++++++++++++++++ go/logic/migrator.go | 147 +++++++++++++++++++++++++--- go/sql/builder.go | 72 ++++++++++++++ 5 files changed, 432 insertions(+), 11 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 5ebf09267..0668511a1 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -38,6 +38,7 @@ type CutOver int const ( CutOverAtomic CutOver = iota CutOverTwoStep + CutOverTrigger ) type ThrottleReasonHint string @@ -194,6 +195,7 @@ type MigrationContext struct { UserCommandedUnpostponeFlag int64 CutOverCompleteFlag int64 InCutOverCriticalSectionFlag int64 + ApplyDMLEventState int64 PanicAbort chan error OriginalTableColumnsOnApplier *sql.ColumnList diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index ce63b03e5..233f5ad19 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -226,6 +226,8 @@ func main() { migrationContext.CutOverType = base.CutOverAtomic case "two-step": migrationContext.CutOverType = base.CutOverTwoStep + case "trigger": + migrationContext.CutOverType = base.CutOverTrigger default: log.Fatalf("Unknown cut-over: %s", *cutOver) } diff --git a/go/logic/applier.go b/go/logic/applier.go index 5fb795bac..828ebb6d6 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -10,6 +10,8 @@ import ( "fmt" "sync/atomic" "time" + "regexp" + "strings" "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/binlog" @@ -516,6 +518,224 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected return chunkSize, rowsAffected, duration, nil } +// Create triggers from the original table to the new one +func (this *Applier) CreateTriggersOriginalTable() error { + /////////////////////// + // Trigger preffix // + /////////////////////// + + prefix := fmt.Sprintf( + "ghost_%s_%s", + this.migrationContext.DatabaseName, + this.migrationContext.OriginalTableName) + r := regexp.MustCompile("\\W") + prefix = r.ReplaceAllString(prefix, "_") + + if len(prefix) > 60 { + oldPrefix := prefix + prefix = prefix[0:60] + + log.Debugf( + "Trigger prefix %s is over 60 characters long, truncating to %s", + oldPrefix, + prefix) + } + + ////////////////////// + // Delete trigger // + ////////////////////// + + delIndexComparations := []string{} + for _, name := range this.migrationContext.UniqueKey.Columns.Names() { + columnQuoted := sql.EscapeName(name) + comparation := fmt.Sprintf( + "%s.%s.%s <=> OLD.%s", + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + columnQuoted, + columnQuoted) + delIndexComparations = append(delIndexComparations, comparation) + } + + deleteTrigger := fmt.Sprintf( + "CREATE /* gh-ost */ TRIGGER `%s_del` AFTER DELETE ON %s.%s " + + "FOR EACH ROW " + + "DELETE /* gh-ost */ IGNORE FROM %s.%s WHERE %s", + prefix, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + strings.Join(delIndexComparations, " AND ")) + + ////////////////////// + // Insert trigger // + ////////////////////// + + insertCols := []string{} + insertValues := []string{} + for i, origColumn := range this.migrationContext.SharedColumns.Names() { + key := sql.EscapeName(this.migrationContext.MappedSharedColumns.Columns()[i].Name) + insertCols = append(insertCols, key) + + value := fmt.Sprintf("NEW.%s", sql.EscapeName(origColumn)) + insertValues = append(insertValues, value) + } + + + insertTrigger := fmt.Sprintf( + "CREATE /* gh-ost */ TRIGGER `%s_ins` AFTER INSERT ON %s.%s " + + "FOR EACH ROW " + + "REPLACE /* gh-ost */ INTO %s.%s (%s) VALUES (%s)", + prefix, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + strings.Join(insertCols, ", "), + strings.Join(insertValues, ", ")) + + ////////////////////// + // Update trigger // + ////////////////////// + + updIndexComparations := []string{} + for _, name := range this.migrationContext.UniqueKey.Columns.Names() { + columnQuoted := sql.EscapeName(name) + comparation := fmt.Sprintf( + "OLD.%s <=> NEW.%s", + columnQuoted, + columnQuoted) + updIndexComparations = append(updIndexComparations, comparation) + } + + updateTrigger := fmt.Sprintf( + "CREATE /* gh-ost */ TRIGGER `%s_upd` AFTER UPDATE ON %s.%s " + + "FOR EACH ROW " + + "BEGIN /* gh-ost */ " + + "DELETE IGNORE FROM %s.%s WHERE !(%s) AND %s; " + + "REPLACE INTO %s.%s (%s) VALUES (%s); " + + "END", + prefix, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + strings.Join(updIndexComparations, " AND "), + strings.Join(delIndexComparations, " AND "), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + strings.Join(insertCols, ", "), + strings.Join(insertValues, ", ")) + + /////////////////////////// + // Create the triggers // + /////////////////////////// + + return func() error { + tx, err := this.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(deleteTrigger); err != nil { + return err + } + if _, err := tx.Exec(insertTrigger); err != nil { + return err + } + if _, err := tx.Exec(updateTrigger); err != nil { + return err + } + + if err := tx.Commit(); err != nil { + return err + } + return nil + }() +} + +//TODO +func (this *Applier) ObtainUniqueKeyValuesOfEvent(dmlEvent *binlog.BinlogDMLEvent) (uniqueKeys [][]interface{}) { + switch dmlEvent.DML { + case binlog.DeleteDML: + return append(uniqueKeys, + sql.ObtainUniqueKeyValues( + this.migrationContext.OriginalTableColumns, + &this.migrationContext.UniqueKey.Columns, + dmlEvent.WhereColumnValues.AbstractValues())) + + case binlog.InsertDML: + return append(uniqueKeys, + sql.ObtainUniqueKeyValues( + this.migrationContext.OriginalTableColumns, + &this.migrationContext.UniqueKey.Columns, + dmlEvent.NewColumnValues.AbstractValues())) + + case binlog.UpdateDML: + { + if _, isModified := this.updateModifiesUniqueKeyColumns(dmlEvent); isModified { + uniqueKeys = append(uniqueKeys, + sql.ObtainUniqueKeyValues( + this.migrationContext.OriginalTableColumns, + &this.migrationContext.UniqueKey.Columns, + dmlEvent.WhereColumnValues.AbstractValues())) + } + return append(uniqueKeys, + sql.ObtainUniqueKeyValues( + this.migrationContext.OriginalTableColumns, + &this.migrationContext.UniqueKey.Columns, + dmlEvent.NewColumnValues.AbstractValues())) + } + } + + return uniqueKeys +} + +//TODO +func (this *Applier) DropUniqueKeysGhostTable(uniqueKeysArgs [][]interface{}) error { + query, args, err := sql.BuildDeleteQuery( + this.migrationContext.DatabaseName, + this.migrationContext.OriginalTableName, + this.migrationContext.GetGhostTableName(), + &this.migrationContext.UniqueKey.Columns, + uniqueKeysArgs) + + if err != nil { + return err + } + + log.Infof("Delete query: %s", query) + log.Infof("Delete query args: %+v", args) + + return nil + + //return func() error { + //tx, err := this.db.Begin() + //if err != nil { + //return err + //} + + //if _, err := tx.Exec(deleteTrigger); err != nil { + //return err + //} + //if _, err := tx.Exec(insertTrigger); err != nil { + //return err + //} + //if _, err := tx.Exec(updateTrigger); err != nil { + //return err + //} + + //if err := tx.Commit(); err != nil { + //return err + //} + //return nil + //}() +} + + + + // LockOriginalTable places a write lock on the original table func (this *Applier) LockOriginalTable() error { query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, diff --git a/go/logic/migrator.go b/go/logic/migrator.go index b1a238fda..eca3cbc36 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -27,6 +27,7 @@ type ChangelogState string const ( GhostTableMigrated ChangelogState = "GhostTableMigrated" AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" + AllEventsUpToTriggersProcessed = "AllEventsUpToTriggersProcessed" ) func ReadChangelogState(s string) ChangelogState { @@ -71,10 +72,13 @@ type Migrator struct { hooksExecutor *HooksExecutor migrationContext *base.MigrationContext - firstThrottlingCollected chan bool - ghostTableMigrated chan bool - rowCopyComplete chan error - allEventsUpToLockProcessed chan string + firstThrottlingCollected chan bool + ghostTableMigrated chan bool + rowCopyComplete chan error + allEventsUpToLockProcessed chan string + allEventsUpToTriggersProcessed chan string + + triggerCutoverUniqueKeys [][]interface {} rowCopyCompleteFlag int64 // copyRowsQueue should not be buffered; if buffered some non-damaging but @@ -89,12 +93,13 @@ type Migrator struct { func NewMigrator(context *base.MigrationContext) *Migrator { migrator := &Migrator{ - migrationContext: context, - parser: sql.NewParser(), - ghostTableMigrated: make(chan bool), - firstThrottlingCollected: make(chan bool, 3), - rowCopyComplete: make(chan error), - allEventsUpToLockProcessed: make(chan string), + migrationContext: context, + parser: sql.NewParser(), + ghostTableMigrated: make(chan bool), + firstThrottlingCollected: make(chan bool, 3), + rowCopyComplete: make(chan error), + allEventsUpToLockProcessed: make(chan string), + allEventsUpToTriggersProcessed: make(chan string), copyRowsQueue: make(chan tableWriteFunc), applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), @@ -237,6 +242,17 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) }() } + case AllEventsUpToTriggersProcessed: + { + var applyEventFunc tableWriteFunc = func() error { + this.allEventsUpToTriggersProcessed <- changelogStateString + return nil + } + //TODO Explain + go func() { + this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) + }() + } default: { return fmt.Errorf("Unknown changelog state: %+v", changelogState) @@ -530,9 +546,58 @@ func (this *Migrator) cutOver() (err error) { this.handleCutOverResult(err) return err } + if this.migrationContext.CutOverType == base.CutOverTrigger { + err := this.cutOverTrigger() + this.handleCutOverResult(err) + return err + } return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) } +// Inject the "AllEventsUpToTriggersProcessed" state hint, wait for it to appear in the binary logs, +// make sure the queue is drained. +func (this *Migrator) waitForEventsUpToTriggers() (err error) { + timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds)) + + this.migrationContext.MarkPointOfInterest() + waitForEventsUpToTriggersStartTime := time.Now() + + allEventsUpToTriggersProcessedChallenge := fmt.Sprintf( + "%s:%d", + string(AllEventsUpToTriggersProcessed), + waitForEventsUpToTriggersStartTime.UnixNano()) + + log.Infof("Writing changelog state: %+v", allEventsUpToTriggersProcessedChallenge) + if _, err := this.applier.WriteChangelogState(allEventsUpToTriggersProcessedChallenge); err != nil { + return err + } + log.Infof("Waiting for events up to triggers") + //atomic.StoreInt64(&this.migrationContext.AllEventsUpToTriggersProcessedInjectedFlag, 1) + for found := false; !found; { + select { + case <-timeout.C: + { + return log.Errorf("Timeout while waiting for events up to triggers") + } + case state := <-this.allEventsUpToTriggersProcessed: + { + if state == allEventsUpToTriggersProcessedChallenge { + log.Infof("Waiting for events up to triggers: got %s", state) + found = true + } else { + log.Infof("Waiting for events up to triggers: skipping %s", state) + } + } + } + } + waitForEventsUpToTriggersDuration := time.Since(waitForEventsUpToTriggersStartTime) + + log.Infof("Done waiting for events up to triggers; duration=%+v", waitForEventsUpToTriggersDuration) + this.printStatus(ForcePrintStatusAndHintRule) + + return nil +} + // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // make sure the queue is drained. func (this *Migrator) waitForEventsUpToLock() (err error) { @@ -602,6 +667,52 @@ func (this *Migrator) cutOverTwoStep() (err error) { return nil } +// cutOverTrigger will create some INSERT, UPDATE and DELETE triggers to keep the table updated, +// After this it will copy the changes between the last insert and the triggers creation and do a +// simple RENAME TABLE. +func (this *Migrator) cutOverTrigger() (err error) { + atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) + defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1) + + log.Infof( + "Creating triggers for %s.%s", + this.migrationContext.DatabaseName, + this.migrationContext.OriginalTableName); + + time.Sleep(30 * time.Second) + + if err := this.retryOperation(this.applier.CreateTriggersOriginalTable); err != nil { + return err + } + + if err := this.waitForEventsUpToTriggers(); err != nil { + return log.Errore(err) + } + + uniqueKeys := this.triggerCutoverUniqueKeys + + if err := this.applier.DropUniqueKeysGhostTable(uniqueKeys); err != nil { + return err + } + + //if err := this.retryOperation(this.waitForEventsUpToLock); err != nil { + //return err + //} + //if err := this.retryOperation(this.applier.SwapTablesQuickAndBumpy); err != nil { + //return err + //} + //if err := this.retryOperation(this.applier.UnlockTables); err != nil { + //return err + //} + + //lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime) + //renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime) + //log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) + return nil + +} + // atomicCutOver func (this *Migrator) atomicCutOver() (err error) { atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) @@ -988,6 +1099,7 @@ func (this *Migrator) initiateStreaming() error { if err := this.eventsStreamer.InitDBConnections(); err != nil { return err } + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 0) this.eventsStreamer.AddListener( false, this.migrationContext.DatabaseName, @@ -1162,8 +1274,14 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { } if eventStruct.dmlEvent == nil { return handleNonDMLEventStruct(eventStruct) - } + } // And this (?) if eventStruct.dmlEvent != nil { + addCutoverKeys := atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) == 1 + if addCutoverKeys { + this.triggerCutoverUniqueKeys = append( + this.triggerCutoverUniqueKeys, + this.applier.ObtainUniqueKeyValuesOfEvent(eventStruct.dmlEvent)...) + } dmlEvents := [](*binlog.BinlogDMLEvent){} dmlEvents = append(dmlEvents, eventStruct.dmlEvent) var nonDmlStructToApply *applyEventStruct @@ -1183,6 +1301,13 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { break } dmlEvents = append(dmlEvents, additionalStruct.dmlEvent) + + addCutoverKeys = addCutoverKeys && (atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) == 1) + if addCutoverKeys { + this.triggerCutoverUniqueKeys = append( + this.triggerCutoverUniqueKeys, + this.applier.ObtainUniqueKeyValuesOfEvent(additionalStruct.dmlEvent)...) + } } // Create a task to apply the DML event; this will be execute by executeWriteFuncs() var applyEventFunc tableWriteFunc = func() error { diff --git a/go/sql/builder.go b/go/sql/builder.go index 2c5a7ae28..4da8b5874 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -508,3 +508,75 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol ) return result, sharedArgs, uniqueKeyArgs, nil } + +func ObtainUniqueKeyValues(tableColumns, uniqueKeyColumns *ColumnList, args []interface{}) (uniqueKeyArgs []interface{}) { + for _, column := range uniqueKeyColumns.Columns() { + tableOrdinal := tableColumns.Ordinals[column.Name] + arg := column.convertArg(args[tableOrdinal]) + uniqueKeyArgs = append(uniqueKeyArgs, arg) + } + return uniqueKeyArgs +} + +func buildColumnsPreparedValuesWhere(databaseName string, columns *ColumnList) []string { + values := make([]string, columns.Len(), columns.Len()) + for i, name := range columns.Names() { + values[i] = fmt.Sprintf( + "%s.%s = ?", + databaseName, + EscapeName(name)) + } + return values +} + +func buildColumnsEqualsWhere(databaseName, originalTableName, ghostTableName string, columns *ColumnList) []string { + values := make([]string, columns.Len(), columns.Len()) + for i, name := range columns.Names() { + name = EscapeName(name) + values[i] = fmt.Sprintf( + "%s.%s.%s <=> %s.%s.%s", + databaseName, + originalTableName, + name, + databaseName, + ghostTableName, + name) + } + return values +} + +func BuildDeleteQuery(databaseName, originalTableName, ghostTableName string, uniqueKeyColumns *ColumnList, uniqueKeyArgs [][]interface{}) (result string, explodedArgs []interface{}, err error) { + databaseName = EscapeName(databaseName) + originalTableName = EscapeName(originalTableName) + ghostTableName = EscapeName(ghostTableName) + + preparedValuesGetKey := buildColumnsPreparedValuesWhere( + databaseName, + uniqueKeyColumns) + preparedValuesExistsKey := buildColumnsEqualsWhere( + databaseName, + originalTableName, + ghostTableName, + uniqueKeyColumns) + + template := fmt.Sprintf( + "(%s and !(%s))", + strings.Join(preparedValuesGetKey, " and "), + strings.Join(preparedValuesExistsKey, " and ")) + + // WHERE ((gho.PK = ?) AND !(orig.PK <=> gho.PK)) OR (XXX) + + preparedValuesWhere := make([]string, len(uniqueKeyArgs), len(uniqueKeyArgs)) + for i, uniqueKey := range uniqueKeyArgs { + explodedArgs = append(explodedArgs, uniqueKey...) + preparedValuesWhere[i] = template + } + + result = fmt.Sprintf( + "DELETE /* gh-ost */ FROM %s.%s WHERE %s", + databaseName, + ghostTableName, + strings.Join(preparedValuesWhere, " or ")) + + return result, explodedArgs, nil +} From d6f00892a67fa1055e9f8e0a5059995bf5d14473 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Thu, 28 Mar 2019 13:39:17 +0100 Subject: [PATCH 02/15] Remove triggers --- go/base/context.go | 1 - go/logic/applier.go | 130 ++++++++++++++++++++++++++----------------- go/logic/migrator.go | 47 +++------------- 3 files changed, 89 insertions(+), 89 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 0668511a1..e7f96e82e 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -195,7 +195,6 @@ type MigrationContext struct { UserCommandedUnpostponeFlag int64 CutOverCompleteFlag int64 InCutOverCriticalSectionFlag int64 - ApplyDMLEventState int64 PanicAbort chan error OriginalTableColumnsOnApplier *sql.ColumnList diff --git a/go/logic/applier.go b/go/logic/applier.go index 828ebb6d6..bb8a84329 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -518,12 +518,8 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected return chunkSize, rowsAffected, duration, nil } -// Create triggers from the original table to the new one -func (this *Applier) CreateTriggersOriginalTable() error { - /////////////////////// - // Trigger preffix // - /////////////////////// - +// Create the trigger prefix +func (this *Applier) makeTriggerPrefix() string { prefix := fmt.Sprintf( "ghost_%s_%s", this.migrationContext.DatabaseName, @@ -541,6 +537,13 @@ func (this *Applier) CreateTriggersOriginalTable() error { prefix) } + return prefix +} + +// Create triggers from the original table to the new one +func (this *Applier) CreateTriggersOriginalTable() error { + prefix := this.makeTriggerPrefix() + ////////////////////// // Delete trigger // ////////////////////// @@ -655,6 +658,52 @@ func (this *Applier) CreateTriggersOriginalTable() error { }() } +// Drop triggers from the old table +func (this *Applier) DropTriggersOldTable() error { + prefix := this.makeTriggerPrefix() + + dropDeleteTrigger := fmt.Sprintf( + "DROP /* gh-ost */ TRIGGER %s.`%s_del`", + sql.EscapeName(this.migrationContext.DatabaseName), + prefix) + + dropInsertTrigger := fmt.Sprintf( + "DROP /* gh-ost */ TRIGGER %s.`%s_ins`", + sql.EscapeName(this.migrationContext.DatabaseName), + prefix) + + dropUpdateTrigger := fmt.Sprintf( + "DROP /* gh-ost */ TRIGGER %s.`%s_upd`", + sql.EscapeName(this.migrationContext.DatabaseName), + prefix) + + ///////////////////////// + // Drop the triggers // + ///////////////////////// + + return func() error { + tx, err := this.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(dropDeleteTrigger); err != nil { + return err + } + if _, err := tx.Exec(dropInsertTrigger); err != nil { + return err + } + if _, err := tx.Exec(dropUpdateTrigger); err != nil { + return err + } + + if err := tx.Commit(); err != nil { + return err + } + return nil + }() +} + //TODO func (this *Applier) ObtainUniqueKeyValuesOfEvent(dmlEvent *binlog.BinlogDMLEvent) (uniqueKeys [][]interface{}) { switch dmlEvent.DML { @@ -692,50 +741,6 @@ func (this *Applier) ObtainUniqueKeyValuesOfEvent(dmlEvent *binlog.BinlogDMLEven return uniqueKeys } -//TODO -func (this *Applier) DropUniqueKeysGhostTable(uniqueKeysArgs [][]interface{}) error { - query, args, err := sql.BuildDeleteQuery( - this.migrationContext.DatabaseName, - this.migrationContext.OriginalTableName, - this.migrationContext.GetGhostTableName(), - &this.migrationContext.UniqueKey.Columns, - uniqueKeysArgs) - - if err != nil { - return err - } - - log.Infof("Delete query: %s", query) - log.Infof("Delete query args: %+v", args) - - return nil - - //return func() error { - //tx, err := this.db.Begin() - //if err != nil { - //return err - //} - - //if _, err := tx.Exec(deleteTrigger); err != nil { - //return err - //} - //if _, err := tx.Exec(insertTrigger); err != nil { - //return err - //} - //if _, err := tx.Exec(updateTrigger); err != nil { - //return err - //} - - //if err := tx.Commit(); err != nil { - //return err - //} - //return nil - //}() -} - - - - // LockOriginalTable places a write lock on the original table func (this *Applier) LockOriginalTable() error { query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, @@ -765,6 +770,31 @@ func (this *Applier) UnlockTables() error { return nil } +// SwapTables issues a one-step swap table operation: +// - rename original table to _old +// - rename ghost table to original +func (this *Applier) SwapTables() error { + query := fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetOldTableName()), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + ) + log.Infof("Swaping original and new table: %s", query) + this.migrationContext.RenameTablesStartTime = time.Now() + if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil { + return err + } + this.migrationContext.RenameTablesEndTime = time.Now() + + log.Infof("Tables swaped") + return nil +} + // SwapTablesQuickAndBumpy issues a two-step swap table operation: // - rename original table to _old // - rename ghost table to original diff --git a/go/logic/migrator.go b/go/logic/migrator.go index eca3cbc36..d8c55b658 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -78,8 +78,6 @@ type Migrator struct { allEventsUpToLockProcessed chan string allEventsUpToTriggersProcessed chan string - triggerCutoverUniqueKeys [][]interface {} - rowCopyCompleteFlag int64 // copyRowsQueue should not be buffered; if buffered some non-damaging but // excessive work happens at the end of the iteration as new copy-jobs arrive before realizing the copy is complete @@ -673,15 +671,13 @@ func (this *Migrator) cutOverTwoStep() (err error) { func (this *Migrator) cutOverTrigger() (err error) { atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) - atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1) + atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0) log.Infof( "Creating triggers for %s.%s", this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName); - time.Sleep(30 * time.Second) - if err := this.retryOperation(this.applier.CreateTriggersOriginalTable); err != nil { return err } @@ -689,28 +685,17 @@ func (this *Migrator) cutOverTrigger() (err error) { if err := this.waitForEventsUpToTriggers(); err != nil { return log.Errore(err) } - - uniqueKeys := this.triggerCutoverUniqueKeys - - if err := this.applier.DropUniqueKeysGhostTable(uniqueKeys); err != nil { + if err := this.retryOperation(this.applier.SwapTables); err != nil { + return err + } + if err := this.retryOperation(this.applier.DropTriggersOldTable); err != nil { return err } - //if err := this.retryOperation(this.waitForEventsUpToLock); err != nil { - //return err - //} - //if err := this.retryOperation(this.applier.SwapTablesQuickAndBumpy); err != nil { - //return err - //} - //if err := this.retryOperation(this.applier.UnlockTables); err != nil { - //return err - //} - - //lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime) - //renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime) - //log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) + lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime) + renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime) + log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) return nil - } // atomicCutOver @@ -1099,7 +1084,6 @@ func (this *Migrator) initiateStreaming() error { if err := this.eventsStreamer.InitDBConnections(); err != nil { return err } - atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 0) this.eventsStreamer.AddListener( false, this.migrationContext.DatabaseName, @@ -1274,14 +1258,8 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { } if eventStruct.dmlEvent == nil { return handleNonDMLEventStruct(eventStruct) - } // And this (?) + } if eventStruct.dmlEvent != nil { - addCutoverKeys := atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) == 1 - if addCutoverKeys { - this.triggerCutoverUniqueKeys = append( - this.triggerCutoverUniqueKeys, - this.applier.ObtainUniqueKeyValuesOfEvent(eventStruct.dmlEvent)...) - } dmlEvents := [](*binlog.BinlogDMLEvent){} dmlEvents = append(dmlEvents, eventStruct.dmlEvent) var nonDmlStructToApply *applyEventStruct @@ -1301,13 +1279,6 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { break } dmlEvents = append(dmlEvents, additionalStruct.dmlEvent) - - addCutoverKeys = addCutoverKeys && (atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) == 1) - if addCutoverKeys { - this.triggerCutoverUniqueKeys = append( - this.triggerCutoverUniqueKeys, - this.applier.ObtainUniqueKeyValuesOfEvent(additionalStruct.dmlEvent)...) - } } // Create a task to apply the DML event; this will be execute by executeWriteFuncs() var applyEventFunc tableWriteFunc = func() error { From ceed8440cc4b049bd137fe2bd61b889c1ead37f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Thu, 28 Mar 2019 18:22:37 +0100 Subject: [PATCH 03/15] Reverted --- go/base/context.go | 1 + go/logic/migrator.go | 24 +++++++++++++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/go/base/context.go b/go/base/context.go index e7f96e82e..2df9a20b0 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -195,6 +195,7 @@ type MigrationContext struct { UserCommandedUnpostponeFlag int64 CutOverCompleteFlag int64 InCutOverCriticalSectionFlag int64 + ApplyDMLEventState int64 PanicAbort chan error OriginalTableColumnsOnApplier *sql.ColumnList diff --git a/go/logic/migrator.go b/go/logic/migrator.go index d8c55b658..5e8f43edf 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -78,6 +78,8 @@ type Migrator struct { allEventsUpToLockProcessed chan string allEventsUpToTriggersProcessed chan string + triggerCutoverUniqueKeys [][]interface {} + rowCopyCompleteFlag int64 // copyRowsQueue should not be buffered; if buffered some non-damaging but // excessive work happens at the end of the iteration as new copy-jobs arrive before realizing the copy is complete @@ -671,7 +673,7 @@ func (this *Migrator) cutOverTwoStep() (err error) { func (this *Migrator) cutOverTrigger() (err error) { atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) - atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0) + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1) log.Infof( "Creating triggers for %s.%s", @@ -685,6 +687,11 @@ func (this *Migrator) cutOverTrigger() (err error) { if err := this.waitForEventsUpToTriggers(); err != nil { return log.Errore(err) } + + //TODO remove + log.Infof("Received trigger event, waiting 30s after swap it") + time.Sleep(30 * time.Second) + if err := this.retryOperation(this.applier.SwapTables); err != nil { return err } @@ -1084,6 +1091,7 @@ func (this *Migrator) initiateStreaming() error { if err := this.eventsStreamer.InitDBConnections(); err != nil { return err } + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 0) this.eventsStreamer.AddListener( false, this.migrationContext.DatabaseName, @@ -1260,6 +1268,13 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { return handleNonDMLEventStruct(eventStruct) } if eventStruct.dmlEvent != nil { + addCutoverKeys := atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) == 1 + if addCutoverKeys { + this.triggerCutoverUniqueKeys = append( + this.triggerCutoverUniqueKeys, + this.applier.ObtainUniqueKeyValuesOfEvent(eventStruct.dmlEvent)...) + } + dmlEvents := [](*binlog.BinlogDMLEvent){} dmlEvents = append(dmlEvents, eventStruct.dmlEvent) var nonDmlStructToApply *applyEventStruct @@ -1279,6 +1294,13 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { break } dmlEvents = append(dmlEvents, additionalStruct.dmlEvent) + + addCutoverKeys = addCutoverKeys && (atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) == 1) + if addCutoverKeys { + this.triggerCutoverUniqueKeys = append( + this.triggerCutoverUniqueKeys, + this.applier.ObtainUniqueKeyValuesOfEvent(additionalStruct.dmlEvent)...) + } } // Create a task to apply the DML event; this will be execute by executeWriteFuncs() var applyEventFunc tableWriteFunc = func() error { From cad9b3b6a14aaee51be21b26ca66b7806a8c3ab3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Fri, 29 Mar 2019 10:09:59 +0100 Subject: [PATCH 04/15] Added logic to insert events during trigger creation --- go/logic/applier.go | 46 +++++++++++++++++++++++++++ go/logic/migrator.go | 67 ++++++++++++++++++++++++++++++++------- go/sql/builder.go | 75 +++++++++++++++++++++++++------------------- 3 files changed, 143 insertions(+), 45 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index bb8a84329..1a37d9280 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -741,6 +741,52 @@ func (this *Applier) ObtainUniqueKeyValuesOfEvent(dmlEvent *binlog.BinlogDMLEven return uniqueKeys } +func (this *Applier) DropUniqueKeysGhostTable(uniqueKeysArgs [][]interface{}) error { + if len(uniqueKeysArgs) == 0 { + return nil + } + + query, args, err := sql.BuildDeleteQuery( + this.migrationContext.DatabaseName, + this.migrationContext.GetGhostTableName(), + &this.migrationContext.UniqueKey.Columns, + uniqueKeysArgs) + + if err != nil { + return err + } + + log.Infof("Removing %d created rows during trigger creation", + len(uniqueKeysArgs)) + _, err = sqlutils.ExecNoPrepare(this.db, query, args...) + return err +} + +func (this *Applier) InsertSelectUniqueKeysGhostTable(uniqueKeysArgs [][]interface{}) error { + if len(uniqueKeysArgs) == 0 { + return nil + } + + query, args, err := sql.BuildInsertSelectQuery( + this.migrationContext.DatabaseName, + this.migrationContext.OriginalTableName, + this.migrationContext.GetGhostTableName(), + this.migrationContext.SharedColumns.Names(), + this.migrationContext.MappedSharedColumns.Names(), + this.migrationContext.UniqueKey.Name, + &this.migrationContext.UniqueKey.Columns, + uniqueKeysArgs) + + if err != nil { + return err + } + + log.Infof("Inserting %d created rows during trigger creation", + len(uniqueKeysArgs)) + _, err = sqlutils.ExecNoPrepare(this.db, query, args...) + return err +} + // LockOriginalTable places a write lock on the original table func (this *Applier) LockOriginalTable() error { query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 5e8f43edf..4605d27d2 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -27,6 +27,7 @@ type ChangelogState string const ( GhostTableMigrated ChangelogState = "GhostTableMigrated" AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" + StopWriteEvents = "StopWriteEvents" AllEventsUpToTriggersProcessed = "AllEventsUpToTriggersProcessed" ) @@ -242,9 +243,21 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) }() } + case StopWriteEvents: + { + var applyEventFunc tableWriteFunc = func() error { + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1) + return nil + } + //TODO Explain + go func() { + this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) + }() + } case AllEventsUpToTriggersProcessed: { var applyEventFunc tableWriteFunc = func() error { + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 2) this.allEventsUpToTriggersProcessed <- changelogStateString return nil } @@ -554,6 +567,26 @@ func (this *Migrator) cutOver() (err error) { return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) } +// Inject the "StopWriteEvents" state hint +func (this *Migrator) injectStopWriteEvents() (err error) { + this.migrationContext.MarkPointOfInterest() + injectStopWriteEventsStartTime := time.Now() + + injectStopWriteEventsChallenge := fmt.Sprintf( + "%s:%d", + string(StopWriteEvents), + injectStopWriteEventsStartTime.UnixNano()) + + log.Infof("Writing changelog state: %+v", injectStopWriteEventsChallenge) + if _, err := this.applier.WriteChangelogState(injectStopWriteEventsChallenge); err != nil { + return err + } + + this.printStatus(ForcePrintStatusAndHintRule) + + return nil +} + // Inject the "AllEventsUpToTriggersProcessed" state hint, wait for it to appear in the binary logs, // make sure the queue is drained. func (this *Migrator) waitForEventsUpToTriggers() (err error) { @@ -673,7 +706,10 @@ func (this *Migrator) cutOverTwoStep() (err error) { func (this *Migrator) cutOverTrigger() (err error) { atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) - atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1) + + if err := this.injectStopWriteEvents(); err != nil { + return err + } log.Infof( "Creating triggers for %s.%s", @@ -688,9 +724,19 @@ func (this *Migrator) cutOverTrigger() (err error) { return log.Errore(err) } + //TODO retry the operation + if err := this.applier.DropUniqueKeysGhostTable(this.triggerCutoverUniqueKeys); err != nil { + return err + } + + //TODO retry the operation + if err := this.applier.InsertSelectUniqueKeysGhostTable(this.triggerCutoverUniqueKeys); err != nil { + return err + } + //TODO remove - log.Infof("Received trigger event, waiting 30s after swap it") - time.Sleep(30 * time.Second) + //log.Infof("Received trigger event, waiting 30s after swap it") + //time.Sleep(30 * time.Second) if err := this.retryOperation(this.applier.SwapTables); err != nil { return err @@ -1268,11 +1314,15 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { return handleNonDMLEventStruct(eventStruct) } if eventStruct.dmlEvent != nil { - addCutoverKeys := atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) == 1 - if addCutoverKeys { + applyDMLEventState := atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) + if applyDMLEventState == 1 { this.triggerCutoverUniqueKeys = append( this.triggerCutoverUniqueKeys, this.applier.ObtainUniqueKeyValuesOfEvent(eventStruct.dmlEvent)...) + return nil + } + if applyDMLEventState == 2 { + return nil } dmlEvents := [](*binlog.BinlogDMLEvent){} @@ -1294,13 +1344,6 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { break } dmlEvents = append(dmlEvents, additionalStruct.dmlEvent) - - addCutoverKeys = addCutoverKeys && (atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) == 1) - if addCutoverKeys { - this.triggerCutoverUniqueKeys = append( - this.triggerCutoverUniqueKeys, - this.applier.ObtainUniqueKeyValuesOfEvent(additionalStruct.dmlEvent)...) - } } // Create a task to apply the DML event; this will be execute by executeWriteFuncs() var applyEventFunc tableWriteFunc = func() error { diff --git a/go/sql/builder.go b/go/sql/builder.go index 4da8b5874..ec7f1e030 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -518,53 +518,60 @@ func ObtainUniqueKeyValues(tableColumns, uniqueKeyColumns *ColumnList, args []in return uniqueKeyArgs } -func buildColumnsPreparedValuesWhere(databaseName string, columns *ColumnList) []string { +func buildColumnsPreparedValuesWhere(columns *ColumnList) []string { values := make([]string, columns.Len(), columns.Len()) for i, name := range columns.Names() { values[i] = fmt.Sprintf( - "%s.%s = ?", - databaseName, + "%s <=> ?", EscapeName(name)) } return values } -func buildColumnsEqualsWhere(databaseName, originalTableName, ghostTableName string, columns *ColumnList) []string { - values := make([]string, columns.Len(), columns.Len()) - for i, name := range columns.Names() { - name = EscapeName(name) - values[i] = fmt.Sprintf( - "%s.%s.%s <=> %s.%s.%s", - databaseName, - originalTableName, - name, - databaseName, - ghostTableName, - name) +func BuildDeleteQuery(databaseName, ghostTableName string, uniqueKeyColumns *ColumnList, uniqueKeyArgs [][]interface{}) (result string, explodedArgs []interface{}, err error) { + preparedValuesGetKey := buildColumnsPreparedValuesWhere(uniqueKeyColumns) + + template := fmt.Sprintf( + "(%s)", + strings.Join(preparedValuesGetKey, " and ")) + + preparedValuesWhere := make([]string, len(uniqueKeyArgs), len(uniqueKeyArgs)) + for i, uniqueKey := range uniqueKeyArgs { + explodedArgs = append(explodedArgs, uniqueKey...) + preparedValuesWhere[i] = template } - return values + + result = fmt.Sprintf( + "delete /* gh-ost */ from %s.%s where %s", + EscapeName(databaseName), + EscapeName(ghostTableName), + strings.Join(preparedValuesWhere, " or ")) + + return result, explodedArgs, nil } -func BuildDeleteQuery(databaseName, originalTableName, ghostTableName string, uniqueKeyColumns *ColumnList, uniqueKeyArgs [][]interface{}) (result string, explodedArgs []interface{}, err error) { +func BuildInsertSelectQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, uniqueKeyArgs [][]interface{}) (result string, explodedArgs []interface{}, err error) { databaseName = EscapeName(databaseName) originalTableName = EscapeName(originalTableName) ghostTableName = EscapeName(ghostTableName) - preparedValuesGetKey := buildColumnsPreparedValuesWhere( - databaseName, - uniqueKeyColumns) - preparedValuesExistsKey := buildColumnsEqualsWhere( - databaseName, - originalTableName, - ghostTableName, - uniqueKeyColumns) + mappedSharedColumns = duplicateNames(mappedSharedColumns) + for i := range mappedSharedColumns { + mappedSharedColumns[i] = EscapeName(mappedSharedColumns[i]) + } + mappedSharedColumnsListing := strings.Join(mappedSharedColumns, ", ") - template := fmt.Sprintf( - "(%s and !(%s))", - strings.Join(preparedValuesGetKey, " and "), - strings.Join(preparedValuesExistsKey, " and ")) + sharedColumns = duplicateNames(sharedColumns) + for i := range sharedColumns { + sharedColumns[i] = EscapeName(sharedColumns[i]) + } + sharedColumnsListing := strings.Join(sharedColumns, ", ") + + preparedValuesGetKey := buildColumnsPreparedValuesWhere(uniqueKeyColumns) - // WHERE ((gho.PK = ?) AND !(orig.PK <=> gho.PK)) OR (XXX) + template := fmt.Sprintf( + "(%s)", + strings.Join(preparedValuesGetKey, " and ")) preparedValuesWhere := make([]string, len(uniqueKeyArgs), len(uniqueKeyArgs)) for i, uniqueKey := range uniqueKeyArgs { @@ -573,9 +580,11 @@ func BuildDeleteQuery(databaseName, originalTableName, ghostTableName string, un } result = fmt.Sprintf( - "DELETE /* gh-ost */ FROM %s.%s WHERE %s", - databaseName, - ghostTableName, + "insert /* gh-ost %s.%s */ into %s.%s (%s) select %s from %s.%s force index (%s) where %s", + databaseName, originalTableName, + databaseName, ghostTableName, mappedSharedColumnsListing, + sharedColumnsListing, databaseName, originalTableName, + uniqueKey, strings.Join(preparedValuesWhere, " or ")) return result, explodedArgs, nil From 82f8e729f77c9401569a74836760a9dc2cae903a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Fri, 29 Mar 2019 13:15:56 +0100 Subject: [PATCH 05/15] Improved sanitize step --- go/base/context.go | 2 + go/logic/applier.go | 103 +++++++++++++++++++++++++------------------ go/logic/migrator.go | 36 +++++++++------ go/sql/builder.go | 4 +- 4 files changed, 87 insertions(+), 58 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 2df9a20b0..900be4cf2 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -217,6 +217,8 @@ type MigrationContext struct { MigrationIterationRangeMaxValues *sql.ColumnValues ForceTmpTableName string + TriggerCutoverUniqueKeys [][]interface {} + recentBinlogCoordinates mysql.BinlogCoordinates } diff --git a/go/logic/applier.go b/go/logic/applier.go index 1a37d9280..d9192ef81 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -741,50 +741,67 @@ func (this *Applier) ObtainUniqueKeyValuesOfEvent(dmlEvent *binlog.BinlogDMLEven return uniqueKeys } -func (this *Applier) DropUniqueKeysGhostTable(uniqueKeysArgs [][]interface{}) error { - if len(uniqueKeysArgs) == 0 { - return nil - } - - query, args, err := sql.BuildDeleteQuery( - this.migrationContext.DatabaseName, - this.migrationContext.GetGhostTableName(), - &this.migrationContext.UniqueKey.Columns, - uniqueKeysArgs) - - if err != nil { - return err - } - - log.Infof("Removing %d created rows during trigger creation", - len(uniqueKeysArgs)) - _, err = sqlutils.ExecNoPrepare(this.db, query, args...) - return err -} +func (this *Applier) SanitizeRowsDuringCutOver() error { + for len(this.migrationContext.TriggerCutoverUniqueKeys) > 0 { + cutIndex := int64(len(this.migrationContext.TriggerCutoverUniqueKeys)) - this.migrationContext.ChunkSize + if cutIndex < 0 { + cutIndex = 0 + } + + chunkValues := this.migrationContext.TriggerCutoverUniqueKeys[cutIndex:] + deleteQuery, deleteArgs, deleteErr := sql.BuildDeleteQuery( + this.migrationContext.DatabaseName, + this.migrationContext.GetGhostTableName(), + &this.migrationContext.UniqueKey.Columns, + chunkValues) + + if deleteErr != nil { + return deleteErr + } + + insertSelectQuery, insertSelectArgs, insertSelectErr := sql.BuildInsertSelectQuery( + this.migrationContext.DatabaseName, + this.migrationContext.OriginalTableName, + this.migrationContext.GetGhostTableName(), + this.migrationContext.SharedColumns.Names(), + this.migrationContext.MappedSharedColumns.Names(), + this.migrationContext.UniqueKey.Name, + &this.migrationContext.UniqueKey.Columns, + chunkValues) + + if insertSelectErr != nil { + return insertSelectErr + } + + log.Infof("Sanitizing chunk of %d created rows during trigger creation", + len(chunkValues)) -func (this *Applier) InsertSelectUniqueKeysGhostTable(uniqueKeysArgs [][]interface{}) error { - if len(uniqueKeysArgs) == 0 { - return nil - } - - query, args, err := sql.BuildInsertSelectQuery( - this.migrationContext.DatabaseName, - this.migrationContext.OriginalTableName, - this.migrationContext.GetGhostTableName(), - this.migrationContext.SharedColumns.Names(), - this.migrationContext.MappedSharedColumns.Names(), - this.migrationContext.UniqueKey.Name, - &this.migrationContext.UniqueKey.Columns, - uniqueKeysArgs) - - if err != nil { - return err - } - - log.Infof("Inserting %d created rows during trigger creation", - len(uniqueKeysArgs)) - _, err = sqlutils.ExecNoPrepare(this.db, query, args...) - return err + err := func() error { + tx, err := this.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(deleteQuery, deleteArgs...); err != nil { + return err + } + if _, err := tx.Exec(insertSelectQuery, insertSelectArgs...); err != nil { + return err + } + if err := tx.Commit(); err != nil { + return err + } + return nil + }() + + if err != nil { + return err + } + + this.migrationContext.TriggerCutoverUniqueKeys = this.migrationContext.TriggerCutoverUniqueKeys[:cutIndex] + } + + return nil } // LockOriginalTable places a write lock on the original table diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 4605d27d2..74a4cbd10 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -724,19 +724,29 @@ func (this *Migrator) cutOverTrigger() (err error) { return log.Errore(err) } - //TODO retry the operation - if err := this.applier.DropUniqueKeysGhostTable(this.triggerCutoverUniqueKeys); err != nil { - return err - } - - //TODO retry the operation - if err := this.applier.InsertSelectUniqueKeysGhostTable(this.triggerCutoverUniqueKeys); err != nil { - return err - } - - //TODO remove - //log.Infof("Received trigger event, waiting 30s after swap it") - //time.Sleep(30 * time.Second) + this.migrationContext.TriggerCutoverUniqueKeys = nil + +CompareNewRow: + for _, rowToAdd := range this.triggerCutoverUniqueKeys { +CompareAddedRow: + for _, rowAdded := range this.migrationContext.TriggerCutoverUniqueKeys { + for i := range rowAdded { + if rowToAdd[i] != rowAdded[i] { + continue CompareAddedRow + } + } + + continue CompareNewRow + } + + this.migrationContext.TriggerCutoverUniqueKeys = append( + this.migrationContext.TriggerCutoverUniqueKeys, + rowToAdd) + } + + if err := this.retryOperation(this.applier.SanitizeRowsDuringCutOver); err != nil { + return err + } if err := this.retryOperation(this.applier.SwapTables); err != nil { return err diff --git a/go/sql/builder.go b/go/sql/builder.go index ec7f1e030..b057d4607 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -542,7 +542,7 @@ func BuildDeleteQuery(databaseName, ghostTableName string, uniqueKeyColumns *Col } result = fmt.Sprintf( - "delete /* gh-ost */ from %s.%s where %s", + "delete /* gh-ost */ ignore from %s.%s where %s", EscapeName(databaseName), EscapeName(ghostTableName), strings.Join(preparedValuesWhere, " or ")) @@ -580,7 +580,7 @@ func BuildInsertSelectQuery(databaseName, originalTableName, ghostTableName stri } result = fmt.Sprintf( - "insert /* gh-ost %s.%s */ into %s.%s (%s) select %s from %s.%s force index (%s) where %s", + "insert /* gh-ost %s.%s */ ignore into %s.%s (%s) select %s from %s.%s force index (%s) where %s", databaseName, originalTableName, databaseName, ghostTableName, mappedSharedColumnsListing, sharedColumnsListing, databaseName, originalTableName, From 7481c296f77692809021116d487fe038c173a8ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Fri, 29 Mar 2019 13:16:06 +0100 Subject: [PATCH 06/15] Added doc --- doc/cut-over.md | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/doc/cut-over.md b/doc/cut-over.md index ada5100c6..ddaa99245 100644 --- a/doc/cut-over.md +++ b/doc/cut-over.md @@ -6,6 +6,18 @@ MySQL poses some limitations on how the table swap can take place. While it supp The [facebook OSC](https://www.facebook.com/notes/mysql-at-facebook/online-schema-change-for-mysql/430801045932/) tool documents this nicely. Look for **"Cut-over phase"**. The Facebook solution uses a non-atomic swap: the original table is first renamed and pushed aside, then the ghost table is renamed to take its place. In between the two renames there's a brief period of time where your table just does not exist, and queries will fail. +Another option to support a atomic swap without the use of a lock is the use of triggers only in the cut-over phase, +gh-ost philosophy is avoid the use of triggers but in some environments like a Galera cluster isn't possible use the +lock command, and like the cut-over should take a little time it shouldn't be a problem. This method work in the next +way: + +- A stop writes event is injected in the binlog and gh-ost disable the writes once it receive it. +- The triggers are created to handle the modifications in the MySQL side. +- A created triggers event is injected in the binlog and gh-ost wait until receive it. +- Since while the first event and the second one the affected rows will be in a inconsistent state, the unique key + values of this events are tracked. +- The rows tracked are deleted and inserted back from the original table, so like we have already the triggers is assured his consistence. + `gh-ost` solves this by using an atomic, two-step blocking swap: while one connection holds the lock, another attempts the atomic `RENAME`. The `RENAME` is guaranteed to not be executed prematurely by positioning a sentry table which blocks the `RENAME` operation until `gh-ost` is satisfied all is in order. This solution either: @@ -18,4 +30,4 @@ Also note: Internals of the atomic cut-over are discussed in [Issue #82](https://github.com/github/gh-ost/issues/82). -At this time the command-line argument `--cut-over` is supported, and defaults to the atomic cut-over algorithm described above. Also supported is `--cut-over=two-step`, which uses the FB non-atomic algorithm. We recommend using the default cut-over that has been battle tested in our production environments. +At this time the command-line argument `--cut-over` is supported, and defaults to the atomic cut-over algorithm described above. Also supported is `--cut-over=two-step`, which uses the FB non-atomic algorithm and the `--cut-over=trigger`, which use the trigger algorithm. We recommend using the default cut-over that has been battle tested in our production environments. From 97c09aea0e8f78852530e117df2f6a6c2426b83c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Fri, 29 Mar 2019 15:27:53 +0100 Subject: [PATCH 07/15] Added correction fixes --- doc/cut-over.md | 7 ++----- go/logic/applier.go | 25 ++----------------------- go/logic/migrator.go | 6 +++--- 3 files changed, 7 insertions(+), 31 deletions(-) diff --git a/doc/cut-over.md b/doc/cut-over.md index ddaa99245..bb0060873 100644 --- a/doc/cut-over.md +++ b/doc/cut-over.md @@ -8,15 +8,12 @@ The [facebook OSC](https://www.facebook.com/notes/mysql-at-facebook/online-schem Another option to support a atomic swap without the use of a lock is the use of triggers only in the cut-over phase, gh-ost philosophy is avoid the use of triggers but in some environments like a Galera cluster isn't possible use the -lock command, and like the cut-over should take a little time it shouldn't be a problem. This method work in the next -way: +lock command, and like the cut-over should take a little time it shouldn't be a problem. Triggers cut-over works like the following: - A stop writes event is injected in the binlog and gh-ost disable the writes once it receive it. - The triggers are created to handle the modifications in the MySQL side. - A created triggers event is injected in the binlog and gh-ost wait until receive it. -- Since while the first event and the second one the affected rows will be in a inconsistent state, the unique key - values of this events are tracked. -- The rows tracked are deleted and inserted back from the original table, so like we have already the triggers is assured his consistence. +- The affected rows will be in an inconsistent stata during the time between the first and the second event. For this reason, this events are checked and, the values of the fields that are part of the unique key used to do the online alter are saved to sanitize that rows. `gh-ost` solves this by using an atomic, two-step blocking swap: while one connection holds the lock, another attempts the atomic `RENAME`. The `RENAME` is guaranteed to not be executed prematurely by positioning a sentry table which blocks the `RENAME` operation until `gh-ost` is satisfied all is in order. diff --git a/go/logic/applier.go b/go/logic/applier.go index d9192ef81..8322e698e 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -540,14 +540,10 @@ func (this *Applier) makeTriggerPrefix() string { return prefix } -// Create triggers from the original table to the new one +// CreateTriggersOriginalTable Create triggers from the original table to the ghost one func (this *Applier) CreateTriggersOriginalTable() error { prefix := this.makeTriggerPrefix() - ////////////////////// - // Delete trigger // - ////////////////////// - delIndexComparations := []string{} for _, name := range this.migrationContext.UniqueKey.Columns.Names() { columnQuoted := sql.EscapeName(name) @@ -571,10 +567,6 @@ func (this *Applier) CreateTriggersOriginalTable() error { sql.EscapeName(this.migrationContext.GetGhostTableName()), strings.Join(delIndexComparations, " AND ")) - ////////////////////// - // Insert trigger // - ////////////////////// - insertCols := []string{} insertValues := []string{} for i, origColumn := range this.migrationContext.SharedColumns.Names() { @@ -598,10 +590,6 @@ func (this *Applier) CreateTriggersOriginalTable() error { strings.Join(insertCols, ", "), strings.Join(insertValues, ", ")) - ////////////////////// - // Update trigger // - ////////////////////// - updIndexComparations := []string{} for _, name := range this.migrationContext.UniqueKey.Columns.Names() { columnQuoted := sql.EscapeName(name) @@ -631,10 +619,6 @@ func (this *Applier) CreateTriggersOriginalTable() error { strings.Join(insertCols, ", "), strings.Join(insertValues, ", ")) - /////////////////////////// - // Create the triggers // - /////////////////////////// - return func() error { tx, err := this.db.Begin() if err != nil { @@ -658,7 +642,7 @@ func (this *Applier) CreateTriggersOriginalTable() error { }() } -// Drop triggers from the old table +// DropTriggersOldTable Drop triggers from the old table func (this *Applier) DropTriggersOldTable() error { prefix := this.makeTriggerPrefix() @@ -677,10 +661,6 @@ func (this *Applier) DropTriggersOldTable() error { sql.EscapeName(this.migrationContext.DatabaseName), prefix) - ///////////////////////// - // Drop the triggers // - ///////////////////////// - return func() error { tx, err := this.db.Begin() if err != nil { @@ -704,7 +684,6 @@ func (this *Applier) DropTriggersOldTable() error { }() } -//TODO func (this *Applier) ObtainUniqueKeyValuesOfEvent(dmlEvent *binlog.BinlogDMLEvent) (uniqueKeys [][]interface{}) { switch dmlEvent.DML { case binlog.DeleteDML: diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 74a4cbd10..4c4fd0a87 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -567,7 +567,7 @@ func (this *Migrator) cutOver() (err error) { return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) } -// Inject the "StopWriteEvents" state hint +// injectStopWriteEvents Inject the "StopWriteEvents" state hint func (this *Migrator) injectStopWriteEvents() (err error) { this.migrationContext.MarkPointOfInterest() injectStopWriteEventsStartTime := time.Now() @@ -587,8 +587,8 @@ func (this *Migrator) injectStopWriteEvents() (err error) { return nil } -// Inject the "AllEventsUpToTriggersProcessed" state hint, wait for it to appear in the binary logs, -// make sure the queue is drained. +// waitForEventsUpToTriggers Inject the "AllEventsUpToTriggersProcessed" state hint, +// wait for it to appear in the binary logs, make sure the queue is drained. func (this *Migrator) waitForEventsUpToTriggers() (err error) { timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds)) From d4e0d3960ad4ee2382dd4f9f0c3ecefa1f727a7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Mon, 1 Apr 2019 13:13:29 +0200 Subject: [PATCH 08/15] Added doc and fixed queries --- go/logic/migrator.go | 12 ++++++++++-- go/sql/builder.go | 4 ++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 4c4fd0a87..4ceaca6bd 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -249,7 +249,11 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1) return nil } - //TODO Explain + // at this point we know that the triggers will be created and we don't want write the + // next events from the streamer, because the streamer works sequentially. So those + // events are either already handled, or have event functions in applyEventsQueue. + // So as not to create a potential deadlock, we write this func to applyEventsQueue + // asynchronously, understanding it doesn't really matter. go func() { this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) }() @@ -261,7 +265,11 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er this.allEventsUpToTriggersProcessed <- changelogStateString return nil } - //TODO Explain + // at this point we know that the triggers are created and we want to sanitize the inconsistent + // rows between the stop and the triggers event, because the streamer works sequentially. + // So those events are either already handled, or have event functions in applyEventsQueue. + // So as not to create a potential deadlock, we write this func to applyEventsQueue + // asynchronously, understanding it doesn't really matter. go func() { this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) }() diff --git a/go/sql/builder.go b/go/sql/builder.go index b057d4607..ec7f1e030 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -542,7 +542,7 @@ func BuildDeleteQuery(databaseName, ghostTableName string, uniqueKeyColumns *Col } result = fmt.Sprintf( - "delete /* gh-ost */ ignore from %s.%s where %s", + "delete /* gh-ost */ from %s.%s where %s", EscapeName(databaseName), EscapeName(ghostTableName), strings.Join(preparedValuesWhere, " or ")) @@ -580,7 +580,7 @@ func BuildInsertSelectQuery(databaseName, originalTableName, ghostTableName stri } result = fmt.Sprintf( - "insert /* gh-ost %s.%s */ ignore into %s.%s (%s) select %s from %s.%s force index (%s) where %s", + "insert /* gh-ost %s.%s */ into %s.%s (%s) select %s from %s.%s force index (%s) where %s", databaseName, originalTableName, databaseName, ghostTableName, mappedSharedColumnsListing, sharedColumnsListing, databaseName, originalTableName, From d9f172fdaa97e6b89acba8d283910f6225ae8472 Mon Sep 17 00:00:00 2001 From: Juan Manuel Fernandez Date: Sat, 18 May 2019 00:43:57 +0200 Subject: [PATCH 09/15] Fixed cut-over logic and changed doc --- doc/cut-over.md | 4 +-- go/base/context.go | 1 + go/logic/applier.go | 2 ++ go/logic/migrator.go | 58 ++++++++++++++------------------------------ 4 files changed, 23 insertions(+), 42 deletions(-) diff --git a/doc/cut-over.md b/doc/cut-over.md index bb0060873..9395997d6 100644 --- a/doc/cut-over.md +++ b/doc/cut-over.md @@ -10,10 +10,10 @@ Another option to support a atomic swap without the use of a lock is the use of gh-ost philosophy is avoid the use of triggers but in some environments like a Galera cluster isn't possible use the lock command, and like the cut-over should take a little time it shouldn't be a problem. Triggers cut-over works like the following: -- A stop writes event is injected in the binlog and gh-ost disable the writes once it receive it. +- gh-ost disable the writes but it keep reading the binlog to keep a record of the changed rows. - The triggers are created to handle the modifications in the MySQL side. - A created triggers event is injected in the binlog and gh-ost wait until receive it. -- The affected rows will be in an inconsistent stata during the time between the first and the second event. For this reason, this events are checked and, the values of the fields that are part of the unique key used to do the online alter are saved to sanitize that rows. +- The affected rows during the change of writer could be in an inconsistent state, for this reason is necessary sanitize the modified rows removing them if exists and adding them if exists from the original table to the ghost one. `gh-ost` solves this by using an atomic, two-step blocking swap: while one connection holds the lock, another attempts the atomic `RENAME`. The `RENAME` is guaranteed to not be executed prematurely by positioning a sentry table which blocks the `RENAME` operation until `gh-ost` is satisfied all is in order. diff --git a/go/base/context.go b/go/base/context.go index 900be4cf2..3f99b9080 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -170,6 +170,7 @@ type MigrationContext struct { StartTime time.Time RowCopyStartTime time.Time RowCopyEndTime time.Time + CreateTriggersStartTime time.Time LockTablesStartTime time.Time RenameTablesStartTime time.Time RenameTablesEndTime time.Time diff --git a/go/logic/applier.go b/go/logic/applier.go index 8322e698e..0f3ddcb85 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -620,6 +620,8 @@ func (this *Applier) CreateTriggersOriginalTable() error { strings.Join(insertValues, ", ")) return func() error { + this.migrationContext.CreateTriggersStartTime = time.Now() + tx, err := this.db.Begin() if err != nil { return err diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 4ceaca6bd..ba99918a9 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -27,7 +27,6 @@ type ChangelogState string const ( GhostTableMigrated ChangelogState = "GhostTableMigrated" AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" - StopWriteEvents = "StopWriteEvents" AllEventsUpToTriggersProcessed = "AllEventsUpToTriggersProcessed" ) @@ -77,6 +76,7 @@ type Migrator struct { ghostTableMigrated chan bool rowCopyComplete chan error allEventsUpToLockProcessed chan string + stoppedBinlogWrites chan bool allEventsUpToTriggersProcessed chan string triggerCutoverUniqueKeys [][]interface {} @@ -243,30 +243,15 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) }() } - case StopWriteEvents: - { - var applyEventFunc tableWriteFunc = func() error { - atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1) - return nil - } - // at this point we know that the triggers will be created and we don't want write the - // next events from the streamer, because the streamer works sequentially. So those - // events are either already handled, or have event functions in applyEventsQueue. - // So as not to create a potential deadlock, we write this func to applyEventsQueue - // asynchronously, understanding it doesn't really matter. - go func() { - this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) - }() - } case AllEventsUpToTriggersProcessed: { var applyEventFunc tableWriteFunc = func() error { - atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 2) + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 3) this.allEventsUpToTriggersProcessed <- changelogStateString return nil } // at this point we know that the triggers are created and we want to sanitize the inconsistent - // rows between the stop and the triggers event, because the streamer works sequentially. + // rows between the stop writes and the triggers event, because the streamer works sequentially. // So those events are either already handled, or have event functions in applyEventsQueue. // So as not to create a potential deadlock, we write this func to applyEventsQueue // asynchronously, understanding it doesn't really matter. @@ -575,22 +560,13 @@ func (this *Migrator) cutOver() (err error) { return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) } -// injectStopWriteEvents Inject the "StopWriteEvents" state hint -func (this *Migrator) injectStopWriteEvents() (err error) { - this.migrationContext.MarkPointOfInterest() - injectStopWriteEventsStartTime := time.Now() - - injectStopWriteEventsChallenge := fmt.Sprintf( - "%s:%d", - string(StopWriteEvents), - injectStopWriteEventsStartTime.UnixNano()) - - log.Infof("Writing changelog state: %+v", injectStopWriteEventsChallenge) - if _, err := this.applier.WriteChangelogState(injectStopWriteEventsChallenge); err != nil { - return err - } +// waitForStopWrites Disable the writes from the binlog and keep a record of the affected rows +func (this *Migrator) waitForStopWrites() (err error) { + log.Infof("Stopping the writes") + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1) + <-this.stoppedBinlogWrites - this.printStatus(ForcePrintStatusAndHintRule) + log.Infof("Done waiting to stop the writes") return nil } @@ -613,7 +589,6 @@ func (this *Migrator) waitForEventsUpToTriggers() (err error) { return err } log.Infof("Waiting for events up to triggers") - //atomic.StoreInt64(&this.migrationContext.AllEventsUpToTriggersProcessedInjectedFlag, 1) for found := false; !found; { select { case <-timeout.C: @@ -715,9 +690,7 @@ func (this *Migrator) cutOverTrigger() (err error) { atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) - if err := this.injectStopWriteEvents(); err != nil { - return err - } + this.waitForStopWrites() log.Infof( "Creating triggers for %s.%s", @@ -763,9 +736,9 @@ CompareAddedRow: return err } - lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime) + createTriggersAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.CreateTriggersStartTime) renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime) - log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) + log.Debugf("Create triggers & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", createTriggersAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) return nil } @@ -1334,12 +1307,17 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { if eventStruct.dmlEvent != nil { applyDMLEventState := atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) if applyDMLEventState == 1 { + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 2) + applyDMLEventState = 2 + this.stoppedBinlogWrites <- true + } + if applyDMLEventState == 2 { this.triggerCutoverUniqueKeys = append( this.triggerCutoverUniqueKeys, this.applier.ObtainUniqueKeyValuesOfEvent(eventStruct.dmlEvent)...) return nil } - if applyDMLEventState == 2 { + if applyDMLEventState == 3 { return nil } From 2961cadaa850301ec747c78fed8652943c4e4e6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Mon, 20 May 2019 17:11:18 +0200 Subject: [PATCH 10/15] Added logic to remove the triggers in case of error and created channel --- go/logic/applier.go | 6 +++--- go/logic/migrator.go | 3 +++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index 0f3ddcb85..6c2a6b8e4 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -649,17 +649,17 @@ func (this *Applier) DropTriggersOldTable() error { prefix := this.makeTriggerPrefix() dropDeleteTrigger := fmt.Sprintf( - "DROP /* gh-ost */ TRIGGER %s.`%s_del`", + "DROP /* gh-ost */ TRIGGER IF EXISTS %s.`%s_del`", sql.EscapeName(this.migrationContext.DatabaseName), prefix) dropInsertTrigger := fmt.Sprintf( - "DROP /* gh-ost */ TRIGGER %s.`%s_ins`", + "DROP /* gh-ost */ TRIGGER IF EXISTS %s.`%s_ins`", sql.EscapeName(this.migrationContext.DatabaseName), prefix) dropUpdateTrigger := fmt.Sprintf( - "DROP /* gh-ost */ TRIGGER %s.`%s_upd`", + "DROP /* gh-ost */ TRIGGER IF EXISTS %s.`%s_upd`", sql.EscapeName(this.migrationContext.DatabaseName), prefix) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index ba99918a9..a1c034062 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -100,6 +100,7 @@ func NewMigrator(context *base.MigrationContext) *Migrator { firstThrottlingCollected: make(chan bool, 3), rowCopyComplete: make(chan error), allEventsUpToLockProcessed: make(chan string), + stoppedBinlogWrites: make(chan bool), allEventsUpToTriggersProcessed: make(chan string), copyRowsQueue: make(chan tableWriteFunc), @@ -692,6 +693,8 @@ func (this *Migrator) cutOverTrigger() (err error) { this.waitForStopWrites() + defer this.applier.DropTriggersOldTable() + log.Infof( "Creating triggers for %s.%s", this.migrationContext.DatabaseName, From f5b7fb019c44dd9526c942e8b29eab4650c6cafd Mon Sep 17 00:00:00 2001 From: Juan Manuel Fernandez Date: Mon, 20 May 2019 19:34:44 +0200 Subject: [PATCH 11/15] Explained better method logic --- go/logic/applier.go | 4 ++-- go/logic/migrator.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index 6c2a6b8e4..fcdc67b8c 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -644,8 +644,8 @@ func (this *Applier) CreateTriggersOriginalTable() error { }() } -// DropTriggersOldTable Drop triggers from the old table -func (this *Applier) DropTriggersOldTable() error { +// DropTriggersOldTableIfExists Drop triggers from the old table if them exists +func (this *Applier) DropTriggersOldTableIfExists() error { prefix := this.makeTriggerPrefix() dropDeleteTrigger := fmt.Sprintf( diff --git a/go/logic/migrator.go b/go/logic/migrator.go index a1c034062..4d239eb2b 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -693,7 +693,7 @@ func (this *Migrator) cutOverTrigger() (err error) { this.waitForStopWrites() - defer this.applier.DropTriggersOldTable() + defer this.applier.DropTriggersOldTableIfExists() log.Infof( "Creating triggers for %s.%s", @@ -735,7 +735,7 @@ CompareAddedRow: if err := this.retryOperation(this.applier.SwapTables); err != nil { return err } - if err := this.retryOperation(this.applier.DropTriggersOldTable); err != nil { + if err := this.retryOperation(this.applier.DropTriggersOldTableIfExists); err != nil { return err } From 884ba90a17a3d2a64d3e5012b9c84045ce2deae5 Mon Sep 17 00:00:00 2001 From: Juan Manuel Fernandez Date: Mon, 20 May 2019 20:05:31 +0200 Subject: [PATCH 12/15] Fixed wait forever if not updates take place in the table during the cut-over --- doc/cut-over.md | 4 +-- go/logic/migrator.go | 78 +++++++++++++++++++++++++++++++++++--------- 2 files changed, 64 insertions(+), 18 deletions(-) diff --git a/doc/cut-over.md b/doc/cut-over.md index 9395997d6..bb0060873 100644 --- a/doc/cut-over.md +++ b/doc/cut-over.md @@ -10,10 +10,10 @@ Another option to support a atomic swap without the use of a lock is the use of gh-ost philosophy is avoid the use of triggers but in some environments like a Galera cluster isn't possible use the lock command, and like the cut-over should take a little time it shouldn't be a problem. Triggers cut-over works like the following: -- gh-ost disable the writes but it keep reading the binlog to keep a record of the changed rows. +- A stop writes event is injected in the binlog and gh-ost disable the writes once it receive it. - The triggers are created to handle the modifications in the MySQL side. - A created triggers event is injected in the binlog and gh-ost wait until receive it. -- The affected rows during the change of writer could be in an inconsistent state, for this reason is necessary sanitize the modified rows removing them if exists and adding them if exists from the original table to the ghost one. +- The affected rows will be in an inconsistent stata during the time between the first and the second event. For this reason, this events are checked and, the values of the fields that are part of the unique key used to do the online alter are saved to sanitize that rows. `gh-ost` solves this by using an atomic, two-step blocking swap: while one connection holds the lock, another attempts the atomic `RENAME`. The `RENAME` is guaranteed to not be executed prematurely by positioning a sentry table which blocks the `RENAME` operation until `gh-ost` is satisfied all is in order. diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 4d239eb2b..301d3e365 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -27,6 +27,7 @@ type ChangelogState string const ( GhostTableMigrated ChangelogState = "GhostTableMigrated" AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" + StopWriteEvents = "StopWriteEvents" AllEventsUpToTriggersProcessed = "AllEventsUpToTriggersProcessed" ) @@ -76,7 +77,7 @@ type Migrator struct { ghostTableMigrated chan bool rowCopyComplete chan error allEventsUpToLockProcessed chan string - stoppedBinlogWrites chan bool + stoppedWriteEvents chan string allEventsUpToTriggersProcessed chan string triggerCutoverUniqueKeys [][]interface {} @@ -100,7 +101,7 @@ func NewMigrator(context *base.MigrationContext) *Migrator { firstThrottlingCollected: make(chan bool, 3), rowCopyComplete: make(chan error), allEventsUpToLockProcessed: make(chan string), - stoppedBinlogWrites: make(chan bool), + stoppedWriteEvents: make(chan string), allEventsUpToTriggersProcessed: make(chan string), copyRowsQueue: make(chan tableWriteFunc), @@ -244,10 +245,26 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) }() } + case StopWriteEvents: + { + var applyEventFunc tableWriteFunc = func() error { + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1) + this.stoppedWriteEvents <- changelogStateString + return nil + } + // at this point we know that the triggers will be created and we don't want write the + // next events from the streamer, because the streamer works sequentially. So those + // events are either already handled, or have event functions in applyEventsQueue. + // So as not to create a potential deadlock, we write this func to applyEventsQueue + // asynchronously, understanding it doesn't really matter. + go func() { + this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) + }() + } case AllEventsUpToTriggersProcessed: { var applyEventFunc tableWriteFunc = func() error { - atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 3) + atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 2) this.allEventsUpToTriggersProcessed <- changelogStateString return nil } @@ -561,13 +578,45 @@ func (this *Migrator) cutOver() (err error) { return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) } -// waitForStopWrites Disable the writes from the binlog and keep a record of the affected rows -func (this *Migrator) waitForStopWrites() (err error) { - log.Infof("Stopping the writes") - atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1) - <-this.stoppedBinlogWrites +// waitForStopWriteEvents Inject the "StopWriteEvents" state hint, +// wait for it to appear in the binary logs, make sure the queue is drained. +func (this *Migrator) waitForStopWriteEvents() (err error) { + timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds)) - log.Infof("Done waiting to stop the writes") + this.migrationContext.MarkPointOfInterest() + stopWriteEventsStartTime := time.Now() + + stopWriteEventsChallenge := fmt.Sprintf( + "%s:%d", + string(StopWriteEvents), + stopWriteEventsStartTime.UnixNano()) + + log.Infof("Writing changelog state: %+v", stopWriteEventsChallenge) + if _, err := this.applier.WriteChangelogState(stopWriteEventsChallenge); err != nil { + return err + } + log.Infof("Waiting for stop writes") + for found := false; !found; { + select { + case <-timeout.C: + { + return log.Errorf("Timeout while waiting for stop writes") + } + case state := <-this.stoppedWriteEvents: + { + if state == stopWriteEventsChallenge { + log.Infof("Waiting for stop writes: got %s", state) + found = true + } else { + log.Infof("Waiting for stop writes: skipping %s", state) + } + } + } + } + stopWriteEventsDuration := time.Since(stopWriteEventsStartTime) + + log.Infof("Done waiting for stop writes; duration=%+v", stopWriteEventsDuration) + this.printStatus(ForcePrintStatusAndHintRule) return nil } @@ -691,7 +740,9 @@ func (this *Migrator) cutOverTrigger() (err error) { atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) - this.waitForStopWrites() + if err := this.waitForStopWriteEvents(); err != nil { + return err + } defer this.applier.DropTriggersOldTableIfExists() @@ -1310,17 +1361,12 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { if eventStruct.dmlEvent != nil { applyDMLEventState := atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState) if applyDMLEventState == 1 { - atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 2) - applyDMLEventState = 2 - this.stoppedBinlogWrites <- true - } - if applyDMLEventState == 2 { this.triggerCutoverUniqueKeys = append( this.triggerCutoverUniqueKeys, this.applier.ObtainUniqueKeyValuesOfEvent(eventStruct.dmlEvent)...) return nil } - if applyDMLEventState == 3 { + if applyDMLEventState == 2 { return nil } From 2a76b6770a9f5661d2df0592a8c65cebd5d84af7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Tue, 10 Sep 2019 10:47:37 +0200 Subject: [PATCH 13/15] Don't use singleton connection for the swap --- go/logic/applier.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index fcdc67b8c..4e025483a 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -830,7 +830,7 @@ func (this *Applier) SwapTables() error { ) log.Infof("Swaping original and new table: %s", query) this.migrationContext.RenameTablesStartTime = time.Now() - if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil { + if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { return err } this.migrationContext.RenameTablesEndTime = time.Now() From 605a3b267e4ff209c7f76c7bd095a4e983f3c34e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Tue, 10 Sep 2019 15:37:06 +0200 Subject: [PATCH 14/15] Runned automatic formater --- go/base/context.go | 2 +- go/logic/applier.go | 29 ++++++++++++++--------------- go/logic/migrator.go | 14 +++++++------- 3 files changed, 22 insertions(+), 23 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 3f99b9080..7347f647b 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -218,7 +218,7 @@ type MigrationContext struct { MigrationIterationRangeMaxValues *sql.ColumnValues ForceTmpTableName string - TriggerCutoverUniqueKeys [][]interface {} + TriggerCutoverUniqueKeys [][]interface{} recentBinlogCoordinates mysql.BinlogCoordinates } diff --git a/go/logic/applier.go b/go/logic/applier.go index 4e025483a..53864a609 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -8,10 +8,10 @@ package logic import ( gosql "database/sql" "fmt" - "sync/atomic" - "time" "regexp" "strings" + "sync/atomic" + "time" "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/binlog" @@ -557,9 +557,9 @@ func (this *Applier) CreateTriggersOriginalTable() error { } deleteTrigger := fmt.Sprintf( - "CREATE /* gh-ost */ TRIGGER `%s_del` AFTER DELETE ON %s.%s " + - "FOR EACH ROW " + - "DELETE /* gh-ost */ IGNORE FROM %s.%s WHERE %s", + "CREATE /* gh-ost */ TRIGGER `%s_del` AFTER DELETE ON %s.%s "+ + "FOR EACH ROW "+ + "DELETE /* gh-ost */ IGNORE FROM %s.%s WHERE %s", prefix, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName), @@ -577,11 +577,10 @@ func (this *Applier) CreateTriggersOriginalTable() error { insertValues = append(insertValues, value) } - insertTrigger := fmt.Sprintf( - "CREATE /* gh-ost */ TRIGGER `%s_ins` AFTER INSERT ON %s.%s " + - "FOR EACH ROW " + - "REPLACE /* gh-ost */ INTO %s.%s (%s) VALUES (%s)", + "CREATE /* gh-ost */ TRIGGER `%s_ins` AFTER INSERT ON %s.%s "+ + "FOR EACH ROW "+ + "REPLACE /* gh-ost */ INTO %s.%s (%s) VALUES (%s)", prefix, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName), @@ -601,12 +600,12 @@ func (this *Applier) CreateTriggersOriginalTable() error { } updateTrigger := fmt.Sprintf( - "CREATE /* gh-ost */ TRIGGER `%s_upd` AFTER UPDATE ON %s.%s " + - "FOR EACH ROW " + - "BEGIN /* gh-ost */ " + - "DELETE IGNORE FROM %s.%s WHERE !(%s) AND %s; " + - "REPLACE INTO %s.%s (%s) VALUES (%s); " + - "END", + "CREATE /* gh-ost */ TRIGGER `%s_upd` AFTER UPDATE ON %s.%s "+ + "FOR EACH ROW "+ + "BEGIN /* gh-ost */ "+ + "DELETE IGNORE FROM %s.%s WHERE !(%s) AND %s; "+ + "REPLACE INTO %s.%s (%s) VALUES (%s); "+ + "END", prefix, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName), diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 301d3e365..5b21de836 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -25,10 +25,10 @@ import ( type ChangelogState string const ( - GhostTableMigrated ChangelogState = "GhostTableMigrated" - AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" - StopWriteEvents = "StopWriteEvents" - AllEventsUpToTriggersProcessed = "AllEventsUpToTriggersProcessed" + GhostTableMigrated ChangelogState = "GhostTableMigrated" + AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" + StopWriteEvents = "StopWriteEvents" + AllEventsUpToTriggersProcessed = "AllEventsUpToTriggersProcessed" ) func ReadChangelogState(s string) ChangelogState { @@ -80,7 +80,7 @@ type Migrator struct { stoppedWriteEvents chan string allEventsUpToTriggersProcessed chan string - triggerCutoverUniqueKeys [][]interface {} + triggerCutoverUniqueKeys [][]interface{} rowCopyCompleteFlag int64 // copyRowsQueue should not be buffered; if buffered some non-damaging but @@ -749,7 +749,7 @@ func (this *Migrator) cutOverTrigger() (err error) { log.Infof( "Creating triggers for %s.%s", this.migrationContext.DatabaseName, - this.migrationContext.OriginalTableName); + this.migrationContext.OriginalTableName) if err := this.retryOperation(this.applier.CreateTriggersOriginalTable); err != nil { return err @@ -763,7 +763,7 @@ func (this *Migrator) cutOverTrigger() (err error) { CompareNewRow: for _, rowToAdd := range this.triggerCutoverUniqueKeys { -CompareAddedRow: + CompareAddedRow: for _, rowAdded := range this.migrationContext.TriggerCutoverUniqueKeys { for i := range rowAdded { if rowToAdd[i] != rowAdded[i] { From 9167ccf93ae3a3f395d8dad7072a2275739b4615 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Manuel=20Fern=C3=A1ndez=20Garc=C3=ADa-Minguill?= =?UTF-8?q?=C3=A1n?= Date: Mon, 27 Jan 2020 10:42:14 +0100 Subject: [PATCH 15/15] Improve sanitize message --- go/logic/applier.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index 53864a609..88948f3eb 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -753,8 +753,9 @@ func (this *Applier) SanitizeRowsDuringCutOver() error { return insertSelectErr } - log.Infof("Sanitizing chunk of %d created rows during trigger creation", - len(chunkValues)) + log.Infof("Sanitizing chunk of created rows during trigger creation (%d/%d)", + int64(len(this.migrationContext.TriggerCutoverUniqueKeys))-cutIndex, + len(this.migrationContext.TriggerCutoverUniqueKeys)) err := func() error { tx, err := this.db.Begin()