Beispiel für einen asynchronen Remotedienst für AWS

Unter diesem Thema wird ein Beispiel für eine asynchrone AWS Lambda-Funktion (Remotedienst) bereitgestellt. Sie können diese Beispielfunktion erstellen, indem Sie die gleichen Schritte wie unter Schritt 1: Remotedienst (AWS Lambda-Funktion) in der Management Console erstellen beschrieben ausführen.

Unter diesem Thema:

Übersicht zum Code

Dieser Abschnitt der Dokumentation enthält Informationen zum Erstellen einer asynchronen externen Funktion auf AWS. (Bevor Sie Ihre erste asynchrone externe Funktion implementieren, sollten Sie zunächst den Konzeptionellen Überblick zu asynchronen externen Funktionen lesen.)

Auf AWS müssen asynchrone Remotedienste die folgenden Einschränkungen überwinden:

  • Da es sich bei HTTP POST und GET um separate Anforderungen handelt, muss der Remotedienst Informationen über den von der POST-Anforderung gestarteten Workflow aufbewahren, damit der Status später von der GET-Anforderung abgefragt werden kann.

    Typischerweise ruft jeder HTTP POST und HTTP GET eine separate Instanz der Handlerfunktion(en) in einem separaten Prozess oder Thread auf. Die einzelnen Instanzen teilen sich den Speicher nicht. Damit der GET-Handler den Status oder die verarbeiteten Daten lesen kann, muss der GET-Handler auf eine gemeinsame Speicherressource zugreifen, die auf AWS verfügbar ist.

  • Die einzige Möglichkeit für den POST-Handler, den anfänglichen HTTP 202-Antwortcode zu senden, ist über eine return-Anweisung (oder eine gleichwertige Anweisung), die die Ausführung des Handlers beendet. Daher muss der POST-Handler vor der Rückgabe von HTTP 202 einen unabhängigen Prozess (oder Thread) starten, um die eigentliche Datenverarbeitung des Remotedienstes durchzuführen. Dieser unabhängige Prozess benötigt typischerweise Zugriff auf den Speicher, der für den GET-Handler sichtbar ist.

Um diese Einschränkung für einen asynchronen Remotedienst zu überwinden, können beispielsweise drei Prozesse (oder Threads) und ein gemeinsamer Speicher verwendet werden:

Illustration of processes for an Asynchronous Remote Service

Im folgenden Modell haben die Prozesse die folgenden Aufgaben:

  • Der HTTP-POST-Handler:

    • Liest die Eingangsdaten. Bei einer Lambda-Funktion wird der Textbereich des Eingabeparameters event der Handler-Funktion ausgelesen.

    • Liest die Batch-ID. Bei einer Lambda-Funktion wird die Kopfzeile des Eingabeparameters event gelesen.

    • Startet den Datenverarbeitungsprozess und übergibt ihm die Daten und die Batch-ID. Die Daten werden normalerweise beim Aufruf übergeben, sie können aber auch durch Schreiben in einen externen Speicher übergeben werden.

    • Erfasst die Batch-ID im gemeinsamen Speicher, auf den sowohl der Datenverarbeitungsprozess als auch der HTTP-GET-Handler-Prozess zugreifen können.

    • Erfasst bei Bedarf, dass die Verarbeitung dieser Batches noch nicht abgeschlossen ist.

    • Gibt HTTP 202 zurück, wenn kein Fehler erkannt wurde.

  • Der Datenverarbeitungscode:

    • Liest die Eingangsdaten.

    • Verarbeitet die Daten.

    • Stellt das Ergebnis für den GET-Handler zur Verfügung (entweder durch Schreiben der Ergebnisdaten in den gemeinsamen Speicher oder durch Bereitstellung einer API, über den die Ergebnisse abgefragt werden können).

    • Aktualisiert in der Regel den Status dieses Batches (z. B. von IN_PROGRESS in SUCCESS), um anzuzeigen, dass die Ergebnisse zum Lesen bereit sind.

    • Beendet den Prozess. Optional kann dieser Prozess einen Fehlerindikator zurückgeben. Snowflake sieht dies nicht direkt (Snowflake sieht nur den HTTP-Rückgabecode von POST-Handler und GET-Handler), aber die Rückgabe eines Fehlerindikators aus dem Datenverarbeitungsprozess könnte bei der Fehlersuche hilfreich sein.

  • Der GET-Handler:

    • Liest die Batch-ID. Bei einer Lambda-Funktion wird die Kopfzeile des Eingabeparameters event gelesen.

    • Liest den Speicher, um den aktuellen Status dieses Batches zu erhalten (z. B. IN_PROGRESS oder SUCCESS).

    • Wenn die Verarbeitung noch nicht abgeschlossen ist, dann wird 202 zurückgegeben.

    • Wenn die Verarbeitung erfolgreich abgeschlossen ist, dann:

      • Liest die Ergebnisse.

      • Bereinigt den Speicher.

      • Gibt die Ergebnisse zusammen mit HTTP-Code 200 zurück.

    • Wenn der gespeicherte Status einen Fehler anzeigt, dann:

      • Bereinigt den Speicher.

      • Gibt einen Fehlercode zurück.

    Beachten Sie, dass der GET-Handler für einen Batch möglicherweise mehrfach aufgerufen wird, wenn die Verarbeitung so lange dauert, dass mehrere HTTP-GET-Anforderungen gesendet werden.

