Installation et configuration du connecteur Kafka

Le connecteur Kafka est fourni sous forme de fichier JAR (exécutable Java).

Snowflake fournit deux versions du connecteur :

  • Une version de la version du package Confluent de Kafka.

  • Une version du package logiciel open source (OSS) Apache Kafka.

Les instructions de ce chapitre spécifient les étapes qui ne s’appliquent qu’à l’une ou l’autre version du connecteur.

Dans ce chapitre :

Configuration du contrôle d’accès pour les objets Snowflake

Privilèges requis

La création et la gestion des objets Snowflake utilisés par le connecteur Kafka nécessitent un rôle avec les privilèges minimum suivants :

Objet

Privilège

Remarques

Base de données

USAGE

Schéma

USAGE . CREATE TABLE . CREATE STAGE . CREATE PIPE

Une fois les objets de niveau schéma créés, les privilèges CREATE objet peuvent être révoqués.

Table

INSERT . SELECT

Requis uniquement lors de l’utilisation du connecteur Kafka pour ingérer des données dans une table existante. . Si le connecteur crée une nouvelle table cible pour les enregistrements du sujet Kafka, le rôle par défaut de l’utilisateur spécifié dans le fichier de configuration Kafka devient le propriétaire de la table (c’est-à-dire qu’il dispose du privilège OWNERSHIP sur la table).

Zone de préparation

READ . WRITE

Requis uniquement lors de l’utilisation du connecteur Kafka pour mettre en zone de préparation les fichiers de données de Kafka vers une zone de préparation interne existante (non recommandée). . Si le connecteur crée une nouvelle zone de préparation pour stocker temporairement les fichiers de données consommés dans le sujet Kafka, le rôle par défaut de l’utilisateur spécifié dans le fichier de configuration Kafka devient le propriétaire de la zone de préparation (c.-à-d. qu’il dispose du privilège OWNERSHIP sur la zone de préparation).

Snowflake vous recommande de créer un utilisateur séparé (avec CREATE USER) et un rôle (avec CREATE ROLE) distincts pour chaque instance de Kafka afin que les privilèges d’accès puissent être révoqués individuellement si nécessaire. Le rôle doit être attribué en tant que rôle par défaut pour l’utilisateur.

Création d’un rôle pour utiliser le connecteur Kafka

Le script suivant crée un rôle personnalisé à utiliser par le connecteur Kafka (par exemple, KAFKA_CONNECTOR_ROLE_1). Tout rôle pouvant octroyer des privilèges (par exemple, SECURITYADMIN ou un rôle disposant du privilège MANAGE GRANTS) peut attribuer ce rôle personnalisé à tout utilisateur pour autoriser le connecteur Kafka à créer les objets Snowflake requis et à insérer des données dans des tables. Le script référence une base de données existante et un schéma (kafka_db.kafka_schema) et un utilisateur (kafka_connector_user_1) spécifiques :

-- Use a role that can create and manage roles and privileges:
USE ROLE securityadmin;

-- Create a Snowflake role with the privileges to work with the connector
CREATE ROLE kafka_connector_role_1;

-- Grant privileges on the database:
GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1;

-- Grant privileges on the schema:
GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE STAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE PIPE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;

-- Only required if the Kafka connector will load data into an existing table:
GRANT SELECT, INSERT ON TABLE existing_table1 TO ROLE kafka_connector_role_1;

-- Only required if the Kafka connector will stage data files in an existing internal stage: (not recommended):
GRANT READ, WRITE ON STAGE existing_stage1 TO ROLE kafka_connector_role_1;

-- Grant the custom role to an existing user:
GRANT ROLE kafka_connector_role_1 TO USER kafka_connector_user_1;

-- Set the custom role as the default role for the user:
ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1;

Pour plus d’informations sur la création de rôles personnalisés et de hiérarchies de rôles, voir Configuration du contrôle d’accès.

