Exemple de service à distance asynchrone pour AWS

Cette rubrique contient un exemple de fonction AWS Lambda asynchrone (service distant). Vous pouvez créer cet exemple de fonction en suivant les mêmes étapes que celles décrites dans Étape 1 : Créez le service distant (fonctionAWS Lambda) dans la console de gestion.

Dans ce chapitre :

Aperçu du code

Cette section de la documentation fournit des informations sur la création d’une fonction externe asynchrone sur AWS. (Avant de mettre en œuvre votre première fonction externe asynchrone, vous pouvez lire l” aperçu conceptuel des fonctions externes asynchrones).

Sur AWS, les services à distance asynchrones doivent surmonter les restrictions suivantes :

  • Comme les requêtes HTTP POST et GET sont des requêtes distinctes, le service distant doit conserver des informations sur le workflow lancé par la requête POST afin que l’état puisse être interrogé ultérieurement par la requête GET.

    En général, chaque requête HTTP POST et HTTP GET appelle une instance distincte de la ou des fonctions de gestionnaire dans un processus ou un chemin distinct. Les différentes instances ne partagent pas leur mémoire. Pour que le gestionnaire GET puisse lire le statut ou les données traitées, le gestionnaire GET doit accéder à une ressource de stockage partagée qui est disponible sur AWS.

  • La seule façon pour le gestionnaire POST d’envoyer le code de réponse initial HTTP 202 est d’utiliser une instruction return (ou un équivalent), qui met fin à l’exécution du gestionnaire. Par conséquent, avant de renvoyer HTTP 202, le gestionnaire POST doit lancer un processus indépendant (ou thread) pour effectuer le travail de traitement des données du service à distance. Ce processus indépendant nécessite généralement un accès au stockage qui est visible pour le gestionnaire GET.

Un moyen pour un service distant asynchrone de surmonter ces restrictions est d’utiliser 3 processus (ou chemins) et un stockage partagé :

Illustration of processes for an Asynchronous Remote Service

Dans ce modèle, les processus ont les responsabilités suivantes :

  • Le gestionnaire HTTP POST :

    • Lit les données d’entrée. Dans une fonction Lambda, elle est lue à partir du corps du paramètre d’entrée event de la fonction de gestionnaire.

    • Lit l’ID de lot. Dans une fonction Lambda, ceci est lu à partir de l’en-tête du paramètre d’entrée event .

    • Lance le processus de traitement des données, et lui transmet les données et l’ID de lot. Les données sont généralement transmises pendant l’appel, mais elles peuvent être transmises en les écrivant sur un stockage externe.

    • Enregistre l’ID de lot dans un stockage partagé auquel peuvent accéder à la fois le processus de traitement des données et le processus du gestionnaire HTTP GET.

    • Si nécessaire, enregistre que le traitement de ce lot n’est pas encore terminé.

    • Retourne HTTP 202 si aucune erreur n’a été détectée.

  • Le code de traitement des données :

    • Lit les données d’entrée.

    • Traite les données.

    • Met le résultat à la disposition du gestionnaire GET (soit en écrivant les données de résultat sur un stockage partagé, soit en fournissant une API permettant d’interroger les résultats).

    • Généralement, il met à jour le statut de ce lot (par exemple de IN_PROGRESS à SUCCESS) pour indiquer que les résultats sont prêts à être lus.

    • Quitte. En option, ce processus peut renvoyer un indicateur d’erreur. Snowflake ne le voit pas directement (Snowflake ne voit que les codes de retour HTTP du gestionnaire POST et du gestionnaire GET), mais le renvoi d’un indicateur d’erreur du processus de traitement des données pourrait aider au débogage.

  • Le gestionnaire GET :

    • Lit l’ID de lot. Dans une fonction Lambda, ceci est lu à partir de l’en-tête du paramètre d’entrée event .

    • Lit le stockage pour obtenir le statut actuel de ce lot (par exemple IN_PROGRESS ou SUCCESS).

    • Si le traitement est toujours en cours, alors retournez 202.

    • Si le traitement s’est terminé avec succès, alors :

      • Lisez les résultats.

      • Nettoyez le stockage.

      • Renvoyez les résultats avec le code HTTP 200.

    • Si le statut stocké indique une erreur, alors :

      • Nettoyez le stockage.

      • Renvoyez un code d’erreur.

    Notez que le gestionnaire GET peut être appelé plusieurs fois pour un lot si le traitement prend suffisamment de temps pour que plusieurs requêtes HTTP GET soient envoyées.

