Set up the Openflow Connector for Snowflake to Kafka¶
Note
The connector is subject to the Connector Terms.
This topic describes the steps to set up the Openflow Connector for Snowflake to Kafka.
Prerequisites¶
Ensure that you have reviewed About Openflow Connector for Snowflake to Kafka.
Ensure that you have set up Openflow.
Create a Snowflake stream that will be queried for the changes.
Create a Kafka topic that will receive CDC messages from the Snowflake stream.
Set up Snowflake account¶
As a Snowflake account administrator, perform the following tasks:
Create the database, source table, and the stream object that the connector will use for reading CDC events. For example:
create database stream_db; use database stream_db; create table stream_source (user_id varchar, data varchar); create stream stream_on_table on table stream_source;
Create a new role or use an existing role, and grant the SELECT privilege on the stream and the source object for the stream. The connector will also need the USAGE privilege on the database and schema containing the stream and source object for the stream. For example:
create role stream_reader; grant usage on database stream_db to role stream_reader; grant usage on schema stream_db.public to role stream_reader; grant select on stream_source to role stream_reader; grant select on stream_on_table to role stream_reader;
Create a new Snowflake service user with the type as SERVICE. For example:
create user stream_user type = service;
Grant the Snowflake service user the role you created in the previous steps. For example:
grant role stream_reader to user stream_user;
Configure with key-pair auth for the Snowflake SERVICE user from step 3.
Snowflake strongly recommends this step. Configure a secrets manager supported by Openflow, for example, AWS, Azure, and Hashicorp, and store the public and private keys in the secret store. However, note that the private key generated in step 4 can be used directly as a configuration parameter for the connector configuration. In such a case, the private key is stored in Openflow runtime configuration.
Note
If for any reason, you do not wish to use a secrets manager, then you are responsible for safeguarding the public key and private key files used for key-pair authentication according to the security policies of your organization.
Once the secrets manager is configured, determine how you will authenticate to it. On AWS, it’s recommended that you the EC2 instance role associated with Openflow as this way no other secrets have to be persisted.
In Openflow, configure a Parameter Provider associated with this Secrets Manager, from the hamburger menu in the upper right. Navigate to Controller Settings » Parameter Provider and then fetch your parameter values.
At this point all credentials can be referenced with the associated parameter paths and no sensitive values need to be persisted within Openflow.
Designate a warehouse for the connector to use. One connector can replicate single table to a single Kafka Topic. For this kind of processing, you can select the smallest warehouse.
Set up the connector¶
As a data engineer, perform the following tasks to install and configure a connector:
Navigate to the Openflow Overview page. In the Featured connectors section, select View more connectors.
On the Openflow connectors page, find and choose the connector depending on what kind of Kafka broker instance the connector should communicate with.
mTLS version: Choose this connector if you are using the SSL (mutual TLS) security protocol, or if you are using the SASL_SSL protocol and connecting to the broker that is using self-signed certificates.
SASL version: Choose this connector if you are using any other security protocol
Select Add to runtime.
In the Select runtime dialog, select your runtime from the Available runtimes drop-down list.
Select Add.
Authenticate to the deployment with your Snowflake account credentials and select Allow when prompted to allow the runtime application to access your Snowflake account. The connector installation process takes a few minutes to complete.
Authenticate to the runtime with your Snowflake account credentials.
The Openflow canvas appears with the connector process group added to it.
Right-click on the imported process group and select Parameters.
Populate the required parameter values as described in Flow parameters.
Flow parameters¶
This section describes the flow parameters that you can configure based on the following parameter contexts:
Kafka Sink Source Parameters¶
Parameter |
Description |
Required |
---|---|---|
Snowflake Account Identifier |
Snowflake account name formatted as [organization-name]-[account-name] where data will be persisted.
Example: |
Yes |
Snowflake Authentication Strategy |
Strategy of authentication to Snowflake. Possible values:
|
Yes |
Source Database |
Source database. This database should contain Snowflake Stream object that will be consumed. |
Yes |
Snowflake Private Key Password |
The password associated with the Snowflake Private Key. It is not required when the private key used is not secured with a password. |
No |
Snowflake Role |
Snowflake Role used during query execution |
Yes |
Snowflake Username |
Username used to connect to Snowflake instance |
Yes |
Snowflake Warehouse |
Snowflake warehouse used to run queries |
Yes |
Snowflake Private Key |
The RSA private key used for authentication. The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either Snowflake Private Key File or Snowflake Private Key must be defined. |
Yes |
Snowflake Private Key File |
The file that contains the RSA Private Key used for authentication to Snowflake, formatted according to PKCS8 standards and having standard PEM headers and footers. The header line starts with |
No |
Source Schema |
The source schema. This schema should contain Snowflake Stream object that will be consumed. |
Yes |
Kafka Sink Destination Parameters¶
Parameter |
Description |
Required |
---|---|---|
Kafka Bootstrap Servers |
A comma-separated list of Kafka brokers to send data to. |
Yes |
Kafka SASL Mechanism |
SASL mechanism used for authentication. Corresponds to the Kafka Client
|
Yes |
Kafka SASL Username |
The username to authenticate to Kafka |
Yes |
Kafka SASL Password |
The password to authenticate to Kafka |
Yes |
Kafka Security Protocol |
Security protocol used to communicate with brokers.
Corresponds to the Kafka Client
|
Yes |
Kafka Topic |
The Kafka topic, where CDCs from Snowflake Stream will be sent |
Yes |
Kafka Message Key Field |
Specify the database column name that will be used as the Kafka message key. If not specified, the message key will not be set. If specified, the value of this column will be used as a message key. The value of this parameter is case-sensitive. |
No |
Kafka Keystore Filename |
A full path to a keystore storing a client key and certificate for mTLS authentication method. Required for mTLS authentication and when the security protocol is SSL. |
No |
Kafka Keystore Type |
The type of keystore. Required for mTLS authentication. Possible values:
|
No |
Kafka Keystore Password |
The password used to secure keystore file. |
No |
Kafka Key Password |
A password for the private key stored in the keystore. Required for mTLS authentication. |
No |
Kafka Truststore Filename |
A full path to a truststore storing broker certificates. The client will use the certificate from this truststore to verify broker identity. |
No |
Kafka Truststore Type |
The type of truststore file. Possible values:
|
No |
Kafka Truststore Password |
A password for the truststore file. |
No |
Kafka Sink Ingestion Parameters¶
Parameter |
Description |
Required |
---|---|---|
Snowflake FQN Stream Name |
Fully qualified Snowflake stream name. |
Yes |