diff --git a/go/base/context.go b/go/base/context.go index 5ebf09267..62421f712 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -181,6 +181,7 @@ type MigrationContext struct { TotalRowsCopied int64 TotalDMLEventsApplied int64 DMLBatchSize int64 + InspectorUptimeSeconds int64 isThrottled bool throttleReason string throttleReasonHint ThrottleReasonHint diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 31184b0c2..c6d0d84f9 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -752,6 +752,13 @@ func (this *Inspector) readChangelogState(hint string) (string, error) { return result, err } +// readUptime reads MySQL server uptime (in seconds) +func (this *Inspector) readUptime() (uptime int64, err error) { + var dummy string + err = this.db.QueryRow("show global status like 'Uptime'").Scan(&dummy, &uptime) + return uptime, err +} + func (this *Inspector) getMasterConnectionConfig() (applierConfig *mysql.ConnectionConfig, err error) { log.Infof("Recursively searching for replication master") visitedKeys := mysql.NewInstanceKeyMap() diff --git a/go/logic/migrator.go b/go/logic/migrator.go index b1a238fda..6604c8917 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -342,6 +342,7 @@ func (this *Migrator) Migrate() (err error) { if err := this.initiateInspector(); err != nil { return err } + go this.initiateInspectorHealthCheck() if err := this.initiateStreaming(); err != nil { return err } @@ -711,6 +712,21 @@ func (this *Migrator) initiateServer() (err error) { return nil } +func (this *Migrator) initiateInspectorHealthCheck() { + ticker := time.Tick(10 * time.Second) + for range ticker { + lastUptime := atomic.LoadInt64(&this.migrationContext.InspectorUptimeSeconds) + if uptime, err := this.inspector.readUptime(); err != nil { + log.Errore(err) + } else { + if uptime < lastUptime { + this.migrationContext.PanicAbort <- fmt.Errorf("Inspector Uptime is %+v, less than previously measured uptime %+v. Has the inspector been restarted? Bailing out.", uptime, lastUptime) + } + atomic.StoreInt64(&this.migrationContext.InspectorUptimeSeconds, uptime) + } + } +} + // initiateInspector connects, validates and inspects the "inspector" server. // The "inspector" server is typically a replica; it is where we issue some // queries such as: