snowflake.ingest.streaming¶
The snowflake.ingest.streaming
package provides a Python SDK for streaming data into Snowflake using Snowpipe Streaming.
## Table of Contents
Core Classes
API Reference¶
- class ChannelStatus(database_name: str, schema_name: str, pipe_name: str, channel_name: str, status_code: str, latest_committed_offset_token: str | None, created_on_ms: int, rows_inserted: int, rows_parsed: int, rows_error_count: int, last_error_offset_upper_bound: str | None, last_error_message: str | None, last_error_timestamp_ms: int | None, snowflake_avg_processing_latency_ms: int | None, last_refreshed_on_ms: int)¶
Channel status information returned to users.
Provides access to channel status information including the channel name, status code, and latest committed offset token from Snowflake server.
- property database_name: str¶
Get the database name.
- property schema_name: str¶
Get the schema name.
- property pipe_name: str¶
Get the pipe name.
- property channel_name: str¶
Get the channel name.
- Returns:
The name of the channel
- Return type:
str
- property status_code: str¶
Get the status code for the channel.
- Returns:
The status code from Snowflake server
- Return type:
str
- property latest_committed_offset_token: str | None¶
Get the latest committed offset token for the channel.
- Returns:
The latest committed offset token, or None if no commits yet
- Return type:
Optional[str]
- property created_on_ms: int¶
Get the created on timestamp in ms for the channel.
- property rows_inserted: int¶
Get the rows inserted for the channel.
- property rows_parsed: int¶
Get the rows parsed for the channel.
- property rows_error_count: int¶
Get the rows error count for the channel.
- property last_error_offset_upper_bound: str | None¶
Get the last error offset upper bound for the channel.
- property last_error_message: str | None¶
Get the last error message for the channel.
- property last_error_timestamp_ms: int | None¶
Get the last error timestamp in ms for the channel.
- property snowflake_avg_processing_latency_ms: int | None¶
Get the snowflake avg processing latency in the snowflake server side for the channel to ingest data.
- property last_refreshed_on_ms: int¶
Get the last refreshed on timestamp in ms for the channel. Channels is periodically refreshed every second in the background.
- class StreamingIngestChannel(channel: snowflake.ingest.streaming._python_ffi.PyChannel, *, _internal: bool = False)¶
A channel for streaming data ingestion into Snowflake using the Snowflake Ingest SDK.
The channel is used to ingest data into Snowflake tables in a streaming fashion. Each channel is associated with a specific account/database/schema/pipe combination and is created by calling
open_channel()
and closed by callingclose()
.The channel provides methods for appending single rows or batches of rows into Snowflake, with support for offset tokens to track ingestion progress and enable replay capabilities in case of failures.
Note
This class should not be instantiated directly. Use
open_channel()
to create channel instances.- initiate_flush() None ¶
Initiate a flush of the channel.
Initiates a flush of all buffered data maintained for this Channel but does not wait for the flush to complete. Calls to append_rows are still allowed on the Channel after invoking this API.
This method triggers an immediate flush of all currently buffered data in this specific channel, similar to the client-level
initiate_flush()
but scoped to only this channel. The flush operation will occur asynchronously and this method returns immediately.This method is useful when you want to force immediate transmission of buffered data without waiting for automatic flush triggers (time-based or size-based). It provides fine-grained control over when data gets sent to Snowflake on a per-channel basis. However, calling initiate_flush at a high rate will lead to a drop in overall throughput, potential increase in costs, and could lead to higher incidence of throttling by the Snowflake Service.
- Raises:
StreamingIngestError – If initiating the flush fails
- wait_for_flush(timeout_seconds: int | None = None) None ¶
Wait for the channel to flush all buffered data.
Waits for all buffered data in this channel to be flushed to the Snowflake server side. This method triggers a flush of all pending data and waits for the flush operation to complete. If the timeout is reached, a TimeoutError is raised.
- Parameters:
timeout_seconds – Optional timeout in seconds for the flush operation. Defaults to None if no timeout is desired.
- Raises:
ValueError – If timeout_seconds is negative
TimeoutError – If the timeout is reached
StreamingIngestError – If waiting for the flush fails
- wait_for_commit(token_checker: Callable[[str], bool], timeout_seconds: int | None = None) None ¶
Wait for the channel to commit all buffered data.
Waits for offset token to be committed in the snowflake sever side by checking whether the latest committed offset token meets the commit condition provided by the token_checker. Note that snowflake commits offset token in batch, so the token_checker should be able to handle the case where the latest committed offset token passed the expected ones. That said, the token_checker usually does a range check whether the provided token is greater or equal to the expected one, not a exact match.
- Parameters:
token_checker – A callable that tests whether the current committed offset token from the server meets the desired condition. The callable receives the latest committed offset token (which may be None) and should return True when the wait condition is satisfied.
timeout_seconds – Optional timeout in seconds for the commit operation. Defaults to None if no timeout is desired.
- Raises:
ValueError – If token_checker is not callable or timeout_seconds is negative
StreamingIngestError – If waiting for the commit fails
TimeoutError – If the timeout is reached
- append_row(row: Dict[str, Any], offset_token: str | None = None) None ¶
Append a single row into the channel.
- Parameters:
row –
Dictionary representing the row data to append with keys as column names and values as column values. Values can be of the following types:
None: null values
bool: boolean values (True, False)
int: integer values
float: floating-point values
str: string values
bytes: byte strings
bytearray: mutable byte arrays
tuple: tuples of values
list: lists of values
dict: nested dictionaries
set: sets of values
frozenset: immutable sets of values
datetime.datetime: datetime objects
datetime.date: date objects
datetime.time: time objects
decimal.Decimal: decimal values for precise numeric operations
offset_token – Optional offset token, used to track the ingestion progress and replay ingestion in case of failures. It could be null if user don’t plan on replaying or can’t replay.
- Raises:
ValueError, TypeError – If the row cannot be serialized to JSON
StreamingIngestError – If the row appending fails
- append_rows(rows: List[Dict[str, Any]], start_offset_token: str | None = None, end_offset_token: str | None = None) None ¶
Append multiple rows into the channel.
- Parameters:
rows –
List of dictionaries representing the row data to append. Each dictionary’s values can be of the following types:
None: null values
bool: boolean values (True, False)
int: integer values
float: floating-point values
str: string values
bytes: byte strings
bytearray: mutable byte arrays
tuple: tuples of values
list: lists of values
dict: nested dictionaries
set: sets of values
frozenset: immutable sets of values
datetime.datetime: datetime objects
datetime.date: date objects
datetime.time: time objects
decimal.Decimal: decimal values for precise numeric operations
start_offset_token – Optional start offset token of the batch/row-set.
end_offset_token – Optional end offset token of the batch/row-set.
- Raises:
ValueError, TypeError – If the rows cannot be serialized to JSON
StreamingIngestError – If the rows appending fails
- get_latest_committed_offset_token() str | None ¶
Get the latest committed offset token for the channel.
- Returns:
The latest committed offset token for the channel, or None if the channel is brand new.
- Return type:
Optional[str]
- Raises:
StreamingIngestError – If getting the latest committed offset token fails
- get_channel_status() ChannelStatus ¶
Get the status of the channel.
- Returns:
The status of the channel.
- Return type:
- Raises:
StreamingIngestError – If getting the channel status fails
- close(drop: bool = False, wait_for_flush: bool = True, timeout_seconds: int | None = None) None ¶
Close the channel.
- Parameters:
drop – Whether to drop the channel, defaults to False
wait_for_flush – Whether to wait for the flush to complete. Default is True.
timeout_seconds – The timeout in seconds for the flush, None means no timeout. Default is None.
- Raises:
ValueError – If timeout_seconds is negative
StreamingIngestError – If closing the channel fails
- is_closed() bool ¶
Check if the channel is closed.
- Returns:
True if the channel is closed, False otherwise
- Return type:
bool
- property db_name: str¶
Get the database name.
- property channel_name: str¶
Get the channel name.
- property schema_name: str¶
Get the schema name.
- property pipe_name: str¶
Get the pipe name.
- class StreamingIngestClient(client_name: str, db_name: str, schema_name: str, pipe_name: str, profile_json: str | None = None, properties: Dict[str, Any] | None = None)¶
A client that is the starting point for using the Streaming Ingest client APIs.
A single client maps to exactly one account/database/schema/pipe in Snowflake; however, multiple clients can point to the same account/database/schema/pipe. Each client contains information for Snowflake authentication and authorization, and it is used to create one or more StreamingIngestChannel instances for data ingestion.
The client manages the lifecycle of streaming ingest channels and handles the underlying communication with Snowflake services for authentication, channel management, and data transmission.
- open_channel(channel_name: str, offset_token: str | None = None) Tuple[StreamingIngestChannel, ChannelStatus] ¶
Open a channel with the given name.
- Parameters:
channel_name – Name of the channel to open
offset_token – Optional offset token
- Returns:
(StreamingIngestChannel, ChannelStatus)
- Return type:
tuple
- Raises:
StreamingIngestError – If opening the channel fails
- close(wait_for_flush: bool = True, timeout_seconds: int | None = None) None ¶
Close the client.
- Parameters:
wait_for_flush – Whether to wait for the flush to complete, defaults to True
timeout_seconds – Optional timeout in seconds for the flush operation, defaults to 60 seconds
- Raises:
ValueError – If timeout_seconds is negative
TimeoutError – If the timeout is reached
StreamingIngestError – If closing the client fails
- is_closed() bool ¶
Check if the client is closed.
- Raises:
StreamingIngestError – If checking the client status fails
- get_latest_committed_offset_tokens(channel_names: List[str]) Dict[str, str | None] ¶
Get the latest committed offset tokens for a list of channels.
- Parameters:
channel_names – List of channel names
- Returns:
- A dictionary mapping channel names to their latest committed offset tokens.
Value is None if the channel is brand new or does not exist.
- Return type:
Dict[str, Optional[str]]
- Raises:
StreamingIngestError – If getting the latest committed offset tokens fails
- get_channel_statuses(channel_names: List[str]) Dict[str, ChannelStatus] ¶
Get the statuses of a list of channels.
- Parameters:
channel_names – List of channel names
- Returns:
A dictionary mapping channel names to their statuses.
- Return type:
Dict[str, ChannelStatus]
- Raises:
StreamingIngestError – If getting the channel statuses fails
- drop_channel(channel_name: str) None ¶
Drop a channel.
- Parameters:
channel_name – Name of the channel to drop
- Raises:
StreamingIngestError – If dropping the channel fails
- initiate_flush() None ¶
Initiate a flush of the client.
Initiates a flush by the Client which causes all outstanding buffered data to be flushed to Snowflake. Note that data can still be accepted by the Client - this is an asynchronous call and will return after flush is initiated for all Channels opened by this Client
- Raises:
StreamingIngestError – If initiating the flush fails
- wait_for_flush(timeout_seconds: int | None = None) None ¶
Wait for the client to flush all buffered data.
Waits for all buffered data in all channels managed by this client to be flushed to the Snowflake server side. This method triggers a flush of all pending data across all channels and waits for the flush operations to complete. If the timeout is reached, a StreamingIngestError is raised.
- Parameters:
timeout_seconds – Optional timeout in seconds for the flush operation. Defaults to None if no timeout is desired.
- Raises:
ValueError – If timeout_seconds is negative
TimeoutError – If the timeout is reached
StreamingIngestError – If waiting for the flush fails
- property client_name: str¶
Get the client name.
- property db_name: str¶
Get the database name.
- property schema_name: str¶
Get the schema name.
- property pipe_name: str¶
Get the pipe name.
- exception StreamingIngestError(error_code: StreamingIngestErrorCode, message: str, http_status_code: int, http_status_name: str)¶
Bases:
Exception
A class for all streaming ingest errors.
- property error_code: StreamingIngestErrorCode¶
The error code of the error.
- property message: str¶
The message of the error.
- property http_status_code: int¶
The HTTP status code of the error.
- property http_status_name: str¶
The HTTP status name of the error.
- class StreamingIngestErrorCode(*args, **kwds)¶
Bases:
enum.Enum
Enumeration of all possible streaming ingest error codes.
These error codes correspond to the IngestError variants in the Rust implementation and provide type-safe error handling.
- CONFIG_ERROR = 'ConfigError'¶
- INVALID_REQUEST = 'InvalidRequest'¶
- INVALID_ARGUMENT = 'InvalidArgument'¶
- SF_API_USER_ERROR = 'SfApiUserError'¶
- NOT_IMPLEMENTED = 'NotImplemented'¶
- HTTP_CLIENT_NON_RETRYABLE_ERROR = 'HttpClientNonRetryableError'¶
- SF_API_PIPE_FAILED_OVER_ERROR = 'SfApiPipeFailedOverError'¶
- CHANNEL_ALREADY_EXISTS = 'ChannelAlreadyExists'¶
- AUTH_TOKEN_ERROR = 'AuthTokenError'¶
- SF_API_AUTH_ERROR = 'SfApiAuthError'¶
- CHANNEL_NOT_FOUND = 'ChannelNotFound'¶
- CHANNEL_WAIT_FOR_FLUSH_TIMEOUT = 'ChannelWaitForFlushTimeout'¶
- CLIENT_WAIT_FOR_FLUSH_TIMEOUT = 'ClientWaitForFlushTimeout'¶
- CLOSED_CHANNEL_ERROR = 'ClosedChannelError'¶
- CLOSED_CLIENT_ERROR = 'ClosedClientError'¶
- CHANNEL_CLOSED_BY_USER = 'ChannelClosedByUser'¶
- INVALID_CHANNEL_ERROR = 'InvalidChannelError'¶
- INVALID_CLIENT_ERROR = 'InvalidClientError'¶
- INPUT_CHANNEL_CLOSED = 'InputChannelClosed'¶
- OUTPUT_CHANNEL_CLOSED = 'OutputChannelClosed'¶
- RECEIVER_SATURATED = 'ReceiverSaturated'¶
- MEMORY_THRESHOLD_EXCEEDED = 'MemoryThresholdExceeded'¶
- MEMORY_THRESHOLD_EXCEEDED_IN_CONTAINER = 'MemoryThresholdExceededInContainer'¶
- FATAL = 'Fatal'¶
- NON_FATAL = 'NonFatal'¶
- MUTEX_LOCK_FAILED = 'MutexLockFailed'¶
- SF_API_UNEXPECTED_BEHAVIOR_ERROR = 'SfApiUnexpectedBehaviorError'¶
- SF_API_INTERNAL_SERVER_ERROR = 'SfApiInternalServerError'¶
- FILE_UPLOAD_ERROR = 'FileUploadError'¶
- HTTP_RETRIES_EXHAUSTED_ERROR = 'HttpRetriesExhaustedError'¶
- CLOSE_ALL_CHANNELS_FAILED_ERROR = 'CloseAllChannelsFailedError'¶
- HTTP_RETRYABLE_CLIENT_ERROR = 'HttpRetryableClientError'¶
- classmethod from_string(error_code_str: str) StreamingIngestErrorCode | None ¶
Convert a string error code to enum value if it exists.
- Parameters:
error_code_str – The error code string to convert
- Raises:
ValueError – If the error code string is invalid
- Returns:
The matching StreamingIngestErrorCode enum value, or None if not found