Prérequis d’installation

  • Le connecteur Kafka prend en charge les versions de package suivantes :

    Paquet

    Version du connecteur Snowflake Kafka

    Prise en charge des packages

    Apache Kafka

    0.5.3 - 1.2.2

    Apache Kafka 2.0.x - 2.4.x

    1.2.3 (ou supérieur)

    Apache Kafka 2.0.x - 2.5.x

    Confluent

    0.5.3 - 1.2.2

    Confluent 5.0.x - 5.4.x

    1.2.3 (ou supérieur)

    Confluent 5.0.x - 5.5.x

  • Le connecteur Kafka est conçu pour être utilisé avec l’API Kafka Connect 2.0.0 suivante. Nous vous recommandons vivement d’utiliser une version de l’API Kafka Connect entre la 2.0.0 et la 2.3.0. Les versions antérieures ne sont pas compatibles avec le connecteur, et les versions plus récentes n’ont pas été testées.

  • Si vous utilisez le format Avro pour l’acquisition de données :

  • Configurez Kafka avec la durée de conservation des données ou la limite de stockage de votre choix.

  • Installez et configurez le cluster Kafka Connect.

    Chaque nœud de cluster Kafka Connect doit inclure suffisamment de RAM pour le connecteur Kafka. La quantité minimale recommandée est de 5 MB par partition Kafka. Cela s’ajoute aux RAM requises pour toute autre tâche effectuée par Kafka Connect.

  • Nous vous recommandons vivement d’exécuter votre instance Kafka Connect dans la même région du fournisseur de cloud que votre compte Snowflake. Cela n’est pas strictement nécessaire, mais améliore généralement le débit.

Pour obtenir la liste des systèmes d’exploitation pris en charge par les clients Snowflake, voir Prise en charge par les systèmes d’exploitation.

Installation du connecteur

Cette section fournit des instructions pour l’installation et la configuration du connecteur Kafka pour Confluent.

Installation du connecteur pour Confluent

Télécharger les fichiers du connecteur Kafka

Téléchargez le fichier JAR du connecteur Kafka à l’un des emplacements suivants :

Hub Confluent

https://www.confluent.io/hub/

Le package inclut toutes les dépendances requises pour utiliser une clé privée chiffrée ou non chiffrée pour l’authentification par paire de clés. Pour obtenir des informations, voir Utilisation de l’authentification par paire de clés dans cette rubrique.

Maven Central Repository

https://mvnrepository.com/artifact/com.snowflake

Le fichier JAR ne nécessite aucune dépendance supplémentaire pour utiliser une clé privée non chiffrée pour l’authentification par paire de clés. Pour utiliser une clé privée chiffrée, téléchargez la bibliothèque de cryptographie Bouncy Castle (un fichier JAR). Snowflake utilise Bouncy Castle pour déchiffrer les clés privées RSA chiffrées utilisées pour se connecter :

Téléchargez ces fichiers dans le même dossier local que le fichier JAR du connecteur Kafka.

Le code source du connecteur est disponible sur https://github.com/snowflakedb/snowflake-kafka-connector.

Installation du connecteur Kafka

Installez le connecteur Kafka en suivant les instructions fournies pour installer d’autres connecteurs :

Installation du connecteur pour Apache Kafka Open Source

Cette section fournit des instructions pour l’installation et la configuration du connecteur Kafka pour Apache Kafka Open Source.

Installer Apache Kafka

  1. Téléchargez le package Kafka sur son site officiel : https://kafka.apache.org/downloads

  2. Dans une fenêtre de terminal, accédez au répertoire dans lequel vous avez téléchargé le fichier de package.

  3. Exécutez la commande suivante pour décompresser le fichier kafka_<version_scala>-<version_kafka>.tgz :

    tar xzvf kafka_<scala_version>-<kafka_version>.tgz
    

Installer le JDK

Installez et configurez le kit de développement Java (JDK). Snowflake teste avec l’édition standard (SE) de JDK. L’édition entreprise (EE) devrait être compatible mais n’a pas été testée.

Si vous avez déjà terminé cette étape, vous pouvez ignorer cette section.

  1. Téléchargez le JDK à l’adresse https://www.oracle.com/technetwork/java/javase/downloads/index.html.

  2. Installez ou décompressez le JDK.

  3. En suivant les instructions pour votre système d’exploitation, définissez la variable d’environnement JAVA_HOME pour qu’elle pointe vers le répertoire contenant le JDK.

