snowflake.ingest.streaming

The snowflake.ingest.streaming package provides a Python SDK for streaming data into Snowflake using Snowpipe Streaming.

## Table of Contents

Core Classes

API Reference

class ChannelStatus(channel_name: str, status_code: str, latest_committed_offset_token: str | None)

Channel status information returned to users.

Provides access to channel status information including the channel name, status code, and latest committed offset token from Snowflake server.

property channel_name: str

Get the channel name.

Returns:

The name of the channel

Return type:

str

property status_code: str

Get the status code for the channel.

Returns:

The status code from Snowflake server

Return type:

str

property latest_committed_offset_token: str | None

Get the latest committed offset token for the channel.

Returns:

The latest committed offset token, or None if no commits yet

Return type:

Optional[str]

class StreamingIngestChannel(channel: snowflake.ingest.streaming._python_ffi.PyChannel, *, _internal: bool = False)

A channel for streaming data ingestion into Snowflake using the Snowflake Ingest SDK.

The channel is used to ingest data into Snowflake tables in a streaming fashion. Each channel is associated with a specific account/database/schema/pipe combination and is created by calling open_channel() and closed by calling close().

The channel provides methods for appending single rows or batches of rows into Snowflake, with support for offset tokens to track ingestion progress and enable replay capabilities in case of failures.

Note

This class should not be instantiated directly. Use open_channel() to create channel instances.

append_row(row: Dict[str, Any], offset_token: str | None = None) None

Append a single row into the channel.

Parameters:
  • row

    Dictionary representing the row data to append with keys as column names and values as column values. Values can be of the following types:

    • None: null values

    • bool: boolean values (True, False)

    • int: integer values

    • float: floating-point values

    • str: string values

    • bytes: byte strings

    • bytearray: mutable byte arrays

    • tuple: tuples of values

    • list: lists of values

    • dict: nested dictionaries

    • set: sets of values

    • frozenset: immutable sets of values

    • datetime.datetime: datetime objects

    • datetime.date: date objects

    • datetime.time: time objects

    • decimal.Decimal: decimal values for precise numeric operations

  • offset_token – Optional offset token, used to track the ingestion progress and replay ingestion in case of failures. It could be null if user don’t plan on replaying or can’t replay.

Raises:
  • ValueError, TypeError – If the row cannot be serialized to JSON

  • StreamingIngestError – If the row appending fails

append_rows(rows: List[Dict[str, Any]], start_offset_token: str | None = None, end_offset_token: str | None = None) None

Append multiple rows into the channel.

Parameters:
  • rows

    List of dictionaries representing the row data to append. Each dictionary’s values can be of the following types:

    • None: null values

    • bool: boolean values (True, False)

    • int: integer values

    • float: floating-point values

    • str: string values

    • bytes: byte strings

    • bytearray: mutable byte arrays

    • tuple: tuples of values

    • list: lists of values

    • dict: nested dictionaries

    • set: sets of values

    • frozenset: immutable sets of values

    • datetime.datetime: datetime objects

    • datetime.date: date objects

    • datetime.time: time objects

    • decimal.Decimal: decimal values for precise numeric operations

  • start_offset_token – Optional start offset token of the batch/row-set.

  • end_offset_token – Optional end offset token of the batch/row-set.

Raises:
  • ValueError, TypeError – If the rows cannot be serialized to JSON

  • StreamingIngestError – If the rows appending fails

get_latest_committed_offset_token() str | None

Get the latest committed offset token for the channel.

Returns:

The latest committed offset token for the channel, or None if the channel is brand new.

Return type:

Optional[str]

Raises:

StreamingIngestError – If getting the latest committed offset token fails

get_channel_status() ChannelStatus

Get the status of the channel.

Returns:

The status of the channel.

Return type:

ChannelStatus

Raises:

StreamingIngestError – If getting the channel status fails

close(drop: bool = False, wait_for_flush: bool = True, timeout_seconds: int | None = None) None

Close the channel.

Parameters:
  • drop – Whether to drop the channel, defaults to False

  • wait_for_flush – Whether to wait for the flush to complete. Default is True.

  • timeout_seconds – The timeout in seconds for the flush, None means no timeout. Default is None.

Raises:

StreamingIngestError – If closing the channel fails

is_closed() bool

Check if the channel is closed.

Returns:

True if the channel is closed, False otherwise

Return type:

bool

property db_name: str

Get the database name.

Raises:

ValueError – If channel is already closed

property channel_name: str

Get the channel name.

Raises:

ValueError – If channel is already closed

property schema_name: str

Get the schema name.

Raises:

ValueError – If channel is already closed

property pipe_name: str

Get the pipe name.

Raises:

ValueError – If channel is already closed

class StreamingIngestClient(client_name: str, db_name: str, schema_name: str, pipe_name: str, profile_json: str | None = None, properties: Dict[str, Any] | None = None)

A client that is the starting point for using the Streaming Ingest client APIs.

A single client maps to exactly one account/database/schema/pipe in Snowflake; however, multiple clients can point to the same account/database/schema/pipe. Each client contains information for Snowflake authentication and authorization, and it is used to create one or more StreamingIngestChannel instances for data ingestion.

The client manages the lifecycle of streaming ingest channels and handles the underlying communication with Snowflake services for authentication, channel management, and data transmission.

open_channel(channel_name: str, offset_token: str | None = None) Tuple[StreamingIngestChannel, ChannelStatus]

Open a channel with the given name.

Parameters:
  • channel_name – Name of the channel to open

  • offset_token – Optional offset token

Returns:

(StreamingIngestChannel, ChannelStatus)

Return type:

tuple

Raises:

StreamingIngestError – If opening the channel fails

close(wait_for_flush: bool = True, timeout_seconds: int | None = None) None

Close the client.

Parameters:
  • wait_for_flush – Whether to wait for the flush to complete, defaults to True

  • timeout_seconds – Optional timeout in seconds for the flush operation, defaults to 60 seconds

Raises:

StreamingIngestError – If closing the client fails

is_closed() bool

Check if the client is closed.

Raises:

StreamingIngestError – If checking the client status fails

get_latest_committed_offset_tokens(channel_names: List[str]) Dict[str, str | None]

Get the latest committed offset tokens for a list of channels.

Parameters:

channel_names – List of channel names

Returns:

A dictionary mapping channel names to their latest committed offset tokens.

Value is None if the channel is brand new or does not exist.

Return type:

Dict[str, Optional[str]]

Raises:

StreamingIngestError – If getting the latest committed offset tokens fails

get_channel_statuses(channel_names: List[str]) Dict[str, ChannelStatus]

Get the statuses of a list of channels.

Parameters:

channel_names – List of channel names

Returns:

A dictionary mapping channel names to their statuses.

Return type:

Dict[str, ChannelStatus]

Raises:

StreamingIngestError – If getting the channel statuses fails

drop_channel(channel_name: str) None

Drop a channel.

Parameters:

channel_name – Name of the channel to drop

Raises:

StreamingIngestError – If dropping the channel fails

initiate_flush() None

Initiate a flush of the client.

Raises:

StreamingIngestError – If initiating the flush fails

property client_name: str

Get the client name.

Raises:

ValueError – If client is already closed

property db_name: str

Get the database name.

Raises:

ValueError – If client is already closed

property schema_name: str

Get the schema name.

Raises:

ValueError – If client is already closed

property pipe_name: str

Get the pipe name.

Raises:

ValueError – If client is already closed

exception StreamingIngestError(code_name: str, message: str)

Bases: Exception

A class for all streaming ingest errors.

code_name
message