Création de fonctions externes asynchrones

Ce document décrit les fonctions externes asynchrones.

Dans ce chapitre :

Introduction aux services à distance asynchrones

Un service à distance peut fonctionner de manière synchrone ou asynchrone.

Un service distant synchrone reçoit une requête HTTP POST, traite la requête et renvoie le résultat. En fonction du temps nécessaire au traitement des données, il peut y avoir un délai important entre le moment où la requête est reçue et celui où les résultats sont renvoyés.

Un service distant asynchrone reçoit une requête HTTP POST et renvoie un accusé de réception (généralement presque immédiat) de la requête. L’appelant (Snowflake) exécute alors une boucle d’interrogation dans laquelle il émet une ou plusieurs requêtes HTTP GET (généralement avec un délai important entre chaque requête) pour vérifier le statut du traitement asynchrone. Un GET n’envoie aucune donnée dans le corps de la requête, mais contient les mêmes en-têtes que le POST original.

Les services à distance asynchrones sont utiles lorsqu’un service à distance dépasse les délais d’attente intégrés dans des composants tels que le service proxy (par ex. Amazon API Gateway).

Un service à distance n’est pas nécessairement purement synchrone ou purement asynchrone. Un service à distance peut fonctionner de manière synchrone et asynchrone à différents moments, en fonction de facteurs tels que la quantité de données contenues dans la requête, le nombre d’autres requêtes en cours de traitement, etc.

Le schéma ci-dessous illustre le traitement synchrone et asynchrone. Le chemin supérieur est synchrone. Le chemin inférieur (qui comprend une ou plusieurs requêtes HTTP GET) est asynchrone.

Illustration of the HTTP POST and GET requests for asynchronous external functions.

Le comportement asynchrone est mis en œuvre par la personne qui écrit le service à distance (et par Snowflake). Les instructions SQL sont les mêmes pour les services à distance asynchrones que pour les services à distance synchrones.

L’implémentation des fonctions externes de Snowflake est généralement compatible avec les bibliothèques de fonctions tierces synchrones et asynchrones.

Si vous écrivez votre propre service à distance et que vous voulez le rendre compatible avec le traitement asynchrone de Snowflake, écrivez-le de façon à ce qu’il se comporte comme suit :

  • Lorsqu’il reçoit initialement un HTTP POST pour un lot spécifique de lignes, le service à distance renvoie le code HTTP 202 (« Traitement en cours… »).

  • Si le service à distance reçoit des requêtes HTTP GET après le POST mais avant que la sortie ne soit prête, le service à distance renvoie le code HTTP 202.

  • Une fois que le service à distance a généré toutes les lignes de sortie, il attend le prochain HTTP GET avec le même ID de lot, puis renvoie les lignes reçues, avec le code HTTP 200 (« Achèvement réussi… »).

En bref, pour chaque lot reçu, le service à distance renvoie 202 jusqu’à ce que les résultats soient prêts, après quoi le GET suivant reçoit les résultats et un HTTP 200.

Pour chaque lot, Snowflake fonctionne avec le service à distance asynchrone comme suit :

  1. Snowflake envoie un HTTP POST qui contient les données à traiter, ainsi qu’un ID de lot unique.

  2. Si Snowflake reçoit une réponse HTTP 202, alors Snowflake fait une boucle jusqu’à ce que l’un des éléments suivants soit vrai :

    • Snowflake reçoit les données et un HTTP 200.

    • Le délai d’expiration interne de Snowflake est atteint.

    • Snowflake reçoit une erreur (par exemple, le code de réponse HTTP 5XX).

    Dans chaque itération de la boucle, Snowflake retarde, puis émet un HTTP GET qui contient le même ID de lot que l’ID de lot du HTTP POST correspondant, de sorte que le service à distance puisse renvoyer les informations pour le lot correct.

    Le délai à l’intérieur de la boucle commence court mais s’allonge pour chaque réponse 202 reçue jusqu’à ce que le délai d’expiration de Snowflake soit atteint.

  3. Si le délai de Snowflake est atteint avant que le code 200 ne soit renvoyé, alors Snowflake abandonne la requête SQL.

    Actuellement, le délai d’attente de Snowflake est de 10 minutes (600 secondes) et n’est pas configurable par l’utilisateur. Ce délai pourrait changer à l’avenir.

