Amostra de serviço remoto assíncrono para a AWS

Este tópico contém uma amostra assíncrona da Função AWS Lambda (serviço remoto). Você pode criar esta função de amostra seguindo as mesmas etapas descritas em Etapa 1: criar o serviço remoto (função AWS Lambda) no console de gerenciamento.

Neste tópico:

Visão geral do código

Esta seção da documentação fornece informações sobre a criação de uma função externa assíncrona na AWS. (Antes de implementar sua primeira função externa assíncrona, você pode ler a visão geral conceptual das funções externas assíncronas).

Na AWS, os serviços remotos assíncronos devem superar as seguintes restrições:

  • Como HTTP POST e GET são solicitações separadas, o serviço remoto deve manter informações sobre o fluxo de trabalho iniciado pela solicitação POST para que o estado possa ser consultado posteriormente pela solicitação GET.

    Geralmente, cada HTTP POST e HTTP GET invoca uma instância separada da(s) função(ões) do manipulador em um processo ou thread separado. As instâncias separadas não compartilham memória. Para que o manipulador GET possa ler o status ou os dados processados, o manipulador GET deve acessar um recurso de armazenamento compartilhado que está disponível na AWS.

  • A única maneira do manipulador POST enviar o código de resposta HTTP 202 inicial é por meio de uma instrução return (ou equivalente), que encerra a execução do manipulador. Portanto, antes de retornar HTTP 202, o manipulador de POST deve iniciar um processo (ou thread) independente para fazer o trabalho real de processamento de dados do serviço remoto. Este processo independente normalmente precisa de acesso ao armazenamento que é visível para o manipulador GET.

Uma maneira de um serviço remoto assíncrono superar essas restrições é usar 3 processos (ou threads) e armazenamento compartilhado:

Illustration of processes for an Asynchronous Remote Service

Nesse modelo, os processos têm as seguintes responsabilidades:

  • O manipulador HTTP POST:

    • Lê os dados de entrada. Em uma função Lambda, isto é lido a partir do corpo do parâmetro de entrada de event da função do manipulador.

    • Lê a ID de lote. Em uma função Lambda, isto é lido a partir do cabeçalho do parâmetro de entrada de event.

    • Inicia o processo de processamento de dados, e passa os dados e a ID de lote. Os dados são geralmente passados durante a chamada, mas podem ser passados gravando-os para armazenamento externo.

    • Registra a ID de lote em um armazenamento compartilhado que o processo de processamento de dados e o processo do manipulador HTTP GET podem acessar.

    • Se necessário, registra que o processamento desse lote ainda não terminou.

    • Retorna HTTP 202 se nenhum erro for detectado.

  • O código de processamento de dados:

    • Lê os dados de entrada.

    • Processa os dados.

    • Torna o resultado disponível para o manipulador GET (gravando os dados do resultado para o armazenamento compartilhado, ou fornecendo uma API por meio da qual é possível consultar os resultados).

    • Normalmente, atualiza o status desse lote (por exemplo, de IN_PROGRESS para SUCCESS) para indicar que os resultados estão prontos para serem lidos.

    • Sai. Opcionalmente, esse processo pode retornar um indicador de erro. O Snowflake não vê isso diretamente (o Snowflake vê apenas os códigos de retorno HTTP do manipulador POST e manipulador GET), mas retornar um indicador de erro do processo de processamento de dados pode ajudar durante a depuração.

  • O manipulador GET:

    • Lê a ID de lote. Em uma função Lambda, isto é lido a partir do cabeçalho do parâmetro de entrada de event.

    • Lê o armazenamento para obter o status atual desse lote (por exemplo, IN_PROGRESS ou SUCCESS).

    • Se o processamento ainda estiver em andamento, retorna 202.

    • Se o processamento tiver terminado com sucesso, então:

      • Lê os resultados.

      • Limpa o armazenamento.

      • Retorna os resultados junto com o código HTTP 200.

    • Se o status armazenado indicar um erro, então:

      • Limpa o armazenamento.

      • Retorna um código de erro.

    Observe que o manipulador GET pode ser chamado várias vezes para um lote se o processamento demorar o suficiente para que múltiplas solicitações HTTP GET sejam enviadas.

