snowflake.ingest.streaming

The snowflake.ingest.streaming package provides a Python SDK for streaming data into Snowflake using Snowpipe Streaming.

## Table of Contents

Core Classes

API Reference

class ChannelStatus(database_name: str, schema_name: str, pipe_name: str, channel_name: str, status_code: str, latest_committed_offset_token: str | None, created_on_ms: int, rows_inserted: int, rows_parsed: int, rows_error_count: int, last_error_offset_upper_bound: str | None, last_error_message: str | None, last_error_timestamp_ms: int | None, snowflake_avg_processing_latency_ms: int | None, last_refreshed_on_ms: int)

Channel status information returned to users.

Provides access to channel status information including the channel name, status code, and latest committed offset token from Snowflake server.

property database_name: str

Get the database name.

property schema_name: str

Get the schema name.

property pipe_name: str

Get the pipe name.

property channel_name: str

Get the channel name.

Returns:

The name of the channel

Return type:

str

property status_code: str

Get the status code for the channel.

Returns:

The status code from Snowflake server

Return type:

str

property latest_committed_offset_token: str | None

Get the latest committed offset token for the channel.

Returns:

The latest committed offset token, or None if no commits yet

Return type:

Optional[str]

property created_on_ms: int

Get the created on timestamp in ms for the channel.

property rows_inserted: int

Get the rows inserted for the channel.

property rows_parsed: int

Get the rows parsed for the channel.

property rows_error_count: int

Get the rows error count for the channel.

property last_error_offset_upper_bound: str | None

Get the last error offset upper bound for the channel.

property last_error_message: str | None

Get the last error message for the channel.

property last_error_timestamp_ms: int | None

Get the last error timestamp in ms for the channel.

property snowflake_avg_processing_latency_ms: int | None

Get the snowflake avg processing latency in the snowflake server side for the channel to ingest data.

property last_refreshed_on_ms: int

Get the last refreshed on timestamp in ms for the channel. Channels is periodically refreshed every second in the background.

class StreamingIngestChannel(channel: snowflake.ingest.streaming._python_ffi.PyChannel, *, _internal: bool = False)

A channel for streaming data ingestion into Snowflake using the Snowflake Ingest SDK.

The channel is used to ingest data into Snowflake tables in a streaming fashion. Each channel is associated with a specific account/database/schema/pipe combination and is created by calling open_channel() and closed by calling close().

The channel provides methods for appending single rows or batches of rows into Snowflake, with support for offset tokens to track ingestion progress and enable replay capabilities in case of failures.

Note

This class should not be instantiated directly. Use open_channel() to create channel instances.

initiate_flush() None

Initiate a flush of the channel.

Initiates a flush of all buffered data maintained for this Channel but does not wait for the flush to complete. Calls to append_rows are still allowed on the Channel after invoking this API.

This method triggers an immediate flush of all currently buffered data in this specific channel, similar to the client-level initiate_flush() but scoped to only this channel. The flush operation will occur asynchronously and this method returns immediately.

This method is useful when you want to force immediate transmission of buffered data without waiting for automatic flush triggers (time-based or size-based). It provides fine-grained control over when data gets sent to Snowflake on a per-channel basis. However, calling initiate_flush at a high rate will lead to a drop in overall throughput, potential increase in costs, and could lead to higher incidence of throttling by the Snowflake Service.

Raises:

StreamingIngestError – If initiating the flush fails

wait_for_flush(timeout_seconds: int | None = None) None

Wait for the channel to flush all buffered data.

Waits for all buffered data in this channel to be flushed to the Snowflake server side. This method triggers a flush of all pending data and waits for the flush operation to complete. If the timeout is reached, a TimeoutError is raised.

Parameters:

timeout_seconds – Optional timeout in seconds for the flush operation. Defaults to None if no timeout is desired.

Raises:
  • ValueError – If timeout_seconds is negative

  • TimeoutError – If the timeout is reached

  • StreamingIngestError – If waiting for the flush fails

wait_for_commit(token_checker: Callable[[str], bool], timeout_seconds: int | None = None) None

Wait for the channel to commit all buffered data.

Waits for offset token to be committed in the snowflake sever side by checking whether the latest committed offset token meets the commit condition provided by the token_checker. Note that snowflake commits offset token in batch, so the token_checker should be able to handle the case where the latest committed offset token passed the expected ones. That said, the token_checker usually does a range check whether the provided token is greater or equal to the expected one, not a exact match.

Parameters:
  • token_checker – A callable that tests whether the current committed offset token from the server meets the desired condition. The callable receives the latest committed offset token (which may be None) and should return True when the wait condition is satisfied.

  • timeout_seconds – Optional timeout in seconds for the commit operation. Defaults to None if no timeout is desired.

Raises:
  • ValueError – If token_checker is not callable or timeout_seconds is negative

  • StreamingIngestError – If waiting for the commit fails

  • TimeoutError – If the timeout is reached

append_row(row: Dict[str, Any], offset_token: str | None = None) None

Append a single row into the channel.

