This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
DataGen
DataGen SQL 连接器 #
Scan Source: 有界 Scan Source: 无界
DataGen 连接器允许基于内存生成数据来创建表。 在本地开发时,若不访问外部系统(如 Kafka),这会非常有用。 可以使用计算列语法灵活地生成记录。
DataGen 连接器是内置的,不需要额外的依赖项。
用法 #
默认情况下,DataGen 表将创建无限数量的行,每列都有一个随机值。 还可以指定总行数,从而生成有界表。
DataGen 连接器可以生成符合其 schema 的数据,应该注意的是,它按如下方式处理长度受限的字段:
- 对于固定长度的数据类型(char、binary),字段长度只能由 schema 定义,且不支持自定义;
- 对于可变长度数据类型 (varchar、varbinary),字段默认长度由 schema 定义,且自定义长度不能大于 schema 定义;
- 对于超长字段(string、bytes),字段默认长度为 100,但可以定义为小于 2^31 的长度。
还支持序列生成器,您可以指定序列的起始和结束值。 如果表中有任一列是序列类型,则该表将是有界的,并在第一个序列完成时结束。
时间类型字段对应的值始终是本地机器当前系统时间。
CREATE TABLE Orders (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen'
)
DataGen 连接器通常与 LIKE 子句结合使用,以模拟物理表。
CREATE TABLE Orders (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3)
) WITH (...)
-- create a bounded mock table
CREATE TEMPORARY TABLE GenOrders
WITH (
'connector' = 'datagen',
'number-of-rows' = '10'
)
LIKE Orders (EXCLUDING ALL)
此外,对于可变长度类型(varchar、string、varbinary 和 bytes),您可以指定是否生成可变长度的数据。
CREATE TABLE Orders (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3),
seller VARCHAR(150)
) WITH (
'connector' = 'datagen',
'fields.seller.var-len' = 'true'
)
对于集合类型,你可以指定集合的大小(元素个数)。
CREATE TABLE Orders (
f0 Array<INT>,
f1 Map<INT, STRING>,
f2 MULTISET<INT>
) WITH (
'connector' = 'datagen',
'fields.f0.length' = '10',
'fields.f1.length' = '11',
'fields.f2.length' = '12'
);
字段类型 #
| Type | Supported Generators | Notes |
|---|---|---|
| BOOLEAN | random | |
| CHAR | random / sequence | |
| VARCHAR | random / sequence | |
| BINARY | random / sequence | |
| VARBINARY | random / sequence | |
| STRING | random / sequence | |
| DECIMAL | random / sequence | |
| TINYINT | random / sequence | |
| SMALLINT | random / sequence | |
| INT | random / sequence | |
| BIGINT | random / sequence | |
| FLOAT | random / sequence | |
| DOUBLE | random / sequence | |
| DATE | random | 总是解析为本地机器的当前日期。 |
| TIME | random | 总是解析为本地机器的当前时间。 |
| TIMESTAMP | random | 解析为相对于本地机器的当前时间戳向过去偏移的时间戳。偏移的最大值可以通过 'max-past' 选项指定。 |
| TIMESTAMP_LTZ | random | 解析为相对于本地机器的当前时间戳向过去偏移的时间戳。偏移的最大值可以通过 'max-past' 选项指定。 |
| INTERVAL YEAR TO MONTH | random | |
| INTERVAL DAY TO MONTH | random | |
| ROW | random | 生成具有随机字段数据的行。 |
| ARRAY | random | 生成具有随机元素的数组。 |
| MAP | random | 生成具有随机元素的 Map。 |
| MULTISET | random | 生成具有随机元素的多重集。 |
连接器参数 #
| 参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
|---|---|---|---|---|
connector |
必须 | (none) | String | 指定要使用的连接器,这里是 'datagen'。 |
rows-per-second |
可选 | 10000 | Long | 每秒生成的行数,用以控制数据发出速率。 |
number-of-rows |
可选 | (none) | Long | 生成数据的总行数。默认情况下,该表是无界的。 |
scan.parallelism |
可选 | (none) | Integer | 定义算子并行度。不设置将使用全局默认并发。 |
fields.#.kind |
可选 | random | String | 指定 '#' 字段的生成器。可以是 'sequence' 或 'random'。 |
fields.#.min |
可选 | (Minimum value of type) | (Type of field) | 随机生成器的最小值,仅适用于数字类型。 |
fields.#.max |
可选 | (Maximum value of type) | (Type of field) | 随机生成器的最大值,仅适用于数字类型。 |
fields.#.max-past |
可选 | 0 | Duration | 对于 string/bytes 类型为 100,对于 array/map/multiset 类型为 3。 |
fields.#.length |
可选 | 100 | Integer | 随机生成器生成字符的长度,适用于 varchar、varbinary、string、bytes、array、map、multiset。 请注意对于可变长字段(varchar、varbinary),默认长度由 schema 定义,且长度不可设置为大于它; 对于超长字段(string、bytes),默认长度是 100 且可设置为小于 2^31 的长度; 对于结构化字段(数组、Map、多重集),默认元素数量为 3 且可以自定义。 |
fields.#.var-len |
可选 | false | Boolean | 是否生成变长数据,请注意只能用于变长类型(varchar、string、varbinary、bytes)。 |
fields.#.start |
可选 | (none) | (Type of field) | 序列生成器的起始值。 |
fields.#.end |
可选 | (none) | (Type of field) | 序列生成器的结束值。 |
fields.#.null-rate |
optional | 0 | (Type of field) | 空值比例。 |