Há muitas variações possíveis nesse modelo. Por exemplo:

  • A ID de lote e o status podem ser gravados no início do processo de processamento de dados, e não no final do processo POST.

  • O processamento de dados pode ser feito em uma função separada (por exemplo, uma função Lambda separada) ou mesmo como um serviço completamente separado.

  • O código de processamento de dados não precisa necessariamente ser gravado para o armazenamento compartilhado. Em vez disso, os dados processados podem ser disponibilizados de outra forma. Por exemplo, uma API pode aceitar a ID de lote como um parâmetro e retornar os dados.

O código de implementação deve levar em conta a possibilidade de que o processamento demore muito tempo ou falhe e, portanto, qualquer resultado parcial deve ser limpo para evitar o desperdício de espaço de armazenamento.

O mecanismo de armazenamento deve ser compartilhável por meio de múltiplos processos (ou threads). Os possíveis mecanismos de armazenamento incluem:

  • Mecanismos de armazenamento fornecidos pela AWS, como:

  • Armazenamento que está fora da AWS, mas acessível a partir da AWS.

O código para cada um dos processos 3 acima pode ser gravado como 3 funções Lambda separadas (uma para o manipulador POST, uma para a função de processamento de dados e uma para o manipulador GET), ou como uma única função que pode ser invocada de diferentes maneiras.

A amostra de código Python abaixo é uma única função Lambda que pode ser chamada separadamente para processos POST, processamento de dados e processos GET.

Código da amostra

Este código mostra um exemplo de consulta com saída. O foco deste exemplo está nos três processos e como eles interagem, não no mecanismo de armazenamento compartilhado (DynamoDB) ou na transformação de dados (análise de sentimento). O código é estruturado para facilitar a substituição do mecanismo de armazenamento de exemplo e a transformação de dados em outros diferentes.

Para simplificar, este exemplo:

  • Codifica alguns valores importantes (por exemplo, a região AWS).

  • Considera a existência de alguns recursos (por exemplo, a tabela Jobs no Dynamo).

import json
import time
import boto3

HTTP_METHOD_STRING = "httpMethod"
HEADERS_STRING = "headers"
BATCH_ID_STRING = "sf-external-function-query-batch-id"
DATA_STRING = "data"
REGION_NAME = "us-east-2"

TABLE_NAME = "Jobs"
IN_PROGRESS_STATUS = "IN_PROGRESS"
SUCCESS_STATUS = "SUCCESS"

def lambda_handler(event, context):
    # this is called from either the GET or POST
    if (HTTP_METHOD_STRING in event):
        method = event[HTTP_METHOD_STRING]
        if method == "POST":
            return initiate(event, context)
        elif method == "GET":
            return poll(event, context)
        else:
            return create_response(400, "Function called from invalid method")

    # if not called from GET or POST, then this lambda was called to
    # process data
    else:
        return process_data(event, context)


# Reads batch_ID and data from the request, marks the batch_ID as being processed, and
# starts the processing service.
def initiate(event, context):
    batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
    data = json.loads(event["body"])[DATA_STRING]

    lambda_name = context.function_name

    write_to_storage(batch_id, IN_PROGRESS_STATUS, "NULL")
    lambda_response = invoke_process_lambda(batch_id, data, lambda_name)

    # lambda response returns 202, because we are invoking it with
    # InvocationType = 'Event'
    if lambda_response["StatusCode"] != 202:
        response = create_response(400, "Error in inititate: processing lambda not started")
    else:
        response = {
            'statusCode': lambda_response["StatusCode"]
        }

    return response


# Processes the data passed to it from the POST handler. In this example,
# the processing is to perform sentiment analysis on text.
def process_data(event, context):
    data = event[DATA_STRING]
    batch_id = event[BATCH_ID_STRING]

    def process_data_impl(data):
        comprehend = boto3.client(service_name='comprehend', region_name=REGION_NAME)
        # create return rows
        ret = []
        for i in range(len(data)):
            text = data[i][1]
            sentiment_response = comprehend.detect_sentiment(Text=text, LanguageCode='en')
            sentiment_score = json.dumps(sentiment_response['SentimentScore'])
            ret.append([i, sentiment_score])
        return ret

    processed_data = process_data_impl(data)
    write_to_storage(batch_id, SUCCESS_STATUS, processed_data)

    return create_response(200, "No errors in process")


