Asynchrone externe Funktionen erstellen

Dieses Dokument beschreibt asynchrone externe Funktionen.

Unter diesem Thema:

Einführung in asynchrone Remotedienste

Ein Remotedienst kann synchron oder asynchron arbeiten.

Ein synchroner Remotedienst empfängt eine HTTP-POST-Anforderung, verarbeitet die Anforderung und gibt das Ergebnis zurück. Je nachdem, wie lange es dauert, die Daten zu verarbeiten, kann es zu einer erheblichen Verzögerung zwischen dem Zeitpunkt des Eingangs der Anforderung und der Rückgabe der Ergebnisse kommen.

Ein asynchroner Remotedienst empfängt eine HTTP-POST-Anforderung und gibt eine (in der Regel fast sofortige) Bestätigung zurück, dass die Anforderung empfangen wurde. Der Aufrufer (Snowflake) führt dann eine Abrufschleife (Polling) aus, in der er eine oder mehrere HTTP-GET-Anforderungen ausgibt (normalerweise mit einer erheblichen Verzögerung zwischen den einzelnen Anforderungen), um den Status der asynchronen Verarbeitung zu überprüfen. Ein GET sendet keine Daten im Hauptteil der Anforderung, sondern enthält den gleichen Header wie das ursprüngliche POST.

Asynchrone Remotedienste sind nützlich, wenn ein Remotedienst die in Komponenten wie dem Proxydienst (z. B. Amazon API Gateway) integrierten Timeouts überschreitet.

Ein Remotedienst ist nicht unbedingt rein synchron oder rein asynchron. Ein Remotedienst kann zu verschiedenen Zeiten synchron und asynchron arbeiten, abhängig von Faktoren wie der Datenmenge in der Anforderung, der Anzahl anderer Anfragen, die verarbeitet werden, usw.

Das folgende Diagramm veranschaulicht die synchrone und asynchrone Verarbeitung. Der obere Pfad ist synchron. Der untere Pfad (der eine oder mehrere HTTP-GET-Anforderungen enthält) ist asynchron.

Illustration of the HTTP POST and GET requests for asynchronous external functions.

Asynchrones Verhalten wird von der Person, die den Remotedienst schreibt, (und von Snowflake) implementiert. SQL-Anweisungen sind für asynchrone Remotedienste die gleichen wie für synchrone Remotedienste.

Die Snowflake-Implementierung von externen Funktionen ist im Allgemeinen sowohl mit synchronen als auch asynchronen Funktionsbibliotheken von Drittanbietern kompatibel.

Wenn Sie Ihren eigenen Remotedienst schreiben und dieser für die asynchronen Verarbeitung in Snowflake kompatibel sein soll, muss dieser sich wie folgt verhalten:

  • Wenn er anfangs einen HTTP-POST-Befehl für einen bestimmten Batch von Zeilen empfängt, gibt der Remotedienst den HTTP-Code 202 zurück („Processing…“).

  • Wenn der Remotedienst nach dem POST, aber bevor die Ausgabe fertig ist, irgendwelche HTTP-GET-Anforderungen empfängt, gibt der Remotedienst den HTTP-Code 202 zurück.

  • Nachdem der Remotedienst alle Ausgabezeilen generiert hat, wartet er auf den nächsten HTTP-GET-Befehl mit derselben Batch-ID und gibt dann die empfangenen Zeilen zusammen mit dem HTTP-Code 200 zurück („Successful completion…“).

Kurz gesagt, für jeden empfangenen Batch gibt der Remotedienst „202“ zurück, bis die Ergebnisse bereit sind, woraufhin der nächste GET-Befehl die Ergebnisse und einen HTTP-Code 200 erhält.

Für jeden Batch verwendet Snowflake den asynchronen Remotedienst wie folgt:

  1. Snowflake sendet einen HTTP-POST-Befehl, der die zu verarbeitenden Daten enthält, zusammen mit einer eindeutigen Batch-ID.

  2. Wenn Snowflake einen HTTP-Code 202 als Antwort erhält, führt Snowflake so lange eine Schleife aus, bis einer der folgenden Punkte zutrifft:

    • Snowflake empfängt die Daten und einen HTTP-Code 200.

    • Das interne Timeout von Snowflake ist erreicht.

    • Snowflake erhält einen Fehler (z. B. HTTP-Antwortcode 5XX).

    In jeder Iteration der Schleife führt Snowflake eine Verzögerung aus und gibt dann einen HTTP-GET-Befehl aus, der die gleiche Batch-ID enthält wie die Batch-ID des entsprechenden HTTP-POST-Befehls, sodass der Remotedienst Informationen für den richtigen Batch zurückgeben kann.

    Die Verzögerung innerhalb der Schleife beginnt kurz, wird aber mit jeder empfangenen 202-Antwort länger, bis das Timeout von Snowflake erreicht ist.

  3. Wenn das Timeout von Snowflake erreicht wird, bevor 200 zurückgegeben wird, bricht Snowflake die SQL-Abfrage ab.

    Gegenwärtig beträgt das Timeout von Snowflake 10 Minuten (600 Sekunden) und ist nicht vom Benutzer konfigurierbar. Dieser Timeout-Wert könnte sich in Zukunft ändern.