Télécharger les fichiers JAR du connecteur Kafka

  1. Téléchargez le fichier JAR du connecteur Kafka à partir du référentiel central Maven :

    https://mvnrepository.com/artifact/com.snowflake

  2. Le fichier JAR ne nécessite aucune dépendance supplémentaire pour utiliser une clé privée non chiffrée pour l’authentification par paire de clés. Pour utiliser une clé privée chiffrée, téléchargez la bibliothèque de cryptographie Bouncy Castle (un fichier JAR). Snowflake utilise Bouncy Castle pour déchiffrer les clés privées RSA chiffrées utilisées pour se connecter :

  3. Si vos données Kafka sont diffusées au format Apache Avro, téléchargez le fichier JAR Avro :

    https://mvnrepository.com/artifact/org.apache.avro/avro

Le code source du connecteur est disponible sur https://github.com/snowflakedb/snowflake-kafka-connector.

Installation du connecteur Kafka

Copiez les fichiers JAR que vous avez téléchargés dans Télécharger les fichiers JAR du connecteur Kafka dans le dossier <kafka_dir>/libs.

Configuration du connecteur Kafka

Le connecteur est configuré en créant un fichier spécifiant des paramètres tels que les identifiants de connexion Snowflake, les noms des sujets, les noms des tables Snowflake, etc.

Important

L’infrastructure Kafka Connect diffuse les paramètres de configuration du connecteur Kafka du nœud maître aux nœuds de travail. Les paramètres de configuration incluent des informations sensibles (en particulier le nom d’utilisateur et la clé privée Snowflake). Assurez-vous de sécuriser le canal de communication entre les nœuds Kafka Connect. Pour obtenir des instructions, consultez la documentation de votre logiciel Apache Kafka.

Chaque fichier de configuration spécifie les sujets et les tables correspondantes pour une base de données et un schéma dans cette base de données. Remarque : un connecteur peut intégrer des messages de n’importe quel nombre de sujets, mais les tables correspondantes doivent toutes figurer dans une base de données et un schéma uniques.

Cette section fournit des instructions pour les modes distribué et autonome.

Pour une description des champs de configuration, voir Propriétés de configuration de Kafka.

Important

Comme le fichier de configuration contient généralement des informations relatives à la sécurité, telles que la clé privée, définissez les privilèges de lecture / écriture de manière appropriée sur le fichier pour limiter l’accès.

En outre, envisagez de stocker le fichier de configuration dans un emplacement externe sécurisé ou dans un service de gestion de clés. Pour plus d’informations, voir Externaliser les secrets (dans ce chapitre).

Mode distribué

Créez un fichier de configuration Kafka, par exemple <rep_kafka>/config/connect-distributed.properties. Remplissez le fichier avec toutes les informations de configuration du connecteur.

Exemple de fichier de configuration

{
  "name":"XYZCompanySensorData",
  "config":{
    "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max":"8",
    "topics":"topic1,topic2",
    "snowflake.topic2table.map": "topic1:table1,topic2:table2",
    "buffer.count.records":"10000",
    "buffer.flush.time":"60",
    "buffer.size.bytes":"5000000",
    "snowflake.url.name":"myaccount.us-west-2.snowflakecomputing.com:443",
    "snowflake.user.name":"jane.smith",
    "snowflake.private.key":"xyz123",
    "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r",
    "snowflake.database.name":"mydb",
    "snowflake.schema.name":"myschema",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
    "value.converter.schema.registry.url":"http://localhost:8081",
    "value.converter.basic.auth.credentials.source":"USER_INFO",
    "value.converter.basic.auth.user.info":"jane.smith:MyStrongPassword"
  }
}

Mode autonome

Créez un fichier de configuration, par exemple <rep_kafka>/config/connect-standalone.properties. Remplissez le fichier avec toutes les informations de configuration du connecteur.

Exemple de fichier de configuration

connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=8
topics=topic1,topic2
snowflake.topic2table.map= topic1:table1,topic2:table2
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
snowflake.url.name=myaccount.us-west-2.snowflakecomputing.com:443
snowflake.user.name=jane.smith
snowflake.private.key=xyz123
snowflake.private.key.passphrase=jkladu098jfd089adsq4r
snowflake.database.name=mydb
snowflake.schema.name=myschema
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeAvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.basic.auth.user.info=jane.smith:MyStrongPassword

