A complete implementation
This chapter covers Implementing data ingestion component withTensorFlow Defining the machine learning model and submitting distributed model training jobs Implementing a single-instance model server as well as replicated model servers Building an efficient end-to-end workflow of our machine learning system
In the previous chapter of the book, we learned the basics of the four core technologies that we will use in our project: TensorFlow, Kubernetes, Kubeflow, and Argo Workflows. We learned that TensorFlow performs data processing, model building, and model evaluation. We also learned the basic concepts of Kubernetes and started our local Kubernetes cluster, which we will use as our core distributed infrastructure. In addition, we successfully submitted distributed model training jobs to the local Kubernetes cluster using Kubeflow. At the end of the last chapter, we learned how to use Argo Workflows to construct and submit a basic “hello world” workflow and a complex DAG-structured workflow.
In this chapter, we’ll implement the end-to-end machine learning system with the architecture we designed in chapter 7. We will completely implement each component, which will incorporate the previously discussed patterns. We’ll use several popular frameworks and cutting-edge technologies, particularly TensorFlow, Kubernetes, Kubeflow, Docker, and Argo Workflows, which we introduced in chapter 8 to build different components of a distributed machine learning workflow in this chapter.
9.1 Data ingestion The first component in our end-to-end workflow is data ingestion. We’ll be using the Fashion-MNIST dataset introduced in section 2.2 to build the data ingestion component. Figure 9.1 shows this component in the dark box on the left of the end-to-end workflow.
Figure 9.1 The data ingestion component (dark box) in the end-to-end machine learning system
The machine learning workflow is triggered.
Has the data been updated recently?
Recall that this dataset consists of a training set of 60,000 examples and a test set of 10,000 examples. Each example is a 28 × 28 grayscale image representing one Zalando’s article image and associated with a label from 10 classes. In addition, the Fashion-MNIST dataset is designed to serve as a direct drop-in replacement for theoriginal MNIST dataset for benchmarking machine learning algorithms. It shares the same image size and structure of training and testing splits. Figure 9.2 is a screenshot of the collection of images for all 10 classes (T-shirt/top, trouser, pullover, dress, coat, sandal, shirt, sneaker, bag, and ankle boot) from Fashion-MNIST, where each class takes three rows in the screenshot.
Figure 9.3 is a closer look at the first few example images in the training set together with their corresponding labels in text above each of the images.
In section 9.1.1, we’ll go through the implementation of a single-node data pipeline that ingests the Fashion-MNIST dataset. Furthermore, section 9.1.2 will cover the implementation of the distributed data pipeline to prepare the data for our distributed model training in section 9.2.
Figure 9.2 A screenshot of the collection of images from the Fashion-MNIST dataset for all 10 classes (T-shirt/top, trouser, pullover, dress, coat, sandal, shirt, sneaker, bag, and ankle boot)
Every three rows represent example images that represent a class. For example, the top three rows are images of T-shirts.
Figure 9.3 A closer look at the first few example images in the training set with their corresponding labels in text
9.1.1 Single-node data pipeline Let’s first take a look at how to build a single-node data pipeline that works locally on your laptop without using a local Kubernetes cluster. The best way for a machine learning program written in TensorFlow to consume data is through methods in tf.data module. The tf.data API allows users to build complex input pipelines easily. For example, the pipeline for an image model might aggregate data from files in various file systems, apply random transformations to each image, and create batches from the images for model training.
The tf.data API enables it to handle large amounts of data, read from different data formats, and perform complex transformations. It contains a tf.data.Dataset abstraction that represents a sequence of elements, in which each element consists of one or more components. Let’s use the image pipeline to illustrate this. An element in an image input pipeline might be a single training example, with a pair of tensor components representing the image and its label.
The following listing provides the code snippet to load the Fashion-MNIST dataset into a tf.data.Dataset object and performs some necessary preprocessing steps to prepare for our model training: 1 Scale the dataset from the range (0, 255] to (0., 1.]. 2 Cast the image multidimensional arrays into float32 type that our model can accept. 3 Select the training data, cache it in memory to speed up training, and shuffle it with a buffer size of 10,000.
Listing 9.1 Loading the Fashion-MNIST dataset
Note that we imported tensorflow_datasets module. The TensorFlow Datasets, which consists of a collection of datasets for various tasks such as image classification, object detection, document summarization, etc., can be used with TensorFlow and other Python machine learning frameworks.
The tf.data.Dataset object is a shuffled dataset where each element consists of the images and their labels with the shape and data type information as in the following listing.
Listing 9.2 Inspecting the tf.data object
9.1.2 Distributed data pipeline Now let’s look at how we can consume our dataset in a distributed fashion. We’ll be using tf.distribute.MultiWorkerMirroredStrategy for distributed training in the next section. Let’s assume we have instantiated a strategy object. We will instantiate our dataset inside the strategy’s scope via Python’s with syntax using the same function we previously defined for the single-node use case.
We will need to tweak a few configurations to build our distributed input pipeline. First, we create repeated batches of data where the total batch size equals the batch size per replica times the number of replicas over which gradients are aggregated. This ensures that we will have enough records to train each batch in each of the model training workers. In other words, the number of replicas in sync equals the number of devices taking part in the gradient allreduce operation during model training. For instance, when a user or the training code calls next() on the distributed data iterator, a per replica batch size of data is returned on each replica. The rebatched dataset cardinality will always be a multiple of the number of replicas.
In addition, we want to configure tf.data to enable automatic data sharding. Since the dataset is in the distributed scope, the input dataset will be sharded automatically in multiworker training mode. More specifically, each dataset will be created on the CPU device of the worker, and each set of workers will train the model on a subset of the entire dataset when tf.data.experimental.AutoShardPolicy is set to AutoShardPolicy.DATA. One benefit is that during each model training step, a global batch size of non-overlapping dataset elements will be processed by each worker. Each worker will process the whole dataset and discard the portion that is not for itself. Note that for this mode to partition the dataset elements correctly, the dataset needs to produce elements in a deterministic order, which should already be guaranteed by the TensorFlow Datasets library we use.
Listing 9.3 Configuring distributed data pipeline
9.2 Model training We went through the implementation of the data ingestion component for both local-node and distributed data pipelines and discussed how we can shard the dataset properly across different workers so that it would work with distributed model training. In this section, let’s dive into the implementation details for our model training component. An architecture diagram of the model training component can be found in figure 9.4.
Figure 9.4 A diagram of the model training component in the overall architecture. Three different model training steps are followed by a model selection step. These model training steps would train three different models—namely, CNN, CNN with dropout, and CNN with batch normalization— competing with each other for better statistical performance.
Three mode training steps train different models.
This step picks the top model that will be used in the following two separate model serving steps.
We will learn how to define those three models with TensorFlow in section 9.2.1 and execute the distributed model training jobs with Kubeflow in section 9.2.2. In section 9.2.3, we will implement the model selection step that picks the top model that will be used in the model serving component in our end-to-end machine learning workflow.
9.2.1 Model definition and single-node training Next, we’ll look at the TensorFlow code to define and initialize the first model, a convolutional neural network (CNN) model we introduced in previous chapters with three convolutional layers. We initialize the model with Sequential(), meaning we’ll add the layers sequentially. The first layer is the input layer, where we specify the shape of the input pipeline that we defined previously. Note that we also explicitly give a name to the input layer so we can pass the correct key in our inference inputs, which we will discuss in more depth in section 9.3.
After adding the input layer, three convolutional layers, followed by max-pooling layers and dense layers, are added to the sequential model. We’ll then print out a summary of the model architecture and compile the model with Adam as its optimizer, accuracy as the metric we use to evaluate the model, and sparse categorical crossentropy as the loss function.
Listing 9.4 Defining the basic CNN model
We’ve successfully defined our basic CNN model. Next, we define two models based on the CNN model. One adds a batch normalization layer to force the pre-activations to have zero mean and unit standard deviation for every neuron (activation) in a particular layer. The other model has an additional dropout layer where half of the hidden units will be dropped randomly to reduce the complexity of the model and speed up computation. The rest of the code is the same as the basic CNN model.
Listing 9.5 Defining the variations of the basic CNN model
Once the models are defined, we can train them locally on our laptops. Let’s use the basic CNN model as an example. We will create four callbacks that will be executed during model training: 1 PrintLR—Callback to print the learning rate at the end of each epoch 2 TensorBoard—Callback to start the interactive TensorBoard visualization to monitor the training progress and model architecture 3 ModelCheckpoint—Callback to save model weights for model inference later 4 LearningRateScheduler—Callback to decay the learning rate at the end of each epoch
Once these callbacks are defined, we’ll pass it to the fit() method for training. The fit() method trains the model with a specified number of epochs and steps per epoch. Note that the numbers here are for demonstration purposes only to speed up our local experiments and may not sufficiently produce a model with good quality in real-world applications.
Listing 9.6 Modeling training with callbacks
We’ll see the model training progress like the following in the logs:
Based on this summary, 93,000 parameters will be trained during the process. The shape and the number of parameters in each layer can also be found in the summary.
9.2.2 Distributed model training Now that we’ve defined our models and can train them locally in a single machine, the next step is to insert the distributed training logic in the code so that we can run model training with multiple workers using the collective communication pattern that we introduced in the book. We’ll use the tf.distribute module that contains MultiWorkerMirroredStrategy. It’s a distribution strategy for synchronous training on multiple workers. It creates copies of all variables in the model’s layers on each device across all workers. This strategy uses a distributed collective implementation (e.g., all-reduce), so multiple workers can work together to speed up training. If you don’t have appropriate GPUs, you can replace communication_options with other implementations. Since we want to ensure the distributed training can run on different machines that might not have GPUs, we’ll replace it with CollectiveCommunication.AUTO so that it will pick any available hardware automatically.
Once we define our distributed training strategy, we’ll initiate our distributed input data pipeline (as mentioned previously in section 9.1.2) and the model inside the strategy scope. Note that defining the model inside the strategy scope is required since TensorFlow knows how to copy the variables in the model’s layers to each worker adequately based on the strategy. Here we define different model types (basic CNN, CNN with dropout, and CNN with batch normalization) based on the command-line arguments we pass to this Python script. We’ll get to the rest of the flags soon. Once the data pipeline and the model are defined inside the scope, we can use fit() to train the model outside the distribution strategy scope.
Listing 9.7 Distributed model training logic
Once the model training is finished via fit() function, we want to save the model. One common mistake that users can easily make is saving models on all the workers, which may not save the completed model correctly and wastes computational resources and storage. The correct way to fix this problem is to save only the model on the chief worker. We can inspect the environment variable TF_CONFIG, which contains the cluster information, such as the task type and index, to see whether the worker is chief. Also, we want to save the model to a unique path across workers to avoid unexpected errors.
Listing 9.8 Saving a model with a chief worker
So far, we’ve seen two command-line flags already—namely, saved_model_dir and model_type. Listing 9.9 provides the rest of the main function that will parse those command-line arguments. In addition to those two arguments, there’s another checkpoint_dir argument that we will use to save our model to the TensorFlowSavedModel format that can be easily consumed for our model serving component. We will discuss that in detail in section 9.3. We also disabled the progress bar for the TensorFlow Datasets module to reduce the logs we will see.
Listing 9.9 Entry point main function
We’ve just finished writing our Python script that contains the distributed model training logic. Let’s containerize it and build the image used to run distributed training in our local Kubernetes cluster. In our Dockerfile, we’ll use the Python 3.9 base image, install TensorFlow and TensorFlow Datasets modules via pip, and copy our multiworker distributed training Python script.
Listing 9.10 Containerization
We then build the image from the Dockerfile we just defined. We also need to import the image to k3d cluster since our cluster does not have access to our local image registry. We then set the current namespace to be “kubeflow”. Please read chapter 8 and follow the instructions to install the required components we need for this project.
Listing 9.11 Building and importing the docker image
Once the worker Pods are completed, all files in the Pod will be recycled. Since we are running distributed model training across multiple workers in Kubernetes Pods, all the model checkpoints will be lost, and we don’t have a trained model for model serving. To address this problem, we’ll use PersistentVolume (PV) and PersistentVolumeClaim (PVC).
PV is a storage in the cluster that has been provisioned by an administrator or dynamically provisioned. It is a resource in the cluster, just like a node is a cluster resource. PVs are volume plugins like Volumes, but have a life cycle independent of any individual Pod that uses the PV. In other words, PVs will persist and live even after the Pods are completed or deleted.
A PVC is a request for storage by a user. It is similar to a Pod. Pods consume node resources, and PVCs consume PV resources. Pods can request specific levels of resources (CPU and memory). Claims can request specific size and access modes (e.g., they can be mounted ReadWriteOnce, ReadOnlyMany, or ReadWriteMany).
Let’s create a PVC to submit a request for storage that will be used in our worker Pods to store the trained model. Here we only submit a request for 1 Gi storage with ReadWriteOnce access mode.
Listing 9.12 Persistent volume claim
Next, we’ll create the PVC.
Listing 9.13 Creating the PVC
Next, let’s define the TFJob spec we introduced in chapter 7 with the image we just built that contains the distributed training script. We pass the necessary command arguments to the container to train the basic CNN model. The volumes field in the Worker spec specifies the name of the persistent volume claim that we just created, and the volumeMounts field in the containers spec specifies what folder to mount the files between the volume to the container. The model will be saved in the /trained_model folder inside the volume.
Listing 9.14 Distributed model training job definition
Then we can submit this TFJob to our cluster to start our distributed model training.
Listing 9.15 Submitting TFJob
Once the worker Pods are completed, we’ll notice the following logs from the Pods that indicate we trained the model in a distributed fashion and the workers communicated with each other successfully:
9.2.3 Model selection So far, we’ve implemented our distributed model training component. We’ll eventually train three different models, as mentioned in section 9.2.1, and then pick the top model for model serving. Let’s assume that we have trained those models successfully by submitting three different TFJobs with different model types.
Next, we write the Python code that loads the testing data and trained models and then evaluate their performance. We will load each trained model from different folders by keras.models.load_model() function and execute model.evaluate(), which returns the loss and accuracy. Once we find the model with the highest accuracy, we can copy the model to a new version in a different folder—namely, 4—which will be used by our model serving component.
Listing 9.16 Model evaluation
Note that the latest version, 4, in the trained_model/saved_model_versions folder will be picked up by our serving component. We will talk about that in the next section.
We then add this Python script to our Dockerfile, rebuild the container image, and create a Pod that runs the model selection component. The following is the YAML file that configures the model selection Pod.
Listing 9.17 Model selection Pod definition
When inspecting the logs, we see the third model has the highest accuracy, so we will copy it to a new version to be used for the model serving component:
9.3 Model serving Now that we have implemented our distributed model training as well as model selection among the trained models. The next component we will implement is the model serving component. The model serving component is essential to the end-user experience since the results will be shown to our users directly, and if it’s not performant enough, our users will know immediately. Figure 9.5 shows the model training component in the overall architecture.
Figure 9.5 Model serving component (dark boxes) in the end-to-end machine learning system
The results from the two model serving steps are then aggregated via a result aggregation step to present to users.
In figure 9.5, the model serving components are shown as the two dark boxes between the model selection and result aggregation steps. Let’s first implement our singleserver model inference component in section 9.3.1 and then make it more scalable and performant in section 9.3.2.
9.3.1 Single-server model inference The model inference Python code is very similar to the model evaluation code. The only difference is that we use the model.predict() method instead of evaluate() after we load the trained model. This is an excellent way to test whether the trained model can make predictions as expected.
Listing 9.18 Model prediction
Alternatively, you can start a TensorFlow Serving (https://github.com/tensorflow/serving) server locally like in the following listing once it’s installed.
Listing 9.19 TensorFlow Serving command
This seems straightforward and works well if we are only experimenting locally. However, there are more performant ways to build our model serving component that will pave our path to running distributed model serving that incorporates the replicated model server pattern that we introduced in previous chapters.
Before we dive into a better solution, let’s make sure our trained model can work with our prediction inputs, which will be a JSON-structured list of image bytes with the key “instances” and “image_bytes”, like the following:
Now is the time to modify our distributed model training code to make sure the model has the correct serving signature that’s compatible with our supplied inputs. We define the preprocessing function that does the following: 1 Decodes the images from bytes 2 Resizes the image to 28 × 28 that’s compatible with our model architecture 3 Casts the images to tf.uint8 4 Defines the input signature with string type and key as image_bytes
Once the preprocessing function is defined, we can define the serving signature via tf.TensorSpec() and then pass it to tf.saved_model.save() method to save the model that is compatible with our input format and preprocess it before TensorFlow Serving makes inference calls.
Listing 9.20 Model serving signature definitions
Once the distributed model training script is modified, we can rebuild our container image and retrain our model from scratch, following the instructions in section 9.2.2.
Next, we will use KServe, as we mentioned in the technologies overview, to create an inference service. Listing 9.21 provides the YAML to define the KServe inference service. We need to specify the model format so that KServe knows what to use for serving the model (e.g., TensorFlow Serving). In addition, we need to supply the URI to the trained model. In this case, we can specify the PVC name and the path to the trained model, following the format pvc:///.
Listing 9.21 Inference service definition
Let’s install KServe and create our inference service!
Listing 9.22 Installing KServe and creating the inference service
We can check its status to make sure it’s ready for serving.
Listing 9.23 Getting the details of the inference service
Once the service is created, we port-forward it to local so that we can send requests to it locally.
Listing 9.24 Port-forwarding the inference service
You should be able to see the following if the port-forwarding is successful:
Let’s open another terminal and execute the following Python script to send a sample inference request to our model serving service and print out the response text.
Listing 9.25 Using Python to send an inference request
The response from our KServe model serving service, which includes the predicted probabilities for each class in the Fashion-MNIST dataset, is as follows:
Alternatively, we can use curl to send requests.
Listing 9.26 Using curl to send an inference request
The output probabilities should be the same as the ones we just saw:
As mentioned previously, even though we specified the entire directory that contains the trained model in the KServe InferenceService spec, the model serving service that utilizes TensorFlow Serving will pick the latest version 4 from that particular folder, which is our best model we selected in section 9.2.3. We can observe that from the logs of the serving Pod.
Listing 9.27 Inspecting the model server logs
Here’s the logs:
9.3.2 Replicated model servers In the previous section, we successfully deployed our model serving service in our local Kubernetes cluster. This might be sufficient for running local serving experiments, but it’s far from ideal if it’s deployed to production systems that serve real world model serving traffic. The current model serving service is a single Kubernetes Pod, where the allocated computational resources are limited and requested in advance. When the number of model serving requests increases, the single-instance model server can no longer support the workloads and may run out of computational resources.
To address the problem, we need to have multiple instances of model servers to handle a larger amount of dynamic model serving requests. Fortunately, KServe can autoscale based on the average number of in-flight requests per Pod, which uses the Knative Serving autoscaler.
The following listing provides the inference service spec with autoscaling enabled. The scaleTarget field specifies the integer target value of the metric type the autoscaler watches for. In addition, the scaleMetric field defines the scaling metric type watched by autoscaler. The possible metrics are concurrency, RPS, CPU, and memory. Here we only allow one concurrent request to be processed by each inference service instance. In other words, when there are more requests, we will start a new inference service Pod to handle each additional request.
Listing 9.28 Replicated model inference services
Let’s assume there’s no request, and we should only see one inference service Pod that’s up and running. Next, let’s send traffic in 30-second spurts, maintaining five inflight requests. We use the same service hostname and ingress address, as well as the same inference input and trained model. Note that we are using the tool hey, a tiny program that sends some load to a web application. Follow the instructions at https://github.com/rakyll/hey to install it before executing the following command.
Listing 9.29 Sending traffic to test the load
The following is the expected output from the command, which includes a summary of how the inference service handled the requests. For example, the service has processed 230,160 bytes of inference inputs and 95.7483 requests per second. You can also find a nice response-time histogram and a latency distribution that might be useful:
As expected, we see five running inference service Pods processing the requests concurrently, where each Pod handles only one request.
Listing 9.30 Getting the list of model server Pods
Once the hey command is completed, we will only see one running Pod.
Listing 9.31 Getting the list of model server Pods again
9.4 The end-to-end workflow We have just implemented all the components in the previous sections. Now it’s time to put things together! In this section, we’ll define an end-to-end workflow using Argo Workflows that includes the components we just implemented. Please go back to previous sections if you are still unfamiliar with all the components and refresh your knowledge of basic Argo Workflows from chapter 8.
Here’s a recap of what the end-to-end workflow we will implement looks like. Figure 9.6 is a diagram of the end-to-end workflow that we are building. The diagram includes two model serving steps for illustration purposes, but we will only implement one step in our Argo workflow. It will autoscale to more instances based on requests traffic, as mentioned in section 9.3.2.
In the next sections, we will define the entire workflow by connecting the steps sequentially with Argo and then optimize the workflow for future executions by implementing step memoization.
Figure 9.6 An architecture diagram of the end-to-end machine learning system we are building
The machine learning workflow is triggered.
Three model training steps train different models.
This step picks the top model that will be used in the following two separate model serving steps.
The results from the two model serving steps are then aggregated via a result aggregation step to present to users.
Has the data been updated recently?
9.4.1 Sequential steps First, let’s look at the entry point templates and the main steps involved in the workflow. The entry point template name is tfjob-wf, which consists of the following steps (for simplicity, each step uses a template with the same name): 1 data-ingestion-step contains the data ingestion step, which we will use to download and preprocess the dataset before model training. 2 distributed-tf-training-steps is a step group that consists of multiple substeps, where each substep represents a distributed model training step for a specific model type. 3 model-selection-step is a step that selects the top model from among the different models we have trained in previous steps. 4 create-model-serving-service creates the model serving serve via KServe.
Listing 9.32 Workflow entry point templates
Note that we specify the podGC strategy to be OnPodSuccess since we’ll be creating a lot of Pods for different steps within our local k3s cluster with limited computational resources, so deleting the Pods right after they are successful can free up computational resources for the subsequent steps. The OnPodCompletion strategy is also available; it deletes Pods on completion regardless of whether they failed or succeeded. We won’t use that since we want to keep failed Pods to debug what went wrong.
In addition, we also specify our volumes and PVC to ensure we can persist any files that will be used in the steps. We can save the downloaded dataset into the persistent volume for model training and then persist the trained model for the subsequent model serving step.
The first step, the data ingestion step, is very straightforward. It only specifies the container image and the data ingestion Python script to execute. The Python script is a one-line code with tfds.load(name=‘fashion_mnist’) to download the dataset to the container’s local storage, which will be mounted to our persistent volume.
Listing 9.33 Data ingestion step
The next step is a step group that consists of multiple substeps, where each substep represents a distributed model training step for a specific model type (e.g., basic CNN, CNN with dropout, and CNN with batch norm). The following listing provides the template that defines all the substeps. Distributed training steps for multiple models dictate that these will be executed in parallel.
Listing 9.34 Distributed training step groups
Let’s use the first substep, which runs a distributed model training for the basic CNN model, as an example. The main content of this step template is the resource field, which includes the following: The custom resource definition (CRD) or manifest to take action upon. In our case, we create a TFJob as part of this step. The conditions that indicate whether the CRD is created successfully. In our case, we ask Argo to watch the field status.replicaStatuses.Worker.succeeded and status.replicaStatuses.Worker.failed. Inside the container spec in the TFJob definition, we specify the model type and save the trained model to a different folder so it’s easy to pick and save the best model for model serving in subsequent steps. We also want to make sure to attach the persistent volumes so the trained model can be persisted.
Listing 9.35 CNN model training step
For the rest of the substeps in distributed-tf-training-steps, the spec is very similar, except the saved model directory and model type arguments are different. The next step is model selection, for which we will supply the same container image but execute the model selection Python script we implemented earlier.
Listing 9.36 Model selection step Caption here
Make sure these additional scripts are included in your Dockerfile and that you have rebuilt the image and re-imported it to your local Kubernetes cluster.
Once the model selection step is implemented, the last step in the workflow is the model serving step that starts a KServe model inference service. It’s a resource template similar to the model training steps but with KServe’s InferenceService CRD and a success condition that applies to this specific CRD.
Listing 9.37 The model serving step
Let’s submit this workflow now!
Listing 9.38 Submitting the end-to-end workflow
Once the data ingestion step is completed, the associated Pod will be deleted. When we list the Pods again while it’s executing the distributed model training steps, we’ll see the Pods with names prefixed by tfjob-wf-f4bql-cnn-model-, which are the Pods responsible for monitoring the status of distributed model training for different model types. In addition, each model training for each model type contains two workers with the name matching the pattern multi-worker-training--worker-.
Listing 9.39 Getting the list of Pods
Once the remaining steps are completed, and the model serving has started successfully, the workflow should have a Succeeded status. We’ve just finished the execution of the end-to-end workflow.
9.4.2 Step memoization To speed up future executions of workflows, we can utilize cache and skip certain steps that have recently run. In our case, the data ingestion step can be skipped since we don’t have to download the same dataset again and again.
Let’s first take a look at the logs from our data ingestion step:
The dataset has been downloaded to a path in the container. If the path is mounted to our persistent volume, it will be available for any future workflow runs. Let’s use the step memoization feature provided by Argo Workflows to optimize our workflow.
Inside the step template, we supply the memoize field with the cache key and age of the cache. When a step is completed, a cache will be saved. When this step runs again in a new workflow, it will check whether the cache is created within the past hour. If so, this step will be skipped, and the workflow will proceed to execute subsequent steps. For our application, our dataset does not change so, theoretically, the cache should always be used, and we specify 1 hour here for demonstration purposes only. In real-world applications, you may want to adjust that according to how frequently the data is updated.
Listing 9.40 Memoization for the data ingestion step
Let’s run the workflow for the first time and pay attention to the Memoization Status field in the workflow’s node status. The cache is not hit because this is the first time the step is run.
Listing 9.41 Checking the node statuses of the workflow
If we run the same workflow again within one hour, we will notice that the step is skipped (indicated by hit: true in the Memoization Status field):
In addition, note that the Finished At and Started At timestamps are the same. That is, this step is completed instantly without having to re-execute from scratch.
All the cache in Argo Workflows is saved in a Kubernetes ConfigMap object. The cache contains the node ID, step outputs, and cache creation timestamp, as well as the timestamp when this cache is last hit.
Listing 9.42 Checking the details of the configmap
Summary The data ingestion component implements a distributed input pipeline for the Fashion-MNIST dataset with TensorFlow that makes it easy to integrate with distributed model training. Machine learning models and distributed model training logic can be defined in TensorFlow and then executed in a distributed fashion in the Kubernetes cluster with the help of Kubeflow. Both the single-instance model server and the replicated model servers can be implemented via KServe. The autoscaling functionality of KServe can automatically create additional model serving Pods to handle the increasing number of model serving requests. We implemented our end-to-end workflow that includes all the components of our system in Argo Workflows and used step memoization to avoid time-consuming and redundant data ingestion step.