Parameters:
  • row

    Dictionary representing the row data to append with keys as column names and values as column values. Values can be of the following types:

    • None: null values

    • bool: boolean values (True, False)

    • int: integer values

    • float: floating-point values

    • str: string values

    • bytes: byte strings

    • bytearray: mutable byte arrays

    • tuple: tuples of values

    • list: lists of values

    • dict: nested dictionaries

    • set: sets of values

    • frozenset: immutable sets of values

    • datetime.datetime: datetime objects

    • datetime.date: date objects

    • datetime.time: time objects

    • decimal.Decimal: decimal values for precise numeric operations

  • offset_token – Optional offset token, used to track the ingestion progress and replay ingestion in case of failures. It could be null if user don’t plan on replaying or can’t replay.

Raises:
  • ValueError, TypeError – If the row cannot be serialized to JSON

  • StreamingIngestError – If the row appending fails

append_rows(rows: List[Dict[str, Any]], start_offset_token: str | None = None, end_offset_token: str | None = None) None

Append multiple rows into the channel.

Parameters:
  • rows

    List of dictionaries representing the row data to append. Each dictionary’s values can be of the following types:

    • None: null values

    • bool: boolean values (True, False)

    • int: integer values

    • float: floating-point values

    • str: string values

    • bytes: byte strings

    • bytearray: mutable byte arrays

    • tuple: tuples of values

    • list: lists of values

    • dict: nested dictionaries

    • set: sets of values

    • frozenset: immutable sets of values

    • datetime.datetime: datetime objects

    • datetime.date: date objects

    • datetime.time: time objects

    • decimal.Decimal: decimal values for precise numeric operations

  • start_offset_token – Optional start offset token of the batch/row-set.

  • end_offset_token – Optional end offset token of the batch/row-set.

Raises:
  • ValueError, TypeError – If the rows cannot be serialized to JSON

  • StreamingIngestError – If the rows appending fails

get_latest_committed_offset_token() str | None

Get the latest committed offset token for the channel.

Returns:

The latest committed offset token for the channel, or None if the channel is brand new.

Return type:

Optional[str]

Raises:

StreamingIngestError – If getting the latest committed offset token fails

get_channel_status() ChannelStatus

Get the status of the channel.

Returns:

The status of the channel.

Return type:

ChannelStatus

Raises:

StreamingIngestError – If getting the channel status fails

close(drop: bool = False, wait_for_flush: bool = True, timeout_seconds: int | None = None) None

Close the channel.

Parameters:
  • drop – Whether to drop the channel, defaults to False

  • wait_for_flush – Whether to wait for the flush to complete. Default is True.

  • timeout_seconds – The timeout in seconds for the flush, None means no timeout. Default is None.

Raises:
  • ValueError – If timeout_seconds is negative

  • StreamingIngestError – If closing the channel fails

is_closed() bool

Check if the channel is closed.

Returns:

True if the channel is closed, False otherwise

Return type:

bool

property db_name: str

Get the database name.

property channel_name: str

Get the channel name.

property schema_name: str

Get the schema name.

property pipe_name: str

Get the pipe name.

class StreamingIngestClient(client_name: str, db_name: str, schema_name: str, pipe_name: str, profile_json: str | None = None, properties: Dict[str, Any] | None = None)

A client that is the starting point for using the Streaming Ingest client APIs.

A single client maps to exactly one account/database/schema/pipe in Snowflake; however, multiple clients can point to the same account/database/schema/pipe. Each client contains information for Snowflake authentication and authorization, and it is used to create one or more StreamingIngestChannel instances for data ingestion.

The client manages the lifecycle of streaming ingest channels and handles the underlying communication with Snowflake services for authentication, channel management, and data transmission.

open_channel(channel_name: str, offset_token: str | None = None) Tuple[StreamingIngestChannel, ChannelStatus]

Open a channel with the given name.

Parameters:
  • channel_name – Name of the channel to open

  • offset_token – Optional offset token

Returns:

(StreamingIngestChannel, ChannelStatus)

Return type:

tuple

Raises:

StreamingIngestError – If opening the channel fails

close(wait_for_flush: bool = True, timeout_seconds: int | None = None) None

Close the client.

Parameters:
  • wait_for_flush – Whether to wait for the flush to complete, defaults to True

  • timeout_seconds – Optional timeout in seconds for the flush operation, defaults to 60 seconds

Raises:
  • ValueError – If timeout_seconds is negative

  • TimeoutError – If the timeout is reached

  • StreamingIngestError – If closing the client fails

is_closed() bool

Check if the client is closed.

Raises:

StreamingIngestError – If checking the client status fails

get_latest_committed_offset_tokens(channel_names: List[str]) Dict[str, str | None]

Get the latest committed offset tokens for a list of channels.

Parameters:

channel_names – List of channel names

Returns:

A dictionary mapping channel names to their latest committed offset tokens.

Value is None if the channel is brand new or does not exist.

Return type:

Dict[str, Optional[str]]

Raises:

StreamingIngestError – If getting the latest committed offset tokens fails

