This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
OpenAI #
The OpenAI Model Function allows Flink SQL to call OpenAI API for inference tasks.
Overview #
The function supports calling remote OpenAI model services via Flink SQL for prediction/inference tasks. Currently, the following tasks are supported:
- Chat Completions: generate a model response from a list of messages comprising a conversation.
- Embeddings: get a vector representation of a given input that can be easily consumed by machine learning models and algorithms.
Usage examples #
The following example creates a chat completions model and uses it to predict sentiment labels for movie reviews.
First, create the chat completions model with the following SQL statement:
CREATE MODEL ai_analyze_sentiment
INPUT (`input` STRING)
OUTPUT (`content` STRING)
WITH (
'provider'='openai',
'endpoint'='https://api.openai.com/v1/chat/completions',
'api-key' = '<YOUR KEY>',
'model'='gpt-3.5-turbo',
'system-prompt' = 'Classify the text below into one of the following labels: [positive, negative, neutral, mixed]. Output only the label.'
);
Suppose the following data is stored in a table named movie_comment, and the prediction result is to be stored in a table named print_sink:
CREATE TEMPORARY VIEW movie_comment(id, movie_name, user_comment, actual_label)
AS VALUES
(1, 'Good Stuff', 'The part where children guess the sounds is my favorite. It's a very romantic narrative compared to other movies I've seen. Very gentle and full of love.', 'positive');
CREATE TEMPORARY TABLE print_sink(
id BIGINT,
movie_name VARCHAR,
predicit_label VARCHAR,
actual_label VARCHAR
) WITH (
'connector' = 'print'
);
Then the following SQL statement can be used to predict sentiment labels for movie reviews:
INSERT INTO print_sink
SELECT id, movie_name, content as predicit_label, actual_label
FROM ML_PREDICT(
TABLE movie_comment,
MODEL ai_analyze_sentiment,
DESCRIPTOR(user_comment));
Model Options #
Common #
| Key | Default | Type | Description |
|---|---|---|---|
api-key |
(none) | String | OpenAI API key for authentication. |
context-overflow-action |
truncated-tail | Enum |
Action to handle context overflows. Possible values:
|
endpoint |
(none) | String | Full URL of the OpenAI API endpoint, e.g., https://api.openai.com/v1/chat/completions or https://api.openai.com/v1/embeddings |
error-handling-strategy |
RETRY | Enum |
Strategy for handling errors during model requests. Possible values:
|
max-context-size |
(none) | Integer | Max number of tokens for context. context-overflow-action would be triggered if this threshold is exceeded. |
model |
(none) | String | Model name, e.g., gpt-3.5-turbo, text-embedding-ada-002. |
retry-fallback-strategy |
FAILOVER | Enum |
Fallback strategy to employ if the retry attempts are exhausted. This strategy is applied when error-handling-strategy is set to retry. Possible values:
|
retry-num |
100 | Integer | Number of retry for OpenAI client requests. |
Chat Completions #
| Key | Default | Type | Description |
|---|---|---|---|
max-tokens |
(none) | Long | The maximum number of tokens that can be generated in the chat completion. |
n |
(none) | Long | How many chat completion choices to generate for each input message. Note that you will be charged based on the number of generated tokens across all of the choices. Keep n as 1 to minimize costs. |
presence-penalty |
(none) | Double | Number between -2.0 and 2.0. Positive values penalize new tokens based on whether they appear in the text so far, increasing the model's likelihood to talk about new topics. |
response-format |
(none) | Enum |
The format of the response, e.g., 'text' or 'json_object'. Possible values:
|
seed |
(none) | Long | If specified, the model platform will make a best effort to sample deterministically, such that repeated requests with the same seed and parameters should return the same result. Determinism is not guaranteed. |
stop |
(none) | String | A CSV list of strings to pass as stop sequences to the model. |
system-prompt |
"You are a helpful assistant." | String | The system message of a chat. |
temperature |
(none) | Double | Controls the randomness or “creativity” of the output. Typical values are between 0.0 and 1.0. |
top-p |
(none) | Double | The probability cutoff for token selection. Usually, either temperature or topP are specified, but not both. |
Embeddings #
| Key | Default | Type | Description |
|---|---|---|---|
dimension |
(none) | Long | The size of the embedding result array. |
Schema Requirement #
The following table lists the schema requirement for each task.
| Task | Input Type | Output Type |
|---|---|---|
| Chat Completions | STRING | STRING |
| Embeddings | STRING | ARRAY<FLOAT> |
Available Metadata #
When configuring error-handling-strategy as ignore, you can choose to additionally specify the
following metadata columns to surface information about failures into your stream.
- error-string(STRING): A message associated with the error
- http-status-code(INT): The HTTP status code
- http-headers-map(MAP<STRING, ARRAY
>): The headers returned with the response
If you defined these metadata columns in the output schema but the call did not fail, the columns will be filled with null values.