snowflake.ingest.streaming.StreamingIngestChannel¶
- WAIT_FOR_COMMIT_CHECK_INTERVAL_SECONDS = 1¶
- class StreamingIngestChannel(channel: snowflake.ingest.streaming._python_ffi.PyChannel, *, _internal: bool = False)¶
A channel for streaming data ingestion into Snowflake using the Snowflake Ingest SDK.
The channel is used to ingest data into Snowflake tables in a streaming fashion. Each channel is associated with a specific account/database/schema/pipe combination and is created by calling
open_channel()
and closed by callingclose()
.The channel provides methods for appending single rows or batches of rows into Snowflake, with support for offset tokens to track ingestion progress and enable replay capabilities in case of failures.
Note
This class should not be instantiated directly. Use
open_channel()
to create channel instances.- initiate_flush() None ¶
Initiate a flush of the channel.
Initiates a flush of all buffered data maintained for this Channel but does not wait for the flush to complete. Calls to append_rows are still allowed on the Channel after invoking this API.
This method triggers an immediate flush of all currently buffered data in this specific channel, similar to the client-level
initiate_flush()
but scoped to only this channel. The flush operation will occur asynchronously and this method returns immediately.This method is useful when you want to force immediate transmission of buffered data without waiting for automatic flush triggers (time-based or size-based). It provides fine-grained control over when data gets sent to Snowflake on a per-channel basis. However, calling initiate_flush at a high rate will lead to a drop in overall throughput, potential increase in costs, and could lead to higher incidence of throttling by the Snowflake Service.
- Raises:
StreamingIngestError – If initiating the flush fails
- wait_for_flush(timeout_seconds: int | None = None) None ¶
Wait for the channel to flush all buffered data.
Waits for all buffered data in this channel to be flushed to the Snowflake server side. This method triggers a flush of all pending data and waits for the flush operation to complete. If the timeout is reached, a TimeoutError is raised.
- Parameters:
timeout_seconds – Optional timeout in seconds for the flush operation. Defaults to None if no timeout is desired.
- Raises:
ValueError – If timeout_seconds is negative
TimeoutError – If the timeout is reached
StreamingIngestError – If waiting for the flush fails
- wait_for_commit(token_checker: Callable[[str], bool], timeout_seconds: int | None = None) None ¶
Wait for the channel to commit all buffered data.
Waits for offset token to be committed in the snowflake sever side by checking whether the latest committed offset token meets the commit condition provided by the token_checker. Note that snowflake commits offset token in batch, so the token_checker should be able to handle the case where the latest committed offset token passed the expected ones. That said, the token_checker usually does a range check whether the provided token is greater or equal to the expected one, not a exact match.
- Parameters:
token_checker – A callable that tests whether the current committed offset token from the server meets the desired condition. The callable receives the latest committed offset token (which may be None) and should return True when the wait condition is satisfied.
timeout_seconds – Optional timeout in seconds for the commit operation. Defaults to None if no timeout is desired.
- Raises:
ValueError – If token_checker is not callable or timeout_seconds is negative
StreamingIngestError – If waiting for the commit fails
TimeoutError – If the timeout is reached
- append_row(row: Dict[str, Any], offset_token: str | None = None) None ¶
Append a single row into the channel.
- Parameters:
row –
Dictionary representing the row data to append with keys as column names and values as column values. Values can be of the following types:
None: null values
bool: boolean values (True, False)
int: integer values
float: floating-point values
str: string values
bytes: byte strings
bytearray: mutable byte arrays
tuple: tuples of values
list: lists of values
dict: nested dictionaries
set: sets of values
frozenset: immutable sets of values
datetime.datetime: datetime objects
datetime.date: date objects
datetime.time: time objects
decimal.Decimal: decimal values for precise numeric operations
offset_token – Optional offset token, used to track the ingestion progress and replay ingestion in case of failures. It could be null if user don’t plan on replaying or can’t replay.
- Raises:
ValueError, TypeError – If the row cannot be serialized to JSON
StreamingIngestError – If the row appending fails
- append_rows(rows: List[Dict[str, Any]], start_offset_token: str | None = None, end_offset_token: str | None = None) None ¶
Append multiple rows into the channel.
- Parameters:
rows –
List of dictionaries representing the row data to append. Each dictionary’s values can be of the following types:
None: null values
bool: boolean values (True, False)
int: integer values
float: floating-point values
str: string values
bytes: byte strings
bytearray: mutable byte arrays
tuple: tuples of values
list: lists of values
dict: nested dictionaries
set: sets of values
frozenset: immutable sets of values
datetime.datetime: datetime objects
datetime.date: date objects
datetime.time: time objects
decimal.Decimal: decimal values for precise numeric operations
start_offset_token – Optional start offset token of the batch/row-set.
end_offset_token – Optional end offset token of the batch/row-set.
- Raises:
ValueError, TypeError – If the rows cannot be serialized to JSON
StreamingIngestError – If the rows appending fails
- get_latest_committed_offset_token() str | None ¶
Get the latest committed offset token for the channel.
- Returns:
The latest committed offset token for the channel, or None if the channel is brand new.
- Return type:
Optional[str]
- Raises:
StreamingIngestError – If getting the latest committed offset token fails
- get_channel_status() ChannelStatus ¶
Get the status of the channel.
- Returns:
The status of the channel.
- Return type:
- Raises:
StreamingIngestError – If getting the channel status fails
- close(drop: bool = False, wait_for_flush: bool = True, timeout_seconds: int | None = None) None ¶
Close the channel.
- Parameters:
drop – Whether to drop the channel, defaults to False
wait_for_flush – Whether to wait for the flush to complete. Default is True.
timeout_seconds – The timeout in seconds for the flush, None means no timeout. Default is None.
- Raises:
ValueError – If timeout_seconds is negative
StreamingIngestError – If closing the channel fails
- is_closed() bool ¶
Check if the channel is closed.
- Returns:
True if the channel is closed, False otherwise
- Return type:
bool
- property db_name: str¶
Get the database name.
- property channel_name: str¶
Get the channel name.
- property schema_name: str¶
Get the schema name.
- property pipe_name: str¶
Get the pipe name.