This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
向量搜素
向量搜索 #
Batch Streaming
Flink SQL 提供了 VECTOR_SEARCH 表值函数 (TVF) 来在 SQL 查询中执行向量搜索。该函数允许您根据高维向量搜索相似的行。
VECTOR_SEARCH 函数 #
VECTOR_SEARCH 使用处理时间属性 (processing-time attribute) 将行与外部表中的最新版本数据关联起来。它与 Flink SQL 中的 lookup-join 非常相似,但区别在于 VECTOR_SEARCH 使用输入数据向量与外部表中的数据比较相似度,并返回 top-k 个最相似的行。
语法 #
SELECT * FROM input_table, LATERAL TABLE(VECTOR_SEARCH(
TABLE vector_table,
input_table.vector_column,
DESCRIPTOR(index_column),
top_k,
[CONFIG => MAP['key', 'value']]
))
参数 #
input_table: 包含待处理数据的输入表。vector_table: 允许通过向量进行搜索的外部表的名称。vector_column: 输入表中的列名,其类型应为 FLOAT ARRAY 或 DOUBLE ARRAY。index_column: 一个描述符 (descriptor),指定应使用向量表 (vector_table) 中的哪一列与输入数据进行相似度比较。top_k: 要返回的 top-k 个最相似行的数量。config: (可选) 用于向量搜索的配置选项。
配置选项 #
可以在 config map 中指定以下配置选项:
| Key | Default | Type | Description |
|---|---|---|---|
async |
(none) | Boolean | Value can be 'true' or 'false' to suggest the planner choose the corresponding predict function. If the backend search function provider does not support the suggested mode, it will throw exception to notify users. |
max-concurrent-operations |
(none) | Integer | The max number of async i/o operation that the async vector search call can trigger. |
output-mode |
(none) | Enum |
Output mode for asynchronous vector search call operations which will convert to {@see AsyncDataStream.OutputMode}, ORDERED by default. If set to ALLOW_UNORDERED, will attempt to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not affect the correctness of the result, otherwise ORDERED will be still used. Possible values:
|
timeout |
(none) | Duration | Timeout from first invoke to final completion of asynchronous vector search call operation, may include multiple retries, and will be reset in case of failover. |
示例 #
-- 基本用法
SELECT * FROM
input_table, LATERAL TABLE(VECTOR_SEARCH(
TABLE vector_table,
input_table.vector_column,
DESCRIPTOR(index_column),
10
));
-- 带配置选项
SELECT * FROM
input_table, LATERAL TABLE(VECTOR_SEARCH(
TABLE vector_table,
input_table.vector_column,
DESCRIPTOR(index_column),
10,
MAP['async', 'true', 'timeout', '100s']
));
-- 使用命名参数
SELECT * FROM
input_table, LATERAL TABLE(VECTOR_SEARCH(
SEARCH_TABLE => TABLE vector_table,
COLUMN_TO_QUERY => input_table.vector_column,
COLUMN_TO_SEARCH => DESCRIPTOR(index_column),
TOP_K => 10,
CONFIG => MAP['async', 'true', 'timeout', '100s']
));
-- 使用常量值搜索
SELECT * FROM TABLE(VECTOR_SEARCH(
TABLE vector_table,
ARRAY[10, 20],
DESCRIPTOR(index_column),
10
));
输出 #
输出表包含输入表的所有列、向量搜索表 (vector search table) 的列,以及一个名为 score 的列,用于表示输入行与匹配行之间的相似度。
注意事项 #
- 向量表 (vector table) 的实现必须实现
org.apache.flink.table.connector.source.VectorSearchTableSource接口。详情请参阅 Vector Search Table Source。 VECTOR_SEARCH仅支持读取仅 append-only 表。- 当函数调用与其它表没有关联时,
VECTOR_SEARCH不需要LATERAL关键字。例如,如果搜索列是一个常量或字面值 (literal value),LATERAL可以被省略。