Post

How Workflow Get Scheduled via Plugins in Flyte

Reading open source code has been a recommended approach for software engineers to learn. However, in my past 8 years career, I didn’t do a good job on that. After working in a startup for 1 year, I accidentally foster the habit to read open source code XD. In this post, I would like to share one open source project I have been learning recently, and hope you would enjoy this journey as well.

I have been working in ML pipelining for a long time in Meta Ads. However, I didn’t have a comprehensive understanding across the entire stack, especially on how the underlying infra schedule the model training job and execute it over a fleet of machines. Recently, I have been exposed to an open source project: Flyte, which is a orchestrator for ML pipeline built on top of Kubernetes. I think this might be a good opportunity for me to gain some deep understanding in this area.

I have always been a believer of “Learning by Doing”. My ultimate goal on learning this open source code is to implement a simplified version of ML pipeline orchestrator on my own. Next, let’s see what problem we are going to discuss in this post.

Problem

In Flyte, we could use something called Plugin for distributed training, e.g. PyTorch Plugin. In this post, we would discuss how these plugins are getting invoked, so that the distributed training job we defined could get executed. In this post, I would simplify the discussion and only laser eye on the main flow, for other important topics such as storage, pipeline definition and compilation, availability and scalability, I plan to defer it to later posts.

High level architecture

FlytePropeller Design

The key component that is responsible for scheduling and monitoring the workflow in Flyte is called FlytePropeller. It tries to push FlyteWorkflow, which is defined as a Custom Resource Definition in k8s, to the desired state leveraging k8s reconcile mechanism. The official document of Flyte has provided a pretty good high level architecture on FlytePropeller’s design, here is a list of the core components:

  • Controller: overall brain of FlytePropeller
  • WorkQueue/WorkerPoll: where worker lives and take jobs to do, a very classic design in job scheduling system
  • WorkflowExecutor: responsible for high-level workflow operations, such as tracking the status of workflow
  • NodeExecutor: responsible for process the node within the workflow and decide the action need to take
  • NodeHandler: different type of handler to execute different type of node in the workflow, e.g. TaskHandler for execute Plugins and WorkflowHandler to execute embedded sub-workflows

Knowing what to do is one thing, and knowing how to do is another thing! Next, let’s jump into the code and see how these components are working internally and see how the logic defined within Plugin could be invoked.

Components Deep Dive

Controller

Let’s get our journey starts with the controller. Controller is the starter for FlytePropeller, it is responsible for initializing other components:

  • In the New function of controller, we would create workqueue here.
  • And then, we would create workerpool here. Note that workerpool requires the workqueue we have created before as part of its initialization (because worker needs to consume the jobs from the queue), and one PropellerHandler

As of now, all the key components we have mentioned in the high level architecture is ready. We would go deeper into them to understand how are they getting invoked.

Besides the New function, there is also a run function which plays a critical role on launching the controller. It launches things such as the workerpool, gc and metrics monitors. run function is called within another function Run, in Run, one interesting part is that it is going to leverage the leader election functionality provided by k8s and only let leader to trigger run function. We would discuss this topic more in details in a future post.

As the controller would launch workerpool, let’s then move our view to workerpool and workqueue to understand how these 2 components work.

WorkerPool/WorkQueue

The workerpool essentially is composed of workqueue and several workers, each are actually goroutines (this is also why Flyte could be pretty scalable on a single CPU, we would discuss this in the future). The Run function in workerpool is the most critical function, which is the one get invoked by controller. The main logic is the for loop here, where we launch multiple goroutines and each goroutine would make a call to runWorker function. The runWorker function is relatively simple, just an endless while loop to call processNextWorkItem function. processNextWorkItem function gets an item from the workqueue and then invokes the PropellerHandler we perviously passed in during initialization. As we could see, the key processing logic resides within PropellerHandler’s Handle function, which is defined as part of the interface here, then let’s move on and see how this Handle works.

PropellerHandler

The Handle function defined by the Propeller struct is the entry point of the reconcile process (Here Propeller has implemented the Handle interface, thus it could be considered as type Handler although there is no explicit inherit, this is how interface implementation works in Golang). The key logic is within this for loop, where we call streak function up to a max trial. The streak function would try to do a single mutation to workflow, and return the mutated workflow upon succeed, otherwise no update made if failed. The workflow here is the CRD FlyteWorkflow and the mutation operation is done via TryMutateWorkflow. TryMutateWorkflow makes calls to workflowExecutor’s HandleFlyteWorkflow function to see if we could reconcile the workflow towards it desired status. We left out other details, such as how to handle failure, how to handle aborted workflow etc. From the code in PropellerHandler, we could observer that the Handler is just doing some high-level logic and the actual workflow processing logic is delegated to workflowExecutor. Now, let’s move to workflowExecutor.

WorkflowExecutor

The HandleFlyteWorkflow function called within PropellerHandler is a router function. It invokes other actual logic function based on the status of the workflow. For example, if the workflow status is in WorkflowPhaseRunning, then it would invoke handleRunningWorkflow function. In these functions, a common pattern is that they would setup the context, invoke nodeExecutor’s RecursiveNodeHandler function to get the new status and then update the status. The new status is passed back and used to transit the workflow’s status (which is the reconcile process). Notice that the FlyteWorkflow is passed as parameters for executors.DAGStructure and executors.NodeLookup, as well as the startNode.