Bemerkung

Die Häufigkeit, mit der Abfragen zu Timeouts führen, hängt teilweise auch von der Skalierbarkeit des Remotedienstes ab. Wenn Ihr Remotedienst häufig Timeouts produziert, finden Sie unter Skalierbarkeit eine Diskussion zu diesem Thema.

Erstellen einer asynchronen Funktion auf AWS

Auf AWS müssen asynchrone Remotedienste die folgenden Einschränkungen überwinden:

  • Da es sich bei HTTP POST und GET um separate Anforderungen handelt, muss der Remotedienst Informationen über den von der POST-Anforderung gestarteten Workflow aufbewahren, damit der Status später von der GET-Anforderung abgefragt werden kann.

    Typischerweise ruft jeder HTTP POST und HTTP GET eine separate Instanz der Handlerfunktion(en) in einem separaten Prozess oder Thread auf. Die einzelnen Instanzen teilen sich den Speicher nicht. Damit der GET-Handler den Status oder die verarbeiteten Daten lesen kann, muss der GET-Handler auf eine gemeinsame Speicherressource zugreifen, die auf AWS verfügbar ist.

  • Die einzige Möglichkeit für den POST-Handler, den anfänglichen HTTP 202-Antwortcode zu senden, ist über eine return-Anweisung (oder eine gleichwertige Anweisung), die die Ausführung des Handlers beendet. Daher muss der POST-Handler vor der Rückgabe von HTTP 202 einen unabhängigen Prozess (oder Thread) starten, um die eigentliche Datenverarbeitung des Remotedienstes durchzuführen. Dieser unabhängige Prozess benötigt typischerweise Zugriff auf den Speicher, der für den GET-Handler sichtbar ist.

Um diese Einschränkung für einen asynchronen Remotedienst zu überwinden, können beispielsweise drei Prozesse (oder Threads) und ein gemeinsamer Speicher verwendet werden:

Illustration of processes for an Asynchronous Remote Service

Im folgenden Modell haben die Prozesse die folgenden Aufgaben:

  • Der HTTP-POST-Handler:

    • Liest die Eingangsdaten. Bei einer Lambda-Funktion wird der Textbereich des Eingabeparameters event der Handler-Funktion ausgelesen.

    • Liest die Batch-ID. Bei einer Lambda-Funktion wird die Kopfzeile des Eingabeparameters event gelesen.

    • Startet den Datenverarbeitungsprozess und übergibt ihm die Daten und die Batch-ID. Die Daten werden normalerweise beim Aufruf übergeben, sie können aber auch durch Schreiben in einen externen Speicher übergeben werden.

    • Erfasst die Batch-ID im gemeinsamen Speicher, auf den sowohl der Datenverarbeitungsprozess als auch der HTTP-GET-Handler-Prozess zugreifen können.

    • Erfasst bei Bedarf, dass die Verarbeitung dieser Batches noch nicht abgeschlossen ist.

    • Gibt HTTP 202 zurück, wenn kein Fehler erkannt wurde.

  • Der Datenverarbeitungscode:

    • Liest die Eingangsdaten.

    • Verarbeitet die Daten.

    • Stellt das Ergebnis für den GET-Handler zur Verfügung (entweder durch Schreiben der Ergebnisdaten in den gemeinsamen Speicher oder durch Bereitstellung einer API, über den die Ergebnisse abgefragt werden können).

    • Aktualisiert in der Regel den Status dieses Batches (z. B. von IN_PROGRESS in SUCCESS), um anzuzeigen, dass die Ergebnisse zum Lesen bereit sind.

    • Beendet den Prozess. Optional kann dieser Prozess einen Fehlerindikator zurückgeben. Snowflake sieht dies nicht direkt (Snowflake sieht nur den HTTP-Rückgabecode von POST-Handler und GET-Handler), aber die Rückgabe eines Fehlerindikators aus dem Datenverarbeitungsprozess könnte bei der Fehlersuche hilfreich sein.

  • Der GET-Handler:

    • Liest die Batch-ID. Bei einer Lambda-Funktion wird die Kopfzeile des Eingabeparameters event gelesen.

    • Liest den Speicher, um den aktuellen Status dieses Batches zu erhalten (z. B. IN_PROGRESS oder SUCCESS).

    • Wenn die Verarbeitung noch nicht abgeschlossen ist, dann wird 202 zurückgegeben.

    • Wenn die Verarbeitung erfolgreich abgeschlossen ist, dann:

      • Liest die Ergebnisse.

      • Bereinigt den Speicher.

      • Gibt die Ergebnisse zusammen mit HTTP-Code 200 zurück.

    • Wenn der gespeicherte Status einen Fehler anzeigt, dann:

      • Bereinigt den Speicher.

      • Gibt einen Fehlercode zurück.

    Beachten Sie, dass der GET-Handler für einen Batch möglicherweise mehrfach aufgerufen wird, wenn die Verarbeitung so lange dauert, dass mehrere HTTP-GET-Anforderungen gesendet werden.