Il existe de nombreuses variations possibles sur ce modèle. Par exemple :

  • L’ID de lot et le statut pourraient être écrits au début du processus de traitement des données plutôt qu’à la fin du processus POST.

  • Le traitement des données pourrait être effectué dans une fonction distincte (par exemple, une fonction Lambda distincte) ou même comme un service complètement distinct.

  • Le code de traitement des données ne doit pas nécessairement être écrit sur un stockage partagé. Au lieu de cela, les données traitées pourraient être mises à disposition d’une autre manière. Par exemple, une API pourrait accepter l’ID de lot comme paramètre et renvoyer les données.

Le code de mise en œuvre doit tenir compte de la possibilité que le traitement prenne trop de temps ou échoue, et tout résultat partiel doit donc être nettoyé pour éviter de gaspiller de l’espace de stockage.

Le mécanisme de stockage doit pouvoir être partagé entre plusieurs processus (ou chemins). Parmi les mécanismes de stockage possibles, on peut citer :

Le code de chacun des 3 processus ci-dessus peut être écrit comme 3 fonctions Lambda séparées (une pour le gestionnaire POST, une pour la fonction de traitement des données et une pour le gestionnaire GET), ou comme une fonction unique qui peut être appelée de différentes manières.

L’exemple de code Python ci-dessous est une fonction Lambda unique qui peut être appelée séparément pour les processus POST, le traitement des données et GET.

Exemple de code

Ce code montre un exemple de requête avec sortie. Dans cet exemple, l’accent est mis sur les trois processus et leur interaction, et non sur le mécanisme de stockage partagé (DynamoDB) ou la transformation des données (analyse des sentiments). Le code est structuré de manière à faciliter le remplacement du mécanisme de stockage et de transformation des données par d’autres.

Par souci de simplicité, voici un exemple :

  • Code en dur certaines valeurs importantes (par exemple, la région AWS).

  • Suppose l’existence de certaines ressources (par exemple, la table Jobs dans Dynamo).

import json
import time
import boto3

HTTP_METHOD_STRING = "httpMethod"
HEADERS_STRING = "headers"
BATCH_ID_STRING = "sf-external-function-query-batch-id"
DATA_STRING = "data"
REGION_NAME = "us-east-2"

TABLE_NAME = "Jobs"
IN_PROGRESS_STATUS = "IN_PROGRESS"
SUCCESS_STATUS = "SUCCESS"

def lambda_handler(event, context):
    # this is called from either the GET or POST
    if (HTTP_METHOD_STRING in event):
        method = event[HTTP_METHOD_STRING]
        if method == "POST":
            return initiate(event, context)
        elif method == "GET":
            return poll(event, context)
        else:
            return create_response(400, "Function called from invalid method")

    # if not called from GET or POST, then this lambda was called to
    # process data
    else:
        return process_data(event, context)


# Reads batch_ID and data from the request, marks the batch_ID as being processed, and
# starts the processing service.
def initiate(event, context):
    batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
    data = json.loads(event["body"])[DATA_STRING]

    lambda_name = context.function_name

    write_to_storage(batch_id, IN_PROGRESS_STATUS, "NULL")
    lambda_response = invoke_process_lambda(batch_id, data, lambda_name)

    # lambda response returns 202, because we are invoking it with
    # InvocationType = 'Event'
    if lambda_response["StatusCode"] != 202:
        response = create_response(400, "Error in inititate: processing lambda not started")
    else:
        response = {
            'statusCode': lambda_response["StatusCode"]
        }

    return response


# Processes the data passed to it from the POST handler. In this example,
# the processing is to perform sentiment analysis on text.
def process_data(event, context):
    data = event[DATA_STRING]
    batch_id = event[BATCH_ID_STRING]

    def process_data_impl(data):
        comprehend = boto3.client(service_name='comprehend', region_name=REGION_NAME)
        # create return rows
        ret = []
        for i in range(len(data)):
            text = data[i][1]
            sentiment_response = comprehend.detect_sentiment(Text=text, LanguageCode='en')
            sentiment_score = json.dumps(sentiment_response['SentimentScore'])
            ret.append([i, sentiment_score])
        return ret

    processed_data = process_data_impl(data)
    write_to_storage(batch_id, SUCCESS_STATUS, processed_data)

    return create_response(200, "No errors in process")