Note

La fréquence à laquelle les requêtes atteignent les délais d’attente dépend en partie de l’évolutivité du service à distance. Si votre service à distance expire fréquemment, voir aussi la discussion de Évolutivité.

Création d’une fonction asynchrone sur AWS

Sur AWS, les services à distance asynchrones doivent surmonter les restrictions suivantes :

  • Comme les requêtes HTTP POST et GET sont des requêtes distinctes, le service distant doit conserver des informations sur le workflow lancé par la requête POST afin que l’état puisse être interrogé ultérieurement par la requête GET.

    En général, chaque requête HTTP POST et HTTP GET appelle une instance distincte de la ou des fonctions de gestionnaire dans un processus ou un chemin distinct. Les différentes instances ne partagent pas leur mémoire. Pour que le gestionnaire GET puisse lire le statut ou les données traitées, le gestionnaire GET doit accéder à une ressource de stockage partagée qui est disponible sur AWS.

  • La seule façon pour le gestionnaire POST d’envoyer le code de réponse initial HTTP 202 est d’utiliser une instruction return (ou un équivalent), qui met fin à l’exécution du gestionnaire. Par conséquent, avant de renvoyer HTTP 202, le gestionnaire POST doit lancer un processus indépendant (ou thread) pour effectuer le travail de traitement des données du service à distance. Ce processus indépendant nécessite généralement un accès au stockage qui est visible pour le gestionnaire GET.

Un moyen pour un service distant asynchrone de surmonter ces restrictions est d’utiliser 3 processus (ou chemins) et un stockage partagé :

Illustration of processes for an Asynchronous Remote Service

Dans ce modèle, les processus ont les responsabilités suivantes :

  • Le gestionnaire HTTP POST :

    • Lit les données d’entrée. Dans une fonction Lambda, elle est lue à partir du corps du paramètre d’entrée event de la fonction de gestionnaire.

    • Lit l’ID de lot. Dans une fonction Lambda, ceci est lu à partir de l’en-tête du paramètre d’entrée event .

    • Lance le processus de traitement des données, et lui transmet les données et l’ID de lot. Les données sont généralement transmises pendant l’appel, mais elles peuvent être transmises en les écrivant sur un stockage externe.

    • Enregistre l’ID de lot dans un stockage partagé auquel peuvent accéder à la fois le processus de traitement des données et le processus du gestionnaire HTTP GET.

    • Si nécessaire, enregistre que le traitement de ce lot n’est pas encore terminé.

    • Retourne HTTP 202 si aucune erreur n’a été détectée.

  • Le code de traitement des données :

    • Lit les données d’entrée.

    • Traite les données.

    • Met le résultat à la disposition du gestionnaire GET (soit en écrivant les données de résultat sur un stockage partagé, soit en fournissant une API permettant d’interroger les résultats).

    • Généralement, il met à jour le statut de ce lot (par exemple de « IN_PROGRESS » à « SUCCESS ») pour indiquer que les résultats sont prêts à être lus.

    • Quitte. En option, ce processus peut renvoyer un indicateur d’erreur. Snowflake ne le voit pas directement (Snowflake ne voit que les codes de retour HTTP du gestionnaire POST et du gestionnaire GET), mais le renvoi d’un indicateur d’erreur du processus de traitement des données pourrait aider au débogage.

  • Le gestionnaire GET :

    • Lit l’ID de lot. Dans une fonction Lambda, ceci est lu à partir de l’en-tête du paramètre d’entrée event .

    • Lit le stockage pour obtenir le statut actuel de ce lot (par exemple IN_PROGRESS ouSUCCESS).

    • Si le traitement est toujours en cours, alors retournez 202.

    • Si le traitement s’est terminé avec succès, alors :

      • Lisez les résultats.

      • Nettoyez le stockage.

      • Renvoyez les résultats avec le code HTTP 200.

    • Si le statut stocké indique une erreur, alors :

      • Nettoyez le stockage.

      • Renvoyez un code d’erreur.

    Notez que le gestionnaire GET peut être appelé plusieurs fois pour un lot si le traitement prend suffisamment de temps pour que plusieurs requêtes HTTP GET soient envoyées.

