Skip to content
Open
11 changes: 10 additions & 1 deletion doc/cut-over.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ MySQL poses some limitations on how the table swap can take place. While it supp

The [facebook OSC](https://www.facebook.com/notes/mysql-at-facebook/online-schema-change-for-mysql/430801045932/) tool documents this nicely. Look for **"Cut-over phase"**. The Facebook solution uses a non-atomic swap: the original table is first renamed and pushed aside, then the ghost table is renamed to take its place. In between the two renames there's a brief period of time where your table just does not exist, and queries will fail.

Another option to support a atomic swap without the use of a lock is the use of triggers only in the cut-over phase,
gh-ost philosophy is avoid the use of triggers but in some environments like a Galera cluster isn't possible use the
lock command, and like the cut-over should take a little time it shouldn't be a problem. Triggers cut-over works like the following:

- A stop writes event is injected in the binlog and gh-ost disable the writes once it receive it.
- The triggers are created to handle the modifications in the MySQL side.
- A created triggers event is injected in the binlog and gh-ost wait until receive it.
- The affected rows will be in an inconsistent stata during the time between the first and the second event. For this reason, this events are checked and, the values of the fields that are part of the unique key used to do the online alter are saved to sanitize that rows.

`gh-ost` solves this by using an atomic, two-step blocking swap: while one connection holds the lock, another attempts the atomic `RENAME`. The `RENAME` is guaranteed to not be executed prematurely by positioning a sentry table which blocks the `RENAME` operation until `gh-ost` is satisfied all is in order.

This solution either:
Expand All @@ -18,4 +27,4 @@ Also note:

Internals of the atomic cut-over are discussed in [Issue #82](https://github.com/github/gh-ost/issues/82).

At this time the command-line argument `--cut-over` is supported, and defaults to the atomic cut-over algorithm described above. Also supported is `--cut-over=two-step`, which uses the FB non-atomic algorithm. We recommend using the default cut-over that has been battle tested in our production environments.
At this time the command-line argument `--cut-over` is supported, and defaults to the atomic cut-over algorithm described above. Also supported is `--cut-over=two-step`, which uses the FB non-atomic algorithm and the `--cut-over=trigger`, which use the trigger algorithm. We recommend using the default cut-over that has been battle tested in our production environments.
5 changes: 5 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type CutOver int
const (
CutOverAtomic CutOver = iota
CutOverTwoStep
CutOverTrigger
)

type ThrottleReasonHint string
Expand Down Expand Up @@ -169,6 +170,7 @@ type MigrationContext struct {
StartTime time.Time
RowCopyStartTime time.Time
RowCopyEndTime time.Time
CreateTriggersStartTime time.Time
LockTablesStartTime time.Time
RenameTablesStartTime time.Time
RenameTablesEndTime time.Time
Expand All @@ -194,6 +196,7 @@ type MigrationContext struct {
UserCommandedUnpostponeFlag int64
CutOverCompleteFlag int64
InCutOverCriticalSectionFlag int64
ApplyDMLEventState int64
PanicAbort chan error

OriginalTableColumnsOnApplier *sql.ColumnList
Expand All @@ -215,6 +218,8 @@ type MigrationContext struct {
MigrationIterationRangeMaxValues *sql.ColumnValues
ForceTmpTableName string

TriggerCutoverUniqueKeys [][]interface{}

recentBinlogCoordinates mysql.BinlogCoordinates
}

Expand Down
2 changes: 2 additions & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ func main() {
migrationContext.CutOverType = base.CutOverAtomic
case "two-step":
migrationContext.CutOverType = base.CutOverTwoStep
case "trigger":
migrationContext.CutOverType = base.CutOverTrigger
default:
log.Fatalf("Unknown cut-over: %s", *cutOver)
}
Expand Down
294 changes: 294 additions & 0 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package logic
import (
gosql "database/sql"
"fmt"
"regexp"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -516,6 +518,273 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
return chunkSize, rowsAffected, duration, nil
}

// Create the trigger prefix
func (this *Applier) makeTriggerPrefix() string {
prefix := fmt.Sprintf(
"ghost_%s_%s",
this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName)
r := regexp.MustCompile("\\W")
prefix = r.ReplaceAllString(prefix, "_")

if len(prefix) > 60 {
oldPrefix := prefix
prefix = prefix[0:60]

log.Debugf(
"Trigger prefix %s is over 60 characters long, truncating to %s",
oldPrefix,
prefix)
}

return prefix
}

// CreateTriggersOriginalTable Create triggers from the original table to the ghost one
func (this *Applier) CreateTriggersOriginalTable() error {
prefix := this.makeTriggerPrefix()

delIndexComparations := []string{}
for _, name := range this.migrationContext.UniqueKey.Columns.Names() {
columnQuoted := sql.EscapeName(name)
comparation := fmt.Sprintf(
"%s.%s.%s <=> OLD.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
columnQuoted,
columnQuoted)
delIndexComparations = append(delIndexComparations, comparation)
}

deleteTrigger := fmt.Sprintf(
"CREATE /* gh-ost */ TRIGGER `%s_del` AFTER DELETE ON %s.%s "+
"FOR EACH ROW "+
"DELETE /* gh-ost */ IGNORE FROM %s.%s WHERE %s",
prefix,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
strings.Join(delIndexComparations, " AND "))

insertCols := []string{}
insertValues := []string{}
for i, origColumn := range this.migrationContext.SharedColumns.Names() {
key := sql.EscapeName(this.migrationContext.MappedSharedColumns.Columns()[i].Name)
insertCols = append(insertCols, key)

value := fmt.Sprintf("NEW.%s", sql.EscapeName(origColumn))
insertValues = append(insertValues, value)
}

insertTrigger := fmt.Sprintf(
"CREATE /* gh-ost */ TRIGGER `%s_ins` AFTER INSERT ON %s.%s "+
"FOR EACH ROW "+
"REPLACE /* gh-ost */ INTO %s.%s (%s) VALUES (%s)",
prefix,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
strings.Join(insertCols, ", "),
strings.Join(insertValues, ", "))

updIndexComparations := []string{}
for _, name := range this.migrationContext.UniqueKey.Columns.Names() {
columnQuoted := sql.EscapeName(name)
comparation := fmt.Sprintf(
"OLD.%s <=> NEW.%s",
columnQuoted,
columnQuoted)
updIndexComparations = append(updIndexComparations, comparation)
}

updateTrigger := fmt.Sprintf(
"CREATE /* gh-ost */ TRIGGER `%s_upd` AFTER UPDATE ON %s.%s "+
"FOR EACH ROW "+
"BEGIN /* gh-ost */ "+
"DELETE IGNORE FROM %s.%s WHERE !(%s) AND %s; "+
"REPLACE INTO %s.%s (%s) VALUES (%s); "+
"END",
prefix,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
strings.Join(updIndexComparations, " AND "),
strings.Join(delIndexComparations, " AND "),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
strings.Join(insertCols, ", "),
strings.Join(insertValues, ", "))

return func() error {
this.migrationContext.CreateTriggersStartTime = time.Now()

tx, err := this.db.Begin()
if err != nil {
return err
}

if _, err := tx.Exec(deleteTrigger); err != nil {
return err
}
if _, err := tx.Exec(insertTrigger); err != nil {
return err
}
if _, err := tx.Exec(updateTrigger); err != nil {
return err
}

if err := tx.Commit(); err != nil {
return err
}
return nil
}()
}

// DropTriggersOldTableIfExists Drop triggers from the old table if them exists
func (this *Applier) DropTriggersOldTableIfExists() error {
prefix := this.makeTriggerPrefix()

dropDeleteTrigger := fmt.Sprintf(
"DROP /* gh-ost */ TRIGGER IF EXISTS %s.`%s_del`",
sql.EscapeName(this.migrationContext.DatabaseName),
prefix)

dropInsertTrigger := fmt.Sprintf(
"DROP /* gh-ost */ TRIGGER IF EXISTS %s.`%s_ins`",
sql.EscapeName(this.migrationContext.DatabaseName),
prefix)

dropUpdateTrigger := fmt.Sprintf(
"DROP /* gh-ost */ TRIGGER IF EXISTS %s.`%s_upd`",
sql.EscapeName(this.migrationContext.DatabaseName),
prefix)

return func() error {
tx, err := this.db.Begin()
if err != nil {
return err
}

if _, err := tx.Exec(dropDeleteTrigger); err != nil {
return err
}
if _, err := tx.Exec(dropInsertTrigger); err != nil {
return err
}
if _, err := tx.Exec(dropUpdateTrigger); err != nil {
return err
}

if err := tx.Commit(); err != nil {
return err
}
return nil
}()
}

func (this *Applier) ObtainUniqueKeyValuesOfEvent(dmlEvent *binlog.BinlogDMLEvent) (uniqueKeys [][]interface{}) {
switch dmlEvent.DML {
case binlog.DeleteDML:
return append(uniqueKeys,
sql.ObtainUniqueKeyValues(
this.migrationContext.OriginalTableColumns,
&this.migrationContext.UniqueKey.Columns,
dmlEvent.WhereColumnValues.AbstractValues()))

case binlog.InsertDML:
return append(uniqueKeys,
sql.ObtainUniqueKeyValues(
this.migrationContext.OriginalTableColumns,
&this.migrationContext.UniqueKey.Columns,
dmlEvent.NewColumnValues.AbstractValues()))

case binlog.UpdateDML:
{
if _, isModified := this.updateModifiesUniqueKeyColumns(dmlEvent); isModified {
uniqueKeys = append(uniqueKeys,
sql.ObtainUniqueKeyValues(
this.migrationContext.OriginalTableColumns,
&this.migrationContext.UniqueKey.Columns,
dmlEvent.WhereColumnValues.AbstractValues()))
}
return append(uniqueKeys,
sql.ObtainUniqueKeyValues(
this.migrationContext.OriginalTableColumns,
&this.migrationContext.UniqueKey.Columns,
dmlEvent.NewColumnValues.AbstractValues()))
}
}

return uniqueKeys
}

func (this *Applier) SanitizeRowsDuringCutOver() error {
for len(this.migrationContext.TriggerCutoverUniqueKeys) > 0 {
cutIndex := int64(len(this.migrationContext.TriggerCutoverUniqueKeys)) - this.migrationContext.ChunkSize
if cutIndex < 0 {
cutIndex = 0
}

chunkValues := this.migrationContext.TriggerCutoverUniqueKeys[cutIndex:]
deleteQuery, deleteArgs, deleteErr := sql.BuildDeleteQuery(
this.migrationContext.DatabaseName,
this.migrationContext.GetGhostTableName(),
&this.migrationContext.UniqueKey.Columns,
chunkValues)

if deleteErr != nil {
return deleteErr
}

insertSelectQuery, insertSelectArgs, insertSelectErr := sql.BuildInsertSelectQuery(
this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName,
this.migrationContext.GetGhostTableName(),
this.migrationContext.SharedColumns.Names(),
this.migrationContext.MappedSharedColumns.Names(),
this.migrationContext.UniqueKey.Name,
&this.migrationContext.UniqueKey.Columns,
chunkValues)

if insertSelectErr != nil {
return insertSelectErr
}

log.Infof("Sanitizing chunk of created rows during trigger creation (%d/%d)",
int64(len(this.migrationContext.TriggerCutoverUniqueKeys))-cutIndex,
len(this.migrationContext.TriggerCutoverUniqueKeys))

err := func() error {
tx, err := this.db.Begin()
if err != nil {
return err
}

if _, err := tx.Exec(deleteQuery, deleteArgs...); err != nil {
return err
}
if _, err := tx.Exec(insertSelectQuery, insertSelectArgs...); err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}()

if err != nil {
return err
}

this.migrationContext.TriggerCutoverUniqueKeys = this.migrationContext.TriggerCutoverUniqueKeys[:cutIndex]
}

return nil
}

// LockOriginalTable places a write lock on the original table
func (this *Applier) LockOriginalTable() error {
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
Expand Down Expand Up @@ -545,6 +814,31 @@ func (this *Applier) UnlockTables() error {
return nil
}

// SwapTables issues a one-step swap table operation:
// - rename original table to _old
// - rename ghost table to original
func (this *Applier) SwapTables() error {
query := fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
log.Infof("Swaping original and new table: %s", query)
this.migrationContext.RenameTablesStartTime = time.Now()
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
this.migrationContext.RenameTablesEndTime = time.Now()

log.Infof("Tables swaped")
return nil
}

// SwapTablesQuickAndBumpy issues a two-step swap table operation:
// - rename original table to _old
// - rename ghost table to original
Expand Down
Loading