Skip to content

Commit c014f39

Browse files
committed
update vector-field config
1 parent 0148245 commit c014f39

File tree

22 files changed

+316
-320
lines changed

22 files changed

+316
-320
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1498,7 +1498,7 @@
14981498
<td><h5>vector-field</h5></td>
14991499
<td style="word-wrap: break-word;">(none)</td>
15001500
<td>String</td>
1501-
<td>Specify the vector store fields.</td>
1501+
<td>Specifies column names that should be stored as vector type. This is used when you want to treat a ARRAY column as a VECTOR.</td>
15021502
</tr>
15031503
<tr>
15041504
<td><h5>vector.file.format</h5></td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2310,11 +2310,13 @@ public InlineElement getDescription() {
23102310
.noDefaultValue()
23112311
.withDescription("Specify the vector store file format.");
23122312

2313-
public static final ConfigOption<String> VECTOR_FIELDS =
2313+
public static final ConfigOption<String> VECTOR_FIELD =
23142314
key("vector-field")
23152315
.stringType()
23162316
.noDefaultValue()
2317-
.withDescription("Specify the vector store fields.");
2317+
.withDescription(
2318+
"Specifies column names that should be stored as vector type. "
2319+
+ "This is used when you want to treat a ARRAY column as a VECTOR.");
23182320

23192321
public static final ConfigOption<MemorySize> VECTOR_TARGET_FILE_SIZE =
23202322
key("vector.target-file-size")
@@ -3642,20 +3644,27 @@ public Duration visibilityCallbackCheckInterval() {
36423644
return options.get(VISIBILITY_CALLBACK_CHECK_INTERVAL);
36433645
}
36443646

3645-
public String vectorStoreFileFormatString() {
3647+
public String vectorFileFormatString() {
36463648
return normalizeFileFormat(options.get(VECTOR_FILE_FORMAT));
36473649
}
36483650

3649-
public List<String> vectorStoreFieldNames() {
3650-
String vectorStoreFields = options.get(CoreOptions.VECTOR_FIELDS);
3651-
if (vectorStoreFields == null || vectorStoreFields.trim().isEmpty()) {
3652-
return new ArrayList<>();
3653-
} else {
3654-
return Arrays.asList(vectorStoreFields.trim().split(","));
3651+
public Set<String> vectorField() {
3652+
String vectorFields = options.get(CoreOptions.VECTOR_FIELD);
3653+
if (vectorFields == null || vectorFields.trim().isEmpty()) {
3654+
return Collections.emptySet();
3655+
}
3656+
return Arrays.stream(vectorFields.trim().split(",")).collect(Collectors.toSet());
3657+
}
3658+
3659+
public static Set<String> vectorField(Map<String, String> options) {
3660+
String vectorFields = options.getOrDefault(CoreOptions.VECTOR_FIELD.key(), null);
3661+
if (vectorFields == null || vectorFields.trim().isEmpty()) {
3662+
return Collections.emptySet();
36553663
}
3664+
return Arrays.stream(vectorFields.trim().split(",")).collect(Collectors.toSet());
36563665
}
36573666

3658-
public long vectorStoreTargetFileSize() {
3667+
public long vectorTargetFileSize() {
36593668
// Since vectors are large, it would be better to set a larger target size for vectors.
36603669
return options.getOptional(VECTOR_TARGET_FILE_SIZE)
36613670
.map(MemorySize::getBytes)

paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,15 +107,12 @@ public static FileFormat fileFormat(CoreOptions options) {
107107
return FileFormat.fromIdentifier(options.fileFormatString(), options.toConfiguration());
108108
}
109109

110-
public static FileFormat vectorStoreFileFormat(CoreOptions options) {
111-
if (options.vectorStoreFieldNames().isEmpty()) {
112-
return null;
113-
}
114-
String vectorStoreFileFormat = options.vectorStoreFileFormatString();
115-
if (vectorStoreFileFormat == null) {
110+
public static FileFormat vectorFileFormat(CoreOptions options) {
111+
String vectorFileFormat = options.vectorFileFormatString();
112+
if (vectorFileFormat == null) {
116113
return fileFormat(options);
117114
}
118-
return FileFormat.fromIdentifier(vectorStoreFileFormat, options.toConfiguration());
115+
return FileFormat.fromIdentifier(vectorFileFormat, options.toConfiguration());
119116
}
120117

121118
public static FileFormat manifestFormat(CoreOptions options) {

paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java

Lines changed: 35 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,10 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner {
7777
private final FileIO fileIO;
7878
private final long schemaId;
7979
private final FileFormat fileFormat;
80-
private final FileFormat vectorStoreFileFormat;
81-
private final List<String> vectorStoreFieldNames;
80+
private final FileFormat vectorFileFormat;
8281
private final long targetFileSize;
8382
private final long blobTargetFileSize;
84-
private final long vectorStoreTargetFileSize;
83+
private final long vectorTargetFileSize;
8584
private final RowType writeSchema;
8685
@Nullable private final List<String> writeCols;
8786
private final DataFilePathFactory pathFactory;
@@ -112,11 +111,10 @@ public AppendOnlyWriter(
112111
@Nullable IOManager ioManager,
113112
long schemaId,
114113
FileFormat fileFormat,
115-
FileFormat vectorStoreFileFormat,
116-
List<String> vectorStoreFieldNames,
114+
FileFormat vectorFileFormat,
117115
long targetFileSize,
118116
long blobTargetFileSize,
119-
long vectorStoreTargetFileSize,
117+
long vectorTargetFileSize,
120118
RowType writeSchema,
121119
@Nullable List<String> writeCols,
122120
long maxSequenceNumber,
@@ -139,11 +137,10 @@ public AppendOnlyWriter(
139137
this.fileIO = fileIO;
140138
this.schemaId = schemaId;
141139
this.fileFormat = fileFormat;
142-
this.vectorStoreFileFormat = vectorStoreFileFormat;
143-
this.vectorStoreFieldNames = vectorStoreFieldNames;
140+
this.vectorFileFormat = vectorFileFormat;
144141
this.targetFileSize = targetFileSize;
145142
this.blobTargetFileSize = blobTargetFileSize;
146-
this.vectorStoreTargetFileSize = vectorStoreTargetFileSize;
143+
this.vectorTargetFileSize = vectorTargetFileSize;
147144
this.writeSchema = writeSchema;
148145
this.writeCols = writeCols;
149146
this.pathFactory = pathFactory;
@@ -317,33 +314,38 @@ public void toBufferedWriter() throws Exception {
317314
}
318315

319316
private RollingFileWriter<InternalRow, DataFileMeta> createRollingRowWriter() {
320-
boolean hasBlob = (blobContext != null);
321-
List<DataField> fieldsInBlobFile =
322-
hasBlob
323-
? fieldsInBlobFile(writeSchema, blobContext.blobDescriptorFields())
324-
: Collections.emptyList();
325-
Set<String> blobFieldNames =
326-
fieldsInBlobFile.stream().map(DataField::name).collect(Collectors.toSet());
327-
boolean hasVectorStore = !vectorStoreFieldNames.isEmpty();
328-
boolean hasNormal =
329-
writeSchema.getFields().stream()
330-
.anyMatch(
331-
f ->
332-
!blobFieldNames.contains(f.name())
333-
&& !vectorStoreFieldNames.contains(f.name()));
334-
boolean hasSeparatedVectorStore =
335-
VectorStoreUtils.isDifferentFormat(vectorStoreFileFormat, fileFormat);
336-
337-
if (hasBlob || (hasNormal && hasVectorStore && hasSeparatedVectorStore)) {
317+
boolean hasNormal, hasBlob, hasVectorStore;
318+
{
319+
hasBlob = (blobContext != null);
320+
321+
List<DataField> fieldsInVectorFile =
322+
VectorStoreUtils.fieldsInVectorFile(writeSchema, fileFormat, vectorFileFormat);
323+
Set<String> vectorFieldNames =
324+
fieldsInVectorFile.stream().map(DataField::name).collect(Collectors.toSet());
325+
hasVectorStore = !fieldsInVectorFile.isEmpty();
326+
327+
List<DataField> fieldsInBlobFile =
328+
hasBlob
329+
? fieldsInBlobFile(writeSchema, blobContext.blobDescriptorFields())
330+
: Collections.emptyList();
331+
Set<String> blobFieldNames =
332+
fieldsInBlobFile.stream().map(DataField::name).collect(Collectors.toSet());
333+
hasNormal =
334+
writeSchema.getFields().stream()
335+
.anyMatch(
336+
f ->
337+
!blobFieldNames.contains(f.name())
338+
&& !vectorFieldNames.contains(f.name()));
339+
}
340+
if (hasBlob || (hasNormal && hasVectorStore)) {
338341
return new DataEvolutionRollingFileWriter(
339342
fileIO,
340343
schemaId,
341344
fileFormat,
342-
vectorStoreFileFormat,
343-
vectorStoreFieldNames,
345+
vectorFileFormat,
344346
targetFileSize,
345347
blobTargetFileSize,
346-
vectorStoreTargetFileSize,
348+
vectorTargetFileSize,
347349
writeSchema,
348350
pathFactory,
349351
seqNumCounterProvider,
@@ -354,20 +356,15 @@ private RollingFileWriter<InternalRow, DataFileMeta> createRollingRowWriter() {
354356
statsDenseStore,
355357
blobContext);
356358
}
357-
FileFormat realFileFormat = hasNormal ? fileFormat : vectorStoreFileFormat;
358-
long realTargetFileSize = hasNormal ? targetFileSize : vectorStoreTargetFileSize;
359-
DataFilePathFactory realPathFactory =
360-
hasNormal
361-
? pathFactory
362-
: pathFactory.vectorStorePathFactory(
363-
vectorStoreFileFormat.getFormatIdentifier());
359+
FileFormat realFileFormat = hasNormal ? fileFormat : vectorFileFormat;
360+
long realTargetFileSize = hasNormal ? targetFileSize : vectorTargetFileSize;
364361
return new RowDataRollingFileWriter(
365362
fileIO,
366363
schemaId,
367364
realFileFormat,
368365
realTargetFileSize,
369366
writeSchema,
370-
realPathFactory,
367+
pathFactory,
371368
seqNumCounterProvider,
372369
fileCompression,
373370
statsCollectorFactories.statsCollectors(writeSchema.getFieldNames()),

paimon-core/src/main/java/org/apache/paimon/append/DataEvolutionRollingFileWriter.java

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.apache.paimon.types.DataField;
3737
import org.apache.paimon.types.RowType;
3838
import org.apache.paimon.utils.LongCounter;
39-
import org.apache.paimon.utils.Pair;
4039
import org.apache.paimon.utils.Preconditions;
4140
import org.apache.paimon.utils.StatsCollectorFactories;
4241
import org.apache.paimon.utils.VectorStoreUtils;
@@ -52,10 +51,11 @@
5251
import java.util.HashMap;
5352
import java.util.List;
5453
import java.util.Map;
54+
import java.util.Set;
5555
import java.util.function.Supplier;
56+
import java.util.stream.Collectors;
5657

5758
import static org.apache.paimon.types.BlobType.fieldsInBlobFile;
58-
import static org.apache.paimon.types.BlobType.fieldsNotInBlobFile;
5959

6060
/**
6161
* A rolling file writer that handles both normal data and blob data. This writer creates separate
@@ -114,11 +114,10 @@ public DataEvolutionRollingFileWriter(
114114
FileIO fileIO,
115115
long schemaId,
116116
FileFormat fileFormat,
117-
FileFormat vectorStoreFileFormat,
118-
List<String> vectorStoreFieldNames,
117+
FileFormat vectorFileFormat,
119118
long targetFileSize,
120119
long blobTargetFileSize,
121-
long vectorStoreTargetFileSize,
120+
long vectorTargetFileSize,
122121
RowType writeSchema,
123122
DataFilePathFactory pathFactory,
124123
Supplier<LongCounter> seqNumCounterSupplier,
@@ -140,20 +139,22 @@ public DataEvolutionRollingFileWriter(
140139
boolean asyncFileWrite = false;
141140

142141
// Split into normal, blob, and vector-store parts
143-
List<DataField> fieldsInNormalFile, fieldsInBlobFile, fieldsInVectorFile;
142+
List<DataField> fieldsInBlobFile =
143+
(context == null)
144+
? Collections.emptyList()
145+
: fieldsInBlobFile(writeSchema, context.blobDescriptorFields());
146+
List<DataField> fieldsInVectorFile =
147+
VectorStoreUtils.fieldsInVectorFile(writeSchema, fileFormat, vectorFileFormat);
148+
List<DataField> fieldsInNormalFile = new ArrayList<>();
144149
{
145-
fieldsInBlobFile = fieldsInBlobFile(writeSchema, context.blobDescriptorFields());
146-
List<DataField> fieldsNotInBlobFile =
147-
fieldsNotInBlobFile(writeSchema, context.blobDescriptorFields());
148-
if (VectorStoreUtils.isDifferentFormat(vectorStoreFileFormat, fileFormat)) {
149-
Pair<RowType, RowType> typeWithVectorStore =
150-
VectorStoreUtils.splitVectorStore(
151-
fieldsNotInBlobFile, vectorStoreFieldNames);
152-
fieldsInNormalFile = typeWithVectorStore.getLeft().getFields();
153-
fieldsInVectorFile = typeWithVectorStore.getRight().getFields();
154-
} else {
155-
fieldsInNormalFile = fieldsNotInBlobFile;
156-
fieldsInVectorFile = Collections.emptyList();
150+
Set<String> fieldsNotInNormalFile =
151+
fieldsInBlobFile.stream().map(DataField::name).collect(Collectors.toSet());
152+
fieldsNotInNormalFile.addAll(
153+
fieldsInVectorFile.stream().map(DataField::name).collect(Collectors.toSet()));
154+
for (DataField field : writeSchema.getFields()) {
155+
if (!fieldsNotInNormalFile.contains(field.name())) {
156+
fieldsInNormalFile.add(field);
157+
}
157158
}
158159
}
159160

@@ -195,7 +196,7 @@ public DataEvolutionRollingFileWriter(
195196
}
196197

197198
// Initialize writer for descriptor fields backed by external storage if needed.
198-
if (!context.blobExternalStorageFields().isEmpty()) {
199+
if ((context != null) && !context.blobExternalStorageFields().isEmpty()) {
199200
this.externalStorageBlobWriter =
200201
new ExternalStorageBlobWriter(
201202
fileIO,
@@ -215,22 +216,24 @@ public DataEvolutionRollingFileWriter(
215216

216217
// Initialize vector-store writer
217218
if (!fieldsInVectorFile.isEmpty()) {
219+
List<String> vectorFieldNames =
220+
fieldsInVectorFile.stream().map(DataField::name).collect(Collectors.toList());
218221
this.vectorStoreWriterFactory =
219222
() ->
220223
createVectorStoreWriter(
221224
fileIO,
222225
schemaId,
223-
vectorStoreFileFormat,
226+
vectorFileFormat,
224227
fieldsInVectorFile,
225228
writeSchema,
226229
pathFactory,
227230
seqNumCounterSupplier,
228231
fileCompression,
229-
statsCollectorFactories.statsCollectors(vectorStoreFieldNames),
232+
statsCollectorFactories.statsCollectors(vectorFieldNames),
230233
fileSource,
231234
asyncFileWrite,
232235
statsDenseStore,
233-
vectorStoreTargetFileSize);
236+
vectorTargetFileSize);
234237
} else {
235238
this.vectorStoreWriterFactory = null;
236239
}
@@ -286,7 +289,7 @@ public DataEvolutionRollingFileWriter(
286289
createVectorStoreWriter(
287290
FileIO fileIO,
288291
long schemaId,
289-
FileFormat vectorStoreFileFormat,
292+
FileFormat vectorFileFormat,
290293
List<DataField> fieldsInVectorStore,
291294
RowType writeSchema,
292295
DataFilePathFactory pathFactory,
@@ -302,14 +305,14 @@ public DataEvolutionRollingFileWriter(
302305
int[] vectorStoreProjection = writeSchema.projectIndexes(vectorStoreColumnNames);
303306

304307
DataFilePathFactory vectorStorePathFactory =
305-
pathFactory.vectorStorePathFactory(vectorStoreFileFormat.getFormatIdentifier());
308+
pathFactory.vectorStorePathFactory(vectorFileFormat.getFormatIdentifier());
306309
return new ProjectedFileWriter<>(
307310
new RollingFileWriterImpl<>(
308311
() ->
309312
new RowDataFileWriter(
310313
fileIO,
311314
RollingFileWriter.createFileWriterContext(
312-
vectorStoreFileFormat,
315+
vectorFileFormat,
313316
vectorStoreRowType,
314317
statsCollectors,
315318
fileCompression),

paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.paimon.table.sink.CommitMessage;
3434
import org.apache.paimon.table.sink.CommitMessageImpl;
3535
import org.apache.paimon.table.source.DataSplit;
36+
import org.apache.paimon.types.DataField;
3637
import org.apache.paimon.types.DataTypeRoot;
3738
import org.apache.paimon.types.RowType;
3839
import org.apache.paimon.utils.FileStorePathFactory;
@@ -43,6 +44,7 @@
4344
import java.util.List;
4445
import java.util.Map;
4546
import java.util.Objects;
47+
import java.util.Set;
4648
import java.util.stream.Collectors;
4749

4850
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -74,12 +76,14 @@ public CommitMessage doCompact(FileStoreTable table, String commitUser) throws E
7476
// TODO: support vector-store file compaction
7577
throw new UnsupportedOperationException("Vector-store task is not supported");
7678
}
77-
List<String> separatedVectorStoreFields =
78-
VectorStoreUtils.isDifferentFormat(
79-
FileFormat.vectorStoreFileFormat(table.coreOptions()),
79+
Set<String> separatedVectorStoreFields =
80+
VectorStoreUtils.fieldsInVectorFile(
81+
table.rowType(),
82+
FileFormat.vectorFileFormat(table.coreOptions()),
8083
FileFormat.fileFormat(table.coreOptions()))
81-
? table.coreOptions().vectorStoreFieldNames()
82-
: Collections.emptyList();
84+
.stream()
85+
.map(DataField::name)
86+
.collect(Collectors.toSet());
8387

8488
table = table.copy(DYNAMIC_WRITE_OPTIONS);
8589
long firstRowId = compactBefore.get(0).nonNullFirstRowId();

paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ private VectorStoreWrapper(DataFilePathFactory base, String formatIdentifier) {
264264

265265
@Override
266266
protected String makeExtension(String compressExtension, String formatIdentifier) {
267-
return ".vector-store" + super.makeExtension(compressExtension, formatIdentifier);
267+
return ".vector" + super.makeExtension(compressExtension, formatIdentifier);
268268
}
269269
}
270270
}

paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,10 @@ protected RecordWriter<InternalRow> createWriter(
131131
ioManager,
132132
schemaId,
133133
fileFormat,
134-
FileFormat.vectorStoreFileFormat(options),
135-
options.vectorStoreFieldNames(),
134+
FileFormat.vectorFileFormat(options),
136135
options.targetFileSize(false),
137136
options.blobTargetFileSize(),
138-
options.vectorStoreTargetFileSize(),
137+
options.vectorTargetFileSize(),
139138
writeType,
140139
writeCols,
141140
restoredMaxSeqNumber,

0 commit comments

Comments
 (0)