Exemple de service à distance asynchrone pour AWS¶
Cette rubrique contient un exemple de fonction AWS Lambda asynchrone (service distant). Vous pouvez créer cet exemple de fonction en suivant les mêmes étapes que celles décrites dans Étape 1 : Créez le service distant (fonctionAWS Lambda) dans la console de gestion.
Dans ce chapitre :
Aperçu du code¶
Cette section de la documentation fournit des informations sur la création d’une fonction externe asynchrone sur AWS. (Avant de mettre en œuvre votre première fonction externe asynchrone, vous pouvez lire l” aperçu conceptuel des fonctions externes asynchrones).
Sur AWS, les services à distance asynchrones doivent surmonter les restrictions suivantes :
Comme les requêtes HTTP POST et GET sont des requêtes distinctes, le service distant doit conserver des informations sur le workflow lancé par la requête POST afin que l’état puisse être interrogé ultérieurement par la requête GET.
En général, chaque requête HTTP POST et HTTP GET appelle une instance distincte de la ou des fonctions de gestionnaire dans un processus ou un chemin distinct. Les différentes instances ne partagent pas leur mémoire. Pour que le gestionnaire GET puisse lire le statut ou les données traitées, le gestionnaire GET doit accéder à une ressource de stockage partagée qui est disponible sur AWS.
La seule façon pour le gestionnaire POST d’envoyer le code de réponse initial HTTP 202 est d’utiliser une instruction
return
(ou un équivalent), qui met fin à l’exécution du gestionnaire. Par conséquent, avant de renvoyer HTTP 202, le gestionnaire POST doit lancer un processus indépendant (ou thread) pour effectuer le travail de traitement des données du service à distance. Ce processus indépendant nécessite généralement un accès au stockage qui est visible pour le gestionnaire GET.
Un moyen pour un service distant asynchrone de surmonter ces restrictions est d’utiliser 3 processus (ou chemins) et un stockage partagé :
Dans ce modèle, les processus ont les responsabilités suivantes :
Le gestionnaire HTTP POST :
Lit les données d’entrée. Dans une fonction Lambda, elle est lue à partir du corps du paramètre d’entrée
event
de la fonction de gestionnaire.Lit l’ID de lot. Dans une fonction Lambda, ceci est lu à partir de l’en-tête du paramètre d’entrée
event
.Lance le processus de traitement des données, et lui transmet les données et l’ID de lot. Les données sont généralement transmises pendant l’appel, mais elles peuvent être transmises en les écrivant sur un stockage externe.
Enregistre l’ID de lot dans un stockage partagé auquel peuvent accéder à la fois le processus de traitement des données et le processus du gestionnaire HTTP GET.
Si nécessaire, enregistre que le traitement de ce lot n’est pas encore terminé.
Retourne HTTP 202 si aucune erreur n’a été détectée.
Le code de traitement des données :
Lit les données d’entrée.
Traite les données.
Met le résultat à la disposition du gestionnaire GET (soit en écrivant les données de résultat sur un stockage partagé, soit en fournissant une API permettant d’interroger les résultats).
Généralement, il met à jour le statut de ce lot (par exemple de
IN_PROGRESS
àSUCCESS
) pour indiquer que les résultats sont prêts à être lus.Quitte. En option, ce processus peut renvoyer un indicateur d’erreur. Snowflake ne le voit pas directement (Snowflake ne voit que les codes de retour HTTP du gestionnaire POST et du gestionnaire GET), mais le renvoi d’un indicateur d’erreur du processus de traitement des données pourrait aider au débogage.
Le gestionnaire GET :
Lit l’ID de lot. Dans une fonction Lambda, ceci est lu à partir de l’en-tête du paramètre d’entrée
event
.Lit le stockage pour obtenir le statut actuel de ce lot (par exemple
IN_PROGRESS
ouSUCCESS
).Si le traitement est toujours en cours, alors retournez 202.
Si le traitement s’est terminé avec succès, alors :
Lisez les résultats.
Nettoyez le stockage.
Renvoyez les résultats avec le code HTTP 200.
Si le statut stocké indique une erreur, alors :
Nettoyez le stockage.
Renvoyez un code d’erreur.
Notez que le gestionnaire GET peut être appelé plusieurs fois pour un lot si le traitement prend suffisamment de temps pour que plusieurs requêtes HTTP GET soient envoyées.
Il existe de nombreuses variations possibles sur ce modèle. Par exemple :
L’ID de lot et le statut pourraient être écrits au début du processus de traitement des données plutôt qu’à la fin du processus POST.
Le traitement des données pourrait être effectué dans une fonction distincte (par exemple, une fonction Lambda distincte) ou même comme un service complètement distinct.
Le code de traitement des données ne doit pas nécessairement être écrit sur un stockage partagé. Au lieu de cela, les données traitées pourraient être mises à disposition d’une autre manière. Par exemple, une API pourrait accepter l’ID de lot comme paramètre et renvoyer les données.
Le code de mise en œuvre doit tenir compte de la possibilité que le traitement prenne trop de temps ou échoue, et tout résultat partiel doit donc être nettoyé pour éviter de gaspiller de l’espace de stockage.
Le mécanisme de stockage doit pouvoir être partagé entre plusieurs processus (ou chemins). Parmi les mécanismes de stockage possibles, on peut citer :
Les mécanismes de stockage fournis par AWS, tels que :
Espace disque (par exemple Système de fichiers élastiques d’Amazon (EFS) ).
Un serveur de base de données local disponible via AWS (par exemple Amazon DynamoDB ).
Stockage qui est à l’extérieur de AWS mais accessible depuis AWS.
Le code de chacun des 3 processus ci-dessus peut être écrit comme 3 fonctions Lambda séparées (une pour le gestionnaire POST, une pour la fonction de traitement des données et une pour le gestionnaire GET), ou comme une fonction unique qui peut être appelée de différentes manières.
L’exemple de code Python ci-dessous est une fonction Lambda unique qui peut être appelée séparément pour les processus POST, le traitement des données et GET.
Exemple de code¶
Ce code montre un exemple de requête avec sortie. Dans cet exemple, l’accent est mis sur les trois processus et leur interaction, et non sur le mécanisme de stockage partagé (DynamoDB) ou la transformation des données (analyse des sentiments). Le code est structuré de manière à faciliter le remplacement du mécanisme de stockage et de transformation des données par d’autres.
Par souci de simplicité, voici un exemple :
Code en dur certaines valeurs importantes (par exemple, la région AWS).
Suppose l’existence de certaines ressources (par exemple, la table Jobs dans Dynamo).
import json
import time
import boto3
HTTP_METHOD_STRING = "httpMethod"
HEADERS_STRING = "headers"
BATCH_ID_STRING = "sf-external-function-query-batch-id"
DATA_STRING = "data"
REGION_NAME = "us-east-2"
TABLE_NAME = "Jobs"
IN_PROGRESS_STATUS = "IN_PROGRESS"
SUCCESS_STATUS = "SUCCESS"
def lambda_handler(event, context):
# this is called from either the GET or POST
if (HTTP_METHOD_STRING in event):
method = event[HTTP_METHOD_STRING]
if method == "POST":
return initiate(event, context)
elif method == "GET":
return poll(event, context)
else:
return create_response(400, "Function called from invalid method")
# if not called from GET or POST, then this lambda was called to
# process data
else:
return process_data(event, context)
# Reads batch_ID and data from the request, marks the batch_ID as being processed, and
# starts the processing service.
def initiate(event, context):
batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
data = json.loads(event["body"])[DATA_STRING]
lambda_name = context.function_name
write_to_storage(batch_id, IN_PROGRESS_STATUS, "NULL")
lambda_response = invoke_process_lambda(batch_id, data, lambda_name)
# lambda response returns 202, because we are invoking it with
# InvocationType = 'Event'
if lambda_response["StatusCode"] != 202:
response = create_response(400, "Error in inititate: processing lambda not started")
else:
response = {
'statusCode': lambda_response["StatusCode"]
}
return response
# Processes the data passed to it from the POST handler. In this example,
# the processing is to perform sentiment analysis on text.
def process_data(event, context):
data = event[DATA_STRING]
batch_id = event[BATCH_ID_STRING]
def process_data_impl(data):
comprehend = boto3.client(service_name='comprehend', region_name=REGION_NAME)
# create return rows
ret = []
for i in range(len(data)):
text = data[i][1]
sentiment_response = comprehend.detect_sentiment(Text=text, LanguageCode='en')
sentiment_score = json.dumps(sentiment_response['SentimentScore'])
ret.append([i, sentiment_score])
return ret
processed_data = process_data_impl(data)
write_to_storage(batch_id, SUCCESS_STATUS, processed_data)
return create_response(200, "No errors in process")
# Repeatedly checks on the status of the batch_ID, and returns the result after the
# processing has been completed.
def poll(event, context):
batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
processed_data = read_data_from_storage(batch_id)
def parse_processed_data(response):
# in this case, the response is the response from DynamoDB
response_metadata = response['ResponseMetadata']
status_code = response_metadata['HTTPStatusCode']
# Take action depending on item status
item = response['Item']
job_status = item['status']
if job_status == SUCCESS_STATUS:
# the row number is stored at index 0 as a Decimal object,
# we need to convert it into a normal int to be serialized to JSON
data = [[int(row[0]), row[1]] for row in item['data']]
return {
'statusCode': 200,
'body': json.dumps({
'data': data
})
}
elif job_status == IN_PROGRESS_STATUS:
return {
'statusCode': 202,
"body": "{}"
}
else:
return create_response(500, "Error in poll: Unknown item status.")
return parse_processed_data(processed_data)
def create_response(code, msg):
return {
'statusCode': code,
'body': msg
}
def invoke_process_lambda(batch_id, data, lambda_name):
# Create payload to be sent to processing lambda
invoke_payload = json.dumps({
BATCH_ID_STRING: batch_id,
DATA_STRING: data
})
# Invoke processing lambda asynchronously by using InvocationType='Event'.
# This allows the processing to continue while the POST handler returns HTTP 202.
lambda_client = boto3.client('lambda', region_name=REGION_NAME,)
lambda_response = lambda_client.invoke(
FunctionName=lambda_name,
InvocationType='Event',
Payload=invoke_payload
)
# returns 202 on success if InvocationType = 'Event'
return lambda_response
def write_to_storage(batch_id, status, data):
# we assume that the table has already been created
client = boto3.resource('dynamodb')
table = client.Table(TABLE_NAME)
# Put in progress item in table
item_to_store = {
'batch_id': batch_id,
'status': status,
'data': data,
'timestamp': "{}".format(time.time())
}
db_response = table.put_item(
Item=item_to_store
)
def read_data_from_storage(batch_id):
# we assume that the table has already been created
client = boto3.resource('dynamodb')
table = client.Table(TABLE_NAME)
response = table.get_item(Key={'batch_id': batch_id},
ConsistentRead=True)
return response
Exemple d’appel et de sortie¶
Voici un exemple d’appel à la fonction externe asynchrone, ainsi qu’un exemple de sortie, avec les résultats de l’analyse des sentiments :
create table test_tb(a string);
insert into test_tb values
('hello world'),
('I am happy');
select ext_func_async(a) from test_tb;
Row | EXT_FUNC_ASYNC(A)
0 | {"Positive": 0.47589144110679626, "Negative": 0.07314028590917587, "Neutral": 0.4493273198604584, "Mixed": 0.0016409909585490823}
1 | {"Positive": 0.9954453706741333, "Negative": 0.00039307220140472054, "Neutral": 0.002452891319990158, "Mixed": 0.0017087293090298772}
Notes sur l’exemple de code¶
La fonction de traitement des données est appelée par un appel :
lambda_response = lambda_client.invoke( ... InvocationType='Event', ... )
Le InvocationType doit être un « événement », comme indiqué ci-dessus, car le 2e processus (ou chemin) doit être asynchrone et
Event
est le seul type d’appel non bloquant disponible par la méthodeinvoke()
.La fonction de traitement des données renvoie un code HTTP 200. Toutefois, ce code HTTP 200 n’est pas renvoyé directement à Snowflake. Snowflake ne voit aucun HTTP 200 jusqu’à ce qu’un GET interroge le statut et constate que la fonction de traitement des données a terminé le traitement de ce lot avec succès.