Trabalhando com o Snowflake High Performance connector for Kafka

Este tópico descreve como o conector funciona com tabelas e canais e como configurá-lo com esses elementos.

Como o conector funciona com tabelas e canais

O conector trata cada registro do Kafka como uma linha a ser inserida em uma tabela do Snowflake. Por exemplo, se você tem um tópico do Kafka com o conteúdo da mensagem estruturado como neste JSON:

{
  "order_id": 12345,
  "customer_name": "John",
  "order_total": 100.00,
  "isPaid": true
}
Copy

Por padrão, você não precisa criar uma tabela ou um canal antes que a ingestão comece. O conector cria uma tabela com colunas correspondentes às chaves do JSON e recorre ao canal padrão chamado {tableName}-STREAMING, que mapeará automaticamente as chaves de primeiro nível do conteúdo do registro para as colunas da tabela correspondentes por nome (sem distinção entre maiúsculas e minúsculas). Você também pode criar sua própria tabela com colunas que correspondam às chaves do JSON. O conector tenta corresponder as chaves de primeiro nível do conteúdo do registro às colunas da tabela por nome. Se as chaves do JSON não corresponderem às colunas da tabela, o conector vai ignorá-las.

CREATE TABLE ORDERS (
  record_metadata VARIANT,
  order_id NUMBER,
  customer_name VARCHAR,
  order_total NUMBER,
  ispaid BOOLEAN
);
Copy

Se você criar o próprio canal, poderá definir a lógica de transformação de dados na instrução COPY INTO do canal. Você pode renomear as colunas e converter os tipos de dados conforme necessário. Por exemplo:

CREATE TABLE ORDERS (
  order_id VARCHAR,
  customer_name VARCHAR,
  order_total VARCHAR,
  ispaid VARCHAR
);
Copy
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
  $1:order_id::STRING,
  $1:customer_name,
  $1:order_total::STRING,
  $1:isPaid::STRING
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

ou

CREATE TABLE ORDERS (
 topic VARCHAR,
 partition VARCHAR,
 order_id VARCHAR,
 customer_name VARCHAR,
 order_total VARCHAR,
 ispaid VARCHAR
);
Copy
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
  $1:RECORD_METADATA.topic::STRING AS topic,
  $1:RECORD_METADATA.partition::STRING AS partition,
  $1['order_id']::STRING AS order_id,
  $1['customer_name']::STRING as customer_name,
  CONCAT($1['order_total']::STRING, ' USD') AS order_total,
  $1['isPaid']::STRING AS ispaid
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

Quando você define o próprio canal, as colunas da tabela de destino não precisam corresponder às chaves do JSON. Você pode renomear as colunas para os nomes desejados e converter os tipos de dados se necessário.

Nomes de tópicos, tabelas e canais

Dependendo das definições de configuração, o conector usará nomes diferentes para a tabela de destino. O nome da tabela de destino é sempre derivado do nome do tópico.

Como o conector mapeia nomes de tópicos para a tabela de destino

O conector Kafka oferece dois modos para mapear nomes de tópicos do Kafka para nomes de tabelas do Snowflake:

  • Mapeamento estático: o conector deriva nomes de tabelas de destino usando apenas o nome do tópico do Kafka.

  • Modo de mapeamento explícito de tópico para tabela: você especifica mapeamentos personalizados entre tópicos e tabelas usando o parâmetro de configuração snowflake.topic2table.map.

Mapeamento estático

Se você não configurar o parâmetro snowflake.topic2table.map, o conector sempre vai derivar os nomes das tabelas do nome do tópico.

Geração de nome de tabela:

O conector deriva o nome da tabela de destino do nome do tópico usando as seguintes regras:

  1. Se o nome do tópico for um identificador Snowflake válido, o conector o usará como o nome da tabela de destino, convertido em maiúsculas.

  2. Se o nome do tópico contiver caracteres inválidos, o conector:

    • Substituirá os caracteres inválidos por sublinhados;

    • Adicionará um sublinhado seguido por um código hash para garantir a exclusividade;

    • Por exemplo, o tópico my-topic.data torna-se MY_TOPIC_DATA_<hash>.

Determinação do nome do canal:

