Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ type MigrationContext struct {
TotalRowsCopied int64
TotalDMLEventsApplied int64
DMLBatchSize int64
InspectorUptimeSeconds int64
isThrottled bool
throttleReason string
throttleReasonHint ThrottleReasonHint
Expand Down
7 changes: 7 additions & 0 deletions go/logic/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,13 @@ func (this *Inspector) readChangelogState(hint string) (string, error) {
return result, err
}

// readUptime reads MySQL server uptime (in seconds)
func (this *Inspector) readUptime() (uptime int64, err error) {
var dummy string
err = this.db.QueryRow("show global status like 'Uptime'").Scan(&dummy, &uptime)
return uptime, err
}

func (this *Inspector) getMasterConnectionConfig() (applierConfig *mysql.ConnectionConfig, err error) {
log.Infof("Recursively searching for replication master")
visitedKeys := mysql.NewInstanceKeyMap()
Expand Down
16 changes: 16 additions & 0 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ func (this *Migrator) Migrate() (err error) {
if err := this.initiateInspector(); err != nil {
return err
}
go this.initiateInspectorHealthCheck()
if err := this.initiateStreaming(); err != nil {
return err
}
Expand Down Expand Up @@ -711,6 +712,21 @@ func (this *Migrator) initiateServer() (err error) {
return nil
}

func (this *Migrator) initiateInspectorHealthCheck() {
ticker := time.Tick(10 * time.Second)
for range ticker {
lastUptime := atomic.LoadInt64(&this.migrationContext.InspectorUptimeSeconds)
if uptime, err := this.inspector.readUptime(); err != nil {
log.Errore(err)
} else {
if uptime < lastUptime {
this.migrationContext.PanicAbort <- fmt.Errorf("Inspector Uptime is %+v, less than previously measured uptime %+v. Has the inspector been restarted? Bailing out.", uptime, lastUptime)
}
atomic.StoreInt64(&this.migrationContext.InspectorUptimeSeconds, uptime)
}
}
}

// initiateInspector connects, validates and inspects the "inspector" server.
// The "inspector" server is typically a replica; it is where we issue some
// queries such as:
Expand Down