# Repeatedly checks on the status of the batch_ID, and returns the result after the
# processing has been completed.
def poll(event, context):
    batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
    processed_data = read_data_from_storage(batch_id)

    def parse_processed_data(response):
        # in this case, the response is the response from DynamoDB
        response_metadata = response['ResponseMetadata']
        status_code = response_metadata['HTTPStatusCode']

        # Take action depending on item status
        item = response['Item']
        job_status = item['status']
        if job_status == SUCCESS_STATUS:
            # the row number is stored at index 0 as a Decimal object,
            # we need to convert it into a normal int to be serialized to JSON
            data = [[int(row[0]), row[1]] for row in item['data']]
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'data': data
                })
            }
        elif job_status == IN_PROGRESS_STATUS:
            return {
                'statusCode': 202,
                "body": "{}"
            }
        else:
            return create_response(500, "Error in poll: Unknown item status.")

    return parse_processed_data(processed_data)


def create_response(code, msg):
    return {
        'statusCode': code,
        'body': msg
    }


def invoke_process_lambda(batch_id, data, lambda_name):
    # Create payload to be sent to processing lambda
    invoke_payload = json.dumps({
        BATCH_ID_STRING: batch_id,
        DATA_STRING: data
    })

    # Invoke processing lambda asynchronously by using InvocationType='Event'.
    # This allows the processing to continue while the POST handler returns HTTP 202.
    lambda_client = boto3.client('lambda', region_name=REGION_NAME,)
    lambda_response = lambda_client.invoke(
        FunctionName=lambda_name,
        InvocationType='Event',
        Payload=invoke_payload
    )
    # returns 202 on success if InvocationType = 'Event'
    return lambda_response


def write_to_storage(batch_id, status, data):
    # we assume that the table has already been created
    client = boto3.resource('dynamodb')
    table = client.Table(TABLE_NAME)

    # Put in progress item in table
    item_to_store = {
        'batch_id': batch_id,
        'status': status,
        'data': data,
        'timestamp': "{}".format(time.time())
    }
    db_response = table.put_item(
        Item=item_to_store
    )


def read_data_from_storage(batch_id):
    # we assume that the table has already been created
    client = boto3.resource('dynamodb')
    table = client.Table(TABLE_NAME)

    response = table.get_item(Key={'batch_id': batch_id},
                          ConsistentRead=True)
    return response
Copy

Amostra de chamada e saída

Aqui está uma chamada de amostra para a função externa assíncrona, juntamente com a saída de amostra, incluindo os resultados da análise de sentimento:

create table test_tb(a string);
insert into test_tb values
    ('hello world'),
    ('I am happy');
select ext_func_async(a) from test_tb;

Row | EXT_FUNC_ASYNC(A)
0   | {"Positive": 0.47589144110679626, "Negative": 0.07314028590917587, "Neutral": 0.4493273198604584, "Mixed": 0.0016409909585490823}
1   | {"Positive": 0.9954453706741333, "Negative": 0.00039307220140472054, "Neutral": 0.002452891319990158, "Mixed": 0.0017087293090298772}
Copy

Notas sobre o código da amostra

  • A função de processamento de dados é invocada por chamada:

    lambda_response = lambda_client.invoke(
        ...
        InvocationType='Event',
        ...
    )
    
    Copy

    O InvocationType deve ser ‘Event’, como mostrado acima, porque o segundo processo (ou thread) deve ser assíncrono e Event é o único tipo de chamada sem bloqueio disponível pelo método invoke().

  • A função de processamento de dados retorna um código HTTP 200. Entretanto, esse código HTTP 200 não é retornado diretamente ao Snowflake. O Snowflake não vê HTTP 200 até que GET faça a sondagem do status e veja que a função de processamento de dados terminou o processamento desse lote com sucesso.