Il existe de nombreuses variations possibles sur ce modèle. Par exemple :

  • L’ID de lot et le statut pourraient être écrits au début du processus de traitement des données plutôt qu’à la fin du processus POST.

  • Le traitement des données pourrait être effectué dans une fonction distincte (par exemple, une fonction Lambda distincte) ou même comme un service complètement distinct.

  • Le code de traitement des données ne doit pas nécessairement être écrit sur un stockage partagé. Au lieu de cela, les données traitées pourraient être mises à disposition d’une autre manière. Par exemple, une API pourrait accepter l’ID de lot comme paramètre et renvoyer les données.

Le code de mise en œuvre doit tenir compte de la possibilité que le traitement prenne trop de temps ou échoue, et tout résultat partiel doit donc être nettoyé pour éviter de gaspiller de l’espace de stockage.

Le mécanisme de stockage doit pouvoir être partagé entre plusieurs processus (ou chemins). Parmi les mécanismes de stockage possibles, on peut citer :

Le code de chacun des 3 processus ci-dessus peut être écrit comme 3 fonctions Lambda séparées (une pour le gestionnaire POST, une pour la fonction de traitement des données et une pour le gestionnaire GET), ou comme une fonction unique qui peut être appelée de différentes manières.

L’exemple de code Python ci-dessous est une fonction Lambda unique qui peut être appelée séparément pour les processus POST, le traitement des données et GET.

Ce code montre un exemple de requête avec sortie. Dans cet exemple, l’accent est mis sur les trois processus et leur interaction, et non sur le mécanisme de stockage partagé (DynamoDB) ou la transformation des données (analyse des sentiments). Le code est structuré de manière à faciliter le remplacement du mécanisme de stockage et de transformation des données par d’autres.

Par souci de simplicité, voici un exemple :

  • Code en dur certaines valeurs importantes (par exemple, la région AWS).

  • Suppose l’existence de certaines ressources (par exemple, la table « Jobs » dans Dynamo).

import json
import time
import boto3

HTTP_METHOD_STRING = "httpMethod"
HEADERS_STRING = "headers"
BATCH_ID_STRING = "sf-external-function-query-batch-id"
DATA_STRING = "data"
REGION_NAME = "us-east-2"

TABLE_NAME = "Jobs"
IN_PROGRESS_STATUS = "IN_PROGRESS"
SUCCESS_STATUS = "SUCCESS"

def lambda_handler(event, context):
    # this is called from either the GET or POST
    if (HTTP_METHOD_STRING in event):
        method = event[HTTP_METHOD_STRING]
        if method == "POST":
            return initiate(event, context)
        elif method == "GET":
            return poll(event, context)
        else:
            return create_response(400, "Function called from invalid method")

    # if not called from GET or POST, then this lambda was called to
    # process data
    else:
        return process_data(event, context)


# Reads batch_ID and data from the request, marks the batch_ID as being processed, and
# starts the processing service.
def initiate(event, context):
    batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
    data = json.loads(event["body"])[DATA_STRING]

    lambda_name = context.function_name

    write_to_storage(batch_id, IN_PROGRESS_STATUS, "NULL")
    lambda_response = invoke_process_lambda(batch_id, data, lambda_name)

    # lambda response returns 202, because we are invoking it with
    # InvocationType = 'Event'
    if lambda_response["StatusCode"] != 202:
        response = create_response(400, "Error in inititate: processing lambda not started")
    else:
        response = {
            'statusCode': lambda_response["StatusCode"]
        }

    return response


# Processes the data passed to it from the POST handler. In this example,
# the processing is to perform sentiment analysis on text.
def process_data(event, context):
    data = event[DATA_STRING]
    batch_id = event[BATCH_ID_STRING]

    def process_data_impl(data):
        comprehend = boto3.client(service_name='comprehend', region_name=REGION_NAME)
        # create return rows
        ret = []
        for i in range(len(data)):
            text = data[i][1]
            sentiment_response = comprehend.detect_sentiment(Text=text, LanguageCode='en')
            sentiment_score = json.dumps(sentiment_response['SentimentScore'])
            ret.append([i, sentiment_score])
        return ret

    processed_data = process_data_impl(data)
    write_to_storage(batch_id, SUCCESS_STATUS, processed_data)

    return create_response(200, "No errors in process")


