OpenAI
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

OpenAI #

The OpenAI Model Function allows Flink SQL to call OpenAI API for inference tasks.

Overview #

The function supports calling remote OpenAI model services via Flink SQL for prediction/inference tasks. Currently, the following tasks are supported:

  • Chat Completions: generate a model response from a list of messages comprising a conversation.
  • Embeddings: get a vector representation of a given input that can be easily consumed by machine learning models and algorithms.

Usage examples #

The following example creates a chat completions model and uses it to predict sentiment labels for movie reviews.

First, create the chat completions model with the following SQL statement:

CREATE MODEL ai_analyze_sentiment
INPUT (`input` STRING)
OUTPUT (`content` STRING)
WITH (
    'provider'='openai',
    'endpoint'='https://api.openai.com/v1/chat/completions',
    'api-key' = '<YOUR KEY>',
    'model'='gpt-3.5-turbo',
    'system-prompt' = 'Classify the text below into one of the following labels: [positive, negative, neutral, mixed]. Output only the label.'
);

Suppose the following data is stored in a table named movie_comment, and the prediction result is to be stored in a table named print_sink:

CREATE TEMPORARY VIEW movie_comment(id, movie_name,  user_comment, actual_label)
AS VALUES
  (1, 'Good Stuff', 'The part where children guess the sounds is my favorite. It's a very romantic narrative compared to other movies I've seen. Very gentle and full of love.', 'positive');

CREATE TEMPORARY TABLE print_sink(
  id BIGINT,
  movie_name VARCHAR,
  predicit_label VARCHAR,
  actual_label VARCHAR
) WITH (
  'connector' = 'print'
);

Then the following SQL statement can be used to predict sentiment labels for movie reviews:

INSERT INTO print_sink
SELECT id, movie_name, content as predicit_label, actual_label
FROM ML_PREDICT(
  TABLE movie_comment,
  MODEL ai_analyze_sentiment,
  DESCRIPTOR(user_comment));

Model Options #

Common #

Key Default Type Description
api-key
(none) String OpenAI API key for authentication.
context-overflow-action
truncated-tail

Enum

Action to handle context overflows.

Possible values:
  • "truncated-tail": Truncates exceeded tokens from the tail of the context.
  • "truncated-tail-log": Truncates exceeded tokens from the tail of the context. Records the truncation log.
  • "truncated-head": Truncates exceeded tokens from the head of the context.
  • "truncated-head-log": Truncates exceeded tokens from the head of the context. Records the truncation log.
  • "skipped": Skips the input row.
  • "skipped-log": Skips the input row. Records the skipping log.
endpoint
(none) String Full URL of the OpenAI API endpoint, e.g., https://api.openai.com/v1/chat/completions or https://api.openai.com/v1/embeddings
error-handling-strategy
RETRY

Enum

Strategy for handling errors during model requests.

Possible values:
  • "RETRY": Retry sending the request.
  • "FAILOVER": Throw exceptions and fail the Flink job.
  • "IGNORE": Ignore the input that caused the error and continue. The error itself would be recorded in log.
max-context-size
(none) Integer Max number of tokens for context. context-overflow-action would be triggered if this threshold is exceeded.
model
(none) String Model name, e.g., gpt-3.5-turbo, text-embedding-ada-002.
retry-fallback-strategy
FAILOVER

Enum

Fallback strategy to employ if the retry attempts are exhausted. This strategy is applied when error-handling-strategy is set to retry.

Possible values:
  • "FAILOVER": Throw exceptions and fail the Flink job.
  • "IGNORE": Ignore the input that caused the error and continue. The error itself would be recorded in log.
retry-num
100 Integer Number of retry for OpenAI client requests.

Chat Completions #

Key Default Type Description
max-tokens
(none) Long The maximum number of tokens that can be generated in the chat completion.
n
(none) Long How many chat completion choices to generate for each input message. Note that you will be charged based on the number of generated tokens across all of the choices. Keep n as 1 to minimize costs.
presence-penalty
(none) Double Number between -2.0 and 2.0. Positive values penalize new tokens based on whether they appear in the text so far, increasing the model's likelihood to talk about new topics.
response-format
(none)

Enum

The format of the response, e.g., 'text' or 'json_object'.

Possible values:
  • "text"
  • "json_object"
seed
(none) Long If specified, the model platform will make a best effort to sample deterministically, such that repeated requests with the same seed and parameters should return the same result. Determinism is not guaranteed.
stop
(none) String A CSV list of strings to pass as stop sequences to the model.
system-prompt
"You are a helpful assistant." String The system message of a chat.
temperature
(none) Double Controls the randomness or “creativity” of the output. Typical values are between 0.0 and 1.0.
top-p
(none) Double The probability cutoff for token selection. Usually, either temperature or topP are specified, but not both.

Embeddings #

Key Default Type Description
dimension
(none) Long The size of the embedding result array.

Schema Requirement #

The following table lists the schema requirement for each task.

Task Input Type Output Type
Chat Completions STRING STRING
Embeddings STRING ARRAY<FLOAT>

Available Metadata #

When configuring error-handling-strategy as ignore, you can choose to additionally specify the following metadata columns to surface information about failures into your stream.

  • error-string(STRING): A message associated with the error
  • http-status-code(INT): The HTTP status code
  • http-headers-map(MAP<STRING, ARRAY>): The headers returned with the response

If you defined these metadata columns in the output schema but the call did not fail, the columns will be filled with null values.