Von diesem Modell gibt es viele mögliche Varianten. Beispiel:

  • Die Batch-ID und der Status könnten am Anfang des Datenverarbeitungsprozesses geschrieben werden und nicht am Ende des POST-Prozesses.

  • Die Datenverarbeitung kann in einer separaten Funktion (z. B. einer separaten Lambda-Funktion) oder sogar als komplett separater Dienst erfolgen.

  • Der Datenverarbeitungscode muss nicht unbedingt in den gemeinsamen Speicher schreiben. Stattdessen können die verarbeiteten Daten auf andere Weise zur Verfügung gestellt werden. Eine API kann z. B. die Batch-ID als Parameter annehmen und die Daten zurückgeben.

Der Implementierungscode sollte die Möglichkeit in Betracht ziehen, dass die Verarbeitung zu lange dauert oder fehlschlägt, und daher müssen alle Teilergebnisse bereinigt werden, um nicht unnötig Speicherplatz zu verbrauchen.

Der Speichermechanismus muss über mehrere Prozesse (oder Threads) hinweg gemeinsam genutzt werden können. Mögliche Speichermechanismen sind:

  • Von AWS bereitgestellte Speichermechanismen, wie z. B.:

  • Speicher, der sich außerhalb von AWS befindet, aber von AWS aus zugänglich ist.

Der Code für jeden der obigen drei Prozesse kann als separate Lambda-Funktion geschrieben werden (eine für den POST-Handler, eine für die Datenverarbeitungsfunktion und eine für den GET-Handler), oder als eine einzige Funktion, die auf verschiedene Weise aufgerufen werden kann.

Der folgende Python-Beispielcode ist eine einzelne Lambda-Funktion, die separat für den POST-, den Datenverarbeitungs- und den GET-Prozess aufgerufen werden kann.

Beispielcode

Dieser Code zeigt eine Beispielabfrage mit Ausgabe. Der Fokus in diesem Beispiel liegt auf den drei Prozessen und wie sie interagieren, nicht auf dem gemeinsamen Speichermechanismus (DynamoDB) oder die Datentransformation (Standpunktanalyse). Der Code ist so strukturiert, dass es einfach ist, den Beispielspeichermechanismus und die Datentransformation durch andere zu ersetzen.

Der Einfachheit halber folgendes Beispiel:

  • Einige wichtige Werte (z. B. die AWS-Region) werden hart kodiert.

  • Setzt das Vorhandensein einiger Ressourcen voraus (z. B. die Tabelle „Jobs“ in Dynamo).

import json
import time
import boto3

HTTP_METHOD_STRING = "httpMethod"
HEADERS_STRING = "headers"
BATCH_ID_STRING = "sf-external-function-query-batch-id"
DATA_STRING = "data"
REGION_NAME = "us-east-2"

TABLE_NAME = "Jobs"
IN_PROGRESS_STATUS = "IN_PROGRESS"
SUCCESS_STATUS = "SUCCESS"

def lambda_handler(event, context):
    # this is called from either the GET or POST
    if (HTTP_METHOD_STRING in event):
        method = event[HTTP_METHOD_STRING]
        if method == "POST":
            return initiate(event, context)
        elif method == "GET":
            return poll(event, context)
        else:
            return create_response(400, "Function called from invalid method")

    # if not called from GET or POST, then this lambda was called to
    # process data
    else:
        return process_data(event, context)


# Reads batch_ID and data from the request, marks the batch_ID as being processed, and
# starts the processing service.
def initiate(event, context):
    batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
    data = json.loads(event["body"])[DATA_STRING]

    lambda_name = context.function_name

    write_to_storage(batch_id, IN_PROGRESS_STATUS, "NULL")
    lambda_response = invoke_process_lambda(batch_id, data, lambda_name)

    # lambda response returns 202, because we are invoking it with
    # InvocationType = 'Event'
    if lambda_response["StatusCode"] != 202:
        response = create_response(400, "Error in inititate: processing lambda not started")
    else:
        response = {
            'statusCode': lambda_response["StatusCode"]
        }

    return response


