snowflake.ingest.streaming¶
The snowflake.ingest.streaming
package provides a Python SDK for streaming data into Snowflake using Snowpipe Streaming.
## Table of Contents
Core Classes
API Reference¶
- class ChannelStatus(channel_name: str, status_code: str, latest_committed_offset_token: str | None)¶
Channel status information returned to users.
Provides access to channel status information including the channel name, status code, and latest committed offset token from Snowflake server.
- property channel_name: str¶
Get the channel name.
- Returns:
The name of the channel
- Return type:
str
- property status_code: str¶
Get the status code for the channel.
- Returns:
The status code from Snowflake server
- Return type:
str
- property latest_committed_offset_token: str | None¶
Get the latest committed offset token for the channel.
- Returns:
The latest committed offset token, or None if no commits yet
- Return type:
Optional[str]
- class StreamingIngestChannel(channel: snowflake.ingest.streaming._python_ffi.PyChannel, *, _internal: bool = False)¶
A channel for streaming data ingestion into Snowflake using the Snowflake Ingest SDK.
The channel is used to ingest data into Snowflake tables in a streaming fashion. Each channel is associated with a specific account/database/schema/pipe combination and is created by calling
open_channel()
and closed by callingclose()
.The channel provides methods for appending single rows or batches of rows into Snowflake, with support for offset tokens to track ingestion progress and enable replay capabilities in case of failures.
Note
This class should not be instantiated directly. Use
open_channel()
to create channel instances.- append_row(row: Dict[str, Any], offset_token: str | None = None) None ¶
Append a single row into the channel.
- Parameters:
row –
Dictionary representing the row data to append with keys as column names and values as column values. Values can be of the following types:
None: null values
bool: boolean values (True, False)
int: integer values
float: floating-point values
str: string values
bytes: byte strings
bytearray: mutable byte arrays
tuple: tuples of values
list: lists of values
dict: nested dictionaries
set: sets of values
frozenset: immutable sets of values
datetime.datetime: datetime objects
datetime.date: date objects
datetime.time: time objects
decimal.Decimal: decimal values for precise numeric operations
offset_token – Optional offset token, used to track the ingestion progress and replay ingestion in case of failures. It could be null if user don’t plan on replaying or can’t replay.
- Raises:
ValueError, TypeError – If the row cannot be serialized to JSON
StreamingIngestError – If the row appending fails
- append_rows(rows: List[Dict[str, Any]], start_offset_token: str | None = None, end_offset_token: str | None = None) None ¶
Append multiple rows into the channel.
- Parameters:
rows –
List of dictionaries representing the row data to append. Each dictionary’s values can be of the following types:
None: null values
bool: boolean values (True, False)
int: integer values
float: floating-point values
str: string values
bytes: byte strings
bytearray: mutable byte arrays
tuple: tuples of values
list: lists of values
dict: nested dictionaries
set: sets of values
frozenset: immutable sets of values
datetime.datetime: datetime objects
datetime.date: date objects
datetime.time: time objects
decimal.Decimal: decimal values for precise numeric operations
start_offset_token – Optional start offset token of the batch/row-set.
end_offset_token – Optional end offset token of the batch/row-set.
- Raises:
ValueError, TypeError – If the rows cannot be serialized to JSON
StreamingIngestError – If the rows appending fails
- get_latest_committed_offset_token() str | None ¶
Get the latest committed offset token for the channel.
- Returns:
The latest committed offset token for the channel, or None if the channel is brand new.
- Return type:
Optional[str]
- Raises:
StreamingIngestError – If getting the latest committed offset token fails
- get_channel_status() ChannelStatus ¶
Get the status of the channel.
- Returns:
The status of the channel.
- Return type:
- Raises:
StreamingIngestError – If getting the channel status fails
- close(drop: bool = False, wait_for_flush: bool = True, timeout_seconds: int | None = None) None ¶
Close the channel.
- Parameters:
drop – Whether to drop the channel, defaults to False
wait_for_flush – Whether to wait for the flush to complete. Default is True.
timeout_seconds – The timeout in seconds for the flush, None means no timeout. Default is None.
- Raises:
StreamingIngestError – If closing the channel fails
- is_closed() bool ¶
Check if the channel is closed.
- Returns:
True if the channel is closed, False otherwise
- Return type:
bool
- property db_name: str¶
Get the database name.
- Raises:
ValueError – If channel is already closed
- property channel_name: str¶
Get the channel name.
- Raises:
ValueError – If channel is already closed
- property schema_name: str¶
Get the schema name.
- Raises:
ValueError – If channel is already closed
- property pipe_name: str¶
Get the pipe name.
- Raises:
ValueError – If channel is already closed
- class StreamingIngestClient(client_name: str, db_name: str, schema_name: str, pipe_name: str, profile_json: str | None = None, properties: Dict[str, Any] | None = None)¶
A client that is the starting point for using the Streaming Ingest client APIs.
A single client maps to exactly one account/database/schema/pipe in Snowflake; however, multiple clients can point to the same account/database/schema/pipe. Each client contains information for Snowflake authentication and authorization, and it is used to create one or more StreamingIngestChannel instances for data ingestion.
The client manages the lifecycle of streaming ingest channels and handles the underlying communication with Snowflake services for authentication, channel management, and data transmission.
- open_channel(channel_name: str, offset_token: str | None = None) Tuple[StreamingIngestChannel, ChannelStatus] ¶
Open a channel with the given name.
- Parameters:
channel_name – Name of the channel to open
offset_token – Optional offset token
- Returns:
(StreamingIngestChannel, ChannelStatus)
- Return type:
tuple
- Raises:
StreamingIngestError – If opening the channel fails
- close(wait_for_flush: bool = True, timeout_seconds: int | None = None) None ¶
Close the client.
- Parameters:
wait_for_flush – Whether to wait for the flush to complete, defaults to True
timeout_seconds – Optional timeout in seconds for the flush operation, defaults to 60 seconds
- Raises:
StreamingIngestError – If closing the client fails
- is_closed() bool ¶
Check if the client is closed.
- Raises:
StreamingIngestError – If checking the client status fails
- get_latest_committed_offset_tokens(channel_names: List[str]) Dict[str, str | None] ¶
Get the latest committed offset tokens for a list of channels.
- Parameters:
channel_names – List of channel names
- Returns:
- A dictionary mapping channel names to their latest committed offset tokens.
Value is None if the channel is brand new or does not exist.
- Return type:
Dict[str, Optional[str]]
- Raises:
StreamingIngestError – If getting the latest committed offset tokens fails
- get_channel_statuses(channel_names: List[str]) Dict[str, ChannelStatus] ¶
Get the statuses of a list of channels.
- Parameters:
channel_names – List of channel names
- Returns:
A dictionary mapping channel names to their statuses.
- Return type:
Dict[str, ChannelStatus]
- Raises:
StreamingIngestError – If getting the channel statuses fails
- drop_channel(channel_name: str) None ¶
Drop a channel.
- Parameters:
channel_name – Name of the channel to drop
- Raises:
StreamingIngestError – If dropping the channel fails
- initiate_flush() None ¶
Initiate a flush of the client.
- Raises:
StreamingIngestError – If initiating the flush fails
- property client_name: str¶
Get the client name.
- Raises:
ValueError – If client is already closed
- property db_name: str¶
Get the database name.
- Raises:
ValueError – If client is already closed
- property schema_name: str¶
Get the schema name.
- Raises:
ValueError – If client is already closed
- property pipe_name: str¶
Get the pipe name.
- Raises:
ValueError – If client is already closed