Amostra de serviço remoto assíncrono para a AWS¶
Este tópico contém uma amostra assíncrona da Função AWS Lambda (serviço remoto). Você pode criar esta função de amostra seguindo as mesmas etapas descritas em Etapa 1: criar o serviço remoto (função AWS Lambda) no console de gerenciamento.
Neste tópico:
Visão geral do código¶
Esta seção da documentação fornece informações sobre a criação de uma função externa assíncrona na AWS. (Antes de implementar sua primeira função externa assíncrona, você pode ler a visão geral conceptual das funções externas assíncronas).
Na AWS, os serviços remotos assíncronos devem superar as seguintes restrições:
Como HTTP POST e GET são solicitações separadas, o serviço remoto deve manter informações sobre o fluxo de trabalho iniciado pela solicitação POST para que o estado possa ser consultado posteriormente pela solicitação GET.
Geralmente, cada HTTP POST e HTTP GET invoca uma instância separada da(s) função(ões) do manipulador em um processo ou thread separado. As instâncias separadas não compartilham memória. Para que o manipulador GET possa ler o status ou os dados processados, o manipulador GET deve acessar um recurso de armazenamento compartilhado que está disponível na AWS.
A única maneira do manipulador POST enviar o código de resposta HTTP 202 inicial é por meio de uma instrução
return
(ou equivalente), que encerra a execução do manipulador. Portanto, antes de retornar HTTP 202, o manipulador de POST deve iniciar um processo (ou thread) independente para fazer o trabalho real de processamento de dados do serviço remoto. Este processo independente normalmente precisa de acesso ao armazenamento que é visível para o manipulador GET.
Uma maneira de um serviço remoto assíncrono superar essas restrições é usar 3 processos (ou threads) e armazenamento compartilhado:
Nesse modelo, os processos têm as seguintes responsabilidades:
O manipulador HTTP POST:
Lê os dados de entrada. Em uma função Lambda, isto é lido a partir do corpo do parâmetro de entrada de
event
da função do manipulador.Lê a ID de lote. Em uma função Lambda, isto é lido a partir do cabeçalho do parâmetro de entrada de
event
.Inicia o processo de processamento de dados, e passa os dados e a ID de lote. Os dados são geralmente passados durante a chamada, mas podem ser passados gravando-os para armazenamento externo.
Registra a ID de lote em um armazenamento compartilhado que o processo de processamento de dados e o processo do manipulador HTTP GET podem acessar.
Se necessário, registra que o processamento desse lote ainda não terminou.
Retorna HTTP 202 se nenhum erro for detectado.
O código de processamento de dados:
Lê os dados de entrada.
Processa os dados.
Torna o resultado disponível para o manipulador GET (gravando os dados do resultado para o armazenamento compartilhado, ou fornecendo uma API por meio da qual é possível consultar os resultados).
Normalmente, atualiza o status desse lote (por exemplo, de
IN_PROGRESS
paraSUCCESS
) para indicar que os resultados estão prontos para serem lidos.Sai. Opcionalmente, esse processo pode retornar um indicador de erro. O Snowflake não vê isso diretamente (o Snowflake vê apenas os códigos de retorno HTTP do manipulador POST e manipulador GET), mas retornar um indicador de erro do processo de processamento de dados pode ajudar durante a depuração.
O manipulador GET:
Lê a ID de lote. Em uma função Lambda, isto é lido a partir do cabeçalho do parâmetro de entrada de
event
.Lê o armazenamento para obter o status atual desse lote (por exemplo,
IN_PROGRESS
ouSUCCESS
).Se o processamento ainda estiver em andamento, retorna 202.
Se o processamento tiver terminado com sucesso, então:
Lê os resultados.
Limpa o armazenamento.
Retorna os resultados junto com o código HTTP 200.
Se o status armazenado indicar um erro, então:
Limpa o armazenamento.
Retorna um código de erro.
Observe que o manipulador GET pode ser chamado várias vezes para um lote se o processamento demorar o suficiente para que múltiplas solicitações HTTP GET sejam enviadas.
Há muitas variações possíveis nesse modelo. Por exemplo:
A ID de lote e o status podem ser gravados no início do processo de processamento de dados, e não no final do processo POST.
O processamento de dados pode ser feito em uma função separada (por exemplo, uma função Lambda separada) ou mesmo como um serviço completamente separado.
O código de processamento de dados não precisa necessariamente ser gravado para o armazenamento compartilhado. Em vez disso, os dados processados podem ser disponibilizados de outra forma. Por exemplo, uma API pode aceitar a ID de lote como um parâmetro e retornar os dados.
O código de implementação deve levar em conta a possibilidade de que o processamento demore muito tempo ou falhe e, portanto, qualquer resultado parcial deve ser limpo para evitar o desperdício de espaço de armazenamento.
O mecanismo de armazenamento deve ser compartilhável por meio de múltiplos processos (ou threads). Os possíveis mecanismos de armazenamento incluem:
Mecanismos de armazenamento fornecidos pela AWS, como:
Espaço em disco (por exemplo, Amazon Elastic File System (EFS) ).
Um servidor de banco de dados local disponível pela AWS (por exemplo, Amazon DynamoDB ).
Armazenamento que está fora da AWS, mas acessível a partir da AWS.
O código para cada um dos processos 3 acima pode ser gravado como 3 funções Lambda separadas (uma para o manipulador POST, uma para a função de processamento de dados e uma para o manipulador GET), ou como uma única função que pode ser invocada de diferentes maneiras.
A amostra de código Python abaixo é uma única função Lambda que pode ser chamada separadamente para processos POST, processamento de dados e processos GET.
Código da amostra¶
Este código mostra um exemplo de consulta com saída. O foco deste exemplo está nos três processos e como eles interagem, não no mecanismo de armazenamento compartilhado (DynamoDB) ou na transformação de dados (análise de sentimento). O código é estruturado para facilitar a substituição do mecanismo de armazenamento de exemplo e a transformação de dados em outros diferentes.
Para simplificar, este exemplo:
Codifica alguns valores importantes (por exemplo, a região AWS).
Considera a existência de alguns recursos (por exemplo, a tabela Jobs no Dynamo).
import json
import time
import boto3
HTTP_METHOD_STRING = "httpMethod"
HEADERS_STRING = "headers"
BATCH_ID_STRING = "sf-external-function-query-batch-id"
DATA_STRING = "data"
REGION_NAME = "us-east-2"
TABLE_NAME = "Jobs"
IN_PROGRESS_STATUS = "IN_PROGRESS"
SUCCESS_STATUS = "SUCCESS"
def lambda_handler(event, context):
# this is called from either the GET or POST
if (HTTP_METHOD_STRING in event):
method = event[HTTP_METHOD_STRING]
if method == "POST":
return initiate(event, context)
elif method == "GET":
return poll(event, context)
else:
return create_response(400, "Function called from invalid method")
# if not called from GET or POST, then this lambda was called to
# process data
else:
return process_data(event, context)
# Reads batch_ID and data from the request, marks the batch_ID as being processed, and
# starts the processing service.
def initiate(event, context):
batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
data = json.loads(event["body"])[DATA_STRING]
lambda_name = context.function_name
write_to_storage(batch_id, IN_PROGRESS_STATUS, "NULL")
lambda_response = invoke_process_lambda(batch_id, data, lambda_name)
# lambda response returns 202, because we are invoking it with
# InvocationType = 'Event'
if lambda_response["StatusCode"] != 202:
response = create_response(400, "Error in inititate: processing lambda not started")
else:
response = {
'statusCode': lambda_response["StatusCode"]
}
return response
# Processes the data passed to it from the POST handler. In this example,
# the processing is to perform sentiment analysis on text.
def process_data(event, context):
data = event[DATA_STRING]
batch_id = event[BATCH_ID_STRING]
def process_data_impl(data):
comprehend = boto3.client(service_name='comprehend', region_name=REGION_NAME)
# create return rows
ret = []
for i in range(len(data)):
text = data[i][1]
sentiment_response = comprehend.detect_sentiment(Text=text, LanguageCode='en')
sentiment_score = json.dumps(sentiment_response['SentimentScore'])
ret.append([i, sentiment_score])
return ret
processed_data = process_data_impl(data)
write_to_storage(batch_id, SUCCESS_STATUS, processed_data)
return create_response(200, "No errors in process")
# Repeatedly checks on the status of the batch_ID, and returns the result after the
# processing has been completed.
def poll(event, context):
batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
processed_data = read_data_from_storage(batch_id)
def parse_processed_data(response):
# in this case, the response is the response from DynamoDB
response_metadata = response['ResponseMetadata']
status_code = response_metadata['HTTPStatusCode']
# Take action depending on item status
item = response['Item']
job_status = item['status']
if job_status == SUCCESS_STATUS:
# the row number is stored at index 0 as a Decimal object,
# we need to convert it into a normal int to be serialized to JSON
data = [[int(row[0]), row[1]] for row in item['data']]
return {
'statusCode': 200,
'body': json.dumps({
'data': data
})
}
elif job_status == IN_PROGRESS_STATUS:
return {
'statusCode': 202,
"body": "{}"
}
else:
return create_response(500, "Error in poll: Unknown item status.")
return parse_processed_data(processed_data)
def create_response(code, msg):
return {
'statusCode': code,
'body': msg
}
def invoke_process_lambda(batch_id, data, lambda_name):
# Create payload to be sent to processing lambda
invoke_payload = json.dumps({
BATCH_ID_STRING: batch_id,
DATA_STRING: data
})
# Invoke processing lambda asynchronously by using InvocationType='Event'.
# This allows the processing to continue while the POST handler returns HTTP 202.
lambda_client = boto3.client('lambda', region_name=REGION_NAME,)
lambda_response = lambda_client.invoke(
FunctionName=lambda_name,
InvocationType='Event',
Payload=invoke_payload
)
# returns 202 on success if InvocationType = 'Event'
return lambda_response
def write_to_storage(batch_id, status, data):
# we assume that the table has already been created
client = boto3.resource('dynamodb')
table = client.Table(TABLE_NAME)
# Put in progress item in table
item_to_store = {
'batch_id': batch_id,
'status': status,
'data': data,
'timestamp': "{}".format(time.time())
}
db_response = table.put_item(
Item=item_to_store
)
def read_data_from_storage(batch_id):
# we assume that the table has already been created
client = boto3.resource('dynamodb')
table = client.Table(TABLE_NAME)
response = table.get_item(Key={'batch_id': batch_id},
ConsistentRead=True)
return response
Amostra de chamada e saída¶
Aqui está uma chamada de amostra para a função externa assíncrona, juntamente com a saída de amostra, incluindo os resultados da análise de sentimento:
create table test_tb(a string);
insert into test_tb values
('hello world'),
('I am happy');
select ext_func_async(a) from test_tb;
Row | EXT_FUNC_ASYNC(A)
0 | {"Positive": 0.47589144110679626, "Negative": 0.07314028590917587, "Neutral": 0.4493273198604584, "Mixed": 0.0016409909585490823}
1 | {"Positive": 0.9954453706741333, "Negative": 0.00039307220140472054, "Neutral": 0.002452891319990158, "Mixed": 0.0017087293090298772}
Notas sobre o código da amostra¶
A função de processamento de dados é invocada por chamada:
lambda_response = lambda_client.invoke( ... InvocationType='Event', ... )
O InvocationType deve ser ‘Event’, como mostrado acima, porque o segundo processo (ou thread) deve ser assíncrono e
Event
é o único tipo de chamada sem bloqueio disponível pelo métodoinvoke()
.A função de processamento de dados retorna um código HTTP 200. Entretanto, esse código HTTP 200 não é retornado diretamente ao Snowflake. O Snowflake não vê HTTP 200 até que GET faça a sondagem do status e veja que a função de processamento de dados terminou o processamento desse lote com sucesso.