Propriétés de configuration de Kafka

Les propriétés suivantes peuvent être définies dans le fichier de configuration Kafka :

Propriétés requises

name

Nom de l’application. Cela doit être unique pour tous les connecteurs Kafka utilisés par le client. Ce nom doit être un identificateur non indiqué entre Snowflake et valide. Pour plus d’informations sur les identificateurs valides, voir Exigences relatives à l’identificateur.

connector.class

com.snowflake.kafka.connector.SnowflakeSinkConnector

topics

Liste de sujets séparés par des virgules. Par défaut, Snowflake suppose que le nom de la table est identique à celui du sujet. Si le nom de la table est différent du nom du sujet, utilisez le paramètre facultatif topic2table.map (ci-dessous) pour spécifier le mappage entre le nom du sujet et le nom de la table. Ce nom de table doit être un identificateur non spécifié Snowflake et valide. Pour plus d’informations sur les noms de table valides, voir Exigences relatives à l’identificateur.

Note

:emph:`` topics ou topics.regex est requis ; pas les deux.

topics.regex

Il s’agit d’une expression régulière (« regex ») qui spécifie les sujets contenant les messages à charger dans les tables Snowflake. Le connecteur charge les données de tout nom de sujet correspondant à l’expression régulière. L’expression régulière doit respecter les règles applicables aux expressions régulières Java (c.-à-d. être compatible avec java.util.regex.Pattern). Le fichier de configuration doit contenir topics ou topics.regex, pas les deux.

snowflake.url.name

L’URL pour accéder à votre compte Snowflake, sous la forme de https://<nom_compte>.<id_région>.snowflakecomputing.com:443. Notez que le https:// et le numéro de port sont facultatifs. L’ID de région n’est pas utilisé si votre compte est dans la région AWS US Ouest et que vous n’utilisez pas AWS PrivateLink. Pour plus d’informations sur les noms de compte Snowflake et les noms de région, voir Régions prises en charge.

snowflake.user.name

Nom de connexion de l’utilisateur pour le compte Snowflake.

snowflake.private.key

Clé privée pour authentifier l’utilisateur. Incluez uniquement la clé, pas l’en-tête ni le pied de page. Si la clé est divisée sur plusieurs lignes, supprimez les sauts de ligne. Vous pouvez fournir une clé non chiffrée ou une clé chiffrée et fournir le paramètre snowflake.private.key.passphrase pour permettre à Snowflake de déchiffrer la clé. Utilisez ce paramètre si et seulement si la valeur du paramètre snowflake.private.key est chiffrée. Cela déchiffre les clés privées chiffrées conformément aux instructions de Utilisation de l’authentification par paire de clés (dans cette rubrique).

Note

Voir également snowflake.private.key.passphrase dans Propriétés facultatives (dans cette rubrique).

snowflake.database.name

Nom de la base de données contenant la table dans laquelle insérer des lignes.

snowflake.schema.name

Nom du schéma contenant la table dans laquelle insérer des lignes.

header.converter

Obligatoire uniquement si les enregistrements sont formatés en Avro et incluent un en-tête. La valeur par défaut est "org.apache.kafka.connect.storage.StringConverter".

key.converter

Il s’agit du convertisseur de clé de l’enregistrement Kafka, (par exemple "org.apache.kafka.connect.storage.StringConverter"). Ce connecteur n’est pas utilisé par le connecteur Kafka, mais il est requis par la plate-forme Kafka Connect.

Voir Limitations du connecteur Kafka pour les limitations actuelles.

value.converter

Si les enregistrements sont au format JSON, il doit s’agir de "com.snowflake.kafka.connector.records.SnowflakeJsonConverter".

Si les enregistrements sont formatés dans Avro et utilisent le service de registre de schéma de Kafka, il doit s’agir de "com.snowflake.kafka.connector.records.SnowflakeAvroConverter".

Si les enregistrements sont formatés dans Avro et contiennent le schéma (et n’ont donc pas besoin du service de registre de schéma de Kafka), il doit s’agir de "com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry".

Voir Limitations du connecteur Kafka pour les limitations actuelles.