O conector determina qual canal usar com base na seguinte lógica:

  1. O conector verifica se existe um canal com o mesmo nome da tabela de destino.

  2. Se existir um canal criado pelo usuário com esse nome, o conector usará esse canal (modo de canal definido pelo usuário).

  3. Caso contrário, o conector usará o canal padrão chamado {tableName}-STREAMING.

Nota

O Snowflake recomenda escolher nomes de tópicos que sigam as regras de nomes dos identificadores do Snowflake para garantir nomes de tabelas previsíveis.

Explicando o RECORD_METADATA

O conector preenche a estrutura de RECORD_METADATA com os metadados sobre o registro do Kafka. Esses metadados são enviados por meio da fonte de dados Snowpipe Streaming para o Snowflake, onde ficam disponíveis em transformações de canal usando o acessador $1:RECORD_METADATA. A estrutura de RECORD_METADATA está disponível nos modos de canal tanto definido pelo usuário quanto padrão. O conteúdo pode ser salvo na coluna do tipo VARIANT, ou os campos individuais podem ser extraídos e salvos em colunas separadas.

Exemplo de canal com transformações e metadados:

CREATE PIPE ORDERS AS
COPY INTO ORDERS_TABLE
FROM (
  SELECT
    $1:order_id::NUMBER,
    $1:customer_name,
    $1:order_total,
    $1:RECORD_METADATA.topic AS source_topic,
    $1:RECORD_METADATA.offset::NUMBER AS kafka_offset,
    $1:RECORD_METADATA.SnowflakeConnectorPushTime::BIGINT AS ingestion_time
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

Neste exemplo:

  • O canal extrai campos específicos da mensagem do Kafka (order_id, customer_name, order_total)

  • Ele também captura campos de metadados (tópico, deslocamento e carimbo de data/hora da ingestão)

  • Os valores podem ser convertidos e/ou transformados conforme necessário

Como os campos de metadados são preenchidos

O conector preenche automaticamente os campos de metadados com base nas propriedades do registro do Kafka e na configuração do conector. Você pode controlar quais campos de metadados são incluídos usando estes parâmetros de configuração:

  • snowflake.metadata.topic (padrão: true): inclui o nome do tópico

  • snowflake.metadata.offset.and.partition (padrão: true): inclui deslocamento e partição

  • snowflake.metadata.createtime (padrão: true): inclui o carimbo de data/hora do registro do Kafka

  • snowflake.metadata.all (padrão: true): inclui todos os metadados disponíveis

Quando snowflake.metadata.all=true (padrão), todos os campos de metadados são preenchidos. A definição de sinalizadores de metadados individuais como false exclui esses campos específicos da estrutura de RECORD_METADATA.

Nota

O campo SnowflakeConnectorPushTime está sempre disponível e representa a hora em que o conector enviou por push o registro para o buffer de ingestão. Isso é útil para calcular a latência de ingestão de ponta a ponta.

Por padrão, a estrutura de RECORD_METADATA contém as seguintes informações:

Campo

Tipo de dados

Descrição

topic

Cadeia de caracteres

O nome do tópico Kafka que deu origem ao registro.

partition

Cadeia de caracteres

O número da partição dentro do tópico. (Note que esta é a partição do Kafka, não a micropartição do Snowflake).

offset

number

O offset dessa partição.

CreateTime / . LogAppendTime

number

Esse é o carimbo de data/hora associado à mensagem no tópico do Kafka. O valor é em milissegundos desde a meia-noite de 1.º de janeiro de 1970, UTC. Para obter mais informações, consulte a documentação do ProducerRecord do Kafka.

SnowflakeConnectorPushTime

number

Um carimbo de data/hora em que um registro foi enviado para um buffer Ingest SDK. O valor é o número de milissegundos desde a meia-noite de 1º de janeiro de 1970 UTC. Para obter mais informações, consulte Estimativa de latência de ingestão.

key

Cadeia de caracteres

Se a mensagem for uma KeyedMessage do Kafka, esta é a chave da mensagem. Para que o conector armazene a chave em RECORD_METADATA, o parâmetro key.converter em Propriedades de configuração do Kafka deve ser definido como org.apache.kafka.connect.storage.StringConverter; caso contrário, o conector ignora as chaves.

headers

Objeto

Um cabeçalho é um par chave-valor associado ao registro e definido pelo usuário. Cada registro pode ter 0, 1 ou vários cabeçalhos.

A quantidade de metadados gravados na coluna RECORD_METADATA é configurável usando propriedades opcionais de configuração do Kafka.

Os nomes e valores dos campos diferenciam maiúsculas e minúsculas.

Como os registros do Kafka são convertidos antes da ingestão

Antes de cada linha ser passada ao Snowpipe Streaming, o conector converte o valor do registro do Kafka Connect em um Map<cadeia de caracteres, objeto>, com chaves que devem corresponder aos nomes das colunas de destino (ou podem ser transformadas dentro de um canal definido pelo usuário). Cadeias de caracteres, matrizes de bytes ou números primitivos devem ser delimitados (por exemplo, usando uma SMT HoistField) para que o conector receba um objeto estruturado. O conversor aplica as seguintes regras:

  • Os valores nulos são tratados como marcas de exclusão. Eles são ignorados quando behavior.on.null.values=IGNORE, caso contrário, são ingeridos como objetos JSON vazios.

  • Os campos numéricos e boolianos são passados como estão. Valores decimais com precisão maior que 38 são serializados como cadeias de caracteres para permanecer dentro dos limites de NUMBER do Snowflake.

  • As cargas úteis byte[] e ByteBuffer são cadeias de caracteres codificadas em Base64, portanto armazene-as em colunas VARIANT ou VARCHAR.

  • As matrizes permanecem como matrizes, e os objetos aninhados permanecem como mapas aninhados. Declare as colunas VARIANT quando você recorrer ao canal padrão para descarregar os dados aninhados como estão.

  • Mapas com chaves que não são cadeia de caracteres são emitidos como matrizes de pares [key, value] porque os nomes das colunas do Snowflake devem ser texto.

  • Os cabeçalhos e as chaves dos registros são copiados para RECORD_METADATA sempre que os sinalizadores de metadados relevantes estiverem habilitados.

Se você precisar que todo o corpo da mensagem seja preservado como uma única coluna, delimite-o em um novo campo de nível superior usando SMTs. Consulte Coluna RECORD_CONTENT legada para ver o padrão de transformação.

Modo de canal definido pelo usuário ou padrão

O conector oferece suporte a dois modos para gerenciar a ingestão de dados:

Modo de canal definido pelo usuário

Neste modo, você tem controle completo sobre a transformação de dados e o mapeamento de colunas.

Quando usar este modo:

  • Você precisa de nomes de colunas personalizados que sejam diferentes dos nomes de campo JSON

  • Você precisa aplicar transformações de dados (conversão de tipo, mascaramento, filtragem)

  • Você quer controle completo sobre como os dados são mapeados para colunas

Modo de canal padrão

Neste modo, o conector usa um canal padrão chamado {tableName}-STREAMING e mapeia os campos de registro do Kafka para colunas de tabelas correspondentes por nome (sem distinção entre maiúsculas e minúsculas).

Quando usar este modo:

  • Os nomes da sua chave de registro do Kafka correspondem aos nomes das colunas desejadas

  • Você não precisa de transformações de dados personalizadas

  • Você quer uma configuração simples

Mapeando chaves de registro do Kafka para colunas de tabela com modo de canal padrão

Ao usar o modo de canal padrão, o conector usa o canal padrão chamado {tableName}-STREAMING e mapeia as chaves de primeiro nível do conteúdo diretamente para as colunas da tabela usando uma correspondência sem distinção entre maiúsculas e minúsculas.

Usando o modo de canal padrão – exemplo

Exemplo 1:

Considere a seguinte carga útil de conteúdo do registro do Kafka:

{
  "city": "New York",
  "age": 30,
  "married": true,
  "has cat": true,
  "@&$#* includes special characters": true,
  "skills": ["sitting", "standing", "eating"],
  "family": {"son": "Jack", "daughter": "Anna"}
}
Copy

Você cria uma tabela com colunas que correspondem às chaves JSON (sem distinção entre maiúsculas e minúsculas, incluindo caracteres especiais):

CREATE TABLE PERSON_DATA (
  record_metadata VARIANT,
  city VARCHAR,
  age NUMBER,
  married BOOLEAN,
  "has cat" BOOLEAN,
  "!@&$#* includes special characters" BOOLEAN,
  skills VARIANT,
  family VARIANT
);
Copy

Comportamento de correspondência:

  • "city" (kafka) → city ou CITY ou City (coluna) – sem distinção entre maiúsculas e minúsculas

  • "has cat" (kafka) → "has cat" (coluna) – deve ser delimitado entre aspas devido ao espaço

  • "!@&$#* includes special characters" (kafka) → "!@&$#* includes special characters" (coluna) – caracteres especiais preservados

  • Objetos aninhados como skills e family mapeiam para colunas VARIANT automaticamente

Usando o modo de canal definido pelo usuário – exemplos

Este exemplo mostra como configurar e usar canais definidos pelo usuário com transformações de dados personalizadas.

Exemplo 1:

Crie uma tabela com o esquema desejado:

CREATE TABLE ORDERS (
  order_id NUMBER,
  customer_name VARCHAR,
  order_total NUMBER,
  order_date TIMESTAMP_NTZ,
  source_topic VARCHAR
);
Copy

Crie um canal que transforma os registros recebidos do Kafka para corresponder ao esquema da sua tabela:

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:order_id::NUMBER,
    $1:customer_name,
    $1:order_total::NUMBER,
    $1:order_date::TIMESTAMP_NTZ,
    $1:RECORD_METADATA.topic
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

Observe que o nome do canal (ORDERS) corresponde ao nome da tabela (ORDERS). A definição do canal extrai campos da carga útil JSON usando a sintaxe $1:field_name e os mapeia para as colunas da tabela.

Nota

Você pode acessar os campos JSON aninhados, e campos com caracteres especiais, usando notação de colchetes, como $1['field name'] ou $1['has cat'].

Configure o mapeamento do tópico para a tabela:

snowflake.topic2table.map=kafka-orders-topic:ORDERS
Copy

Esta configuração mapeia o tópico do Kafka kafka-orders-topic para a tabela e o canal preexistentes chamados ORDERS.

Exemplo 2:

Quando você precisa acessar chaves no conteúdo que não têm nomes convencionais, use a seguinte sintaxe:

  • Campos simples: $1:field_name

  • Campos com espaços ou caracteres especiais: $1['field name'] ou $1['has cat']

  • Campos com caracteres unicode: $1[' @&$#* has Łułósżź']

  • Campos aninhados: $1:parent.child ou $1:parent['child field']

Considere esta carga útil JSON do Kafka:

{
  "city": "New York",
  "age": 30,
  "married": true,
  "has cat": true,
  " @&$#* has Łułósżź": true,
  "skills": ["sitting", "standing", "eating"],
  "family": {"son": "Jack", "daughter": "Anna"}
}
Copy

Crie uma tabela de destino com os nomes de coluna escolhidos:

CREATE TABLE PERSON_DATA (
  city VARCHAR,
  age NUMBER,
  married BOOLEAN,
  has_cat BOOLEAN,
  weird_field_name BOOLEAN,
  skills VARIANT,
  family VARIANT
);
Copy

Em seguida, crie um canal com o mesmo nome que define o mapeamento:

CREATE PIPE PERSON_DATA AS
COPY INTO PERSON_DATA
FROM (
  SELECT
    $1:city,
    $1:age,
    $1:married,
    $1['has cat'] AS has_cat,
    $1[' @&$#* has Łułósżź'] AS weird_field_name,
    $1:skills,
    $1:family
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

Pontos principais:

  • Você controla os nomes das colunas (por exemplo, renomear "has cat" para has_cat)

  • Você pode converter tipos de dados conforme necessário (por exemplo, $1:age::NUMBER)

  • Você pode incluir ou excluir campos conforme desejado

  • Você pode adicionar campos de metadados (por exemplo, $1:RECORD_METADATA.topic)

  • Colunas VARIANT processam estruturas JSON aninhadas automaticamente

Exemplo 3: Com tabelas interativas

As tabelas interativas são um tipo especial de tabela do Snowflake otimizada para consultas de baixa latência e alta simultaneidade. Você pode saber mais sobre tabelas interativas na documentação de tabelas interativas.

  1. Crie uma tabela interativa:

    CREATE INTERACTIVE TABLE REALTIME_METRICS (
      metric_name VARCHAR,
      metric_value NUMBER,
      source_topic VARCHAR,
      timestamp TIMESTAMP_NTZ
    ) AS (SELECT
          $1:M_NAME::VARCHAR,
          $1:M_VALUE::NUMBER,
          $1:RECORD_METADATA.topic::VARCHAR,
          $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ
          from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
    
    Copy
  2. Configure o mapeamento do tópico para a tabela:

    snowflake.topic2table.map=metrics-topic:REALTIME_METRICS
    
    Copy

Considerações importantes:

  • As tabelas interativas têm limitações e restrições de consulta específicas. Revise a documentação de tabelas interativas antes de usá-las com o conector.

  • Para tabelas interativas, as transformações necessárias devem ser tratadas na definição da tabela.

  • Warehouses interativos são necessários para consultar tabelas interativas de forma eficiente.

Mapeamento explícito de tópico para tabela

Quando você configura o parâmetro snowflake.topic2table.map, o conector opera no modo de mapeamento explícito. Esse modo permite que você:

  • Mapeie vários tópicos do Kafka para uma única tabela do Snowflake;

  • Use nomes de tabelas personalizados diferentes dos nomes dos tópicos;

  • Aplique padrões regex para corresponder a vários tópicos.

Formato de configuração:

O parâmetro snowflake.topic2table.map aceita uma lista separada por vírgulas de mapeamentos de tópico para tabela no formato:

topic1:table1,topic2:table2,topic3:table3
Copy

Exemplos de configurações:

Mapeamento direto do tópico

snowflake.topic2table.map=orders:ORDER_TABLE,customers:CUSTOMER_TABLE
Copy

Correspondência de padrão regex

snowflake.topic2table.map=.*_cat:CAT_TABLE,.*_dog:DOG_TABLE
Copy

Esta configuração mapeia todos os tópicos que terminam com _cat (como orange_cat, calico_cat) para a tabela CAT_TABLE, e todos os tópicos que terminam com _dog para a tabela DOG_TABLE.

Muitos tópicos para uma tabela

snowflake.topic2table.map=topic1:shared_table,topic2:shared_table,topic3:other_table
Copy

Esta configuração mapeia tanto topic1 quanto topic2 para shared_table, enquanto topic3 é mapeado para other_table.

Importante

  • Os padrões regex no mapeamento não podem se sobrepor. Cada tópico deve corresponder no máximo a um padrão.

  • Os nomes das tabelas no mapeamento devem ser identificadores válidos do Snowflake com pelo menos 2 caracteres, começando com uma letra ou sublinhado.

  • Você pode mapear vários tópicos para uma única tabela.

Coluna RECORD_CONTENT legada

Nas versões mais antigas do conector (3.x e anteriores), quando o recurso de esquematização era desabilitado, o conector criava uma tabela de destino com duas colunas: RECORD_CONTENT e RECORD_METADATA. A coluna RECORD_CONTENT continha todo o conteúdo da mensagem do Kafka em uma coluna do tipo VARIANT. A coluna RECORD_METADATA continua compatível, mas a coluna RECORD_CONTENT não é mais criada pelo conector. A mesma funcionalidade pode ser obtida usando transformações SMT (consulte os exemplos mais adiante nesta seção). A chave RECORD_CONTENT também não está mais disponível em transformações de PIPE. Por exemplo, esta definição de PIPE não funcionará por padrão:

Nota

Essa definição de canal não funcionará sem transformações SMT adicionais.

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:RECORD_CONTENT
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

Se você precisa de todo o conteúdo da mensagem do Kafka salvo em uma única coluna, ou de um manipulador para todo o conteúdo da mensagem do Kafka em uma transformação de PIPE, pode usar a seguinte transformação SMT que delimita todo o conteúdo da mensagem do Kafka no campo personalizado desejado:

transforms=wrapKafkaMessageContent
transforms.wrapKafkaMessageContent.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.wrapKafkaMessageContent.field=your_top_level_field_name
Copy

Essa transformação delimita todo o conteúdo da mensagem do Kafka em um campo personalizado chamado your_top_level_field_name. Em seguida, você pode acessar todo o conteúdo da mensagem do Kafka usando o acessador $1:your_top_level_field_name em sua transformação de PIPE.

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:your_top_level_field_name
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

Uma outra opção se você quiser salvar todos os metadados e o conteúdo em uma única tabela usando o canal padrão é não criar um canal personalizado; em vez disso, crie apenas uma tabela com duas colunas: RECORD_CONTENT e your_top_level_field_name.

CREATE TABLE ORDERS (
  record_metadata VARIANT,
  your_top_level_field_name VARIANT
);
Copy

Para ler mais sobre a transformação HoistField$Value, consulte a Documentação do Kafka.

Aviso

Salvar todo o conteúdo e os metadados da mensagem do Kafka em uma tabela pode ter um impacto negativo no custo de ingestão, na velocidade do pipeline e na latência. Se você precisa do melhor desempenho possível, considere salvar apenas os dados necessários se estiverem acessíveis no nível superior do conteúdo do registro do Kafka ou usar transformações SMT para extrair os dados dos campos profundamente aninhados para campos de nível superior.

Tratamento de erros de canais de streaming e filas de mensagens mortas

O conector inspeciona o status do canal Snowpipe Streaming antes de confirmar os deslocamentos no Kafka. Se o conector detectar que a propriedade rowsErrorCount no canal aumentou desde que o conector foi iniciado, ele emitirá um erro fatal (ERROR_5030) quando errors.tolerance=none para que os problemas de dados não passem despercebidos. Para permitir que a ingestão continue durante a triagem das linhas inválidas, defina errors.tolerance=all.

errors.tolerance=all
Copy

Evolução do esquema

Para tabelas com ENABLE_SCHEMA_EVOLUTION=TRUE, o conector evolui automaticamente o respectivo esquema com base nos registros Kafka recebidos. Todas as tabelas criadas pelo conector são definidas por padrão como ENABLE_SCHEMA_EVOLUTION=TRUE.

A evolução de esquema é limitada às seguintes operações:

  • Adicionar novas colunas O conector adicionará novas colunas à tabela se os registros Kafka recebidos contiverem novos campos que não estejam presentes na tabela.

  • Descartar a restrição NOT NULL das colunas com dados ausentes nos registros inseridos

Usar o conector com tabelas Apache Iceberg™

O conector pode ingerir dados em tabelas Apache Iceberg™ gerenciadas pelo Snowflake, mas é necessário atender aos seguintes requisitos:

  • Você deve ter recebido o privilégio USAGE no volume externo associado à sua tabela Apache Iceberg™.

  • Você deve criar uma tabela Apache Iceberg™ antes de executar o conector.

Concessão de uso em um volume externo

Para conceder o privilégio USAGE no volume externo associado à tabela Apache Iceberg™ à sua função para o conector Kafka, execute a seguinte instrução:

Por exemplo, se a tabela Iceberg usar o volume externo kafka_external_volume e o conector usar a função kafka_connector_role, execute a seguinte instrução:

USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kafka_external_volume TO ROLE kafka_connector_role;
Copy

Criar uma tabela Apache Iceberg™ para ingestão

O conector não cria tabelas Iceberg automaticamente e não oferece suporte à evolução de esquema. Antes de executar o conector, você deve criar uma tabela Iceberg manualmente.

Ao criar uma tabela Iceberg, é possível usar tipos de dados Iceberg (incluindo VARIANT) ou tipos compatíveis com o Snowflake.

Por exemplo, considere a seguinte mensagem:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "options":
    {
        "can_walk": true,
        "can_talk": false
    },
    "date_added": "2024-10-15"
}
Copy

Para criar uma tabela Iceberg para a mensagem de exemplo, use uma das seguintes instruções:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    id number(38,0),
    name varchar,
    body_temperature number(4,2),
    approved_coffee_types array(varchar),
    animals_possessed variant,
    options object(can_walk boolean, can_talk boolean),
    date_added date
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table'
  ICEBERG_VERSION = 3;
Copy
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    id INT,
    name string,
    body_temperature float,
    approved_coffee_types array(string),
    animals_possessed variant,
    date_added date,
    options object(can_walk boolean, can_talk boolean),
    )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table'
  ICEBERG_VERSION = 3;
Copy

Nota

Os nomes de campo dentro de estruturas aninhadas, como dogs ou cats, diferenciam maiúsculas de minúsculas.

Próximos passos

Configurar tarefas.