Usando o Snowflake Connector para Kafka com tabelas Apache Iceberg™

A partir da versão 3.0.0, o Snowflake Connector para Kafka pode ingerir dados em uma tabela Apache Iceberg™ gerenciada pelo Snowflake.

Requisitos e limitações

Antes de configurar o conector Kafka para ingestão de tabelas Iceberg, observe os seguintes requisitos e limitações:

  • A ingestão de tabelas Iceberg requer a versão 3.0.0 ou posterior do conector Kafka.

  • A ingestão de tabelas Iceberg é compatível com o conector Kafka com o Snowpipe Streaming. Ele não é aceito pelo conector Kafka com o Snowpipe.

  • A ingestão da tabela Iceberg não é compatível quando snowflake.streaming.enable.single.buffer está definido como false.

  • É necessário criar uma tabela Iceberg antes de executar o conector. Para obter mais informações, consulte Configuração e instalação neste tópico.

Limitações da evolução do esquema

A evolução do esquema Iceberg é totalmente compatível com formatos de dados esquematizados, como AVRO ou Protobuf.

Para JSON simples sem um esquema, o conector considera os seguintes tipos de mensagem inválidos e os envia para filas de letras mortas (DLQ):

  • Mensagens com uma nova coluna se o valor correspondente for nulo ou []

  • Mensagens com um novo campo em um objeto estruturado se o valor correspondente for nulo ou []

Para alterar manualmente o esquema da tabela para que o conector possa ingerir esses tipos de mensagem, use uma instrução ALTER TABLE.

Configuração e instalação

Para configurar o conector Kafka para a ingestão de tabelas Iceberg, siga as etapas normais de configuração para um conector baseado em Snowpipe Streaming com algumas diferenças observadas nas seções a seguir.

Concessão de uso em um volume externo

É necessário conceder o privilégio USAGE no volume externo associado à tabela Iceberg à sua função no conector Kafka.

Por exemplo, se a tabela Iceberg usar o volume externo kafka_external_volume e o conector usar a função kafka_connector_role, execute a seguinte instrução:

USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kafka_external_volume TO ROLE kafka_connector_role;
Copy

Criação de uma tabela Iceberg para ingestão

Antes de executar o conector, é necessário criar uma tabela Iceberg. O esquema inicial da tabela depende das configurações do conector snowflake.enable.schematization.

Se você ativar a esquematização, poderá criar uma tabela com uma coluna nomeada record_metadata:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    record_metadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

O conector cria automaticamente a coluna record_content e altera o esquema da coluna record_metadata.

Se você não ativar a esquematização, poderá criar uma tabela com uma coluna nomeada record_content de um tipo que corresponda ao conteúdo real da mensagem do Kafka. O conector cria automaticamente a coluna record_metadata.

Ao criar uma tabela Iceberg, é possível usar tipos de dados Iceberg ou tipos Snowflake compatíveis. O tipo VARIANT semiestruturado não é compatível. Em vez disso, use um OBJECT ou MAP estruturado.

Por exemplo, considere a seguinte mensagem:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

Para criar uma tabela Iceberg para a mensagem de exemplo, use a seguinte instrução:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    record_content OBJECT(
        id INT,
        body_temperature FLOAT,
        name STRING,
        approved_coffee_types ARRAY(STRING),
        animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
        date_added DATE
    )
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Nota

Os nomes de campo dentro de estruturas aninhadas, como dogs ou cats, diferenciam maiúsculas de minúsculas.

Propriedades de configuração

snowflake.streaming.iceberg.enabled

Especifica se o conector ingere dados em uma tabela Iceberg. O conector falhará se essa propriedade não corresponder ao tipo de tabela real.

Valores:

  • true

  • false

Padrão:

false