get_channel_statuses(channel_names: List[str]) Dict[str, ChannelStatus]

Get the statuses of a list of channels.

Parameters:

channel_names – List of channel names

Returns:

A dictionary mapping channel names to their statuses.

Return type:

Dict[str, ChannelStatus]

Raises:

StreamingIngestError – If getting the channel statuses fails

drop_channel(channel_name: str) None

Drop a channel.

Parameters:

channel_name – Name of the channel to drop

Raises:

StreamingIngestError – If dropping the channel fails

initiate_flush() None

Initiate a flush of the client.

Initiates a flush by the Client which causes all outstanding buffered data to be flushed to Snowflake. Note that data can still be accepted by the Client - this is an asynchronous call and will return after flush is initiated for all Channels opened by this Client

Raises:

StreamingIngestError – If initiating the flush fails

wait_for_flush(timeout_seconds: int | None = None) None

Wait for the client to flush all buffered data.

Waits for all buffered data in all channels managed by this client to be flushed to the Snowflake server side. This method triggers a flush of all pending data across all channels and waits for the flush operations to complete. If the timeout is reached, a StreamingIngestError is raised.

Parameters:

timeout_seconds – Optional timeout in seconds for the flush operation. Defaults to None if no timeout is desired.

Raises:
  • ValueError – If timeout_seconds is negative

  • TimeoutError – If the timeout is reached

  • StreamingIngestError – If waiting for the flush fails

property client_name: str

Get the client name.

property db_name: str

Get the database name.

property schema_name: str

Get the schema name.

property pipe_name: str

Get the pipe name.

exception StreamingIngestError(error_code: StreamingIngestErrorCode, message: str, http_status_code: int, http_status_name: str)

Bases: Exception

A class for all streaming ingest errors.

property error_code: StreamingIngestErrorCode

The error code of the error.

property message: str

The message of the error.

property http_status_code: int

The HTTP status code of the error.

property http_status_name: str

The HTTP status name of the error.

class StreamingIngestErrorCode(*args, **kwds)

Bases: enum.Enum

Enumeration of all possible streaming ingest error codes.

These error codes correspond to the IngestError variants in the Rust implementation and provide type-safe error handling.

CONFIG_ERROR = 'ConfigError'
INVALID_REQUEST = 'InvalidRequest'
INVALID_ARGUMENT = 'InvalidArgument'
SF_API_USER_ERROR = 'SfApiUserError'
NOT_IMPLEMENTED = 'NotImplemented'
HTTP_CLIENT_NON_RETRYABLE_ERROR = 'HttpClientNonRetryableError'
SF_API_PIPE_FAILED_OVER_ERROR = 'SfApiPipeFailedOverError'
CHANNEL_ALREADY_EXISTS = 'ChannelAlreadyExists'
AUTH_TOKEN_ERROR = 'AuthTokenError'
SF_API_AUTH_ERROR = 'SfApiAuthError'
CHANNEL_NOT_FOUND = 'ChannelNotFound'
CHANNEL_WAIT_FOR_FLUSH_TIMEOUT = 'ChannelWaitForFlushTimeout'
CLIENT_WAIT_FOR_FLUSH_TIMEOUT = 'ClientWaitForFlushTimeout'
CLOSED_CHANNEL_ERROR = 'ClosedChannelError'
CLOSED_CLIENT_ERROR = 'ClosedClientError'
CHANNEL_CLOSED_BY_USER = 'ChannelClosedByUser'
INVALID_CHANNEL_ERROR = 'InvalidChannelError'
INVALID_CLIENT_ERROR = 'InvalidClientError'
INPUT_CHANNEL_CLOSED = 'InputChannelClosed'
OUTPUT_CHANNEL_CLOSED = 'OutputChannelClosed'
RECEIVER_SATURATED = 'ReceiverSaturated'
MEMORY_THRESHOLD_EXCEEDED = 'MemoryThresholdExceeded'
MEMORY_THRESHOLD_EXCEEDED_IN_CONTAINER = 'MemoryThresholdExceededInContainer'
FATAL = 'Fatal'
NON_FATAL = 'NonFatal'
MUTEX_LOCK_FAILED = 'MutexLockFailed'
SF_API_UNEXPECTED_BEHAVIOR_ERROR = 'SfApiUnexpectedBehaviorError'
SF_API_INTERNAL_SERVER_ERROR = 'SfApiInternalServerError'
FILE_UPLOAD_ERROR = 'FileUploadError'
HTTP_RETRIES_EXHAUSTED_ERROR = 'HttpRetriesExhaustedError'
CLOSE_ALL_CHANNELS_FAILED_ERROR = 'CloseAllChannelsFailedError'
HTTP_RETRYABLE_CLIENT_ERROR = 'HttpRetryableClientError'
classmethod from_string(error_code_str: str) StreamingIngestErrorCode | None

Convert a string error code to enum value if it exists.

Parameters:

error_code_str – The error code string to convert

Raises:

ValueError – If the error code string is invalid

Returns:

The matching StreamingIngestErrorCode enum value, or None if not found