Integrate Ray AIR with Feast feature store¶
# !pip install feast==0.20.1 ray[air]>=1.13 xgboost_ray
In this example, we showcase how to use Ray AIR with Feast feature store, leveraging both historical features for training a model and online features for inference.
The task is adapted from Feast credit scoring tutorial. In this example, we train a xgboost model and run some prediction on an incoming loan request to see if it is approved or rejected.
Let’s first set up our workspace and prepare the data to work with.
import os
WORKING_DIR = os.path.expanduser("~/ray-air-feast-example/")
%env WORKING_DIR=$WORKING_DIR
! mkdir -p $WORKING_DIR
! wget --no-check-certificate https://github.com/ray-project/air-sample-data/raw/main/air-feast-example.zip
! unzip air-feast-example.zip
! mv air-feast-example/* $WORKING_DIR
%cd $WORKING_DIR
--2022-06-02 14:22:50-- https://github.com/ray-project/air-sample-data/raw/main/air-feast-example.zip
Resolving github.com (github.com)... 192.30.255.113
Connecting to github.com (github.com)|192.30.255.113|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/ray-project/air-sample-data/main/air-feast-example.zip [following]
--2022-06-02 14:22:50-- https://raw.githubusercontent.com/ray-project/air-sample-data/main/air-feast-example.zip
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.111.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 23715107 (23M) [application/zip]
Saving to: ‘air-feast-example.zip’
air-feast-example.z 100%[===================>] 22.62M 114MB/s in 0.2s
2022-06-02 14:22:51 (114 MB/s) - ‘air-feast-example.zip’ saved [23715107/23715107]
Archive: air-feast-example.zip
creating: air-feast-example/
creating: air-feast-example/feature_repo/
inflating: air-feast-example/feature_repo/.DS_Store
extracting: air-feast-example/feature_repo/__init__.py
inflating: air-feast-example/feature_repo/features.py
creating: air-feast-example/feature_repo/data/
inflating: air-feast-example/feature_repo/data/.DS_Store
inflating: air-feast-example/feature_repo/data/credit_history_sample.csv
inflating: air-feast-example/feature_repo/data/zipcode_table_sample.csv
inflating: air-feast-example/feature_repo/data/credit_history.parquet
inflating: air-feast-example/feature_repo/data/zipcode_table.parquet
inflating: air-feast-example/feature_repo/feature_store.yaml
inflating: air-feast-example/.DS_Store
creating: air-feast-example/data/
inflating: air-feast-example/data/loan_table.parquet
inflating: air-feast-example/data/loan_table_sample.csv
! ls
data feature_repo
There is already a feature repository set up in feature_repo/
. It isn’t necessary to create a new feature repository, but it can be done using the following command: feast init -t local feature_repo
.
Now let’s take a look at the schema in Feast feature store, which is defined by feature_repo/features.py
. There are mainly two features: zipcode_feature and credit_history, both are generated from parquet files - feature_repo/data/zipcode_table.parquet
and feature_repo/data/credit_history.parquet
.
!pygmentize feature_repo/features.py
from datetime import timedelta
from feast import (Entity, Field, FeatureView, FileSource, ValueType)
from feast.types import Float32, Int64, String
zipcode = Entity(name="zipcode", value_type=Int64)
zipcode_source = FileSource(
path="feature_repo/data/zipcode_table.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)
zipcode_features = FeatureView(
name="zipcode_features",
entities=["zipcode"],
ttl=timedelta(days=3650),
schema=[
Field(name="city", dtype=String),
Field(name="state", dtype=String),
Field(name="location_type", dtype=String),
Field(name="tax_returns_filed", dtype=Int64),
Field(name="population", dtype=Int64),
Field(name="total_wages", dtype=Int64),
],
source=zipcode_source,
)
dob_ssn = Entity(
name="dob_ssn",
value_type=ValueType.STRING,
description="Date of birth and last four digits of social security number",
)
credit_history_source = FileSource(
path="feature_repo/data/credit_history.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)
credit_history = FeatureView(
name="credit_history",
entities=["dob_ssn"],
ttl=timedelta(days=90),
schema=[
Field(name="credit_card_due", dtype=Int64),
Field(name="mortgage_due", dtype=Int64),
Field(name="student_loan_due", dtype=Int64),
Field(name="vehicle_loan_due", dtype=Int64),
Field(name="hard_pulls", dtype=Int64),
Field(name="missed_payments_2y", dtype=Int64),
Field(name="missed_payments_1y", dtype=Int64),
Field(name="missed_payments_6m", dtype=Int64),
Field(name="bankruptcies", dtype=Int64),
],
source=credit_history_source,
)
Deploy the above defined feature store by running apply
from within the feature_repo/ folder.
! (cd feature_repo && feast apply)
Feast is an open source project that collects anonymized error reporting and usage statistics. To opt out or learn more see https://docs.feast.dev/reference/usage
Created entity dob_ssn
Created entity zipcode
Created feature view credit_history
Created feature view zipcode_features
Created sqlite table feature_repo_credit_history
Created sqlite table feature_repo_zipcode_features
import feast
fs = feast.FeatureStore(repo_path="feature_repo")
Generate training data¶
On top of the features in Feast, we also have labeled training data at data/loan_table.parquet
. At the time of training, loan table will be passed into Feast as an entity dataframe for training data generation. Feast will intelligently join credit_history and zipcode_feature tables to create relevant feature vectors to augment the training data.
import pandas as pd
loan_df = pd.read_parquet("data/loan_table.parquet")
display(loan_df)
loan_id | dob_ssn | zipcode | person_age | person_income | person_home_ownership | person_emp_length | loan_intent | loan_amnt | loan_int_rate | loan_status | event_timestamp | created_timestamp | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 10000 | 19530219_5179 | 76104 | 22 | 59000 | RENT | 123.0 | PERSONAL | 35000 | 16.02 | 1 | 2021-08-25 20:34:41.361000+00:00 | 2021-08-25 20:34:41.361000+00:00 |
1 | 10001 | 19520816_8737 | 70380 | 21 | 9600 | OWN | 5.0 | EDUCATION | 1000 | 11.14 | 0 | 2021-08-25 20:16:20.128000+00:00 | 2021-08-25 20:16:20.128000+00:00 |
2 | 10002 | 19860413_2537 | 97039 | 25 | 9600 | MORTGAGE | 1.0 | MEDICAL | 5500 | 12.87 | 1 | 2021-08-25 19:57:58.896000+00:00 | 2021-08-25 19:57:58.896000+00:00 |
3 | 10003 | 19760701_8090 | 63785 | 23 | 65500 | RENT | 4.0 | MEDICAL | 35000 | 15.23 | 1 | 2021-08-25 19:39:37.663000+00:00 | 2021-08-25 19:39:37.663000+00:00 |
4 | 10004 | 19830125_8297 | 82223 | 24 | 54400 | RENT | 8.0 | MEDICAL | 35000 | 14.27 | 1 | 2021-08-25 19:21:16.430000+00:00 | 2021-08-25 19:21:16.430000+00:00 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
28633 | 38633 | 19491126_1487 | 43205 | 57 | 53000 | MORTGAGE | 1.0 | PERSONAL | 5800 | 13.16 | 0 | 2020-08-25 21:48:06.292000+00:00 | 2020-08-25 21:48:06.292000+00:00 |
28634 | 38634 | 19681208_6537 | 24872 | 54 | 120000 | MORTGAGE | 4.0 | PERSONAL | 17625 | 7.49 | 0 | 2020-08-25 21:29:45.059000+00:00 | 2020-08-25 21:29:45.059000+00:00 |
28635 | 38635 | 19880422_2592 | 68826 | 65 | 76000 | RENT | 3.0 | HOMEIMPROVEMENT | 35000 | 10.99 | 1 | 2020-08-25 21:11:23.826000+00:00 | 2020-08-25 21:11:23.826000+00:00 |
28636 | 38636 | 19901017_6108 | 92014 | 56 | 150000 | MORTGAGE | 5.0 | PERSONAL | 15000 | 11.48 | 0 | 2020-08-25 20:53:02.594000+00:00 | 2020-08-25 20:53:02.594000+00:00 |
28637 | 38637 | 19960703_3449 | 69033 | 66 | 42000 | RENT | 2.0 | MEDICAL | 6475 | 9.99 | 0 | 2020-08-25 20:34:41.361000+00:00 | 2020-08-25 20:34:41.361000+00:00 |
28638 rows × 13 columns
feast_features = [
"zipcode_features:city",
"zipcode_features:state",
"zipcode_features:location_type",
"zipcode_features:tax_returns_filed",
"zipcode_features:population",
"zipcode_features:total_wages",
"credit_history:credit_card_due",
"credit_history:mortgage_due",
"credit_history:student_loan_due",
"credit_history:vehicle_loan_due",
"credit_history:hard_pulls",
"credit_history:missed_payments_2y",
"credit_history:missed_payments_1y",
"credit_history:missed_payments_6m",
"credit_history:bankruptcies",
]
loan_w_offline_feature = fs.get_historical_features(
entity_df=loan_df, features=feast_features
).to_df()
# Drop some unnecessary columns for simplicity
loan_w_offline_feature = loan_w_offline_feature.drop(["event_timestamp", "created_timestamp__", "loan_id", "zipcode", "dob_ssn"], axis=1)
Now let’s take a look at the training data as it is augmented by Feast.
display(loan_w_offline_feature)
person_age | person_income | person_home_ownership | person_emp_length | loan_intent | loan_amnt | loan_int_rate | loan_status | city | state | ... | total_wages | credit_card_due | mortgage_due | student_loan_due | vehicle_loan_due | hard_pulls | missed_payments_2y | missed_payments_1y | missed_payments_6m | bankruptcies | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1358886 | 55 | 24543 | RENT | 3.0 | VENTURE | 4000 | 13.92 | 0 | SLIDELL | LA | ... | 315061217 | 1777 | 690650 | 46372 | 10439 | 5 | 1 | 2 | 1 | 0 |
1358815 | 58 | 20000 | RENT | 0.0 | EDUCATION | 4000 | 9.99 | 0 | CHOUTEAU | OK | ... | 59412230 | 1791 | 462670 | 19421 | 3583 | 8 | 7 | 1 | 0 | 2 |
1353348 | 64 | 24000 | RENT | 1.0 | MEDICAL | 3000 | 6.99 | 0 | BISMARCK | ND | ... | 469621263 | 5917 | 1780959 | 11835 | 27910 | 8 | 3 | 2 | 1 | 0 |
1354200 | 55 | 34000 | RENT | 0.0 | DEBTCONSOLIDATION | 12000 | 6.92 | 1 | SANTA BARBARA | CA | ... | 24537583 | 8091 | 364271 | 30248 | 22640 | 2 | 7 | 3 | 0 | 0 |
1354271 | 51 | 74628 | MORTGAGE | 3.0 | PERSONAL | 3000 | 13.49 | 0 | HUNTINGTON BEACH | CA | ... | 19749601 | 3679 | 1659968 | 37582 | 20284 | 0 | 1 | 0 | 0 | 0 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
674285 | 23 | 74000 | RENT | 3.0 | MEDICAL | 25000 | 10.36 | 1 | MANSFIELD | MO | ... | 33180988 | 5176 | 1089963 | 44642 | 2877 | 1 | 6 | 1 | 0 | 0 |
668250 | 21 | 200000 | MORTGAGE | 2.0 | DEBTCONSOLIDATION | 25000 | 13.99 | 0 | SALISBURY | MD | ... | 470634058 | 5297 | 1288915 | 22471 | 22630 | 0 | 5 | 2 | 1 | 0 |
668321 | 24 | 200000 | MORTGAGE | 3.0 | VENTURE | 24000 | 7.49 | 0 | STRUNK | KY | ... | 10067358 | 6549 | 22399 | 11806 | 13005 | 0 | 1 | 0 | 0 | 0 |
670025 | 23 | 215000 | MORTGAGE | 7.0 | MEDICAL | 35000 | 14.79 | 0 | HAWTHORN | PA | ... | 5956835 | 9079 | 876038 | 4556 | 21588 | 0 | 1 | 0 | 0 | 0 |
2034006 | 22 | 59000 | RENT | 123.0 | PERSONAL | 35000 | 16.02 | 1 | FORT WORTH | TX | ... | 142325465 | 8419 | 91803 | 22328 | 15078 | 0 | 1 | 0 | 0 | 0 |
28638 rows × 23 columns
# Convert into Train and Validation datasets.
import ray
loan_ds = ray.data.from_pandas(loan_w_offline_feature)
train_ds, validation_ds = loan_ds.split_proportionately([0.8])
Define Preprocessors¶
Preprocessor does last mile processing on Ray Datasets before feeding into training model.
categorical_features = [
"person_home_ownership",
"loan_intent",
"city",
"state",
"location_type",
]
from ray.ml.preprocessors import Chain, OrdinalEncoder, SimpleImputer
imputer = SimpleImputer(categorical_features, strategy="most_frequent")
encoder = OrdinalEncoder(columns=categorical_features)
chained_preprocessor = Chain(imputer, encoder)
Train XGBoost model using Ray AIR Trainer¶
Ray AIR provides a variety of Trainers that are integrated with popular machine learning frameworks. You can train a distributed model at scale leveraging Ray using the intuitive API trainer.fit()
. The output is a Ray AIR Checkpoint, that will seamlessly transfer the workload from training to prediction. Let’s take a look!
LABEL = "loan_status"
CHECKPOINT_PATH = "checkpoint"
NUM_WORKERS = 1 # Change this based on the resources in the cluster.
from ray.ml.train.integrations.xgboost import XGBoostTrainer
params = {
"tree_method": "approx",
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
}
trainer = XGBoostTrainer(
scaling_config={
"num_workers": NUM_WORKERS,
"use_gpu": 0,
},
label_column=LABEL,
params=params,
datasets={"train": train_ds, "validation": validation_ds},
preprocessor=chained_preprocessor,
num_boost_round=100,
)
checkpoint = trainer.fit().checkpoint
# This saves the checkpoint onto disk
checkpoint.to_directory(CHECKPOINT_PATH)
/home/ray/anaconda3/lib/python3.8/site-packages/xgboost_ray/main.py:131: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead.
XGBOOST_LOOSE_VERSION = LooseVersion(xgboost_version)
E0602 14:26:17.861773834 4511 fork_posix.cc:76] Other threads are currently calling into gRPC, skipping fork() handlers
/home/ray/anaconda3/lib/python3.8/site-packages/tqdm/auto.py:22: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
from .autonotebook import tqdm as notebook_tqdm
Current time: 2022-06-02 14:26:33 (running for 00:00:14.95)
Memory usage on this node: 3.5/30.6 GiB
Using FIFO scheduling algorithm.
Resources requested: 0/8 CPUs, 0/0 GPUs, 0.0/18.04 GiB heap, 0.0/9.02 GiB objects
Result logdir: /home/ray/ray_results/XGBoostTrainer_2022-06-02_14-26-17
Number of trials: 1/1 (1 TERMINATED)
Trial name | status | loc | iter | total time (s) | train-logloss | train-error | validation-logloss |
---|---|---|---|---|---|---|---|
XGBoostTrainer_a3a2c_00000 | TERMINATED | 172.31.71.98:12634 | 100 | 11.9561 | 0.0578837 | 0.0127019 | 0.225994 |
(GBDTTrainable pid=12634) 2022-06-02 14:26:23,018 INFO main.py:980 -- [RayXGBoost] Created 1 new actors (1 total actors). Waiting until actors are ready for training.
(GBDTTrainable pid=12634) 2022-06-02 14:26:25,230 INFO main.py:1025 -- [RayXGBoost] Starting XGBoost training.
(GBDTTrainable pid=12634) E0602 14:26:25.231635524 12691 fork_posix.cc:76] Other threads are currently calling into gRPC, skipping fork() handlers
(_RemoteRayXGBoostActor pid=12769) [14:26:25] task [xgboost.ray]:139712002042896 got new rank 0
Result for XGBoostTrainer_a3a2c_00000:
date: 2022-06-02_14-26-26
done: false
experiment_id: 14b63d641b8e4583b3551b1af113ec6d
hostname: ip-172-31-71-98
iterations_since_restore: 1
node_ip: 172.31.71.98
pid: 12634
should_checkpoint: true
time_since_restore: 5.432286262512207
time_this_iter_s: 5.432286262512207
time_total_s: 5.432286262512207
timestamp: 1654205186
timesteps_since_restore: 0
train-error: 0.09502400698384984
train-logloss: 0.5147884634112437
training_iteration: 1
trial_id: a3a2c_00000
validation-error: 0.1627094972067039
validation-logloss: 0.5557328870197414
warmup_time: 0.004194974899291992
Result for XGBoostTrainer_a3a2c_00000:
date: 2022-06-02_14-26-31
done: false
experiment_id: 14b63d641b8e4583b3551b1af113ec6d
hostname: ip-172-31-71-98
iterations_since_restore: 81
node_ip: 172.31.71.98
pid: 12634
should_checkpoint: true
time_since_restore: 10.460175275802612
time_this_iter_s: 0.03925156593322754
time_total_s: 10.460175275802612
timestamp: 1654205191
timesteps_since_restore: 0
train-error: 0.01802706241815801
train-logloss: 0.07058723671627426
training_iteration: 81
trial_id: a3a2c_00000
validation-error: 0.0824022346368715
validation-logloss: 0.22556984905196217
warmup_time: 0.004194974899291992
(GBDTTrainable pid=12634) 2022-06-02 14:26:32,801 INFO main.py:1516 -- [RayXGBoost] Finished XGBoost training on training data with total N=22,910 in 9.81 seconds (7.57 pure XGBoost training time).
Result for XGBoostTrainer_a3a2c_00000:
date: 2022-06-02_14-26-32
done: true
experiment_id: 14b63d641b8e4583b3551b1af113ec6d
experiment_tag: '0'
hostname: ip-172-31-71-98
iterations_since_restore: 100
node_ip: 172.31.71.98
pid: 12634
should_checkpoint: true
time_since_restore: 11.956074953079224
time_this_iter_s: 0.022900104522705078
time_total_s: 11.956074953079224
timestamp: 1654205192
timesteps_since_restore: 0
train-error: 0.01270187690964644
train-logloss: 0.05788368908741939
training_iteration: 100
trial_id: a3a2c_00000
validation-error: 0.0825768156424581
validation-logloss: 0.22599449588356768
warmup_time: 0.004194974899291992
2022-06-02 14:26:33,481 INFO tune.py:752 -- Total run time: 15.62 seconds (14.94 seconds for the tuning loop).
'checkpoint'
Inference¶
Now from the Checkpoint object we obtained from last session, we can construct a Ray AIR Predictor that encapsulates everything needed for inference.
The API for using Predictor is also very intuitive - simply call Predictor.predict()
.
from ray.ml.checkpoint import Checkpoint
from ray.ml.predictors.integrations.xgboost import XGBoostPredictor
predictor = XGBoostPredictor.from_checkpoint(Checkpoint.from_directory(CHECKPOINT_PATH))
import numpy as np
## Now let's do some prediciton.
loan_request_dict = {
"zipcode": [76104],
"dob_ssn": ["19630621_4278"],
"person_age": [133],
"person_income": [59000],
"person_home_ownership": ["RENT"],
"person_emp_length": [123.0],
"loan_intent": ["PERSONAL"],
"loan_amnt": [35000],
"loan_int_rate": [16.02],
}
# Now augment the request with online features.
zipcode = loan_request_dict["zipcode"][0]
dob_ssn = loan_request_dict["dob_ssn"][0]
online_features = fs.get_online_features(
entity_rows=[{"zipcode": zipcode, "dob_ssn": dob_ssn}],
features=feast_features,
).to_dict()
loan_request_dict.update(online_features)
loan_request_df = pd.DataFrame.from_dict(loan_request_dict, dtype=np.float)
loan_request_df = loan_request_df.drop(["zipcode", "dob_ssn"], axis=1)
display(loan_request_df)
/tmp/ipykernel_4511/2153939661.py:23: DeprecationWarning: `np.float` is a deprecated alias for the builtin `float`. To silence this warning, use `float` by itself. Doing this will not modify any behavior and is safe. If you specifically wanted the numpy scalar type, use `np.float64` here.
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
loan_request_df = pd.DataFrame.from_dict(loan_request_dict, dtype=np.float)
/tmp/ipykernel_4511/2153939661.py:23: FutureWarning: Could not cast to float64, falling back to object. This behavior is deprecated. In a future version, when a dtype is passed to 'DataFrame', either all columns will be cast to that dtype, or a TypeError will be raised.
loan_request_df = pd.DataFrame.from_dict(loan_request_dict, dtype=np.float)
person_age | person_income | person_home_ownership | person_emp_length | loan_intent | loan_amnt | loan_int_rate | state | population | location_type | ... | tax_returns_filed | student_loan_due | missed_payments_1y | hard_pulls | mortgage_due | bankruptcies | credit_card_due | missed_payments_2y | missed_payments_6m | vehicle_loan_due | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 133.0 | 59000.0 | RENT | 123.0 | PERSONAL | 35000.0 | 16.02 | NaN | NaN | NaN | ... | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
1 rows × 22 columns
# Run through our predictor using `Predictor.predict()` API.
loan_result = np.round(predictor.predict(loan_request_df)["predictions"][0])
if loan_result == 0:
print("Loan approved!")
elif loan_result == 1:
print("Loan rejected!")
Loan rejected!