snowflake.ingest.streaming.StreamingIngestChannel

WAIT_FOR_COMMIT_CHECK_INTERVAL_SECONDS = 1
class StreamingIngestChannel(channel: snowflake.ingest.streaming._python_ffi.PyChannel, *, _internal: bool = False)

A channel for streaming data ingestion into Snowflake using the Snowflake Ingest SDK.

The channel is used to ingest data into Snowflake tables in a streaming fashion. Each channel is associated with a specific account/database/schema/pipe combination and is created by calling open_channel() and closed by calling close().

The channel provides methods for appending single rows or batches of rows into Snowflake, with support for offset tokens to track ingestion progress and enable replay capabilities in case of failures.

Note

This class should not be instantiated directly. Use open_channel() to create channel instances.

initiate_flush() None

Initiate a flush of the channel.

Initiates a flush of all buffered data maintained for this Channel but does not wait for the flush to complete. Calls to append_rows are still allowed on the Channel after invoking this API.

This method triggers an immediate flush of all currently buffered data in this specific channel, similar to the client-level initiate_flush() but scoped to only this channel. The flush operation will occur asynchronously and this method returns immediately.

This method is useful when you want to force immediate transmission of buffered data without waiting for automatic flush triggers (time-based or size-based). It provides fine-grained control over when data gets sent to Snowflake on a per-channel basis. However, calling initiate_flush at a high rate will lead to a drop in overall throughput, potential increase in costs, and could lead to higher incidence of throttling by the Snowflake Service.

Raises:

StreamingIngestError – If initiating the flush fails

wait_for_flush(timeout_seconds: int | None = None) None

Wait for the channel to flush all buffered data.

Waits for all buffered data in this channel to be flushed to the Snowflake server side. This method triggers a flush of all pending data and waits for the flush operation to complete. If the timeout is reached, a TimeoutError is raised.

Parameters:

timeout_seconds – Optional timeout in seconds for the flush operation. Defaults to None if no timeout is desired.

Raises:
  • ValueError – If timeout_seconds is negative

  • TimeoutError – If the timeout is reached

  • StreamingIngestError – If waiting for the flush fails

wait_for_commit(token_checker: Callable[[str], bool], timeout_seconds: int | None = None) None

Wait for the channel to commit all buffered data.

Waits for offset token to be committed in the snowflake sever side by checking whether the latest committed offset token meets the commit condition provided by the token_checker. Note that snowflake commits offset token in batch, so the token_checker should be able to handle the case where the latest committed offset token passed the expected ones. That said, the token_checker usually does a range check whether the provided token is greater or equal to the expected one, not a exact match.

Parameters:
  • token_checker – A callable that tests whether the current committed offset token from the server meets the desired condition. The callable receives the latest committed offset token (which may be None) and should return True when the wait condition is satisfied.

  • timeout_seconds – Optional timeout in seconds for the commit operation. Defaults to None if no timeout is desired.

Raises:
  • ValueError – If token_checker is not callable or timeout_seconds is negative

  • StreamingIngestError – If waiting for the commit fails

  • TimeoutError – If the timeout is reached

append_row(row: Dict[str, Any], offset_token: str | None = None) None

Append a single row into the channel.

Parameters:
  • row

    Dictionary representing the row data to append with keys as column names and values as column values. Values can be of the following types:

    • None: null values

    • bool: boolean values (True, False)

    • int: integer values

    • float: floating-point values

    • str: string values

    • bytes: byte strings

    • bytearray: mutable byte arrays

    • tuple: tuples of values

    • list: lists of values

    • dict: nested dictionaries

    • set: sets of values

    • frozenset: immutable sets of values

    • datetime.datetime: datetime objects

    • datetime.date: date objects

    • datetime.time: time objects

    • decimal.Decimal: decimal values for precise numeric operations

  • offset_token – Optional offset token, used to track the ingestion progress and replay ingestion in case of failures. It could be null if user don’t plan on replaying or can’t replay.

Raises:
  • ValueError, TypeError – If the row cannot be serialized to JSON

  • StreamingIngestError – If the row appending fails

append_rows(rows: List[Dict[str, Any]], start_offset_token: str | None = None, end_offset_token: str | None = None) None

Append multiple rows into the channel.

Parameters:
  • rows

    List of dictionaries representing the row data to append. Each dictionary’s values can be of the following types:

    • None: null values

    • bool: boolean values (True, False)

    • int: integer values

    • float: floating-point values

    • str: string values

    • bytes: byte strings

    • bytearray: mutable byte arrays

    • tuple: tuples of values

    • list: lists of values

    • dict: nested dictionaries

    • set: sets of values

    • frozenset: immutable sets of values

    • datetime.datetime: datetime objects

    • datetime.date: date objects

    • datetime.time: time objects

    • decimal.Decimal: decimal values for precise numeric operations

  • start_offset_token – Optional start offset token of the batch/row-set.

  • end_offset_token – Optional end offset token of the batch/row-set.

Raises:
  • ValueError, TypeError – If the rows cannot be serialized to JSON

  • StreamingIngestError – If the rows appending fails

get_latest_committed_offset_token() str | None

Get the latest committed offset token for the channel.

Returns:

The latest committed offset token for the channel, or None if the channel is brand new.

Return type:

Optional[str]

Raises:

StreamingIngestError – If getting the latest committed offset token fails

get_channel_status() ChannelStatus

Get the status of the channel.

Returns:

The status of the channel.

Return type:

ChannelStatus

Raises:

StreamingIngestError – If getting the channel status fails

close(drop: bool = False, wait_for_flush: bool = True, timeout_seconds: int | None = None) None

Close the channel.

Parameters:
  • drop – Whether to drop the channel, defaults to False

  • wait_for_flush – Whether to wait for the flush to complete. Default is True.

  • timeout_seconds – The timeout in seconds for the flush, None means no timeout. Default is None.

Raises:
  • ValueError – If timeout_seconds is negative

  • StreamingIngestError – If closing the channel fails

is_closed() bool

Check if the channel is closed.

Returns:

True if the channel is closed, False otherwise

Return type:

bool

property db_name: str

Get the database name.

property channel_name: str

Get the channel name.

property schema_name: str

Get the schema name.

property pipe_name: str

Get the pipe name.