- Introduction
- Business Goal and Machine Learning Solution
- Data Exploration
- Feature Engineering
- Data Preprocessing and the Data Pipeline
- Machine Learning Model Design and Selection
- Machine Learning Model Training and Development
- Machine Learning Model Evaluation
- Security and Privacy Considerations
- Deployment
This whitepaper details the development, training, and deployment of a machine learning model using the Chicago taxi trips dataset. The document outlines the end-to-end machine learning pipeline implemented using TensorFlow, Vertex AI, and Kubeflow.
The primary business goal is to predict taxi fare prices based on various trip attributes like duration, distance, pickup and drop-off locations, timestamp, and payment methods. This predictive capability can help improve service efficiency, pricing strategies, and customer satisfaction.
The machine learning use case is supervised regression, where the model forecasts continuous outcomes (taxi fares) based on input features.
The expected outcome of this machine learning model is to provide accurate fare predictions, enabling better insight into pricing and service strategies for taxi companies, ultimately leading to improved operational efficiency and customer experience.
Data exploration was performed using BigQuery for querying the dataset, and TensorFlow Transform (tft) for analyzing the data. Additionally, data visualization tools like Pandas and Matplotlib were used for further exploration.
- Feature Importance: Identified key features such as trip duration, trip distance, and pickup/drop-off locations that significantly influence fare prediction.
- Data Cleaning: Decided to filter out instances with missing or anomalous values to improve model robustness.
The feature store was configured using TensorFlow Transform (tft), which enabled efficient feature transformation and engineering.
Feature engineering is a crucial step in preparing the dataset for machine learning. It involves transforming raw data into meaningful features that improve the performance of the machine learning model. In this project, we performed several feature engineering tasks to convert the raw features into a suitable format for the model.
-
Handling Missing Values: Decided to filter out instances with missing or anomalous values to improve model robustness.
-
One-Hot Encoding: We applied one-hot encoding to categorical features to convert them into a numerical format. This process involves creating binary vectors that represent the presence of each category. Categorical features were divided into two groups:
- Categorical Numerical Feature
- Categorical String Features
-
Normalization: We normalized the numerical features to ensure that their wide range of values does not bias the model. Normalization helps in scaling the data to have a mean of 0 and a standard deviation of 1.
-
Feature Selection: We focused on the most relevant features for the business use case, dismissing features like user_id and product_id as they do not add business value to the prediction and may not be present at prediction time. This decision helps in simplifying the model and ensuring it generalizes well to unseen data.
Features were selected based on their relevance to fare prediction. Numerical features such as trip duration and distance directly relate to fare calculation, while categorical features provide additional context.
NUMERICAL_FEATURES = ['TripSeconds', 'TripMiles']
CATEGORICAL_NUMERICAL_FEATURES = ['PickupCommunityArea', 'DropoffCommunityArea']
CATEGORICAL_STRING_FEATURES = ['TripStartTimestamp', 'TripEndTimestamp', 'PaymentType', 'Company']
LABEL_KEY = 'Fare'
The feature engineering process involved handling missing values, one-hot encoding categorical features, normalizing numerical features, and performing feature selection. These steps were essential in preparing the data for the machine learning model, ensuring that the features are in a suitable format for training. By transforming the raw data into meaningful features and selecting the most relevant ones, we enhanced the model's ability to accurately predict taxi fares, thereby improving service efficiency and customer satisfaction.
The data preprocessing pipeline is designed to transform raw data into a format suitable for model training and serving. This pipeline ensures that the data is cleaned, transformed, and standardized, making it ready for the machine learning model. The preprocessing steps are encapsulated in a callable API to enable seamless integration with the production environment where the model will be served.
The data ingestion step loads the raw data into the pipeline using the BigQueryExampleGen
component. This component reads data from BigQuery and splits it into training, evaluation, and testing sets. The code snippet for this component is stored in dataset_bucket_demo1/components/data_ingestion.py
.
from tfx.v1.extensions.google_cloud_big_query import BigQueryExampleGen
def create_example_gen(query: str):
return BigQueryExampleGen(query=query)
Data validation is performed using the StatisticsGen
, SchemaGen
, and ExampleValidator
components. These components generate statistics, infer the schema, and validate the dataset against the schema to detect any anomalies or inconsistencies. The code snippet for this component is stored in dataset_bucket_demo1/components/data_validation.py
.
The data transformation step involves applying feature engineering techniques such as one-hot encoding for categorical features and normalization for numerical features. This is accomplished using the Transform
component, which ensures that the same transformations are applied during both training and serving. The code snippet for this component is stored in dataset_bucket_demo1/components/data_transformation.py
.
import tensorflow as tf
import tensorflow_transform as tft
def preprocessing_fn(inputs):
outputs = {}
for key in NUMERICAL_FEATURES:
outputs[t_name(key)] = inputs[key]
for key in CATEGORICAL_STRING_FEATURES:
outputs[t_name(key)] = _make_one_hot(inputs[key], key)
for key in CATEGORICAL_NUMERICAL_FEATURES:
outputs[t_name(key)] = _make_one_hot(tf.strings.strip(tf.strings.as_string(inputs[key])), key)
outputs[LABEL_KEY] = tf.cast(inputs[LABEL_KEY], tf.float32)
return outputs
The preprocessing steps are encapsulated in a function called preprocessing_fn
, which is part of the data_transformation.py
module. This function is called by the Transform
component to apply the necessary transformations to the data. The Transform
component ensures that the same preprocessing logic is applied during both training and serving, maintaining consistency and accuracy.
The preprocessed data is fed into the machine learning model using the Trainer
component. The preprocessing function is accessed by the served production model through the TFX Transform
component. This integration ensures that the model receives data in the correct format, both during training and when making predictions in production.
The data preprocessing pipeline involves multiple steps, including data ingestion, validation, and transformation. These steps are encapsulated in a callable API, enabling seamless integration with the production environment. By ensuring consistent data preprocessing during both training and serving, the pipeline contributes to the accuracy and reliability of the machine learning model.
For this project, we selected a Convolutional Neural Network (CNN) model for predicting taxi fares. The decision to use a CNN model was based on several criteria aimed at achieving high accuracy and robustness in predictions.
- Data Characteristics: The dataset contains various categorical and numerical features that can benefit from the hierarchical feature extraction capabilities of CNNs.
- Predictive Performance: CNNs are known for their ability to capture complex patterns and interactions in the data, which is essential for accurately predicting fares.
- Scalability: The model needs to handle large volumes of data efficiently. CNNs, with their parallel processing capabilities, are well-suited for this requirement.
- Previous Success: CNNs have been successfully applied in similar use cases, providing a strong justification for their selection in this project.
- Ease of Integration: The model should be easily integrable into the existing TFX pipeline, allowing for seamless data preprocessing, training, evaluation, and serving.
The model design and training process involves defining the CNN architecture, compiling the model, and training it using the transformed dataset. Key aspects of the training process include:
- Model Architecture: The CNN model consists of multiple dense layers to capture the complex relationships between the input features.
- Input layers are created based on the transformed feature specifications.
- Dense layers with ReLU activation functions are used to introduce non-linearity and learn complex patterns.
- The output layer is a single neuron that predicts the taxi fare.
- Optimizer and Learning Rate: The Adam optimizer is used for training the model due to its efficiency and adaptability in handling sparse gradients. An exponential decay schedule is applied to the learning rate, starting at 0.1 and decaying by a factor of 0.9 every 1000 steps. This helps in stabilizing the training process and improving convergence.
- Early Stopping: Early stopping is implemented to monitor the validation loss and stop training if the model's performance does not improve for 5 consecutive epochs. This prevents overfitting and saves computational resources.
- Training Steps: The model is trained for a maximum of 50,000 steps with an evaluation step every 10,000 steps. This ensures that the model is adequately trained and evaluated periodically.
- Callbacks: TensorBoard callbacks are used to monitor the training process and log metrics for visualization.
The code snippet for model design and training can be found in dataset_bucket_demo1/components/model_trainer.py
.
import tensorflow as tf
import tensorflow_transform as tft
from tensorflow_transform import TFTransformOutput
_LABEL_KEY = 'Fare'
_FEATURE_KEYS = [
"TripSeconds", "TripMiles", "PickupCommunityArea", "DropoffCommunityArea",
"TripStartTimestamp", "TripEndTimestamp", "PaymentType", "Company"
]
_BATCH_SIZE = 32
def input_fn(file_pattern, tf_transform_output, batch_size=200):
transformed_feature_spec = tf_transform_output.transformed_feature_spec().copy()
dataset = tf.data.experimental.make_batched_features_dataset(
file_pattern=file_pattern,
batch_size=batch_size,
features=transformed_feature_spec,
reader=lambda filenames: tf.data.TFRecordDataset(filenames, compression_type='GZIP'),
label_key=_LABEL_KEY,
)
return dataset
def _build_keras_model(tf_transform_output: TFTransformOutput) -> tf.keras.Model:
feature_spec = tf_transform_output.transformed_feature_spec().copy()
feature_spec.pop(_LABEL_KEY)
inputs = {
key: tf.keras.layers.Input(shape=spec.shape or [1], name=key, dtype=spec.dtype)
for key, spec in feature_spec.items()
}
flattened_inputs = [tf.keras.layers.Flatten()(input) for input in inputs.values()]
x = tf.keras.layers.Concatenate()(flattened_inputs)
for unit in [512, 256, 128, 64, 32]:
x = tf.keras.layers.Dense(unit, activation='relu')(x)
output = tf.keras.layers.Dense(1)(x)
return tf.keras.Model(inputs=inputs, outputs=output)
def run_fn(fn_args):
tf_transform_output = TFTransformOutput(fn_args.transform_output)
train_dataset = input_fn(fn_args.train_files, tf_transform_output, batch_size=_BATCH_SIZE)
eval_dataset = input_fn(fn_args.eval_files, tf_transform_output, batch_size=_BATCH_SIZE)
model = _build_keras_model(tf_transform_output)
model.compile(optimizer=tf.keras.optimizers.Adam(), loss='mean_squared_error', metrics=['mean_absolute_error'])
model.fit(train_dataset, steps_per_epoch=fn_args.train_steps, validation_data=eval_dataset, validation_steps=fn_args.eval_steps)
model.save(fn_args.serving_model_dir, save_format='tf')
The dataset was split into training (80%), validation (10%), and testing (10%) sets, ensuring a representative distribution of trips.
Training was implemented using TFX components: ExampleGen, SchemaGen, Transform, and Trainer within a Kubeflow pipeline. The Trainer component was configured to utilize Google Cloud AI Platform for scalable training.
Primary evaluation metrics were Root Mean Squared Error (RMSE) and Mean Absolute Error (MAE), with early stopping criteria based on validation loss.
Hyperparameters like learning rate and batch size were tuned using random search, optimizing for the best validation performance.
The model's bias and variance were assessed through cross-validated error analysis, tuning the model complexity to balance the tradeoff effectively.
After training and optimizing the machine learning model, it is crucial to evaluate its performance on an independent test dataset. This ensures that the model generalizes well to new, unseen data, which reflects the distribution it is expected to encounter in a production environment.
The evaluation process involves several steps:
- Evaluation Configuration: An evaluation configuration is set up to specify the evaluation metrics and slicing specifications. For this project, the primary metric used is RMSE.
- Model Resolver: A model resolver is used to ensure that the latest blessed model is selected as the baseline for comparison during evaluation. This allows for a continuous improvement cycle by comparing new models against the best-performing previously deployed models.
- Evaluator Component: The Evaluator component of TFX is used to assess the model's performance on the independent test dataset. This component computes the specified metrics and generates detailed evaluation reports.
- Independent Test Dataset: The model is evaluated on an independent test dataset that reflects the distribution of data expected in a production environment. This dataset is kept separate from the training and validation datasets to provide an unbiased assessment of the model's performance.
The primary evaluation metric for this project is RMSE. RMSE measures the average magnitude of the errors between the predicted and actual fare amounts, providing a clear indication of the model's predictive accuracy.
The evaluation results are derived from the Evaluator component and provide insights into how well the model performs on the independent test dataset. The key metric, RMSE, is used to measure the prediction accuracy. If the model has a better result in the key metric compared to the defined threshold, the Evaluator "blesses" the model, and the Pusher component registers and deploys it in a Vertex AI endpoint. The code for the Evaluator and Pusher is stored in dataset_bucket_demo1/components/model_evaluator_and_pusher.py
.
Sensitive data was securely stored in Google Cloud Storage and BigQuery, with appropriate access controls and encryption measures in place.
Sensitive fields were anonymized where necessary. De-identification techniques, such as masking and aggregation, were applied to minimize privacy risks.
The trained and evaluated model was deployed using Vertex AI, making it accessible for real-time predictions via REST API.
The deployed model is exposed as a callable API endpoint, allowing for fare predictions based on new trip data.
The deployment setup allows model customization and retraining as new data becomes available, ensuring the model remains up-to-date and accurate.
def export_serving_model(tf_transform_output, model, output_dir):
model.tft_layer = tf_transform_output.transform_features_layer()
signatures = {'serving_default': _get_tf_examples_serving_signature(model, tf_transform_output)}
model.save(output_dir, save_format='tf', signatures=signatures)