Von diesem Modell gibt es viele mögliche Varianten. Beispiel:

  • Die Batch-ID und der Status könnten am Anfang des Datenverarbeitungsprozesses geschrieben werden und nicht am Ende des POST-Prozesses.

  • Die Datenverarbeitung kann in einer separaten Funktion (z. B. einer separaten Lambda-Funktion) oder sogar als komplett separater Dienst erfolgen.

  • Der Datenverarbeitungscode muss nicht unbedingt in den gemeinsamen Speicher schreiben. Stattdessen können die verarbeiteten Daten auf andere Weise zur Verfügung gestellt werden. Eine API kann z. B. die Batch-ID als Parameter annehmen und die Daten zurückgeben.

Der Implementierungscode sollte die Möglichkeit in Betracht ziehen, dass die Verarbeitung zu lange dauert oder fehlschlägt, und daher müssen alle Teilergebnisse bereinigt werden, um nicht unnötig Speicherplatz zu verbrauchen.

Der Speichermechanismus muss über mehrere Prozesse (oder Threads) hinweg gemeinsam genutzt werden können. Mögliche Speichermechanismen sind:

  • Von AWS bereitgestellte Speichermechanismen, wie z. B.:

  • Speicher, der sich außerhalb von AWS befindet, aber von AWS aus zugänglich ist.

Der Code für jeden der obigen drei Prozesse kann als separate Lambda-Funktion geschrieben werden (eine für den POST-Handler, eine für die Datenverarbeitungsfunktion und eine für den GET-Handler), oder als eine einzige Funktion, die auf verschiedene Weise aufgerufen werden kann.

Der folgende Python-Beispielcode ist eine einzelne Lambda-Funktion, die separat für den POST-, den Datenverarbeitungs- und den GET-Prozess aufgerufen werden kann.

Dieser Code zeigt eine Beispielabfrage mit Ausgabe. Der Fokus in diesem Beispiel liegt auf den drei Prozessen und wie sie interagieren, nicht auf dem gemeinsamen Speichermechanismus (DynamoDB) oder die Datentransformation (Standpunktanalyse). Der Code ist so strukturiert, dass es einfach ist, den Beispielspeichermechanismus und die Datentransformation durch andere zu ersetzen.

Der Einfachheit halber folgendes Beispiel:

  • Einige wichtige Werte (z. B. die AWS-Region) werden hart kodiert.

  • Setzt das Vorhandensein einiger Ressourcen voraus (z. B. die Tabelle „Jobs“ in Dynamo).

import json
import time
import boto3

HTTP_METHOD_STRING = "httpMethod"
HEADERS_STRING = "headers"
BATCH_ID_STRING = "sf-external-function-query-batch-id"
DATA_STRING = "data"
REGION_NAME = "us-east-2"

TABLE_NAME = "Jobs"
IN_PROGRESS_STATUS = "IN_PROGRESS"
SUCCESS_STATUS = "SUCCESS"

def lambda_handler(event, context):
    # this is called from either the GET or POST
    if (HTTP_METHOD_STRING in event):
        method = event[HTTP_METHOD_STRING]
        if method == "POST":
            return initiate(event, context)
        elif method == "GET":
            return poll(event, context)
        else:
            return create_response(400, "Function called from invalid method")

    # if not called from GET or POST, then this lambda was called to
    # process data
    else:
        return process_data(event, context)


# Reads batch_ID and data from the request, marks the batch_ID as being processed, and
# starts the processing service.
def initiate(event, context):
    batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
    data = json.loads(event["body"])[DATA_STRING]

    lambda_name = context.function_name

    write_to_storage(batch_id, IN_PROGRESS_STATUS, "NULL")
    lambda_response = invoke_process_lambda(batch_id, data, lambda_name)

    # lambda response returns 202, because we are invoking it with
    # InvocationType = 'Event'
    if lambda_response["StatusCode"] != 202:
        response = create_response(400, "Error in inititate: processing lambda not started")
    else:
        response = {
            'statusCode': lambda_response["StatusCode"]
        }

    return response


