This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
TableEnvironment #
This document is an introduction to the TableEnvironment, the central concept of the Table API.
It includes detailed descriptions of the public interfaces of the TableEnvironment class.
Create a TableEnvironment #
The recommended way to create a TableEnvironment is to create from an EnvironmentSettings object:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
// create a streaming TableEnvironment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
// create a streaming TableEnvironment
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val tableEnv = TableEnvironment.create(settings)
from pyflink.common import Configuration
from pyflink.table import EnvironmentSettings, TableEnvironment
# create a streaming TableEnvironment
config = Configuration()
config.set_string('execution.buffer-timeout', '1 min')
env_settings = EnvironmentSettings \
.new_instance() \
.in_streaming_mode() \
.with_configuration(config) \
.build()
table_env = TableEnvironment.create(env_settings)
Alternatively, users can create a StreamTableEnvironment from an existing StreamExecutionEnvironment to interoperate with the DataStream API.
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
// create a streaming TableEnvironment from a StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
// create a streaming TableEnvironment from a StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
# create a streaming TableEnvironment from a StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
TableEnvironment API #
Table/SQL Operations #
These APIs are used to create/remove Table API/SQL Tables and write queries:
| Java/Scala | Python | Description |
|---|---|---|
fromValues(values...) |
from_elements(elements, schema) |
Creates a table from a collection of values. |
| N/A | from_pandas(pdf, schema) |
Creates a table from a pandas DataFrame. |
from(path) |
from_path(path) |
Creates a table from a registered table under the specified path. |
createTemporaryView(path, table) |
create_temporary_view(path, table) |
Registers a Table as a temporary view similar to SQL temporary views. |
dropTemporaryView(path) |
drop_temporary_view(path) |
Drops a temporary view registered under the given path. |
createTemporaryTable(path, descriptor) |
create_temporary_table(path, descriptor) |
Creates a temporary table from a TableDescriptor. |
createTable(path, descriptor) |
create_table(path, descriptor) |
Creates a catalog table from a TableDescriptor. |
dropTemporaryTable(path) |
drop_temporary_table(path) |
Drops a temporary table registered under the given path. |
dropTable(path) |
drop_table(path) |
Drops a table registered under the given path. |
executeSql(stmt) |
execute_sql(stmt) |
Executes the given SQL statement and returns the execution result. Supports DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE statements. |
sqlQuery(query) |
sql_query(query) |
Evaluates a SQL query and retrieves the result as a Table. |
See the Java API docs and Python API docs for complete API reference.
Deprecated APIs
| Java/Scala | Python | Description | Replacement |
|---|---|---|---|
scan(tablePath...) |
scan(*table_path) |
Scans a registered table from catalog. | from / from_path |
registerTable(name, table) |
register_table(name, table) |
Registers a Table under a unique name. | createTemporaryView / create_temporary_view |
insertInto(targetPath, table) |
insert_into(target_path, table) |
Writes Table content to a sink. | Table.executeInsert() / execute_insert() |
| N/A | sql_update(stmt) |
Evaluates a SQL statement. | execute_sql |
Execute/Explain Jobs #
These APIs are used to explain/execute jobs. Note that executeSql / execute_sql can also be used to execute jobs.
| Java/Scala | Python | Description |
|---|---|---|
explainSql(stmt, extraDetails...) |
explain_sql(stmt, *extra_details) |
Returns the AST and execution plan of the specified statement. |
createStatementSet() |
create_statement_set() |
Creates a StatementSet for executing multiple statements as a single job. |
Create/Drop User Defined Functions #
These APIs are used to register UDFs or remove registered UDFs.
Note that executeSql / execute_sql can also be used to register/remove UDFs via SQL DDL.
For more details about UDFs, see User Defined Functions.
| Java/Scala | Python | Description |
|---|---|---|
createTemporaryFunction(path, class) |
create_temporary_function(path, function) |
Registers a function as a temporary catalog function. |
createTemporarySystemFunction(name, class) |
create_temporary_system_function(name, function) |
Registers a function as a temporary system function. |
createFunction(path, class) |
create_java_function(path, function_class_name) |
Registers a function class as a catalog function. |
| N/A | create_java_temporary_function(path, function_class_name) |
Registers a Java UDF class as a temporary catalog function. |
| N/A | create_java_temporary_system_function(name, function_class_name) |
Registers a Java UDF class as a temporary system function. |
dropFunction(path) |
drop_function(path) |
Drops a catalog function registered under the given path. |
dropTemporaryFunction(path) |
drop_temporary_function(path) |
Drops a temporary function registered under the given path. |
dropTemporarySystemFunction(name) |
drop_temporary_system_function(name) |
Drops a temporary system function registered under the given name. |
Dependency Management (Python only) #
These APIs are used to manage Python dependencies required by Python UDFs. See Dependency Management for more details.
| Method | Description |
|---|---|
add_python_file(file_path) |
Adds a Python dependency (file, package, or directory) to the PYTHONPATH of UDF workers. |
set_python_requirements(requirements_file_path, cache_dir) |
Specifies a requirements.txt file for third-party dependencies. |
add_python_archive(archive_path, target_dir) |
Adds a Python archive file to be extracted in the working directory of UDF workers. |
Configuration #
// get the TableConfig
TableConfig config = tableEnv.getConfig();
// set configuration options
config.set("parallelism.default", "8");
config.set("pipeline.name", "my_first_job");
See Configuration for all available options.
// get the TableConfig
val config = tableEnv.getConfig
// set configuration options
config.set("parallelism.default", "8")
config.set("pipeline.name", "my_first_job")
See Configuration for all available options.
# get the TableConfig
config = table_env.get_config()
# set configuration options
config.set("parallelism.default", "8")
config.set("pipeline.name", "my_first_job")
See Configuration and Python Configuration for all available options.
Catalog APIs #
These APIs are used to access catalogs and modules. See Modules and Catalogs for more details.
| Java/Scala | Python | Description |
|---|---|---|
registerCatalog(name, catalog) |
register_catalog(name, catalog) |
Registers a Catalog under a unique name. |
getCatalog(name) |
get_catalog(name) |
Gets a registered Catalog by name. |
useCatalog(name) |
use_catalog(name) |
Sets the current catalog. |
getCurrentCatalog() |
get_current_catalog() |
Gets the current default catalog name. |
useDatabase(name) |
use_database(name) |
Sets the current default database. |
getCurrentDatabase() |
get_current_database() |
Gets the current default database name. |
loadModule(name, module) |
load_module(name, module) |
Loads a Module under a unique name. |
unloadModule(name) |
unload_module(name) |
Unloads a Module with the given name. |
useModules(names...) |
use_modules(*names) |
Enables and changes the resolution order of loaded modules. |
listCatalogs() |
list_catalogs() |
Gets the names of all registered catalogs. |
listModules() |
list_modules() |
Gets the names of all enabled modules. |
| N/A | list_full_modules() |
Gets the names of all loaded modules (including disabled). |
listDatabases() |
list_databases() |
Gets the names of all databases in the current catalog. |
listTables() |
list_tables() |
Gets the names of all tables and views in the current database. |
listViews() |
list_views() |
Gets the names of all views in the current database. |
listFunctions() |
list_functions() |
Gets the names of all functions in this environment. |
| N/A | list_user_defined_functions() |
Gets the names of all user-defined functions. |
listTemporaryTables() |
list_temporary_tables() |
Gets the names of all temporary tables and views. |
listTemporaryViews() |
list_temporary_views() |
Gets the names of all temporary views. |
Statebackend, Checkpoint and Restart Strategy #
You can configure statebackend, checkpointing, and restart strategy by setting key-value options in TableConfig.
See Fault Tolerance, State Backends, and Checkpointing for more details.
TableConfig config = tableEnv.getConfig();
// set the restart strategy to "fixed-delay"
config.set("restart-strategy.type", "fixed-delay");
config.set("restart-strategy.fixed-delay.attempts", "3");
config.set("restart-strategy.fixed-delay.delay", "30s");
// set the checkpoint mode to EXACTLY_ONCE
config.set("execution.checkpointing.mode", "EXACTLY_ONCE");
config.set("execution.checkpointing.interval", "3min");
// set the statebackend type to "rocksdb"
config.set("state.backend.type", "rocksdb");
// set the checkpoint directory
config.set("execution.checkpointing.dir", "file:///tmp/checkpoints/");
val config = tableEnv.getConfig
// set the restart strategy to "fixed-delay"
config.set("restart-strategy.type", "fixed-delay")
config.set("restart-strategy.fixed-delay.attempts", "3")
config.set("restart-strategy.fixed-delay.delay", "30s")
// set the checkpoint mode to EXACTLY_ONCE
config.set("execution.checkpointing.mode", "EXACTLY_ONCE")
config.set("execution.checkpointing.interval", "3min")
// set the statebackend type to "rocksdb"
config.set("state.backend.type", "rocksdb")
// set the checkpoint directory
config.set("execution.checkpointing.dir", "file:///tmp/checkpoints/")
config = table_env.get_config()
# set the restart strategy to "fixed-delay"
config.set("restart-strategy.type", "fixed-delay")
config.set("restart-strategy.fixed-delay.attempts", "3")
config.set("restart-strategy.fixed-delay.delay", "30s")
# set the checkpoint mode to EXACTLY_ONCE
config.set("execution.checkpointing.mode", "EXACTLY_ONCE")
config.set("execution.checkpointing.interval", "3min")
# set the statebackend type to "rocksdb"
config.set("state.backend.type", "rocksdb")
# set the checkpoint directory
config.set("execution.checkpointing.dir", "file:///tmp/checkpoints/")