Integrate Ray AIR with Feast feature store

# !pip install feast==0.20.1 ray[air]>=1.13 xgboost_ray

In this example, we showcase how to use Ray AIR with Feast feature store, leveraging both historical features for training a model and online features for inference.

The task is adapted from Feast credit scoring tutorial. In this example, we train a xgboost model and run some prediction on an incoming loan request to see if it is approved or rejected.

Let’s first set up our workspace and prepare the data to work with.

! wget --no-check-certificate https://github.com/ray-project/air-sample-data/raw/main/air-feast-example.zip
! unzip air-feast-example.zip 
%cd air-feast-example
--2022-09-12 19:24:21--  https://github.com/ray-project/air-sample-data/raw/main/air-feast-example.zip
Loaded CA certificate '/etc/ssl/certs/ca-certificates.crt'
Resolving github.com (github.com)... 192.30.255.113
Connecting to github.com (github.com)|192.30.255.113|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/ray-project/air-sample-data/main/air-feast-example.zip [following]
--2022-09-12 19:24:21--  https://raw.githubusercontent.com/ray-project/air-sample-data/main/air-feast-example.zip
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.111.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 23715107 (23M) [application/zip]
Saving to: ‘air-feast-example.zip’

air-feast-example.z 100%[===================>]  22.62M  8.79MB/s    in 2.6s    

2022-09-12 19:24:25 (8.79 MB/s) - ‘air-feast-example.zip’ saved [23715107/23715107]

Archive:  air-feast-example.zip
   creating: air-feast-example/
   creating: air-feast-example/feature_repo/
  inflating: air-feast-example/feature_repo/.DS_Store  
 extracting: air-feast-example/feature_repo/__init__.py  
  inflating: air-feast-example/feature_repo/features.py  
   creating: air-feast-example/feature_repo/data/
  inflating: air-feast-example/feature_repo/data/.DS_Store  
  inflating: air-feast-example/feature_repo/data/credit_history_sample.csv  
  inflating: air-feast-example/feature_repo/data/zipcode_table_sample.csv  
  inflating: air-feast-example/feature_repo/data/credit_history.parquet  
  inflating: air-feast-example/feature_repo/data/zipcode_table.parquet  
  inflating: air-feast-example/feature_repo/feature_store.yaml  
  inflating: air-feast-example/.DS_Store  
   creating: air-feast-example/data/
  inflating: air-feast-example/data/loan_table.parquet  
  inflating: air-feast-example/data/loan_table_sample.csv  
/home/ray/Desktop/workspace/ray/doc/source/ray-air/examples/air-feast-example
! ls
data  feature_repo

There is already a feature repository set up in feature_repo/. It isn’t necessary to create a new feature repository, but it can be done using the following command: feast init -t local feature_repo.

Now let’s take a look at the schema in Feast feature store, which is defined by feature_repo/features.py. There are mainly two features: zipcode_feature and credit_history, both are generated from parquet files - feature_repo/data/zipcode_table.parquet and feature_repo/data/credit_history.parquet.

!pygmentize feature_repo/features.py
from datetime import timedelta

from feast import (Entity, Field, FeatureView, FileSource, ValueType)
from feast.types import Float32, Int64, String


zipcode = Entity(name="zipcode", value_type=Int64)