Propriétés facultatives

snowflake.private.key.passphrase

Si la valeur de ce paramètre n’est pas vide, Kafka utilise cette expression pour tenter de déchiffrer la clé privée.

tasks.max

Nombre de tâches, généralement égal au nombre de cœurs CPU sur les nœuds de travail du cluster Kafka Connect. Ce nombre peut être inférieur ou supérieur. Cependant, Snowflake ne recommande pas de le régler plus haut.

snowflake.topic2table.map

Ce paramètre facultatif permet à un utilisateur de spécifier quels sujets doivent être mappés à quelles tables. Chaque sujet et son nom de table doivent être séparés par le signe deux-points (voir exemple ci-dessous). Ce nom de table doit être un identificateur non spécifié Snowflake et valide. Pour plus d’informations sur les noms de table valides, voir Exigences relatives à l’identificateur.

buffer.count.records

Nombre d’enregistrements mis en mémoire tampon par partition Kafka avant leur acquisition dans Snowflake. La valeur par défaut est 10000.

buffer.flush.time

Nombre de secondes entre les vidages de la mémoire tampon, où le vidage provient du cache de la mémoire de Kafka vers la zone de préparation interne. La valeur par défaut est 120 secondes.

buffer.size.bytes

Taille cumulée en octets des enregistrements mis en mémoire tampon par partition Kafka avant d’être ingérés dans Snowflake en tant que fichiers de données. La valeur par défaut est 5000000 (5 MB).

Les enregistrements sont compressés lorsqu’ils sont écrits dans des fichiers de données. Par conséquent, la taille des enregistrements dans la mémoire tampon peut être supérieure à la taille des fichiers de données créés à partir des enregistrements.

value.converter.schema.registry.url

Si le format est Avro et que vous utilisez un service de registre de schéma, il doit s’agir de l’URL du service de registre de schéma. Sinon, ce champ devrait être vide.

value.converter.break.on.schema.registry.error

Si vous chargez des données Avro à partir du service de registre de schéma, cette propriété détermine si le connecteur Kafka doit cesser de consommer des enregistrements s’il rencontre une erreur lors de la récupération de l’ID de schéma. La valeur par défaut est false. Définissez la valeur sur true pour activer ce comportement.

Pris en charge par la version du connecteur Kafka 1.4.2 (et supérieure).

jvm.proxy.host

Pour permettre au connecteur Snowflake Kafka d’accéder à Snowflake via un serveur proxy, définissez ce paramètre pour spécifier l’hôte de ce serveur proxy.

jvm.proxy.port

Pour permettre au connecteur Snowflake Kafka d’accéder à Snowflake via un serveur proxy, définissez ce paramètre pour spécifier le port de ce serveur proxy.

jvm.proxy.username

Nom d’utilisateur qui s’authentifie auprès du serveur proxy.

Pris en charge par la version du connecteur Kafka 1.4.4 (et supérieure).

jvm.proxy.password

Mot de passe du nom d’utilisateur qui s’authentifie auprès du serveur proxy.

Pris en charge par la version du connecteur Kafka 1.4.4 (et supérieure).

value.converter.basic.auth.credentials.source

Si vous utilisez le format de données Avro et avez besoin d’un accès sécurisé au registre de schémas Kafka, définissez ce paramètre sur la chaîne « USER_INFO », puis définissez le paramètre value.converter.basic.auth.user.info décrit ci-dessous. Sinon, omettez ce paramètre.

value.converter.basic.auth.user.info

Si vous utilisez le format de données Avro et avez besoin d’un accès sécurisé au registre de schémas Kafka, définissez ce paramètre sur la chaîne « <ID_utilisateur>:<motdepasse> », puis définissez le paramètre value.converter.basic.auth.credentials.source décrit ci-dessus. Sinon, omettez ce paramètre.

snowflake.metadata.createtime

Si la valeur est définie sur FALSE, la valeur de la propriété CreateTime est omise des métadonnées dans la colonne RECORD_METADATA. La valeur par défaut est TRUE.

Pris en charge par le connecteur Kafka 1.2.0 (et supérieur).

snowflake.metadata.topic

