Integrate Ray AIR with Feast feature store
Contents
Integrate Ray AIR with Feast feature store#
# !pip install feast==0.20.1 ray[air]>=1.13 xgboost_ray
In this example, we showcase how to use Ray AIR with Feast feature store, leveraging both historical features for training a model and online features for inference.
The task is adapted from Feast credit scoring tutorial. In this example, we train a xgboost model and run some prediction on an incoming loan request to see if it is approved or rejected.
Let’s first set up our workspace and prepare the data to work with.
! wget --no-check-certificate https://github.com/ray-project/air-sample-data/raw/main/air-feast-example.zip
! unzip air-feast-example.zip
%cd air-feast-example
--2022-09-12 19:24:21-- https://github.com/ray-project/air-sample-data/raw/main/air-feast-example.zip
Loaded CA certificate '/etc/ssl/certs/ca-certificates.crt'
Resolving github.com (github.com)... 192.30.255.113
Connecting to github.com (github.com)|192.30.255.113|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/ray-project/air-sample-data/main/air-feast-example.zip [following]
--2022-09-12 19:24:21-- https://raw.githubusercontent.com/ray-project/air-sample-data/main/air-feast-example.zip
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.111.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 23715107 (23M) [application/zip]
Saving to: ‘air-feast-example.zip’
air-feast-example.z 100%[===================>] 22.62M 8.79MB/s in 2.6s
2022-09-12 19:24:25 (8.79 MB/s) - ‘air-feast-example.zip’ saved [23715107/23715107]
Archive: air-feast-example.zip
creating: air-feast-example/
creating: air-feast-example/feature_repo/
inflating: air-feast-example/feature_repo/.DS_Store
extracting: air-feast-example/feature_repo/__init__.py
inflating: air-feast-example/feature_repo/features.py
creating: air-feast-example/feature_repo/data/
inflating: air-feast-example/feature_repo/data/.DS_Store
inflating: air-feast-example/feature_repo/data/credit_history_sample.csv
inflating: air-feast-example/feature_repo/data/zipcode_table_sample.csv
inflating: air-feast-example/feature_repo/data/credit_history.parquet
inflating: air-feast-example/feature_repo/data/zipcode_table.parquet
inflating: air-feast-example/feature_repo/feature_store.yaml
inflating: air-feast-example/.DS_Store
creating: air-feast-example/data/
inflating: air-feast-example/data/loan_table.parquet
inflating: air-feast-example/data/loan_table_sample.csv
/home/ray/Desktop/workspace/ray/doc/source/ray-air/examples/air-feast-example
! ls
data feature_repo
There is already a feature repository set up in feature_repo/
. It isn’t necessary to create a new feature repository, but it can be done using the following command: feast init -t local feature_repo
.
Now let’s take a look at the schema in Feast feature store, which is defined by feature_repo/features.py
. There are mainly two features: zipcode_feature and credit_history, both are generated from parquet files - feature_repo/data/zipcode_table.parquet
and feature_repo/data/credit_history.parquet
.
!pygmentize feature_repo/features.py
from datetime import timedelta
from feast import (Entity, Field, FeatureView, FileSource, ValueType)
from feast.types import Float32, Int64, String
zipcode = Entity(name="zipcode", value_type=Int64)
zipcode_source = FileSource(
path="feature_repo/data/zipcode_table.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)
zipcode_features = FeatureView(
name="zipcode_features",
entities=["zipcode"],
ttl=timedelta(days=3650),
schema=[
Field(name="city", dtype=String),
Field(name="state", dtype=String),
Field(name="location_type", dtype=String),
Field(name="tax_returns_filed", dtype=Int64),
Field(name="population", dtype=Int64),
Field(name="total_wages", dtype=Int64),
],
source=zipcode_source,
)
dob_ssn = Entity(
name="dob_ssn",
value_type=ValueType.STRING,
description="Date of birth and last four digits of social security number",
)
credit_history_source = FileSource(
path="feature_repo/data/credit_history.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)
credit_history = FeatureView(
name="credit_history",
entities=["dob_ssn"],
ttl=timedelta(days=90),
schema=[
Field(name="credit_card_due", dtype=Int64),
Field(name="mortgage_due", dtype=Int64),
Field(name="student_loan_due", dtype=Int64),
Field(name="vehicle_loan_due", dtype=Int64),
Field(name="hard_pulls", dtype=Int64),
Field(name="missed_payments_2y", dtype=Int64),
Field(name="missed_payments_1y", dtype=Int64),
Field(name="missed_payments_6m", dtype=Int64),
Field(name="bankruptcies", dtype=Int64),
],
source=credit_history_source,
)
Deploy the above defined feature store by running apply
from within the feature_repo/ folder.
! cd feature_repo && feast apply
Created entity zipcode
Created entity dob_ssn
Created feature view credit_history
Created feature view zipcode_features
Created sqlite table feature_repo_credit_history
Created sqlite table feature_repo_zipcode_features
import feast
fs = feast.FeatureStore(repo_path="feature_repo")
Generate training data#
On top of the features in Feast, we also have labeled training data at data/loan_table.parquet
. At the time of training, loan table will be passed into Feast as an entity dataframe for training data generation. Feast will intelligently join credit_history and zipcode_feature tables to create relevant feature vectors to augment the training data.
import pandas as pd
loan_df = pd.read_parquet("data/loan_table.parquet")
display(loan_df)
loan_id | dob_ssn | zipcode | person_age | person_income | person_home_ownership | person_emp_length | loan_intent | loan_amnt | loan_int_rate | loan_status | event_timestamp | created_timestamp | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 10000 | 19530219_5179 | 76104 | 22 | 59000 | RENT | 123.0 | PERSONAL | 35000 | 16.02 | 1 | 2021-08-25 20:34:41.361000+00:00 | 2021-08-25 20:34:41.361000+00:00 |
1 | 10001 | 19520816_8737 | 70380 | 21 | 9600 | OWN | 5.0 | EDUCATION | 1000 | 11.14 | 0 | 2021-08-25 20:16:20.128000+00:00 | 2021-08-25 20:16:20.128000+00:00 |
2 | 10002 | 19860413_2537 | 97039 | 25 | 9600 | MORTGAGE | 1.0 | MEDICAL | 5500 | 12.87 | 1 | 2021-08-25 19:57:58.896000+00:00 | 2021-08-25 19:57:58.896000+00:00 |
3 | 10003 | 19760701_8090 | 63785 | 23 | 65500 | RENT | 4.0 | MEDICAL | 35000 | 15.23 | 1 | 2021-08-25 19:39:37.663000+00:00 | 2021-08-25 19:39:37.663000+00:00 |
4 | 10004 | 19830125_8297 | 82223 | 24 | 54400 | RENT | 8.0 | MEDICAL | 35000 | 14.27 | 1 | 2021-08-25 19:21:16.430000+00:00 | 2021-08-25 19:21:16.430000+00:00 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
28633 | 38633 | 19491126_1487 | 43205 | 57 | 53000 | MORTGAGE | 1.0 | PERSONAL | 5800 | 13.16 | 0 | 2020-08-25 21:48:06.292000+00:00 | 2020-08-25 21:48:06.292000+00:00 |
28634 | 38634 | 19681208_6537 | 24872 | 54 | 120000 | MORTGAGE | 4.0 | PERSONAL | 17625 | 7.49 | 0 | 2020-08-25 21:29:45.059000+00:00 | 2020-08-25 21:29:45.059000+00:00 |
28635 | 38635 | 19880422_2592 | 68826 | 65 | 76000 | RENT | 3.0 | HOMEIMPROVEMENT | 35000 | 10.99 | 1 | 2020-08-25 21:11:23.826000+00:00 | 2020-08-25 21:11:23.826000+00:00 |
28636 | 38636 | 19901017_6108 | 92014 | 56 | 150000 | MORTGAGE | 5.0 | PERSONAL | 15000 | 11.48 | 0 | 2020-08-25 20:53:02.594000+00:00 | 2020-08-25 20:53:02.594000+00:00 |
28637 | 38637 | 19960703_3449 | 69033 | 66 | 42000 | RENT | 2.0 | MEDICAL | 6475 | 9.99 | 0 | 2020-08-25 20:34:41.361000+00:00 | 2020-08-25 20:34:41.361000+00:00 |
28638 rows × 13 columns
feast_features = [
"zipcode_features:city",
"zipcode_features:state",
"zipcode_features:location_type",
"zipcode_features:tax_returns_filed",
"zipcode_features:population",
"zipcode_features:total_wages",
"credit_history:credit_card_due",
"credit_history:mortgage_due",
"credit_history:student_loan_due",
"credit_history:vehicle_loan_due",
"credit_history:hard_pulls",
"credit_history:missed_payments_2y",
"credit_history:missed_payments_1y",
"credit_history:missed_payments_6m",
"credit_history:bankruptcies",
]
loan_w_offline_feature = fs.get_historical_features(
entity_df=loan_df, features=feast_features
).to_df()
# Drop some unnecessary columns for simplicity
loan_w_offline_feature = loan_w_offline_feature.drop(["event_timestamp", "created_timestamp__", "loan_id", "zipcode", "dob_ssn"], axis=1)
Now let’s take a look at the training data as it is augmented by Feast.
display(loan_w_offline_feature)
person_age | person_income | person_home_ownership | person_emp_length | loan_intent | loan_amnt | loan_int_rate | loan_status | city | state | ... | total_wages | credit_card_due | mortgage_due | student_loan_due | vehicle_loan_due | hard_pulls | missed_payments_2y | missed_payments_1y | missed_payments_6m | bankruptcies | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1358886 | 55 | 24543 | RENT | 3.0 | VENTURE | 4000 | 13.92 | 0 | SLIDELL | LA | ... | 315061217 | 1777 | 690650 | 46372 | 10439 | 5 | 1 | 2 | 1 | 0 |
1358815 | 58 | 20000 | RENT | 0.0 | EDUCATION | 4000 | 9.99 | 0 | CHOUTEAU | OK | ... | 59412230 | 1791 | 462670 | 19421 | 3583 | 8 | 7 | 1 | 0 | 2 |
1353348 | 64 | 24000 | RENT | 1.0 | MEDICAL | 3000 | 6.99 | 0 | BISMARCK | ND | ... | 469621263 | 5917 | 1780959 | 11835 | 27910 | 8 | 3 | 2 | 1 | 0 |
1354200 | 55 | 34000 | RENT | 0.0 | DEBTCONSOLIDATION | 12000 | 6.92 | 1 | SANTA BARBARA | CA | ... | 24537583 | 8091 | 364271 | 30248 | 22640 | 2 | 7 | 3 | 0 | 0 |
1354271 | 51 | 74628 | MORTGAGE | 3.0 | PERSONAL | 3000 | 13.49 | 0 | HUNTINGTON BEACH | CA | ... | 19749601 | 3679 | 1659968 | 37582 | 20284 | 0 | 1 | 0 | 0 | 0 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
674285 | 23 | 74000 | RENT | 3.0 | MEDICAL | 25000 | 10.36 | 1 | MANSFIELD | MO | ... | 33180988 | 5176 | 1089963 | 44642 | 2877 | 1 | 6 | 1 | 0 | 0 |
668250 | 21 | 200000 | MORTGAGE | 2.0 | DEBTCONSOLIDATION | 25000 | 13.99 | 0 | SALISBURY | MD | ... | 470634058 | 5297 | 1288915 | 22471 | 22630 | 0 | 5 | 2 | 1 | 0 |
668321 | 24 | 200000 | MORTGAGE | 3.0 | VENTURE | 24000 | 7.49 | 0 | STRUNK | KY | ... | 10067358 | 6549 | 22399 | 11806 | 13005 | 0 | 1 | 0 | 0 | 0 |
670025 | 23 | 215000 | MORTGAGE | 7.0 | MEDICAL | 35000 | 14.79 | 0 | HAWTHORN | PA | ... | 5956835 | 9079 | 876038 | 4556 | 21588 | 0 | 1 | 0 | 0 | 0 |
2034006 | 22 | 59000 | RENT | 123.0 | PERSONAL | 35000 | 16.02 | 1 | FORT WORTH | TX | ... | 142325465 | 8419 | 91803 | 22328 | 15078 | 0 | 1 | 0 | 0 | 0 |
28638 rows × 23 columns
# Convert into Train and Validation datasets.
import ray
loan_ds = ray.data.from_pandas(loan_w_offline_feature)
train_ds, validation_ds = loan_ds.split_proportionately([0.8])
2022-09-12 19:25:14,018 INFO worker.py:1508 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
Define Preprocessors#
Preprocessor does last mile processing on Ray Datasets before feeding into training model.
categorical_features = [
"person_home_ownership",
"loan_intent",
"city",
"state",
"location_type",
]
from ray.data.preprocessors import Chain, OrdinalEncoder, SimpleImputer
imputer = SimpleImputer(categorical_features, strategy="most_frequent")
encoder = OrdinalEncoder(columns=categorical_features)
chained_preprocessor = Chain(imputer, encoder)
Train XGBoost model using Ray AIR Trainer#
Ray AIR provides a variety of Trainers that are integrated with popular machine learning frameworks. You can train a distributed model at scale leveraging Ray using the intuitive API trainer.fit()
. The output is a Ray AIR Checkpoint, that will seamlessly transfer the workload from training to prediction. Let’s take a look!
LABEL = "loan_status"
CHECKPOINT_PATH = "checkpoint"
NUM_WORKERS = 1 # Change this based on the resources in the cluster.
from ray.train.xgboost import XGBoostTrainer
from ray.air.config import ScalingConfig
params = {
"tree_method": "approx",
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
}
trainer = XGBoostTrainer(
scaling_config=ScalingConfig(
num_workers=NUM_WORKERS,
use_gpu=0,
),
label_column=LABEL,
params=params,
datasets={"train": train_ds, "validation": validation_ds},
preprocessor=chained_preprocessor,
num_boost_round=100,
)
checkpoint = trainer.fit().checkpoint
# This saves the checkpoint onto disk
checkpoint.to_directory(CHECKPOINT_PATH)
Tune Status
Current time: | 2022-09-12 19:25:28 |
Running for: | 00:00:09.09 |
Memory: | 12.3/62.7 GiB |
System Info
Using FIFO scheduling algorithm.Resources requested: 0/24 CPUs, 0/0 GPUs, 0.0/32.5 GiB heap, 0.0/16.25 GiB objects
Trial Status
Trial name | status | loc | iter | total time (s) | train-logloss | train-error | validation-logloss |
---|---|---|---|---|---|---|---|
XGBoostTrainer_4f411_00000 | TERMINATED | 10.108.96.251:348845 | 101 | 7.67137 | 0.0578837 | 0.0127019 | 0.225994 |
(XGBoostTrainer pid=348845) /home/ray/.pyenv/versions/mambaforge/envs/ray/lib/python3.9/site-packages/xgboost_ray/main.py:431: UserWarning: `num_actors` in `ray_params` is smaller than 2 (1). XGBoost will NOT be distributed!
(XGBoostTrainer pid=348845) warnings.warn(
(_RemoteRayXGBoostActor pid=348922) [19:25:23] task [xgboost.ray]:140319682474864 got new rank 0
Trial Progress
Trial name | date | done | episodes_total | experiment_id | experiment_tag | hostname | iterations_since_restore | node_ip | pid | time_since_restore | time_this_iter_s | time_total_s | timestamp | timesteps_since_restore | timesteps_total | train-error | train-logloss | training_iteration | trial_id | validation-error | validation-logloss | warmup_time |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
XGBoostTrainer_4f411_00000 | 2022-09-12_19-25-28 | True | 83cacc5068a84efc8998c269bc054088 | 0 | corvus | 101 | 10.108.96.251 | 348845 | 7.67137 | 1.01445 | 7.67137 | 1663035928 | 0 | 0.0127019 | 0.0578837 | 101 | 4f411_00000 | 0.0825768 | 0.225994 | 0.00293422 |
2022-09-12 19:25:28,422 INFO tune.py:762 -- Total run time: 9.86 seconds (9.09 seconds for the tuning loop).
'checkpoint'
Inference#
Now from the Checkpoint object we obtained from last session, we can construct a Ray AIR Predictor that encapsulates everything needed for inference.
The API for using Predictor is also very intuitive - simply call Predictor.predict()
.
from ray.air.checkpoint import Checkpoint
from ray.train.xgboost import XGBoostPredictor
predictor = XGBoostPredictor.from_checkpoint(Checkpoint.from_directory(CHECKPOINT_PATH))
import numpy as np
## Now let's do some prediciton.
loan_request_dict = {
"zipcode": [76104],
"dob_ssn": ["19630621_4278"],
"person_age": [133],
"person_income": [59000],
"person_home_ownership": ["RENT"],
"person_emp_length": [123.0],
"loan_intent": ["PERSONAL"],
"loan_amnt": [35000],
"loan_int_rate": [16.02],
}
# Now augment the request with online features.
zipcode = loan_request_dict["zipcode"][0]
dob_ssn = loan_request_dict["dob_ssn"][0]
online_features = fs.get_online_features(
entity_rows=[{"zipcode": zipcode, "dob_ssn": dob_ssn}],
features=feast_features,
).to_dict()
loan_request_dict.update(online_features)
loan_request_df = pd.DataFrame.from_dict(loan_request_dict)
loan_request_df = loan_request_df.drop(["zipcode", "dob_ssn"], axis=1)
display(loan_request_df)
person_age | person_income | person_home_ownership | person_emp_length | loan_intent | loan_amnt | loan_int_rate | location_type | city | population | ... | total_wages | hard_pulls | bankruptcies | missed_payments_1y | mortgage_due | credit_card_due | missed_payments_2y | missed_payments_6m | student_loan_due | vehicle_loan_due | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 133 | 59000 | RENT | 123.0 | PERSONAL | 35000 | 16.02 | None | None | None | ... | None | None | None | None | None | None | None | None | None | None |
1 rows × 22 columns
# Run through our predictor using `Predictor.predict()` API.
loan_result = np.round(predictor.predict(loan_request_df)["predictions"][0])
if loan_result == 0:
print("Loan approved!")
elif loan_result == 1:
print("Loan rejected!")
Loan rejected!