Async Deployment

Understanding Async Prefect Async

Why use async in Prefect?

For our customer base we may have a usecase where we have several customers that all have the same API. If we went with original deployment strategy where the API is on a singular deployment and ran through all the customers credentials synchronously.This means in order for the next customer to start, all api calls would have to complete for the prior customer. This means several customers would not start for potentially hours after initial start.Async fixes this by utilizing a trigger deployment for each project. This trigger deployment rotates through all customers who have credentials for a specified api.For Example:Customer 1 and Customer 2 have catapult credentials. The prefect trigger deployment starts at 6am, it then grabs all the credentials, rotates through those credentials looking for all customers who have catapult credentials. It grabs Customer 1 and Customer 2. It then grabs the deployment ID and sends a start flowrun api request passing the deployment ID, and the customer creds as a payload. Now both Customer 1 and Customer 2 start around 6am avoiding synchronous delay.

Prefect API Utilization

Prefect does not have asynchronous deployments to pass in multiple customer credentials. This required us to utilize the Prefect API. The Prefect API is accessed through the Prefect python library. All of the Prefect API calls must be called within an async function. By utilizing the read_deployments call(to grab the deployment ID) and the create_flow_run_from_deployment call(to start the flow run from deployment utilizing deployment ID and credentials payload) we are able to complete our desired end goal.

Understanding Prefect Async in Code

Credentials Object

The credentials object is the input value to our main flow in our project deployment . This means all calls that require credentials must obtain them as a parameter from the main function. This requires a waterfall effect of variable passing. In our current version there is no way around this.

Understanding prefect_trigger_function.py integration

deploy_flow() func

@flow(name= 'Catapult-Flow-Trigger')
def deploy_flow():
    try:

        #< - >This line will have to be changed from project to project. the parameter 'catapult' will be changed to the project api
        input_creds = credential_management.customer_schemas('catapult')
        asyncio.run(async_main(input_creds))

    except Exception as e:
        slack_notification.slack_notifier('Something went wrong within the prefect_trigger_function.deploy_flow function!', 'prefect', True,'catapult-async', e)

This is the main flow that will be referenced in our project trigger deployment. All async functionality starts here. You will see the first step is a variable called input_creds that is instantiated calling the customer_schemas(api) function. This grabs all credentials for a specified api along with some metadata such as, customer number, and api name.*Notice you must change the string paramter given to customer_schemas function call for each project*input_creds response structure :{"F#":{"creds":{"auth_field1":"auth_cred1","auth_field2":"auth_cred2"},"api":"api_name"}}The asyncio.runarrow-up-right line starts an async function run. This will cascade into other functions that have the async definition in front of them.

credential_management.customer_schemas(api)

This function is responsible for grabbing all of the customer credentials for a particular API. This is a helper function to the prefect async functions.

I thought it best to show both customer_schemas(api) function and its helper function creds_object_construct().The instatitation of customer_creds variable in customer_schemas(api) calls the creds_object_construct() function. This stores all of the customer credentials regardless of api into this variable.The next line instantiates the variable customer_inputs. This will house all customer credentials of the specified api in the parameter section.The for loop goes through all customer credentials, sees if the customer has credentials for the API in the parameter section, if they do, append their credentials to the list along with the metadata of their customer number, and the api.

asyc_main(input_creds)

This function is responsible for calling all of the async functions. It utilizes await calls to grab necessary data.

The instantiation of the variable deployment_id calls the function get_deployment_id(deployment_name). After the function is called it returns the deployment_id of a specified deployment that is in Prefect Cloud.The next line calls the run_deployment(deployment_id, input_creds) function. This function is responsible for running flow runs from a specified deployment with each set of customer credentials for a specified api.

get_deployment_id(deployment_name)

This function is responsible for grabbing the deployment ID for a specified deployment. The parameter deployment_name is the name of a deployment you are trying to get the ID for.

The first and second line within the try statement make the call to the Prefect API. The read_deployments() API call grabs all deployments within a Prefect Cloud Workspace. All of these deployment objects are then stored into the variable deployments.The for loop goes through each deployment object. The if statement then checks to see if the name of the deployment is equal to the name passed within the parameter section. If it matches, the deployment_id variable is instantiated, populated with the deployment_id of the deployment object, and then returned.

run_deployment(deployment_id, input_creds)

This function is responsible for asynchronously deploying the Prefect flow runs. It takes in the deployment_id parameter passed from the get_deployment_id(deployment_name) function and the input_creds parameter from the customer_schemas(api) call.

The first call of the function generates the client object which is what interacts with the Prefect Cloud API.Then the for loop is called. This for loop iterates through all the customer credential objects. The first step in the for loop instantiates a variable current_customer_creds which houses the credentials, and API for the current customer. The next line then adds the 'customer' key/pair value storing the F# with the 'customer' key.current_customer_creds structure:{"customer":"F#","creds":{"auth_field1":"auth_cred1","auth_field2":"auth_cred2"},"api":"api_name"}Next the variable input is instantiated into a key/value pair of 'input' for the key and the current_customer_creds variable as the value. This is necessary to pass it to the input parameter in the main flow in the project deployment that is being triggered.Next the variable flow_run is instantiated, this calls the create_flow_run_from_deployment(deployment_id, parameters=input) API call which then triggers the project deployment with the current customers credentials. The value stored in the variable is the response from the call.All the subsequent lines are notifications regarding the flow run that just happened.

Passing Credentials from Main Flow in project Deployment

As stated with this async implementation, the credentials of a customer will be passed as a parameter to the main flow. This means in order for a subflow or task of the main flow to access the credentials they must be passed as a parameter.

The code snippit above is for the catapult project main flow. As seen the main flow has the parameter input. the variable credentials_obj is then instantiated with its value being the parameter value input. Just like the structure example above the credentials_obj variable has a structure of:{"customer":"F#","creds":{"auth_field1":"auth_cred1","auth_field2":"auth_cred2"},"api":"api_name"}Notice that all subsequent function calls take the credentials_obj as a parameter. I hope to find a better solution for this in the future, but for now this is how the credentials must be passed to accomplish async functionality.

Last updated