Si la valeur est définie sur FALSE, la valeur de la propriété topic est omise des métadonnées dans la colonne RECORD_METADATA. La valeur par défaut est TRUE.

Pris en charge par le connecteur Kafka 1.2.0 (et supérieur).

snowflake.metadata.offset.and.partition

Si la valeur est définie sur FALSE, les valeurs de propriété Offset et Partition sont omises des métadonnées de la colonne RECORD_METADATA. La valeur par défaut est TRUE.

Pris en charge par le connecteur Kafka 1.2.0 (et supérieur).

snowflake.metadata.all

Si la valeur est définie sur FALSE, les métadonnées de la colonne RECORD_METADATA sont complètement vides. La valeur par défaut est TRUE.

Pris en charge par le connecteur Kafka 1.2.0 (et supérieur).

Utilisation de l’authentification par paire de clés

Le connecteur Kafka repose sur l’authentification par paire de clés plutôt que par l’authentification typique du nom d’utilisateur/mot de passe. Cette méthode d’authentification nécessite une paire de clés de 2048 bits (minimum) RSA. Générez la paire de clés publiques-privées via OpenSSL. La clé publique est attribuée à l’utilisateur Snowflake défini dans le fichier de configuration.

Pour configurer la paire de clés publiques/privées :

  1. Depuis la ligne de commande d’une fenêtre de terminal, générez une clé privée.

    Vous pouvez générer une version chiffrée ou non chiffrée de la clé privée.

    Note

    Le connecteur Kafka prend en charge des algorithmes de chiffrement validés pour répondre aux exigences de la norme Federal Information Processing Standard (140-2) (FIPS 140-2). Pour plus d’informations sur la norme FIPS 140-2, voir https://csrc.nist.gov/publications/detail/fips/140/2/final.

    Pour générer une version non chiffrée, utilisez la commande suivante :

    $ openssl genrsa -out rsa_key.pem 2048
    

    Pour générer une version chiffrée, utilisez la commande suivante :

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 <algorithm> -inform PEM -out rsa_key.p8
    

    <algorithme> est un algorithme de chiffrement conforme à FIPS 140-2.

    Par exemple, pour spécifier AES 256 comme algorithme de chiffrement :

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8
    

    Si vous générez une version chiffrée de la clé privée, enregistrez la phrase secrète. Plus tard, vous spécifierez la phrase secrète dans la propriété snowflake.private.key.passphrase du fichier de configuration de Kafka.

    Exemple de clé privée PEM

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
  2. Depuis la ligne de commande, générez la clé publique en faisant référence à la clé privée :

    En supposant que la clé privée soit chiffrée et contenue dans le fichier nommé rsa_key.p8, utilisez la commande suivante :

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    

    Exemple de clé publique PEM

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
  3. Copiez les fichiers de clés publiques et privées dans un répertoire local en vue de leur stockage. Enregistrez le chemin d’accès aux fichiers. Notez que la clé privée est stockée au format PKCS#8 (Public Key Cryptography Standards) et est chiffrée à l’aide de la phrase de chiffrement que vous avez spécifiée à l’étape précédente ; toutefois, le fichier doit toujours être protégé contre tout accès non autorisé au moyen du mécanisme d’autorisation de fichier fourni par votre système d’exploitation. Il est de votre responsabilité de sécuriser le fichier lorsqu’il n’est pas utilisé.

  4. Connectez-vous à Snowflake. Attribuez la clé publique à l’utilisateur Snowflake en utilisant ALTER USER. Par exemple :

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    

    Note

    • Seuls les administrateurs de sécurité (c’est-à-dire les utilisateurs ayant le rôle SECURITYADMIN) ou ayant un rôle supérieur peuvent modifier un utilisateur.

    • Excluez l’en-tête et le pied de page de la clé publique dans l’instruction SQL.

    Vérifiez l’empreinte de la clé publique de l’utilisateur en utilisant DESCRIBE USER :

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    

    Note

    La propriété RSA_PUBLIC_KEY_2_FP est décrite dans Rotation de clé (dans ce chapitre).

  5. Copiez et collez la clé privée complète dans le champ snowflake.private.key du fichier de configuration. Sauvegardez le fichier.

Externaliser les secrets

