Skip to content

Commit 6944341

Browse files
committed
Fix deadlock on PanicAbort sends
1 parent 1cf0875 commit 6944341

File tree

3 files changed

+6
-1
lines changed

3 files changed

+6
-1
lines changed

go/base/context.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ func NewMigrationContext() *MigrationContext {
324324
pointOfInterestTimeMutex: &sync.Mutex{},
325325
lastHeartbeatOnChangelogMutex: &sync.Mutex{},
326326
ColumnRenameMap: make(map[string]string),
327-
PanicAbort: make(chan error),
327+
PanicAbort: make(chan error, 1),
328328
ctx: ctx,
329329
cancelFunc: cancelFunc,
330330
abortMutex: &sync.Mutex{},

go/logic/migrator.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,8 @@ func (this *Migrator) consumeRowCopyComplete() {
204204
if err := <-this.rowCopyComplete; err != nil {
205205
// Use helper to prevent deadlock if listenOnPanicAbort already exited
206206
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err)
207+
// Don't mark row copy as complete if there was an error
208+
return
207209
}
208210
atomic.StoreInt64(&this.rowCopyCompleteFlag, 1)
209211
this.migrationContext.MarkRowCopyEndTime()
@@ -212,6 +214,7 @@ func (this *Migrator) consumeRowCopyComplete() {
212214
if err != nil {
213215
// Use helper to prevent deadlock if listenOnPanicAbort already exited
214216
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err)
217+
return
215218
}
216219
}
217220
}()

go/logic/throttler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
364364
if base.FileExists(this.migrationContext.PanicFlagFile) {
365365
// Use helper to prevent deadlock if listenOnPanicAbort already exited
366366
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile))
367+
return nil
367368
}
368369
}
369370

@@ -388,6 +389,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
388389
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 {
389390
// Use helper to prevent deadlock if listenOnPanicAbort already exited
390391
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold))
392+
return nil
391393
}
392394
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds > 0 {
393395
this.migrationContext.Log.Errorf("critical-load met once: %s=%d, >=%d. Will check again in %d millis", variableName, value, threshold, this.migrationContext.CriticalLoadIntervalMilliseconds)

0 commit comments

Comments
 (0)