There is some different operation based on the new status RecursiveNodeHandler passed back. For example, if the new status is partial completed, then the workflow would be enqueue again and return running status.

The WorkflowExecutor handles the operation of workflow and decided what action to take. In ML pipeline, we know that workflow is usually composed by several nodes, and these nodes encapsulate the actual computation. Let’s take a look at nodeExecutor, which is responsible for handling this part.

NodeExecutor

The RecursiveNodeHandler function is one of the most important function in NodeExecutor. It is the entry point to execute a node within a workflow. It uses actor model and modified version of DFS to traverse the DAG and to execute non-blocked nodes. Based on different status queried based on the starter node passed from input, it applies different logic to proceed. For example, if the node status is already succeed, skipped or recovered, then it would invoke handleDownstream function; while if the node is in status that could be handled, then the key logic happens here: first, based on the node’s kind, a dedicated handler is retrieved from nodeHandlerFactory; then HandleNode function would be invoked to execute the node.

The handleDownstream is where the aforementioned modified DFS implemented. The logic is relatively straightforward: starting from the input node, we retrieve all downstream nodes; then we iterate each node and invoke the RecursiveNodeHandler function on each of them, with self as the new input start node; keep the status to check if all downstream nodes have been processed, and return the status accordingly.

The HandleNode function of nodeExecutor is also a router function, where different processing function is invoked based on the status of the current node. The most important functions are handleQueuedOrRunningNode and handleNotYetStartedNode:

  • In the handleNotYetStartedNode, the most critic logic is the call to preExecute, where we check if the node could be queued to be further processed. The checking logic is relative simple, where we check the upstream nodes are all in succeed status or not
  • In the handleQueuedOrRunningNode, we would first try to check if there are cached result given the current handler, and trigger the execute function if there is no cache hit. The core part of the execute function is to trigger the Handle function of the input NodeHandler, which is obtained from RecursiveNodeHandler and passed along the stack here, what a long journey!

Now, we have hit the most underground part of FlytePropeller’s architecture. Next, we need to dive into NodeHandler to understand how the Handle function is implemented (here we would focus on how the handler used to fulfill the operations we need in distributed training).

NodeHandler

From the section above, we know that we retrieve node handler from nodeHandlerFactory in RecursiveNodeHandler, through the GetHandler function. Here is a step by step explanation on how we trigger the logic defined within plugins:

  • The GetHandler function returns node handler based on the type of the node. Most of training job is defined via @task, which is of Task type in Flyte
  • Here is the setup of the node handler for Task type. In Flyte, all Task is treated as a dynamic node and handle through dynamic node handler. However, we would still pass a task node handler into dynamic node handler
  • In dynamic node handler’s Handle function, by default, we would make a call to handleParentNode, and in this function, we would make a call to TaskNodeHandler interface’s Handle function
  • The logic of task node handler’s Handle function is pretty complex. First, it tries to find the Plugin based on task type; then if there is no cache hit on result, it would invoke plugin
  • In WranglePluginsAndGenerateFinalList function, we get all plugins related to k8s through PluginRegistryIface interface; in task node handler, there is a data member pluginRegistry of this type, and the construction is here, where we call the PluginRegistry function from pluginMachinery module
  • For each k8s plugin, they would be wrapped within a PluginEntry, which is further wrapped in an object called NewPluginManagerWithBackOff
  • All k8s plugin would use the RegisterK8sPlugin function within the module pluginmachinery.register to register them into the system. For example, the Pytorch Plugin is registered here. However, all of these plugins actual do not provide a Handle function, which should be called by the node handler. What happened?
  • Actually, the Handle function is implemented within PluginManager. Since PluginManager implement all interface defined in pluginCore.plugin, we could treat PluginManager as a plugin to invoke (this is a class Strategy design pattern, where PluginManager defines the main logic and expressed via several step functions. And we could use composition to fulfill these step functions with different implementation)
  • Within the Handle function in PluginManager, we would check the current status, if the status is not started, then we would call launchResource, otherwise we would call getResource and checkResourcePhase to obtain new transition information
    • In launchResource function, we would call BuildResource function which is defined in the plugin. This function is used to construct a kubeflow job. Then it make a create request via kubeClient to create this resource
    • In checkResourcePhase, we would call GetTaskPhase to get the current status of the job
    • Here is the point where Flyte is leveraging kubeflow and k8s to request resource and start the training job; both kubeflow and k8s would be huge topics, and I plan to discuss more in details in separate blog

Here, we reach the end of our journey and the remaining job is delegated to k8s. What a complex flow!

Summary

In this post, we focus our discussion on how Flyte would invoke the distributed training job which is defined through plugin, we could see some common practice that is adopted in the design, such as utilization of queue and multithreading for scalability; separation of workflow executor and node executor for single responsibility principle; factory design for extensibility, etc.

In next topic, we would focus on the storage used in Flyte, which is also another critical component, as we need to store the status of the workflow, node and even intermediate result; as well as leveraging caching to speed up the execution by avoiding duplicated computation. Once we have a better understanding on the storage part, we could start to evaluate the availability, scalability and persistence of Flyte.

This post is licensed under CC BY 4.0 by the author.

Comments powered by Disqus.