# Repeatedly checks on the status of the batch_ID, and returns the result after the
# processing has been completed.
def poll(event, context):
    batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
    processed_data = read_data_from_storage(batch_id)

    def parse_processed_data(response):
        # in this case, the response is the response from DynamoDB
        response_metadata = response['ResponseMetadata']
        status_code = response_metadata['HTTPStatusCode']

        # Take action depending on item status
        item = response['Item']
        job_status = item['status']
        if job_status == SUCCESS_STATUS:
            # the row number is stored at index 0 as a Decimal object,
            # we need to convert it into a normal int to be serialized to JSON
            data = [[int(row[0]), row[1]] for row in item['data']]
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'data': data
                })
            }
        elif job_status == IN_PROGRESS_STATUS:
            return {
                'statusCode': 202,
                "body": "{}"
            }
        else:
            return create_response(500, "Error in poll: Unknown item status.")

    return parse_processed_data(processed_data)


def create_response(code, msg):
    return {
        'statusCode': code,
        'body': msg
    }


def invoke_process_lambda(batch_id, data, lambda_name):
    # Create payload to be sent to processing lambda
    invoke_payload = json.dumps({
        BATCH_ID_STRING: batch_id,
        DATA_STRING: data
    })

    # Invoke processing lambda asynchronously by using InvocationType='Event'.
    # This allows the processing to continue while the POST handler returns HTTP 202.
    lambda_client = boto3.client('lambda', region_name=REGION_NAME,)
    lambda_response = lambda_client.invoke(
        FunctionName=lambda_name,
        InvocationType='Event',
        Payload=invoke_payload
    )
    # returns 202 on success if InvocationType = 'Event'
    return lambda_response


def write_to_storage(batch_id, status, data):
    # we assume that the table has already been created
    client = boto3.resource('dynamodb')
    table = client.Table(TABLE_NAME)

    # Put in progress item in table
    item_to_store = {
        'batch_id': batch_id,
        'status': status,
        'data': data,
        'timestamp': "{}".format(time.time())
    }
    db_response = table.put_item(
        Item=item_to_store
    )


def read_data_from_storage(batch_id):
    # we assume that the table has already been created
    client = boto3.resource('dynamodb')
    table = client.Table(TABLE_NAME)

    response = table.get_item(Key={'batch_id': batch_id},
                          ConsistentRead=True)
    return response

Voici un exemple d’appel à la fonction externe asynchrone, ainsi qu’un exemple de sortie, y compris les résultats de l’analyse des sentiments :

create table test_tb(a string);
insert into test_tb values
    ('hello world'),
    ('I am happy');
select ext_func_async(a) from test_tb;

Row | EXT_FUNC_ASYNC(A)
0   | {"Positive": 0.47589144110679626, "Negative": 0.07314028590917587, "Neutral": 0.4493273198604584, "Mixed": 0.0016409909585490823}
1   | {"Positive": 0.9954453706741333, "Negative": 0.00039307220140472054, "Neutral": 0.002452891319990158, "Mixed": 0.0017087293090298772}

Notes sur l’exemple de code :

  • La fonction de traitement des données est appelée par un appel :

    lambda_response = lambda_client.invoke(
        ...
        InvocationType='Event',
        ...
    )
    

    Le InvocationType doit être un « événement », comme indiqué ci-dessus, car le 2e processus (ou chemin) doit être asynchrone et Event est le seul type d’appel non bloquant disponible par la méthode invoke() .

  • La fonction de traitement des données renvoie un code HTTP 200. Toutefois, ce code HTTP 200 n’est pas renvoyé directement à Snowflake. Snowflake ne voit aucun HTTP 200 jusqu’à ce qu’un GET interroge le statut et constate que la fonction de traitement des données a terminé le traitement de ce lot avec succès.