# Processes the data passed to it from the POST handler. In this example,
# the processing is to perform sentiment analysis on text.
def process_data(event, context):
    data = event[DATA_STRING]
    batch_id = event[BATCH_ID_STRING]

    def process_data_impl(data):
        comprehend = boto3.client(service_name='comprehend', region_name=REGION_NAME)
        # create return rows
        ret = []
        for i in range(len(data)):
            text = data[i][1]
            sentiment_response = comprehend.detect_sentiment(Text=text, LanguageCode='en')
            sentiment_score = json.dumps(sentiment_response['SentimentScore'])
            ret.append([i, sentiment_score])
        return ret

    processed_data = process_data_impl(data)
    write_to_storage(batch_id, SUCCESS_STATUS, processed_data)

    return create_response(200, "No errors in process")


# Repeatedly checks on the status of the batch_ID, and returns the result after the
# processing has been completed.
def poll(event, context):
    batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
    processed_data = read_data_from_storage(batch_id)

    def parse_processed_data(response):
        # in this case, the response is the response from DynamoDB
        response_metadata = response['ResponseMetadata']
        status_code = response_metadata['HTTPStatusCode']

        # Take action depending on item status
        item = response['Item']
        job_status = item['status']
        if job_status == SUCCESS_STATUS:
            # the row number is stored at index 0 as a Decimal object,
            # we need to convert it into a normal int to be serialized to JSON
            data = [[int(row[0]), row[1]] for row in item['data']]
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'data': data
                })
            }
        elif job_status == IN_PROGRESS_STATUS:
            return {
                'statusCode': 202,
                "body": "{}"
            }
        else:
            return create_response(500, "Error in poll: Unknown item status.")

    return parse_processed_data(processed_data)


def create_response(code, msg):
    return {
        'statusCode': code,
        'body': msg
    }


def invoke_process_lambda(batch_id, data, lambda_name):
    # Create payload to be sent to processing lambda
    invoke_payload = json.dumps({
        BATCH_ID_STRING: batch_id,
        DATA_STRING: data
    })

    # Invoke processing lambda asynchronously by using InvocationType='Event'.
    # This allows the processing to continue while the POST handler returns HTTP 202.
    lambda_client = boto3.client('lambda', region_name=REGION_NAME,)
    lambda_response = lambda_client.invoke(
        FunctionName=lambda_name,
        InvocationType='Event',
        Payload=invoke_payload
    )
    # returns 202 on success if InvocationType = 'Event'
    return lambda_response


def write_to_storage(batch_id, status, data):
    # we assume that the table has already been created
    client = boto3.resource('dynamodb')
    table = client.Table(TABLE_NAME)

    # Put in progress item in table
    item_to_store = {
        'batch_id': batch_id,
        'status': status,
        'data': data,
        'timestamp': "{}".format(time.time())
    }
    db_response = table.put_item(
        Item=item_to_store
    )


def read_data_from_storage(batch_id):
    # we assume that the table has already been created
    client = boto3.resource('dynamodb')
    table = client.Table(TABLE_NAME)

    response = table.get_item(Key={'batch_id': batch_id},
                          ConsistentRead=True)
    return response

Hier ist ein Beispielaufruf der asynchronen externen Funktion zusammen mit einer Beispielausgabe, einschließlich der Ergebnisse der Standpunktanalyse:

create table test_tb(a string);
insert into test_tb values
    ('hello world'),
    ('I am happy');
select ext_func_async(a) from test_tb;

Row | EXT_FUNC_ASYNC(A)
0   | {"Positive": 0.47589144110679626, "Negative": 0.07314028590917587, "Neutral": 0.4493273198604584, "Mixed": 0.0016409909585490823}
1   | {"Positive": 0.9954453706741333, "Negative": 0.00039307220140472054, "Neutral": 0.002452891319990158, "Mixed": 0.0017087293090298772}

Hinweise zum Beispielcode:

  • Die Datenverarbeitungsfunktion wird durch Aufruf gestartet:

    lambda_response = lambda_client.invoke(
        ...
        InvocationType='Event',
        ...
    )
    

    Der InvocationType sollte, wie oben gezeigt, „Event“ (Ereignis) sein, weil der zweite Prozess (oder Thread) asynchron sein muss und Event der einzige Typ von nicht blockierendem Aufruf ist, der durch die invoke()-Methode verfügbar ist.

  • Die Datenverarbeitungsfunktion gibt einen HTTP 200-Code zurück. Dieser HTTP 200-Code wird jedoch nicht direkt an Snowflake zurückgegeben. Snowflake sieht keinen HTTP-Code 200, bis ein GET den Status abfragt und sieht, dass die Datenverarbeitungsfunktion die Verarbeitung dieses Batches erfolgreich abgeschlossen hat.