Skip to content

Commit 549c2b3

Browse files
committed
fix, add compactVector param, SplitBunch -> SpecialFieldBunch
1 parent 4976c76 commit 549c2b3

File tree

11 files changed

+78
-66
lines changed

11 files changed

+78
-66
lines changed

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2230,19 +2230,19 @@ public InlineElement getDescription() {
22302230
.withDescription(
22312231
"The interval for checking visibility when visibility-callback enabled.");
22322232

2233-
public static final ConfigOption<String> VECTOR_STORE_FORMAT =
2233+
public static final ConfigOption<String> VECTOR_FILE_FORMAT =
22342234
key("vector.file.format")
22352235
.stringType()
22362236
.noDefaultValue()
22372237
.withDescription("Specify the vector store file format.");
22382238

2239-
public static final ConfigOption<String> VECTOR_STORE_FIELDS =
2239+
public static final ConfigOption<String> VECTOR_FIELDS =
22402240
key("vector-field")
22412241
.stringType()
22422242
.noDefaultValue()
22432243
.withDescription("Specify the vector store fields.");
22442244

2245-
public static final ConfigOption<MemorySize> VECTOR_STORE_TARGET_FILE_SIZE =
2245+
public static final ConfigOption<MemorySize> VECTOR_TARGET_FILE_SIZE =
22462246
key("vector.target-file-size")
22472247
.memoryType()
22482248
.noDefaultValue()
@@ -3493,21 +3493,21 @@ public Duration visibilityCallbackCheckInterval() {
34933493
}
34943494

34953495
public String vectorStoreFileFormatString() {
3496-
return normalizeFileFormat(options.get(VECTOR_STORE_FORMAT));
3496+
return normalizeFileFormat(options.get(VECTOR_FILE_FORMAT));
34973497
}
34983498

34993499
public List<String> vectorStoreFieldNames() {
3500-
String vectorStoreFields = options.get(CoreOptions.VECTOR_STORE_FIELDS);
3500+
String vectorStoreFields = options.get(CoreOptions.VECTOR_FIELDS);
35013501
if (vectorStoreFields == null || vectorStoreFields.trim().isEmpty()) {
35023502
return new ArrayList<>();
35033503
} else {
3504-
return Arrays.asList(vectorStoreFields.split(","));
3504+
return Arrays.asList(vectorStoreFields.trim().split(","));
35053505
}
35063506
}
35073507

35083508
public long vectorStoreTargetFileSize() {
35093509
// Since vectors are large, it would be better to set a larger target size for vectors.
3510-
return options.getOptional(VECTOR_STORE_TARGET_FILE_SIZE)
3510+
return options.getOptional(VECTOR_TARGET_FILE_SIZE)
35113511
.map(MemorySize::getBytes)
35123512
.orElse(10 * targetFileSize(false));
35133513
}

paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,16 @@ public class DataEvolutionCompactCoordinator {
5757
private final CompactScanner scanner;
5858
private final CompactPlanner planner;
5959

60-
public DataEvolutionCompactCoordinator(FileStoreTable table, boolean compactBlob) {
61-
this(table, null, compactBlob);
60+
public DataEvolutionCompactCoordinator(
61+
FileStoreTable table, boolean compactBlob, boolean compactVector) {
62+
this(table, null, compactBlob, compactVector);
6263
}
6364

6465
public DataEvolutionCompactCoordinator(
6566
FileStoreTable table,
6667
@Nullable PartitionPredicate partitionPredicate,
67-
boolean compactBlob) {
68+
boolean compactBlob,
69+
boolean compactVector) {
6870
CoreOptions options = table.coreOptions();
6971
long targetFileSize = options.targetFileSize(false);
7072
long openFileCost = options.splitOpenFileCost();
@@ -75,7 +77,12 @@ public DataEvolutionCompactCoordinator(
7577
table.newSnapshotReader().withPartitionFilter(partitionPredicate),
7678
table.store().newScan());
7779
this.planner =
78-
new CompactPlanner(compactBlob, targetFileSize, openFileCost, compactMinFileNum);
80+
new CompactPlanner(
81+
compactBlob,
82+
compactVector,
83+
targetFileSize,
84+
openFileCost,
85+
compactMinFileNum);
7986
}
8087

8188
public List<DataEvolutionCompactTask> plan() {
@@ -134,16 +141,19 @@ List<ManifestEntry> scan() {
134141
static class CompactPlanner {
135142

136143
private final boolean compactBlob;
144+
private final boolean compactVector;
137145
private final long targetFileSize;
138146
private final long openFileCost;
139147
private final long compactMinFileNum;
140148

141149
CompactPlanner(
142150
boolean compactBlob,
151+
boolean compactVector,
143152
long targetFileSize,
144153
long openFileCost,
145154
long compactMinFileNum) {
146155
this.compactBlob = compactBlob;
156+
this.compactVector = compactVector;
147157
this.targetFileSize = targetFileSize;
148158
this.openFileCost = openFileCost;
149159
this.compactMinFileNum = compactMinFileNum;
@@ -209,7 +219,7 @@ List<DataEvolutionCompactTask> compactPlan(List<ManifestEntry> input) {
209219
}
210220
}
211221
}
212-
if (false) {
222+
if (compactVector) {
213223
// associate vector-store files to data files
214224
for (DataFileMeta vectorStoreFile : vectorStoreFiles) {
215225
Long key = treeMap.floorKey(vectorStoreFile.nonNullFirstRowId());
@@ -309,7 +319,7 @@ private List<DataEvolutionCompactTask> triggerTask(
309319
}
310320
}
311321

312-
if (false) {
322+
if (compactVector) {
313323
List<DataFileMeta> vectorStoreFiles = new ArrayList<>();
314324
for (DataFileMeta dataFile : dataFiles) {
315325
vectorStoreFiles.addAll(

paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -420,8 +420,8 @@ public static List<FieldBunch> splitFieldBunches(
420420
Function<DataFileMeta, RowType> fileToRowType,
421421
boolean rowIdPushDown) {
422422
List<FieldBunch> fieldsFiles = new ArrayList<>();
423-
Map<Integer, SplitBunch> blobBunchMap = new HashMap<>();
424-
Map<VectorStoreBunchKey, SplitBunch> vectorStoreBunchMap = new TreeMap<>();
423+
Map<Integer, SpecialFieldBunch> blobBunchMap = new HashMap<>();
424+
Map<VectorStoreBunchKey, SpecialFieldBunch> vectorStoreBunchMap = new TreeMap<>();
425425
long rowCount = -1;
426426
for (DataFileMeta file : needMergeFiles) {
427427
if (isBlobFile(file.fileName())) {
@@ -430,7 +430,8 @@ public static List<FieldBunch> splitFieldBunches(
430430
final long expectedRowCount = rowCount;
431431
blobBunchMap
432432
.computeIfAbsent(
433-
fieldId, key -> new SplitBunch(expectedRowCount, rowIdPushDown))
433+
fieldId,
434+
key -> new SpecialFieldBunch(expectedRowCount, rowIdPushDown))
434435
.add(file);
435436
} else if (isVectorStoreFile(file.fileName())) {
436437
RowType rowType = fileToRowType.apply(file);
@@ -442,7 +443,7 @@ public static List<FieldBunch> splitFieldBunches(
442443
vectorStoreBunchMap
443444
.computeIfAbsent(
444445
vectorStoreKey,
445-
key -> new SplitBunch(expectedRowCount, rowIdPushDown))
446+
key -> new SpecialFieldBunch(expectedRowCount, rowIdPushDown))
446447
.add(file);
447448
} else {
448449
// Normal file, just add it to the current merge split
@@ -483,7 +484,7 @@ public List<DataFileMeta> files() {
483484
}
484485

485486
@VisibleForTesting
486-
static class SplitBunch implements FieldBunch {
487+
static class SpecialFieldBunch implements FieldBunch {
487488

488489
final List<DataFileMeta> files;
489490
final long expectedRowCount;
@@ -494,7 +495,7 @@ static class SplitBunch implements FieldBunch {
494495
long latestMaxSequenceNumber = -1;
495496
long rowCount;
496497

497-
SplitBunch(long expectedRowCount, boolean rowIdPushDown) {
498+
SpecialFieldBunch(long expectedRowCount, boolean rowIdPushDown) {
498499
this.files = new ArrayList<>();
499500
this.rowCount = 0;
500501
this.expectedRowCount = expectedRowCount;

paimon-core/src/test/java/org/apache/paimon/append/DataEvolutionRollingFileWriterWithVectorStoreTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public class DataEvolutionRollingFileWriterWithVectorStoreTest {
6666
.build();
6767

6868
private static final long TARGET_FILE_SIZE = 2 * 1024 * 1024L; // 2 MB
69-
private static final long VECTOR_STORE_TARGET_FILE_SIZE = 4 * 1024 * 1024L; // 4 MB
69+
private static final long VECTOR_TARGET_FILE_SIZE = 4 * 1024 * 1024L; // 4 MB
7070
private static final long SCHEMA_ID = 1L;
7171
private static final String COMPRESSION = "none";
7272
private static final Random RANDOM = new Random(System.currentTimeMillis());
@@ -103,7 +103,7 @@ public void setUp() throws IOException {
103103
Arrays.asList("f3", "f4"),
104104
TARGET_FILE_SIZE,
105105
TARGET_FILE_SIZE,
106-
VECTOR_STORE_TARGET_FILE_SIZE,
106+
VECTOR_TARGET_FILE_SIZE,
107107
SCHEMA,
108108
pathFactory,
109109
() -> seqNumCounter,
@@ -165,8 +165,8 @@ public void testVectorStoreTargetFileSize() throws Exception {
165165
long fileSize = file.fileSize();
166166
assertThat(fileSize)
167167
.as("Vector-store file size should be close to target size")
168-
.isGreaterThanOrEqualTo(VECTOR_STORE_TARGET_FILE_SIZE)
169-
.isLessThanOrEqualTo(VECTOR_STORE_TARGET_FILE_SIZE + 256 * 1024);
168+
.isGreaterThanOrEqualTo(VECTOR_TARGET_FILE_SIZE)
169+
.isLessThanOrEqualTo(VECTOR_TARGET_FILE_SIZE + 256 * 1024);
170170
}
171171

172172
// Verify total record count
@@ -252,7 +252,7 @@ void testVectorStoreStatsVectorStorePart() throws Exception {
252252
Arrays.asList("f3", "f4"),
253253
TARGET_FILE_SIZE,
254254
TARGET_FILE_SIZE,
255-
VECTOR_STORE_TARGET_FILE_SIZE,
255+
VECTOR_TARGET_FILE_SIZE,
256256
schema,
257257
pathFactory,
258258
() -> seqNumCounter,
@@ -322,7 +322,7 @@ public void testVectorStoreNoBlob() throws Exception {
322322
Arrays.asList("f2", "f3"),
323323
TARGET_FILE_SIZE,
324324
TARGET_FILE_SIZE,
325-
VECTOR_STORE_TARGET_FILE_SIZE,
325+
VECTOR_TARGET_FILE_SIZE,
326326
schema,
327327
pathFactory,
328328
() -> seqNumCounter,
@@ -378,7 +378,7 @@ public void testVectorStoreTheSameFormat() throws Exception {
378378
Arrays.asList("f3", "f4"),
379379
TARGET_FILE_SIZE,
380380
TARGET_FILE_SIZE,
381-
VECTOR_STORE_TARGET_FILE_SIZE,
381+
VECTOR_TARGET_FILE_SIZE,
382382
SCHEMA,
383383
pathFactory,
384384
() -> seqNumCounter,

paimon-core/src/test/java/org/apache/paimon/append/VectorStoreTableTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,11 @@ protected Schema schemaDefault() {
228228
schemaBuilder.column("f3", DataTypes.VECTOR(VECTOR_DIM, DataTypes.FLOAT()));
229229
schemaBuilder.column("f4", DataTypes.INT());
230230
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "2 MB");
231-
schemaBuilder.option(CoreOptions.VECTOR_STORE_TARGET_FILE_SIZE.key(), "4 MB");
231+
schemaBuilder.option(CoreOptions.VECTOR_TARGET_FILE_SIZE.key(), "4 MB");
232232
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
233233
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
234-
schemaBuilder.option(CoreOptions.VECTOR_STORE_FIELDS.key(), "f3,f4");
235-
schemaBuilder.option(CoreOptions.VECTOR_STORE_FORMAT.key(), "json");
234+
schemaBuilder.option(CoreOptions.VECTOR_FIELDS.key(), "f3,f4");
235+
schemaBuilder.option(CoreOptions.VECTOR_FILE_FORMAT.key(), "json");
236236
schemaBuilder.option(CoreOptions.FILE_COMPRESSION.key(), "none");
237237
return schemaBuilder.build();
238238
}
@@ -244,11 +244,11 @@ private Schema schemaWithoutBlob() {
244244
schemaBuilder.column("f2", DataTypes.VECTOR(VECTOR_DIM, DataTypes.FLOAT()));
245245
schemaBuilder.column("f3", DataTypes.INT());
246246
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "2 MB");
247-
schemaBuilder.option(CoreOptions.VECTOR_STORE_TARGET_FILE_SIZE.key(), "4 MB");
247+
schemaBuilder.option(CoreOptions.VECTOR_TARGET_FILE_SIZE.key(), "4 MB");
248248
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
249249
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
250-
schemaBuilder.option(CoreOptions.VECTOR_STORE_FIELDS.key(), "f2,f3");
251-
schemaBuilder.option(CoreOptions.VECTOR_STORE_FORMAT.key(), "json");
250+
schemaBuilder.option(CoreOptions.VECTOR_FIELDS.key(), "f2,f3");
251+
schemaBuilder.option(CoreOptions.VECTOR_FILE_FORMAT.key(), "json");
252252
schemaBuilder.option(CoreOptions.FILE_COMPRESSION.key(), "none");
253253
return schemaBuilder.build();
254254
}

paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void testCompactPlannerSingleFile() {
6161

6262
DataEvolutionCompactCoordinator.CompactPlanner planner =
6363
new DataEvolutionCompactCoordinator.CompactPlanner(
64-
false, 128 * 1024 * 1024, 4 * 1024 * 1024, 2);
64+
false, false, 128 * 1024 * 1024, 4 * 1024 * 1024, 2);
6565

6666
List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
6767

@@ -78,15 +78,15 @@ public void testCompactPlannerContiguousFiles() {
7878

7979
// Use small target file size to trigger compaction
8080
DataEvolutionCompactCoordinator.CompactPlanner planner =
81-
new DataEvolutionCompactCoordinator.CompactPlanner(false, 199, 1, 2);
81+
new DataEvolutionCompactCoordinator.CompactPlanner(false, false, 199, 1, 2);
8282

8383
List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
8484

8585
assertThat(tasks).isNotEmpty();
8686
assertThat(tasks.get(0).compactBefore())
8787
.containsExactly(entries.get(0).file(), entries.get(1).file());
8888

89-
planner = new DataEvolutionCompactCoordinator.CompactPlanner(false, 200, 1, 2);
89+
planner = new DataEvolutionCompactCoordinator.CompactPlanner(false, false, 200, 1, 2);
9090
tasks = planner.compactPlan(entries);
9191
assertThat(tasks).isNotEmpty();
9292
assertThat(tasks.get(0).compactBefore())
@@ -107,7 +107,7 @@ public void testCompactPlannerWithRowIdGap() {
107107
// Use large target file size so compaction is triggered by gap, not size
108108
DataEvolutionCompactCoordinator.CompactPlanner planner =
109109
new DataEvolutionCompactCoordinator.CompactPlanner(
110-
false, 128 * 1024 * 1024, 4 * 1024 * 1024, 2);
110+
false, false, 128 * 1024 * 1024, 4 * 1024 * 1024, 2);
111111

112112
List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
113113

@@ -137,7 +137,7 @@ public void testCompactPlannerSkipsLargeFiles() {
137137

138138
DataEvolutionCompactCoordinator.CompactPlanner planner =
139139
new DataEvolutionCompactCoordinator.CompactPlanner(
140-
false, 100 * 1024 * 1024, 4 * 1024 * 1024, 2);
140+
false, false, 100 * 1024 * 1024, 4 * 1024 * 1024, 2);
141141

142142
List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
143143

@@ -165,7 +165,7 @@ public void testCompactPlannerWithBlobFiles() {
165165

166166
// Use small target to trigger compaction, with blob compaction enabled
167167
DataEvolutionCompactCoordinator.CompactPlanner planner =
168-
new DataEvolutionCompactCoordinator.CompactPlanner(true, 1024, 1024, 2);
168+
new DataEvolutionCompactCoordinator.CompactPlanner(true, false, 1024, 1024, 2);
169169

170170
List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
171171

@@ -244,7 +244,7 @@ public void testPlanWithNullManifestRowId() {
244244
.thenReturn(Arrays.asList(entry1, entry2).iterator());
245245

246246
DataEvolutionCompactCoordinator coordinator =
247-
new DataEvolutionCompactCoordinator(table, false);
247+
new DataEvolutionCompactCoordinator(table, false, false);
248248
List<DataEvolutionCompactTask> tasks = coordinator.plan();
249249

250250
assertThat(tasks).hasSize(1);

paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.paimon.io.DataFileMeta;
2323
import org.apache.paimon.manifest.FileSource;
2424
import org.apache.paimon.operation.DataEvolutionSplitRead.FieldBunch;
25-
import org.apache.paimon.operation.DataEvolutionSplitRead.SplitBunch;
25+
import org.apache.paimon.operation.DataEvolutionSplitRead.SpecialFieldBunch;
2626
import org.apache.paimon.stats.SimpleStats;
2727
import org.apache.paimon.types.DataField;
2828
import org.apache.paimon.types.DataTypes;
@@ -42,14 +42,14 @@
4242
import static org.assertj.core.api.Assertions.assertThatCode;
4343
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4444

45-
/** Tests for {@link SplitBunch}. */
45+
/** Tests for {@link SpecialFieldBunch}. */
4646
public class DataEvolutionReadTest {
4747

48-
private SplitBunch blobBunch;
48+
private SpecialFieldBunch blobBunch;
4949

5050
@BeforeEach
5151
public void setUp() {
52-
blobBunch = new SplitBunch(Long.MAX_VALUE, false);
52+
blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, false);
5353
}
5454

5555
@Test
@@ -223,7 +223,7 @@ public void testComplexBlobBunchScenario2() {
223223
splitFieldBunches(batch, file -> makeBlobRowType(file.writeCols(), f -> 0));
224224
assertThat(fieldBunches.size()).isEqualTo(2);
225225

226-
SplitBunch blobBunch = (SplitBunch) fieldBunches.get(1);
226+
SpecialFieldBunch blobBunch = (SpecialFieldBunch) fieldBunches.get(1);
227227
assertThat(blobBunch.files).hasSize(4);
228228
assertThat(blobBunch.files.get(0).fileName()).contains("blob5");
229229
assertThat(blobBunch.files.get(1).fileName()).contains("blob9");
@@ -275,14 +275,14 @@ public void testComplexBlobBunchScenario3() {
275275
batch, file -> makeBlobRowType(file.writeCols(), String::hashCode));
276276
assertThat(fieldBunches.size()).isEqualTo(3);
277277

278-
SplitBunch blobBunch = (SplitBunch) fieldBunches.get(1);
278+
SpecialFieldBunch blobBunch = (SpecialFieldBunch) fieldBunches.get(1);
279279
assertThat(blobBunch.files).hasSize(4);
280280
assertThat(blobBunch.files.get(0).fileName()).contains("blob5");
281281
assertThat(blobBunch.files.get(1).fileName()).contains("blob9");
282282
assertThat(blobBunch.files.get(2).fileName()).contains("blob7");
283283
assertThat(blobBunch.files.get(3).fileName()).contains("blob8");
284284

285-
blobBunch = (SplitBunch) fieldBunches.get(2);
285+
blobBunch = (SpecialFieldBunch) fieldBunches.get(2);
286286
assertThat(blobBunch.files).hasSize(4);
287287
assertThat(blobBunch.files.get(0).fileName()).contains("blob15");
288288
assertThat(blobBunch.files.get(1).fileName()).contains("blob19");
@@ -329,22 +329,22 @@ private DataFileMeta createBlobFileWithCols(
329329

330330
@Test
331331
public void testRowIdPushDown() {
332-
SplitBunch blobBunch = new SplitBunch(Long.MAX_VALUE, true);
332+
SpecialFieldBunch blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, true);
333333
DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
334334
DataFileMeta blobEntry2 = createBlobFile("blob2", 200, 300, 1);
335335
blobBunch.add(blobEntry1);
336-
SplitBunch finalBlobBunch = blobBunch;
336+
SpecialFieldBunch finalBlobBunch = blobBunch;
337337
DataFileMeta finalBlobEntry = blobEntry2;
338338
assertThatCode(() -> finalBlobBunch.add(finalBlobEntry)).doesNotThrowAnyException();
339339

340-
blobBunch = new SplitBunch(Long.MAX_VALUE, true);
340+
blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, true);
341341
blobEntry1 = createBlobFile("blob1", 0, 100, 1);
342342
blobEntry2 = createBlobFile("blob2", 50, 200, 2);
343343
blobBunch.add(blobEntry1);
344344
blobBunch.add(blobEntry2);
345345
assertThat(blobBunch.files).containsExactlyInAnyOrder(blobEntry2);
346346

347-
SplitBunch finalBlobBunch2 = blobBunch;
347+
SpecialFieldBunch finalBlobBunch2 = blobBunch;
348348
DataFileMeta blobEntry3 = createBlobFile("blob2", 250, 100, 2);
349349
assertThatCode(() -> finalBlobBunch2.add(blobEntry3)).doesNotThrowAnyException();
350350
}

0 commit comments

Comments
 (0)