# Processes the data passed to it from the POST handler. In this example,
# the processing is to perform sentiment analysis on text.
def process_data(event, context):
    data = event[DATA_STRING]
    batch_id = event[BATCH_ID_STRING]

    def process_data_impl(data):
        comprehend = boto3.client(service_name='comprehend', region_name=REGION_NAME)
        # create return rows
        ret = []
        for i in range(len(data)):
            text = data[i][1]
            sentiment_response = comprehend.detect_sentiment(Text=text, LanguageCode='en')
            sentiment_score = json.dumps(sentiment_response['SentimentScore'])
            ret.append([i, sentiment_score])
        return ret

    processed_data = process_data_impl(data)
    write_to_storage(batch_id, SUCCESS_STATUS, processed_data)

    return create_response(200, "No errors in process")


# Repeatedly checks on the status of the batch_ID, and returns the result after the
# processing has been completed.
def poll(event, context):
    batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
    processed_data = read_data_from_storage(batch_id)

    def parse_processed_data(response):
        # in this case, the response is the response from DynamoDB
        response_metadata = response['ResponseMetadata']
        status_code = response_metadata['HTTPStatusCode']

        # Take action depending on item status
        item = response['Item']
        job_status = item['status']
        if job_status == SUCCESS_STATUS:
            # the row number is stored at index 0 as a Decimal object,
            # we need to convert it into a normal int to be serialized to JSON
            data = [[int(row[0]), row[1]] for row in item['data']]
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'data': data
                })
            }
        elif job_status == IN_PROGRESS_STATUS:
            return {
                'statusCode': 202,
                "body": "{}"
            }
        else:
            return create_response(500, "Error in poll: Unknown item status.")

    return parse_processed_data(processed_data)


def create_response(code, msg):
    return {
        'statusCode': code,
        'body': msg
    }


def invoke_process_lambda(batch_id, data, lambda_name):
    # Create payload to be sent to processing lambda
    invoke_payload = json.dumps({
        BATCH_ID_STRING: batch_id,
        DATA_STRING: data
    })

    # Invoke processing lambda asynchronously by using InvocationType='Event'.
    # This allows the processing to continue while the POST handler returns HTTP 202.
    lambda_client = boto3.client('lambda', region_name=REGION_NAME,)
    lambda_response = lambda_client.invoke(
        FunctionName=lambda_name,
        InvocationType='Event',
        Payload=invoke_payload
    )
    # returns 202 on success if InvocationType = 'Event'
    return lambda_response


def write_to_storage(batch_id, status, data):
    # we assume that the table has already been created
    client = boto3.resource('dynamodb')
    table = client.Table(TABLE_NAME)

    # Put in progress item in table
    item_to_store = {
        'batch_id': batch_id,
        'status': status,
        'data': data,
        'timestamp': "{}".format(time.time())
    }
    db_response = table.put_item(
        Item=item_to_store
    )


def read_data_from_storage(batch_id):
    # we assume that the table has already been created
    client = boto3.resource('dynamodb')
    table = client.Table(TABLE_NAME)

    response = table.get_item(Key={'batch_id': batch_id},
                          ConsistentRead=True)
    return response
Copy

Beispielaufruf und Ausgabe

Hier ist ein Beispielaufruf der asynchronen externen Funktion zusammen mit einer Beispielausgabe, einschließlich der Ergebnisse der Standpunktanalyse:

create table test_tb(a string);
insert into test_tb values
    ('hello world'),
    ('I am happy');
select ext_func_async(a) from test_tb;

Row | EXT_FUNC_ASYNC(A)
0   | {"Positive": 0.47589144110679626, "Negative": 0.07314028590917587, "Neutral": 0.4493273198604584, "Mixed": 0.0016409909585490823}
1   | {"Positive": 0.9954453706741333, "Negative": 0.00039307220140472054, "Neutral": 0.002452891319990158, "Mixed": 0.0017087293090298772}
Copy

Hinweise zum Beispielcode

  • Die Datenverarbeitungsfunktion wird durch folgenden Aufruf gestartet:

    lambda_response = lambda_client.invoke(
        ...
        InvocationType='Event',
        ...
    )
    
    Copy

    Der InvocationType sollte, wie oben gezeigt, „Event“ (Ereignis) sein, weil der zweite Prozess (oder Thread) asynchron sein muss und Event der einzige Typ von nicht blockierendem Aufruf ist, der durch die invoke()-Methode verfügbar ist.

  • Die Datenverarbeitungsfunktion gibt einen HTTP 200-Code zurück. Dieser HTTP 200-Code wird jedoch nicht direkt an Snowflake zurückgegeben. Snowflake sieht keinen HTTP-Code 200, bis ein GET den Status abfragt und sieht, dass die Datenverarbeitungsfunktion die Verarbeitung dieses Batches erfolgreich abgeschlossen hat.