zipcode_source = FileSource(
    path="feature_repo/data/zipcode_table.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)

zipcode_features = FeatureView(
    name="zipcode_features",
    entities=["zipcode"],
    ttl=timedelta(days=3650),
    schema=[
        Field(name="city", dtype=String),
        Field(name="state", dtype=String),
        Field(name="location_type", dtype=String),
        Field(name="tax_returns_filed", dtype=Int64),
        Field(name="population", dtype=Int64),
        Field(name="total_wages", dtype=Int64),
    ],
    source=zipcode_source,
)

dob_ssn = Entity(
    name="dob_ssn",
    value_type=ValueType.STRING,
    description="Date of birth and last four digits of social security number",
)

credit_history_source = FileSource(
    path="feature_repo/data/credit_history.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)

credit_history = FeatureView(
    name="credit_history",
    entities=["dob_ssn"],
    ttl=timedelta(days=90),
    schema=[
        Field(name="credit_card_due", dtype=Int64),
        Field(name="mortgage_due", dtype=Int64),
        Field(name="student_loan_due", dtype=Int64),
        Field(name="vehicle_loan_due", dtype=Int64),
        Field(name="hard_pulls", dtype=Int64),
        Field(name="missed_payments_2y", dtype=Int64),
        Field(name="missed_payments_1y", dtype=Int64),
        Field(name="missed_payments_6m", dtype=Int64),
        Field(name="bankruptcies", dtype=Int64),
    ],
    source=credit_history_source,
)

Deploy the above defined feature store by running apply from within the feature_repo/ folder.

! cd feature_repo && feast apply
Created entity zipcode
Created entity dob_ssn
Created feature view credit_history
Created feature view zipcode_features

Created sqlite table feature_repo_credit_history
Created sqlite table feature_repo_zipcode_features
import feast
fs = feast.FeatureStore(repo_path="feature_repo")

Generate training data

On top of the features in Feast, we also have labeled training data at data/loan_table.parquet. At the time of training, loan table will be passed into Feast as an entity dataframe for training data generation. Feast will intelligently join credit_history and zipcode_feature tables to create relevant feature vectors to augment the training data.

import pandas as pd
loan_df = pd.read_parquet("data/loan_table.parquet")
display(loan_df)
loan_id dob_ssn zipcode person_age person_income person_home_ownership person_emp_length loan_intent loan_amnt loan_int_rate loan_status event_timestamp created_timestamp
0 10000 19530219_5179 76104 22 59000 RENT 123.0 PERSONAL 35000 16.02 1 2021-08-25 20:34:41.361000+00:00 2021-08-25 20:34:41.361000+00:00
1 10001 19520816_8737 70380 21 9600 OWN 5.0 EDUCATION 1000 11.14 0 2021-08-25 20:16:20.128000+00:00 2021-08-25 20:16:20.128000+00:00
2 10002 19860413_2537 97039 25 9600 MORTGAGE 1.0 MEDICAL 5500 12.87 1 2021-08-25 19:57:58.896000+00:00 2021-08-25 19:57:58.896000+00:00
3 10003 19760701_8090 63785 23 65500 RENT 4.0 MEDICAL 35000 15.23 1 2021-08-25 19:39:37.663000+00:00 2021-08-25 19:39:37.663000+00:00
4 10004 19830125_8297 82223 24 54400 RENT 8.0 MEDICAL 35000 14.27 1 2021-08-25 19:21:16.430000+00:00 2021-08-25 19:21:16.430000+00:00
... ... ... ... ... ... ... ... ... ... ... ... ... ...
28633 38633 19491126_1487 43205 57 53000 MORTGAGE 1.0 PERSONAL 5800 13.16 0 2020-08-25 21:48:06.292000+00:00 2020-08-25 21:48:06.292000+00:00
28634 38634 19681208_6537 24872 54 120000 MORTGAGE 4.0 PERSONAL 17625 7.49 0 2020-08-25 21:29:45.059000+00:00 2020-08-25 21:29:45.059000+00:00
28635 38635 19880422_2592 68826 65 76000 RENT 3.0 HOMEIMPROVEMENT 35000 10.99 1 2020-08-25 21:11:23.826000+00:00 2020-08-25 21:11:23.826000+00:00
28636 38636 19901017_6108 92014 56 150000 MORTGAGE 5.0 PERSONAL 15000 11.48 0 2020-08-25 20:53:02.594000+00:00 2020-08-25 20:53:02.594000+00:00
28637 38637 19960703_3449 69033 66 42000 RENT 2.0 MEDICAL 6475 9.99 0 2020-08-25 20:34:41.361000+00:00 2020-08-25 20:34:41.361000+00:00

28638 rows × 13 columns

feast_features = [
    "zipcode_features:city",
    "zipcode_features:state",
    "zipcode_features:location_type",
    "zipcode_features:tax_returns_filed",
    "zipcode_features:population",
    "zipcode_features:total_wages",
    "credit_history:credit_card_due",
    "credit_history:mortgage_due",
    "credit_history:student_loan_due",
    "credit_history:vehicle_loan_due",
    "credit_history:hard_pulls",
    "credit_history:missed_payments_2y",
    "credit_history:missed_payments_1y",
    "credit_history:missed_payments_6m",
    "credit_history:bankruptcies",
]

loan_w_offline_feature = fs.get_historical_features(
    entity_df=loan_df, features=feast_features
).to_df()

# Drop some unnecessary columns for simplicity
loan_w_offline_feature = loan_w_offline_feature.drop(["event_timestamp", "created_timestamp__", "loan_id", "zipcode", "dob_ssn"], axis=1)

Now let’s take a look at the training data as it is augmented by Feast.

display(loan_w_offline_feature)
person_age person_income person_home_ownership person_emp_length loan_intent loan_amnt loan_int_rate loan_status city state ... total_wages credit_card_due mortgage_due student_loan_due vehicle_loan_due hard_pulls missed_payments_2y missed_payments_1y missed_payments_6m bankruptcies
1358886 55 24543 RENT 3.0 VENTURE 4000 13.92 0 SLIDELL LA ... 315061217 1777 690650 46372 10439 5 1 2 1 0
1358815 58 20000 RENT 0.0 EDUCATION 4000 9.99 0 CHOUTEAU OK ... 59412230 1791 462670 19421 3583 8 7 1 0 2
1353348 64 24000 RENT 1.0 MEDICAL 3000 6.99 0 BISMARCK ND ... 469621263 5917 1780959 11835 27910 8 3 2 1 0
1354200 55 34000 RENT 0.0 DEBTCONSOLIDATION 12000 6.92 1 SANTA BARBARA CA ... 24537583 8091 364271 30248 22640 2 7 3 0 0
1354271 51 74628 MORTGAGE 3.0 PERSONAL 3000 13.49 0 HUNTINGTON BEACH CA ... 19749601 3679 1659968 37582 20284 0 1 0 0 0
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
674285 23 74000 RENT 3.0 MEDICAL 25000 10.36 1 MANSFIELD MO ... 33180988 5176 1089963 44642 2877 1 6 1 0 0
668250 21 200000 MORTGAGE 2.0 DEBTCONSOLIDATION 25000 13.99 0 SALISBURY MD ... 470634058 5297 1288915 22471 22630 0 5 2 1 0
668321 24 200000 MORTGAGE 3.0 VENTURE 24000 7.49 0 STRUNK KY ... 10067358 6549 22399 11806 13005 0 1 0 0 0
670025 23 215000 MORTGAGE 7.0 MEDICAL 35000 14.79 0 HAWTHORN PA ... 5956835 9079 876038 4556 21588 0 1 0 0 0
2034006 22 59000 RENT 123.0 PERSONAL 35000 16.02 1 FORT WORTH TX ... 142325465 8419 91803 22328 15078 0 1 0 0 0

28638 rows × 23 columns

# Convert into Train and Validation datasets.
import ray

loan_ds = ray.data.from_pandas(loan_w_offline_feature)
train_ds, validation_ds = loan_ds.split_proportionately([0.8])
2022-09-12 19:25:14,018	INFO worker.py:1508 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 

Define Preprocessors

Preprocessor does last mile processing on Ray Datasets before feeding into training model.

categorical_features = [
    "person_home_ownership",
    "loan_intent",
    "city",
    "state",
    "location_type",
]

from ray.data.preprocessors import Chain, OrdinalEncoder, SimpleImputer

imputer = SimpleImputer(categorical_features, strategy="most_frequent")
encoder = OrdinalEncoder(columns=categorical_features)
chained_preprocessor = Chain(imputer, encoder)

Train XGBoost model using Ray AIR Trainer

Ray AIR provides a variety of Trainers that are integrated with popular machine learning frameworks. You can train a distributed model at scale leveraging Ray using the intuitive API trainer.fit(). The output is a Ray AIR Checkpoint, that will seamlessly transfer the workload from training to prediction. Let’s take a look!

LABEL = "loan_status"
CHECKPOINT_PATH = "checkpoint"
NUM_WORKERS = 1  # Change this based on the resources in the cluster.


from ray.train.xgboost import XGBoostTrainer
from ray.air.config import ScalingConfig
params = {
    "tree_method": "approx",
    "objective": "binary:logistic",
    "eval_metric": ["logloss", "error"],
}

trainer = XGBoostTrainer(
    scaling_config=ScalingConfig(
        num_workers=NUM_WORKERS,
        use_gpu=0,
    ),
    label_column=LABEL,
    params=params,
    datasets={"train": train_ds, "validation": validation_ds},
    preprocessor=chained_preprocessor,
    num_boost_round=100,
)
checkpoint = trainer.fit().checkpoint
# This saves the checkpoint onto disk
checkpoint.to_directory(CHECKPOINT_PATH)

Tune Status

Current time:2022-09-12 19:25:28
Running for: 00:00:09.09
Memory: 12.3/62.7 GiB

System Info

Using FIFO scheduling algorithm.
Resources requested: 0/24 CPUs, 0/0 GPUs, 0.0/32.5 GiB heap, 0.0/16.25 GiB objects

Trial Status

Trial name status loc iter total time (s) train-logloss train-error validation-logloss
XGBoostTrainer_4f411_00000TERMINATED10.108.96.251:348845 101 7.67137 0.0578837 0.0127019 0.225994
(XGBoostTrainer pid=348845) /home/ray/.pyenv/versions/mambaforge/envs/ray/lib/python3.9/site-packages/xgboost_ray/main.py:431: UserWarning: `num_actors` in `ray_params` is smaller than 2 (1). XGBoost will NOT be distributed!
(XGBoostTrainer pid=348845)   warnings.warn(
(_RemoteRayXGBoostActor pid=348922) [19:25:23] task [xgboost.ray]:140319682474864 got new rank 0

Trial Progress

Trial name date done episodes_total experiment_id experiment_taghostname iterations_since_restorenode_ip pid time_since_restore time_this_iter_s time_total_s timestamp timesteps_since_restoretimesteps_total train-error train-logloss training_iterationtrial_id validation-error validation-logloss warmup_time
XGBoostTrainer_4f411_000002022-09-12_19-25-28True 83cacc5068a84efc8998c269bc054088 0corvus 10110.108.96.251348845 7.67137 1.01445 7.67137 1663035928 0 0.0127019 0.0578837 1014f411_00000 0.0825768 0.225994 0.00293422
2022-09-12 19:25:28,422	INFO tune.py:762 -- Total run time: 9.86 seconds (9.09 seconds for the tuning loop).
'checkpoint'

Inference

Now from the Checkpoint object we obtained from last session, we can construct a Ray AIR Predictor that encapsulates everything needed for inference.

The API for using Predictor is also very intuitive - simply call Predictor.predict().

from ray.air.checkpoint import Checkpoint
from ray.train.xgboost import XGBoostPredictor
predictor = XGBoostPredictor.from_checkpoint(Checkpoint.from_directory(CHECKPOINT_PATH))
import numpy as np
## Now let's do some prediciton.
loan_request_dict = {
    "zipcode": [76104],
    "dob_ssn": ["19630621_4278"],
    "person_age": [133],
    "person_income": [59000],
    "person_home_ownership": ["RENT"],
    "person_emp_length": [123.0],
    "loan_intent": ["PERSONAL"],
    "loan_amnt": [35000],
    "loan_int_rate": [16.02],
}

# Now augment the request with online features.
zipcode = loan_request_dict["zipcode"][0]
dob_ssn = loan_request_dict["dob_ssn"][0]
online_features =  fs.get_online_features(
    entity_rows=[{"zipcode": zipcode, "dob_ssn": dob_ssn}],
    features=feast_features,
).to_dict()
loan_request_dict.update(online_features)
loan_request_df = pd.DataFrame.from_dict(loan_request_dict)
loan_request_df = loan_request_df.drop(["zipcode", "dob_ssn"], axis=1)
display(loan_request_df)
person_age person_income person_home_ownership person_emp_length loan_intent loan_amnt loan_int_rate location_type city population ... total_wages hard_pulls bankruptcies missed_payments_1y mortgage_due credit_card_due missed_payments_2y missed_payments_6m student_loan_due vehicle_loan_due
0 133 59000 RENT 123.0 PERSONAL 35000 16.02 None None None ... None None None None None None None None None None

1 rows × 22 columns

# Run through our predictor using `Predictor.predict()` API.
loan_result = np.round(predictor.predict(loan_request_df)["predictions"][0])

if loan_result == 0:
    print("Loan approved!")
elif loan_result == 1:
    print("Loan rejected!")
Loan rejected!