Skip to content

Commit 7aea210

Browse files
authored
Merge branch 'master' into grodowski/remove-unused-code
2 parents e5760f7 + 753cf88 commit 7aea210

File tree

15 files changed

+507
-38
lines changed

15 files changed

+507
-38
lines changed

doc/hooks.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ The full list of supported hooks is best found in code: [hooks.go](https://githu
4949
- `gh-ost-on-before-cut-over`
5050
- `gh-ost-on-success`
5151
- `gh-ost-on-failure`
52+
- `gh-ost-on-batch-copy-retry`
5253

5354
### Context
5455

@@ -82,6 +83,7 @@ The following variable are available on particular hooks:
8283

8384
- `GH_OST_COMMAND` is only available in `gh-ost-on-interactive-command`
8485
- `GH_OST_STATUS` is only available in `gh-ost-on-status`
86+
- `GH_OST_LAST_BATCH_COPY_ERROR` is only available in `gh-ost-on-batch-copy-retry`
8587

8688
### Examples
8789

go/base/context.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,13 @@ func (this *MigrationContext) GetIteration() int64 {
611611
return atomic.LoadInt64(&this.Iteration)
612612
}
613613

614+
func (this *MigrationContext) SetNextIterationRangeMinValues() {
615+
this.MigrationIterationRangeMinValues = this.MigrationIterationRangeMaxValues
616+
if this.MigrationIterationRangeMinValues == nil {
617+
this.MigrationIterationRangeMinValues = this.MigrationRangeMinValues
618+
}
619+
}
620+
614621
func (this *MigrationContext) MarkPointOfInterest() int64 {
615622
this.pointOfInterestTimeMutex.Lock()
616623
defer this.pointOfInterestTimeMutex.Unlock()

go/logic/applier.go

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -819,17 +819,6 @@ func (this *Applier) ReadMigrationRangeValues() error {
819819
// no further chunk to work through, i.e. we're past the last chunk and are done with
820820
// iterating the range (and thus done with copying row chunks)
821821
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) {
822-
this.LastIterationRangeMutex.Lock()
823-
if this.migrationContext.MigrationIterationRangeMinValues != nil && this.migrationContext.MigrationIterationRangeMaxValues != nil {
824-
this.LastIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMinValues.Clone()
825-
this.LastIterationRangeMaxValues = this.migrationContext.MigrationIterationRangeMaxValues.Clone()
826-
}
827-
this.LastIterationRangeMutex.Unlock()
828-
829-
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues
830-
if this.migrationContext.MigrationIterationRangeMinValues == nil {
831-
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
832-
}
833822
for i := 0; i < 2; i++ {
834823
buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset
835824
if i == 1 {
@@ -1557,6 +1546,43 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
15571546
if execErr != nil {
15581547
return rollback(execErr)
15591548
}
1549+
1550+
// Check for warnings when PanicOnWarnings is enabled
1551+
if this.migrationContext.PanicOnWarnings {
1552+
//nolint:execinquery
1553+
rows, err := tx.Query("SHOW WARNINGS")
1554+
if err != nil {
1555+
return rollback(err)
1556+
}
1557+
defer rows.Close()
1558+
if err = rows.Err(); err != nil {
1559+
return rollback(err)
1560+
}
1561+
1562+
var sqlWarnings []string
1563+
for rows.Next() {
1564+
var level, message string
1565+
var code int
1566+
if err := rows.Scan(&level, &code, &message); err != nil {
1567+
this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row")
1568+
continue
1569+
}
1570+
// Duplicate warnings are formatted differently across mysql versions, hence the optional table name prefix
1571+
migrationUniqueKeyExpression := fmt.Sprintf("for key '(%s\\.)?%s'", this.migrationContext.GetGhostTableName(), this.migrationContext.UniqueKey.NameInGhostTable)
1572+
matched, _ := regexp.MatchString(migrationUniqueKeyExpression, message)
1573+
if strings.Contains(message, "Duplicate entry") && matched {
1574+
// Duplicate entry on migration unique key is expected during binlog replay
1575+
// (row was already copied during bulk copy phase)
1576+
continue
1577+
}
1578+
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
1579+
}
1580+
if len(sqlWarnings) > 0 {
1581+
warningMsg := fmt.Sprintf("Warnings detected during DML event application: %v", sqlWarnings)
1582+
return rollback(errors.New(warningMsg))
1583+
}
1584+
}
1585+
15601586
if err := tx.Commit(); err != nil {
15611587
return err
15621588
}

go/logic/applier_test.go

Lines changed: 260 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func TestApplierBuildDMLEventQuery(t *testing.T) {
147147
require.Len(t, res, 1)
148148
require.NoError(t, res[0].err)
149149
require.Equal(t,
150-
`replace /* gh-ost `+"`test`.`_test_gho`"+` */
150+
`insert /* gh-ost `+"`test`.`_test_gho`"+` */ ignore
151151
into
152152
`+"`test`.`_test_gho`"+`
153153
`+"(`id`, `item_id`)"+`
@@ -542,7 +542,9 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc
542542
err = applier.ReadMigrationRangeValues()
543543
suite.Require().NoError(err)
544544

545+
migrationContext.SetNextIterationRangeMinValues()
545546
hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues()
547+
546548
suite.Require().NoError(err)
547549
suite.Require().True(hasFurtherRange)
548550

@@ -620,6 +622,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQueryFai
620622
err = applier.AlterGhost()
621623
suite.Require().NoError(err)
622624

625+
migrationContext.SetNextIterationRangeMinValues()
623626
hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues()
624627
suite.Require().NoError(err)
625628
suite.Require().True(hasFurtherRange)
@@ -721,6 +724,262 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() {
721724
suite.Require().Equal(chk.IsCutover, gotChk.IsCutover)
722725
}
723726

727+
func (suite *ApplierTestSuite) TestPanicOnWarningsWithDuplicateKeyOnNonMigrationIndex() {
728+
ctx := context.Background()
729+
730+
var err error
731+
732+
// Create table with id and email columns, where id is the primary key
733+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName()))
734+
suite.Require().NoError(err)
735+
736+
// Create ghost table with same schema plus a new unique index on email
737+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName()))
738+
suite.Require().NoError(err)
739+
740+
connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
741+
suite.Require().NoError(err)
742+
743+
migrationContext := newTestMigrationContext()
744+
migrationContext.ApplierConnectionConfig = connectionConfig
745+
migrationContext.SetConnectionConfig("innodb")
746+
747+
migrationContext.PanicOnWarnings = true
748+
749+
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"})
750+
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"})
751+
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"})
752+
migrationContext.UniqueKey = &sql.UniqueKey{
753+
Name: "PRIMARY",
754+
NameInGhostTable: "PRIMARY",
755+
Columns: *sql.NewColumnList([]string{"id"}),
756+
}
757+
758+
applier := NewApplier(migrationContext)
759+
suite.Require().NoError(applier.prepareQueries())
760+
defer applier.Teardown()
761+
762+
err = applier.InitDBConnections()
763+
suite.Require().NoError(err)
764+
765+
// Insert initial rows into ghost table (simulating bulk copy phase)
766+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'user1@example.com'), (2, 'user2@example.com'), (3, 'user3@example.com');", getTestGhostTableName()))
767+
suite.Require().NoError(err)
768+
769+
// Simulate binlog event: try to insert a row with duplicate email
770+
// This should fail with a warning because the ghost table has a unique index on email
771+
dmlEvents := []*binlog.BinlogDMLEvent{
772+
{
773+
DatabaseName: testMysqlDatabase,
774+
TableName: testMysqlTableName,
775+
DML: binlog.InsertDML,
776+
NewColumnValues: sql.ToColumnValues([]interface{}{4, "user2@example.com"}), // duplicate email
777+
},
778+
}
779+
780+
// This should return an error when PanicOnWarnings is enabled
781+
err = applier.ApplyDMLEventQueries(dmlEvents)
782+
suite.Require().Error(err)
783+
suite.Require().Contains(err.Error(), "Duplicate entry")
784+
785+
// Verify that the ghost table still has only 3 rows (no data loss)
786+
rows, err := suite.db.Query("SELECT * FROM " + getTestGhostTableName() + " ORDER BY id")
787+
suite.Require().NoError(err)
788+
defer rows.Close()
789+
790+
var count int
791+
for rows.Next() {
792+
var id int
793+
var email string
794+
err = rows.Scan(&id, &email)
795+
suite.Require().NoError(err)
796+
count += 1
797+
}
798+
suite.Require().NoError(rows.Err())
799+
800+
// All 3 original rows should still be present
801+
suite.Require().Equal(3, count)
802+
}
803+
804+
// TestUpdateModifyingUniqueKeyWithDuplicateOnOtherIndex tests the scenario where:
805+
// 1. An UPDATE modifies the unique key (converted to DELETE+INSERT)
806+
// 2. The INSERT would create a duplicate on a NON-migration unique index
807+
// 3. Without warning detection: DELETE succeeds, INSERT IGNORE skips = DATA LOSS
808+
// 4. With PanicOnWarnings: Warning detected, transaction rolled back, no data loss
809+
// This test verifies that PanicOnWarnings correctly prevents the data loss scenario.
810+
func (suite *ApplierTestSuite) TestUpdateModifyingUniqueKeyWithDuplicateOnOtherIndex() {
811+
ctx := context.Background()
812+
813+
var err error
814+
815+
// Create table with id (PRIMARY) and email (NO unique constraint yet)
816+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName()))
817+
suite.Require().NoError(err)
818+
819+
// Create ghost table with id (PRIMARY) AND email unique index (being added)
820+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName()))
821+
suite.Require().NoError(err)
822+
823+
connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
824+
suite.Require().NoError(err)
825+
826+
migrationContext := newTestMigrationContext()
827+
migrationContext.ApplierConnectionConfig = connectionConfig
828+
migrationContext.SetConnectionConfig("innodb")
829+
830+
migrationContext.PanicOnWarnings = true
831+
832+
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"})
833+
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"})
834+
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"})
835+
migrationContext.UniqueKey = &sql.UniqueKey{
836+
Name: "PRIMARY",
837+
NameInGhostTable: "PRIMARY",
838+
Columns: *sql.NewColumnList([]string{"id"}),
839+
}
840+
841+
applier := NewApplier(migrationContext)
842+
suite.Require().NoError(applier.prepareQueries())
843+
defer applier.Teardown()
844+
845+
err = applier.InitDBConnections()
846+
suite.Require().NoError(err)
847+
848+
// Setup: Insert initial rows into ghost table
849+
// Row 1: id=1, email='bob@example.com'
850+
// Row 2: id=2, email='charlie@example.com'
851+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'bob@example.com'), (2, 'charlie@example.com');", getTestGhostTableName()))
852+
suite.Require().NoError(err)
853+
854+
// Simulate binlog event: UPDATE that changes BOTH PRIMARY KEY and email
855+
// From: id=2, email='charlie@example.com'
856+
// To: id=3, email='bob@example.com' (duplicate email with id=1)
857+
// This will be converted to DELETE (id=2) + INSERT (id=3, 'bob@example.com')
858+
// With INSERT IGNORE, the INSERT will skip because email='bob@example.com' already exists in id=1
859+
// Result: id=2 deleted, id=3 never inserted = DATA LOSS
860+
dmlEvents := []*binlog.BinlogDMLEvent{
861+
{
862+
DatabaseName: testMysqlDatabase,
863+
TableName: testMysqlTableName,
864+
DML: binlog.UpdateDML,
865+
NewColumnValues: sql.ToColumnValues([]interface{}{3, "bob@example.com"}), // new: id=3, email='bob@example.com'
866+
WhereColumnValues: sql.ToColumnValues([]interface{}{2, "charlie@example.com"}), // old: id=2, email='charlie@example.com'
867+
},
868+
}
869+
870+
// First verify this would be converted to DELETE+INSERT
871+
buildResults := applier.buildDMLEventQuery(dmlEvents[0])
872+
suite.Require().Len(buildResults, 2, "UPDATE modifying unique key should be converted to DELETE+INSERT")
873+
874+
// Apply the event - this should FAIL because INSERT will have duplicate email warning
875+
err = applier.ApplyDMLEventQueries(dmlEvents)
876+
suite.Require().Error(err, "Should fail when DELETE+INSERT causes duplicate on non-migration unique key")
877+
suite.Require().Contains(err.Error(), "Duplicate entry", "Error should mention duplicate entry")
878+
879+
// Verify that BOTH rows still exist (transaction rolled back)
880+
rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " ORDER BY id")
881+
suite.Require().NoError(err)
882+
defer rows.Close()
883+
884+
var count int
885+
var ids []int
886+
var emails []string
887+
for rows.Next() {
888+
var id int
889+
var email string
890+
err = rows.Scan(&id, &email)
891+
suite.Require().NoError(err)
892+
ids = append(ids, id)
893+
emails = append(emails, email)
894+
count++
895+
}
896+
suite.Require().NoError(rows.Err())
897+
898+
// Transaction should have rolled back, so original 2 rows should still be there
899+
suite.Require().Equal(2, count, "Should still have 2 rows after failed transaction")
900+
suite.Require().Equal([]int{1, 2}, ids, "Should have original ids")
901+
suite.Require().Equal([]string{"bob@example.com", "charlie@example.com"}, emails)
902+
}
903+
904+
// TestNormalUpdateWithPanicOnWarnings tests that normal UPDATEs (not modifying unique key) work correctly
905+
func (suite *ApplierTestSuite) TestNormalUpdateWithPanicOnWarnings() {
906+
ctx := context.Background()
907+
908+
var err error
909+
910+
// Create table with id (PRIMARY) and email
911+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName()))
912+
suite.Require().NoError(err)
913+
914+
// Create ghost table with same schema plus unique index on email
915+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName()))
916+
suite.Require().NoError(err)
917+
918+
connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
919+
suite.Require().NoError(err)
920+
921+
migrationContext := newTestMigrationContext()
922+
migrationContext.ApplierConnectionConfig = connectionConfig
923+
migrationContext.SetConnectionConfig("innodb")
924+
925+
migrationContext.PanicOnWarnings = true
926+
927+
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"})
928+
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"})
929+
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"})
930+
migrationContext.UniqueKey = &sql.UniqueKey{
931+
Name: "PRIMARY",
932+
NameInGhostTable: "PRIMARY",
933+
Columns: *sql.NewColumnList([]string{"id"}),
934+
}
935+
936+
applier := NewApplier(migrationContext)
937+
suite.Require().NoError(applier.prepareQueries())
938+
defer applier.Teardown()
939+
940+
err = applier.InitDBConnections()
941+
suite.Require().NoError(err)
942+
943+
// Setup: Insert initial rows into ghost table
944+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'alice@example.com'), (2, 'bob@example.com');", getTestGhostTableName()))
945+
suite.Require().NoError(err)
946+
947+
// Simulate binlog event: Normal UPDATE that only changes email (not PRIMARY KEY)
948+
// This should use UPDATE query, not DELETE+INSERT
949+
dmlEvents := []*binlog.BinlogDMLEvent{
950+
{
951+
DatabaseName: testMysqlDatabase,
952+
TableName: testMysqlTableName,
953+
DML: binlog.UpdateDML,
954+
NewColumnValues: sql.ToColumnValues([]interface{}{2, "robert@example.com"}), // update email only
955+
WhereColumnValues: sql.ToColumnValues([]interface{}{2, "bob@example.com"}),
956+
},
957+
}
958+
959+
// Verify this generates a single UPDATE query (not DELETE+INSERT)
960+
buildResults := applier.buildDMLEventQuery(dmlEvents[0])
961+
suite.Require().Len(buildResults, 1, "Normal UPDATE should generate single UPDATE query")
962+
963+
// Apply the event - should succeed
964+
err = applier.ApplyDMLEventQueries(dmlEvents)
965+
suite.Require().NoError(err)
966+
967+
// Verify the update was applied correctly
968+
rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " WHERE id = 2")
969+
suite.Require().NoError(err)
970+
defer rows.Close()
971+
972+
var id int
973+
var email string
974+
suite.Require().True(rows.Next(), "Should find updated row")
975+
err = rows.Scan(&id, &email)
976+
suite.Require().NoError(err)
977+
suite.Require().Equal(2, id)
978+
suite.Require().Equal("robert@example.com", email)
979+
suite.Require().False(rows.Next(), "Should only have one row")
980+
suite.Require().NoError(rows.Err())
981+
}
982+
724983
func TestApplier(t *testing.T) {
725984
suite.Run(t, new(ApplierTestSuite))
726985
}

0 commit comments

Comments
 (0)