Snowflake recommande fortement d’externaliser des secrets tels que la clé privée et de les stocker sous une forme chiffrée ou dans un service de gestion de clés tel que AWS Key Management Service (KMS), Microsoft Azure Key Vault, ou HashiCorp Vault. Pour ce faire, utilisez une implémentation ConfigProvider sur votre cluster Kafka Connect.

Pour plus d’informations, voir la description de ce service par Confluent : https://docs.confluent.io/current/connect/security.html#externalizing-secrets.

Rotation de clé

Snowflake accepte plusieurs clés actives pour permettre une rotation ininterrompue. Faites pivoter et remplacez vos clés publiques et privées en fonction du calendrier d’expiration que vous suivez en interne.

Actuellement, vous pouvez utiliser les paramètres RSA_PUBLIC_KEY et RSA_PUBLIC_KEY_2 pour ALTER USER afin d’associer jusqu’à 2 clés publiques à un seul utilisateur.

Pour faire tourner vos clés :

  1. Effectuez les étapes de la section Utilisation de l’authentification par paire de clés pour :

    • Générer un nouvel ensemble de clés privées et publiques.

    • Attribuer la clé publique à l’utilisateur. Définir la valeur de la clé publique sur RSA_PUBLIC_KEY ou RSA_PUBLIC_KEY_2 (la valeur de la clé qui n’est pas actuellement utilisée). Par exemple :

      alter user jsmith set rsa_public_key_2='JERUEHtcve...';
      
  2. Mettez à jour le code pour vous connecter à Snowflake. Spécifiez la nouvelle clé privée.

    Snowflake vérifie la bonne clé publique active pour l’authentification sur la base de la clé privée soumise avec vos informations de connexion.

  3. Retirez l’ancienne clé publique du profil utilisateur. Par exemple :

    alter user jsmith unset rsa_public_key;
    

Démarrer Kafka

Démarrez Kafka en suivant les instructions fournies dans la documentation tierce Confluent ou Apache Kafka.

Démarrer le connecteur Kafka

Vous pouvez démarrer le connecteur Kafka en mode distribué ou en mode autonome. Les instructions pour chacun sont indiquées ci-dessous :

Mode distribué

À partir d’une fenêtre de terminal, exécutez la commande suivante :

curl -X POST -H "Content-Type: application/json" --data @<config_file>.json http://localhost:8083/connectors

Mode autonome

À partir d’une fenêtre de terminal, exécutez la commande suivante :

<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/<path>/connect-standalone.properties <kafka_dir>/config/connect-standalone.properties

Test et utilisation du connecteur Kafka

Nous vous recommandons de tester le connecteur Kafka avec une petite quantité de données avant de l’utiliser dans un système de production. Le processus de test est en principe le même que le processus d’utilisation du connecteur :

  1. Vérifiez que Kafka et Kafka Connect sont en cours d’exécution.

  2. Vérifiez que vous avez créé le sujet Kafka approprié.

  3. Créez un éditeur de message (ou utilisez un existant). Assurez-vous que les messages publiés dans le sujet ont le bon format (JSON ou Avro).

  4. Créez un fichier de configuration spécifiant le sujet auquel vous souhaitez vous abonner et la table Snowflake dans laquelle écrire. Voir aussi l’étape suivante.

  5. (Facultatif) Créez une table dans laquelle écrire des données. Cette étape est facultative. Si vous ne créez pas la table, le connecteur Kafka la crée pour vous. Si vous ne prévoyez pas d’utiliser le connecteur pour ajouter des données à une table existante non vide, alors nous vous recommandons de laisser le connecteur la créer pour vous afin de réduire les risques de non-correspondance de schéma.

  6. Attribuez les privilèges minimum requis sur les objets Snowflake (base de données, schéma, table cible, etc.) au rôle qui sera utilisé pour ingérer des données.

  7. Publiez un exemple de jeu de données dans le sujet Kafka configuré.

  8. Attendez quelques minutes que les données se propagent dans le système, puis consultez la table Snowflake pour vérifier que les enregistrements ont été insérés.

Astuce

Envisagez de vérifier votre connexion réseau à Snowflake en utilisant SnowCD avant de charger des données dans Snowflake dans vos environnements de test et de production.