# Repeatedly checks on the status of the batch_ID, and returns the result after the
# processing has been completed.
def poll(event, context):
    batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
    processed_data = read_data_from_storage(batch_id)

    def parse_processed_data(response):
        # in this case, the response is the response from DynamoDB
        response_metadata = response['ResponseMetadata']
        status_code = response_metadata['HTTPStatusCode']

        # Take action depending on item status
        item = response['Item']
        job_status = item['status']
        if job_status == SUCCESS_STATUS:
            # the row number is stored at index 0 as a Decimal object,
            # we need to convert it into a normal int to be serialized to JSON
            data = [[int(row[0]), row[1]] for row in item['data']]
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'data': data
                })
            }
        elif job_status == IN_PROGRESS_STATUS:
            return {
                'statusCode': 202,
                "body": "{}"
            }
        else:
            return create_response(500, "Error in poll: Unknown item status.")

    return parse_processed_data(processed_data)


def create_response(code, msg):
    return {
        'statusCode': code,
        'body': msg
    }


def invoke_process_lambda(batch_id, data, lambda_name):
    # Create payload to be sent to processing lambda
    invoke_payload = json.dumps({
        BATCH_ID_STRING: batch_id,
        DATA_STRING: data
    })

    # Invoke processing lambda asynchronously by using InvocationType='Event'.
    # This allows the processing to continue while the POST handler returns HTTP 202.
    lambda_client = boto3.client('lambda', region_name=REGION_NAME,)
    lambda_response = lambda_client.invoke(
        FunctionName=lambda_name,
        InvocationType='Event',
        Payload=invoke_payload
    )
    # returns 202 on success if InvocationType = 'Event'
    return lambda_response


def write_to_storage(batch_id, status, data):
    # we assume that the table has already been created
    client = boto3.resource('dynamodb')
    table = client.Table(TABLE_NAME)

    # Put in progress item in table
    item_to_store = {
        'batch_id': batch_id,
        'status': status,
        'data': data,
        'timestamp': "{}".format(time.time())
    }
    db_response = table.put_item(
        Item=item_to_store
    )


def read_data_from_storage(batch_id):
    # we assume that the table has already been created
    client = boto3.resource('dynamodb')
    table = client.Table(TABLE_NAME)

    response = table.get_item(Key={'batch_id': batch_id},
                          ConsistentRead=True)
    return response
Copy

Exemple d’appel et de sortie

Voici un exemple d’appel à la fonction externe asynchrone, ainsi qu’un exemple de sortie, avec les résultats de l’analyse des sentiments :

create table test_tb(a string);
insert into test_tb values
    ('hello world'),
    ('I am happy');
select ext_func_async(a) from test_tb;

Row | EXT_FUNC_ASYNC(A)
0   | {"Positive": 0.47589144110679626, "Negative": 0.07314028590917587, "Neutral": 0.4493273198604584, "Mixed": 0.0016409909585490823}
1   | {"Positive": 0.9954453706741333, "Negative": 0.00039307220140472054, "Neutral": 0.002452891319990158, "Mixed": 0.0017087293090298772}
Copy

Notes sur l’exemple de code

  • La fonction de traitement des données est appelée par un appel :

    lambda_response = lambda_client.invoke(
        ...
        InvocationType='Event',
        ...
    )
    
    Copy

    Le InvocationType doit être un « événement », comme indiqué ci-dessus, car le 2e processus (ou chemin) doit être asynchrone et Event est le seul type d’appel non bloquant disponible par la méthode invoke() .

  • La fonction de traitement des données renvoie un code HTTP 200. Toutefois, ce code HTTP 200 n’est pas renvoyé directement à Snowflake. Snowflake ne voit aucun HTTP 200 jusqu’à ce qu’un GET interroge le statut et constate que la fonction de traitement des données a terminé le traitement de ce lot avec succès.