Skip to content

Commit 81ec000

Browse files
committed
Replace log.Fatale with context-based error propagation
This patch modifies gh-ost to use a cancellable context instead of log.Fatale() in listenOnPanicAbort. When using gh-ost as a library, this allows the calling application to recover from aborts (e.g. log the failure reason) instead of having the entire process terminate via os.Exit(). Now we store the error and cancel a context to signal all goroutines to stop gracefully.
1 parent c72b237 commit 81ec000

File tree

8 files changed

+544
-15
lines changed

8 files changed

+544
-15
lines changed

go/base/context.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package base
77

88
import (
9+
"context"
910
"fmt"
1011
"math"
1112
"os"
@@ -225,6 +226,16 @@ type MigrationContext struct {
225226
InCutOverCriticalSectionFlag int64
226227
PanicAbort chan error
227228

229+
// Context for cancellation signaling across all goroutines
230+
// Stored in struct as it spans the entire migration lifecycle, not per-function.
231+
// context.Context is safe for concurrent use by multiple goroutines.
232+
ctx context.Context //nolint:containedctx
233+
cancelFunc context.CancelFunc
234+
235+
// Stores the fatal error that triggered abort
236+
AbortError error
237+
abortMutex *sync.Mutex
238+
228239
OriginalTableColumnsOnApplier *sql.ColumnList
229240
OriginalTableColumns *sql.ColumnList
230241
OriginalTableVirtualColumns *sql.ColumnList
@@ -293,6 +304,7 @@ type ContextConfig struct {
293304
}
294305

295306
func NewMigrationContext() *MigrationContext {
307+
ctx, cancelFunc := context.WithCancel(context.Background())
296308
return &MigrationContext{
297309
Uuid: uuid.NewString(),
298310
defaultNumRetries: 60,
@@ -313,6 +325,9 @@ func NewMigrationContext() *MigrationContext {
313325
lastHeartbeatOnChangelogMutex: &sync.Mutex{},
314326
ColumnRenameMap: make(map[string]string),
315327
PanicAbort: make(chan error),
328+
ctx: ctx,
329+
cancelFunc: cancelFunc,
330+
abortMutex: &sync.Mutex{},
316331
Log: NewDefaultLogger(),
317332
}
318333
}
@@ -982,3 +997,33 @@ func (this *MigrationContext) GetGhostTriggerName(triggerName string) string {
982997
func (this *MigrationContext) ValidateGhostTriggerLengthBelowMaxLength(triggerName string) bool {
983998
return utf8.RuneCountInString(triggerName) <= mysql.MaxTableNameLength
984999
}
1000+
1001+
// GetContext returns the migration context for cancellation checking
1002+
func (this *MigrationContext) GetContext() context.Context {
1003+
return this.ctx
1004+
}
1005+
1006+
// SetAbortError stores the fatal error that triggered abort
1007+
// Only the first error is stored (subsequent errors are ignored)
1008+
func (this *MigrationContext) SetAbortError(err error) {
1009+
this.abortMutex.Lock()
1010+
defer this.abortMutex.Unlock()
1011+
if this.AbortError == nil {
1012+
this.AbortError = err
1013+
}
1014+
}
1015+
1016+
// GetAbortError retrieves the stored abort error
1017+
func (this *MigrationContext) GetAbortError() error {
1018+
this.abortMutex.Lock()
1019+
defer this.abortMutex.Unlock()
1020+
return this.AbortError
1021+
}
1022+
1023+
// CancelContext cancels the migration context to signal all goroutines to stop
1024+
// The cancel function is safe to call multiple times and from multiple goroutines.
1025+
func (this *MigrationContext) CancelContext() {
1026+
if this.cancelFunc != nil {
1027+
this.cancelFunc()
1028+
}
1029+
}

go/base/context_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
package base
77

88
import (
9+
"errors"
910
"os"
1011
"strings"
12+
"sync"
1113
"testing"
1214
"time"
1315

@@ -213,3 +215,58 @@ func TestReadConfigFile(t *testing.T) {
213215
}
214216
}
215217
}
218+
219+
func TestSetAbortError_StoresFirstError(t *testing.T) {
220+
ctx := NewMigrationContext()
221+
222+
err1 := errors.New("first error")
223+
err2 := errors.New("second error")
224+
225+
ctx.SetAbortError(err1)
226+
ctx.SetAbortError(err2)
227+
228+
got := ctx.GetAbortError()
229+
if got != err1 { //nolint:errorlint // Testing pointer equality for sentinel error
230+
t.Errorf("Expected first error %v, got %v", err1, got)
231+
}
232+
}
233+
234+
func TestSetAbortError_ThreadSafe(t *testing.T) {
235+
ctx := NewMigrationContext()
236+
237+
var wg sync.WaitGroup
238+
errors := []error{
239+
errors.New("error 1"),
240+
errors.New("error 2"),
241+
errors.New("error 3"),
242+
}
243+
244+
// Launch 3 goroutines trying to set error concurrently
245+
for _, err := range errors {
246+
wg.Add(1)
247+
go func(e error) {
248+
defer wg.Done()
249+
ctx.SetAbortError(e)
250+
}(err)
251+
}
252+
253+
wg.Wait()
254+
255+
// Should store exactly one of the errors
256+
got := ctx.GetAbortError()
257+
if got == nil {
258+
t.Fatal("Expected error to be stored, got nil")
259+
}
260+
261+
// Verify it's one of the errors we sent
262+
found := false
263+
for _, err := range errors {
264+
if got == err { //nolint:errorlint // Testing pointer equality for sentinel error
265+
found = true
266+
break
267+
}
268+
}
269+
if !found {
270+
t.Errorf("Stored error %v not in list of sent errors", got)
271+
}
272+
}

go/logic/applier.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,17 @@ func (this *Applier) InitiateHeartbeat() {
695695

696696
ticker := time.NewTicker(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
697697
defer ticker.Stop()
698-
for range ticker.C {
698+
for {
699+
// Check for context cancellation each iteration
700+
ctx := this.migrationContext.GetContext()
701+
select {
702+
case <-ctx.Done():
703+
this.migrationContext.Log.Debugf("Heartbeat injection cancelled")
704+
return
705+
case <-ticker.C:
706+
// Process heartbeat
707+
}
708+
699709
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
700710
return
701711
}
@@ -706,7 +716,12 @@ func (this *Applier) InitiateHeartbeat() {
706716
continue
707717
}
708718
if err := injectHeartbeat(); err != nil {
709-
this.migrationContext.PanicAbort <- fmt.Errorf("injectHeartbeat writing failed %d times, last error: %w", numSuccessiveFailures, err)
719+
select {
720+
case this.migrationContext.PanicAbort <- fmt.Errorf("injectHeartbeat writing failed %d times, last error: %w", numSuccessiveFailures, err):
721+
// Error sent successfully
722+
case <-this.migrationContext.GetContext().Done():
723+
// Context cancelled, someone else already reported an error
724+
}
710725
return
711726
}
712727
}

0 commit comments

Comments
 (0)