Integrate Ray AIR with Feast feature store

# !pip install feast==0.20.1 ray[air]>=1.13 xgboost_ray

In this example, we showcase how to use Ray AIR with Feast feature store, leveraging both historical features for training a model and online features for inference.

The task is adapted from Feast credit scoring tutorial. In this example, we train a xgboost model and run some prediction on an incoming loan request to see if it is approved or rejected.

Let’s first set up our workspace and prepare the data to work with.

import os
WORKING_DIR = os.path.expanduser("~/ray-air-feast-example/")
%env WORKING_DIR=$WORKING_DIR
! mkdir -p $WORKING_DIR
! wget --no-check-certificate https://github.com/ray-project/air-sample-data/raw/main/air-feast-example.zip
! unzip air-feast-example.zip 
! mv air-feast-example/* $WORKING_DIR
%cd $WORKING_DIR
--2022-06-02 14:22:50--  https://github.com/ray-project/air-sample-data/raw/main/air-feast-example.zip
Resolving github.com (github.com)... 192.30.255.113
Connecting to github.com (github.com)|192.30.255.113|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/ray-project/air-sample-data/main/air-feast-example.zip [following]
--2022-06-02 14:22:50--  https://raw.githubusercontent.com/ray-project/air-sample-data/main/air-feast-example.zip
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.111.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 23715107 (23M) [application/zip]
Saving to: ‘air-feast-example.zip’

air-feast-example.z 100%[===================>]  22.62M   114MB/s    in 0.2s    

2022-06-02 14:22:51 (114 MB/s) - ‘air-feast-example.zip’ saved [23715107/23715107]

Archive:  air-feast-example.zip
   creating: air-feast-example/
   creating: air-feast-example/feature_repo/
  inflating: air-feast-example/feature_repo/.DS_Store  
 extracting: air-feast-example/feature_repo/__init__.py  
  inflating: air-feast-example/feature_repo/features.py  
   creating: air-feast-example/feature_repo/data/
  inflating: air-feast-example/feature_repo/data/.DS_Store  
  inflating: air-feast-example/feature_repo/data/credit_history_sample.csv  
  inflating: air-feast-example/feature_repo/data/zipcode_table_sample.csv  
  inflating: air-feast-example/feature_repo/data/credit_history.parquet  
  inflating: air-feast-example/feature_repo/data/zipcode_table.parquet  
  inflating: air-feast-example/feature_repo/feature_store.yaml  
  inflating: air-feast-example/.DS_Store  
   creating: air-feast-example/data/
  inflating: air-feast-example/data/loan_table.parquet  
  inflating: air-feast-example/data/loan_table_sample.csv  
! ls
data  feature_repo

There is already a feature repository set up in feature_repo/. It isn’t necessary to create a new feature repository, but it can be done using the following command: feast init -t local feature_repo.

Now let’s take a look at the schema in Feast feature store, which is defined by feature_repo/features.py. There are mainly two features: zipcode_feature and credit_history, both are generated from parquet files - feature_repo/data/zipcode_table.parquet and feature_repo/data/credit_history.parquet.

!pygmentize feature_repo/features.py
from datetime import timedelta

from feast import (Entity, Field, FeatureView, FileSource, ValueType)
from feast.types import Float32, Int64, String


zipcode = Entity(name="zipcode", value_type=Int64)

zipcode_source = FileSource(
    path="feature_repo/data/zipcode_table.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)

zipcode_features = FeatureView(
    name="zipcode_features",
    entities=["zipcode"],
    ttl=timedelta(days=3650),
    schema=[
        Field(name="city", dtype=String),
        Field(name="state", dtype=String),
        Field(name="location_type", dtype=String),
        Field(name="tax_returns_filed", dtype=Int64),
        Field(name="population", dtype=Int64),
        Field(name="total_wages", dtype=Int64),
    ],
    source=zipcode_source,
)

dob_ssn = Entity(
    name="dob_ssn",
    value_type=ValueType.STRING,
    description="Date of birth and last four digits of social security number",
)

credit_history_source = FileSource(
    path="feature_repo/data/credit_history.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)

credit_history = FeatureView(
    name="credit_history",
    entities=["dob_ssn"],
    ttl=timedelta(days=90),
    schema=[
        Field(name="credit_card_due", dtype=Int64),
        Field(name="mortgage_due", dtype=Int64),
        Field(name="student_loan_due", dtype=Int64),
        Field(name="vehicle_loan_due", dtype=Int64),
        Field(name="hard_pulls", dtype=Int64),
        Field(name="missed_payments_2y", dtype=Int64),
        Field(name="missed_payments_1y", dtype=Int64),
        Field(name="missed_payments_6m", dtype=Int64),
        Field(name="bankruptcies", dtype=Int64),
    ],
    source=credit_history_source,
)

Deploy the above defined feature store by running apply from within the feature_repo/ folder.

! cd feature_repo && feast apply
import feast
fs = feast.FeatureStore(repo_path="feature_repo")

Generate training data

On top of the features in Feast, we also have labeled training data at data/loan_table.parquet. At the time of training, loan table will be passed into Feast as an entity dataframe for training data generation. Feast will intelligently join credit_history and zipcode_feature tables to create relevant feature vectors to augment the training data.

import pandas as pd
loan_df = pd.read_parquet("data/loan_table.parquet")
display(loan_df)
loan_id dob_ssn zipcode person_age person_income person_home_ownership person_emp_length loan_intent loan_amnt loan_int_rate loan_status event_timestamp created_timestamp
0 10000 19530219_5179 76104 22 59000 RENT 123.0 PERSONAL 35000 16.02 1 2021-08-25 20:34:41.361000+00:00 2021-08-25 20:34:41.361000+00:00
1 10001 19520816_8737 70380 21 9600 OWN 5.0 EDUCATION 1000 11.14 0 2021-08-25 20:16:20.128000+00:00 2021-08-25 20:16:20.128000+00:00
2 10002 19860413_2537 97039 25 9600 MORTGAGE 1.0 MEDICAL 5500 12.87 1 2021-08-25 19:57:58.896000+00:00 2021-08-25 19:57:58.896000+00:00
3 10003 19760701_8090 63785 23 65500 RENT 4.0 MEDICAL 35000 15.23 1 2021-08-25 19:39:37.663000+00:00 2021-08-25 19:39:37.663000+00:00
4 10004 19830125_8297 82223 24 54400 RENT 8.0 MEDICAL 35000 14.27 1 2021-08-25 19:21:16.430000+00:00 2021-08-25 19:21:16.430000+00:00
... ... ... ... ... ... ... ... ... ... ... ... ... ...
28633 38633 19491126_1487 43205 57 53000 MORTGAGE 1.0 PERSONAL 5800 13.16 0 2020-08-25 21:48:06.292000+00:00 2020-08-25 21:48:06.292000+00:00
28634 38634 19681208_6537 24872 54 120000 MORTGAGE 4.0 PERSONAL 17625 7.49 0 2020-08-25 21:29:45.059000+00:00 2020-08-25 21:29:45.059000+00:00
28635 38635 19880422_2592 68826 65 76000 RENT 3.0 HOMEIMPROVEMENT 35000 10.99 1 2020-08-25 21:11:23.826000+00:00 2020-08-25 21:11:23.826000+00:00
28636 38636 19901017_6108 92014 56 150000 MORTGAGE 5.0 PERSONAL 15000 11.48 0 2020-08-25 20:53:02.594000+00:00 2020-08-25 20:53:02.594000+00:00
28637 38637 19960703_3449 69033 66 42000 RENT 2.0 MEDICAL 6475 9.99 0 2020-08-25 20:34:41.361000+00:00 2020-08-25 20:34:41.361000+00:00

28638 rows × 13 columns

feast_features = [
    "zipcode_features:city",
    "zipcode_features:state",
    "zipcode_features:location_type",
    "zipcode_features:tax_returns_filed",
    "zipcode_features:population",
    "zipcode_features:total_wages",
    "credit_history:credit_card_due",
    "credit_history:mortgage_due",
    "credit_history:student_loan_due",
    "credit_history:vehicle_loan_due",
    "credit_history:hard_pulls",
    "credit_history:missed_payments_2y",
    "credit_history:missed_payments_1y",
    "credit_history:missed_payments_6m",
    "credit_history:bankruptcies",
]

loan_w_offline_feature = fs.get_historical_features(
    entity_df=loan_df, features=feast_features
).to_df()

# Drop some unnecessary columns for simplicity
loan_w_offline_feature = loan_w_offline_feature.drop(["event_timestamp", "created_timestamp__", "loan_id", "zipcode", "dob_ssn"], axis=1)

Now let’s take a look at the training data as it is augmented by Feast.

display(loan_w_offline_feature)
person_age person_income person_home_ownership person_emp_length loan_intent loan_amnt loan_int_rate loan_status city state ... total_wages credit_card_due mortgage_due student_loan_due vehicle_loan_due hard_pulls missed_payments_2y missed_payments_1y missed_payments_6m bankruptcies
1358886 55 24543 RENT 3.0 VENTURE 4000 13.92 0 SLIDELL LA ... 315061217 1777 690650 46372 10439 5 1 2 1 0
1358815 58 20000 RENT 0.0 EDUCATION 4000 9.99 0 CHOUTEAU OK ... 59412230 1791 462670 19421 3583 8 7 1 0 2
1353348 64 24000 RENT 1.0 MEDICAL 3000 6.99 0 BISMARCK ND ... 469621263 5917 1780959 11835 27910 8 3 2 1 0
1354200 55 34000 RENT 0.0 DEBTCONSOLIDATION 12000 6.92 1 SANTA BARBARA CA ... 24537583 8091 364271 30248 22640 2 7 3 0 0
1354271 51 74628 MORTGAGE 3.0 PERSONAL 3000 13.49 0 HUNTINGTON BEACH CA ... 19749601 3679 1659968 37582 20284 0 1 0 0 0
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
674285 23 74000 RENT 3.0 MEDICAL 25000 10.36 1 MANSFIELD MO ... 33180988 5176 1089963 44642 2877 1 6 1 0 0
668250 21 200000 MORTGAGE 2.0 DEBTCONSOLIDATION 25000 13.99 0 SALISBURY MD ... 470634058 5297 1288915 22471 22630 0 5 2 1 0
668321 24 200000 MORTGAGE 3.0 VENTURE 24000 7.49 0 STRUNK KY ... 10067358 6549 22399 11806 13005 0 1 0 0 0
670025 23 215000 MORTGAGE 7.0 MEDICAL 35000 14.79 0 HAWTHORN PA ... 5956835 9079 876038 4556 21588 0 1 0 0 0
2034006 22 59000 RENT 123.0 PERSONAL 35000 16.02 1 FORT WORTH TX ... 142325465 8419 91803 22328 15078 0 1 0 0 0

28638 rows × 23 columns

# Convert into Train and Validation datasets.
import ray

loan_ds = ray.data.from_pandas(loan_w_offline_feature)
train_ds, validation_ds = loan_ds.split_proportionately([0.8])

Define Preprocessors

Preprocessor does last mile processing on Ray Datasets before feeding into training model.

categorical_features = [
    "person_home_ownership",
    "loan_intent",
    "city",
    "state",
    "location_type",
]

from ray.ml.preprocessors import Chain, OrdinalEncoder, SimpleImputer

imputer = SimpleImputer(categorical_features, strategy="most_frequent")
encoder = OrdinalEncoder(columns=categorical_features)
chained_preprocessor = Chain(imputer, encoder)

Train XGBoost model using Ray AIR Trainer

Ray AIR provides a variety of Trainers that are integrated with popular machine learning frameworks. You can train a distributed model at scale leveraging Ray using the intuitive API trainer.fit(). The output is a Ray AIR Checkpoint, that will seamlessly transfer the workload from training to prediction. Let’s take a look!

LABEL = "loan_status"
CHECKPOINT_PATH = "checkpoint"
NUM_WORKERS = 1  # Change this based on the resources in the cluster.


from ray.ml.train.integrations.xgboost import XGBoostTrainer
from ray.air.config import ScalingConfig
params = {
    "tree_method": "approx",
    "objective": "binary:logistic",
    "eval_metric": ["logloss", "error"],
}

trainer = XGBoostTrainer(
    scaling_config=ScalingConfig(
        num_workers=NUM_WORKERS,
        use_gpu=0,
    ),
    label_column=LABEL,
    params=params,
    datasets={"train": train_ds, "validation": validation_ds},
    preprocessor=chained_preprocessor,
    num_boost_round=100,
)
checkpoint = trainer.fit().checkpoint
# This saves the checkpoint onto disk
checkpoint.to_directory(CHECKPOINT_PATH)
/home/ray/anaconda3/lib/python3.8/site-packages/xgboost_ray/main.py:131: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead.
  XGBOOST_LOOSE_VERSION = LooseVersion(xgboost_version)
E0602 14:26:17.861773834    4511 fork_posix.cc:76]           Other threads are currently calling into gRPC, skipping fork() handlers
/home/ray/anaconda3/lib/python3.8/site-packages/tqdm/auto.py:22: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
  from .autonotebook import tqdm as notebook_tqdm
== Status ==
Current time: 2022-06-02 14:26:33 (running for 00:00:14.95)
Memory usage on this node: 3.5/30.6 GiB
Using FIFO scheduling algorithm.
Resources requested: 0/8 CPUs, 0/0 GPUs, 0.0/18.04 GiB heap, 0.0/9.02 GiB objects
Result logdir: /home/ray/ray_results/XGBoostTrainer_2022-06-02_14-26-17
Number of trials: 1/1 (1 TERMINATED)
Trial name status loc iter total time (s) train-logloss train-error validation-logloss
XGBoostTrainer_a3a2c_00000TERMINATED172.31.71.98:12634 100 11.9561 0.0578837 0.0127019 0.225994


(GBDTTrainable pid=12634) 2022-06-02 14:26:23,018	INFO main.py:980 -- [RayXGBoost] Created 1 new actors (1 total actors). Waiting until actors are ready for training.
(GBDTTrainable pid=12634) 2022-06-02 14:26:25,230	INFO main.py:1025 -- [RayXGBoost] Starting XGBoost training.
(GBDTTrainable pid=12634) E0602 14:26:25.231635524   12691 fork_posix.cc:76]           Other threads are currently calling into gRPC, skipping fork() handlers
(_RemoteRayXGBoostActor pid=12769) [14:26:25] task [xgboost.ray]:139712002042896 got new rank 0
Result for XGBoostTrainer_a3a2c_00000:
  date: 2022-06-02_14-26-26
  done: false
  experiment_id: 14b63d641b8e4583b3551b1af113ec6d
  hostname: ip-172-31-71-98
  iterations_since_restore: 1
  node_ip: 172.31.71.98
  pid: 12634
  should_checkpoint: true
  time_since_restore: 5.432286262512207
  time_this_iter_s: 5.432286262512207
  time_total_s: 5.432286262512207
  timestamp: 1654205186
  timesteps_since_restore: 0
  train-error: 0.09502400698384984
  train-logloss: 0.5147884634112437
  training_iteration: 1
  trial_id: a3a2c_00000
  validation-error: 0.1627094972067039
  validation-logloss: 0.5557328870197414
  warmup_time: 0.004194974899291992
  
Result for XGBoostTrainer_a3a2c_00000:
  date: 2022-06-02_14-26-31
  done: false
  experiment_id: 14b63d641b8e4583b3551b1af113ec6d
  hostname: ip-172-31-71-98
  iterations_since_restore: 81
  node_ip: 172.31.71.98
  pid: 12634
  should_checkpoint: true
  time_since_restore: 10.460175275802612
  time_this_iter_s: 0.03925156593322754
  time_total_s: 10.460175275802612
  timestamp: 1654205191
  timesteps_since_restore: 0
  train-error: 0.01802706241815801
  train-logloss: 0.07058723671627426
  training_iteration: 81
  trial_id: a3a2c_00000
  validation-error: 0.0824022346368715
  validation-logloss: 0.22556984905196217
  warmup_time: 0.004194974899291992
  
(GBDTTrainable pid=12634) 2022-06-02 14:26:32,801	INFO main.py:1516 -- [RayXGBoost] Finished XGBoost training on training data with total N=22,910 in 9.81 seconds (7.57 pure XGBoost training time).
Result for XGBoostTrainer_a3a2c_00000:
  date: 2022-06-02_14-26-32
  done: true
  experiment_id: 14b63d641b8e4583b3551b1af113ec6d
  experiment_tag: '0'
  hostname: ip-172-31-71-98
  iterations_since_restore: 100
  node_ip: 172.31.71.98
  pid: 12634
  should_checkpoint: true
  time_since_restore: 11.956074953079224
  time_this_iter_s: 0.022900104522705078
  time_total_s: 11.956074953079224
  timestamp: 1654205192
  timesteps_since_restore: 0
  train-error: 0.01270187690964644
  train-logloss: 0.05788368908741939
  training_iteration: 100
  trial_id: a3a2c_00000
  validation-error: 0.0825768156424581
  validation-logloss: 0.22599449588356768
  warmup_time: 0.004194974899291992
  
2022-06-02 14:26:33,481	INFO tune.py:752 -- Total run time: 15.62 seconds (14.94 seconds for the tuning loop).
'checkpoint'

Inference

Now from the Checkpoint object we obtained from last session, we can construct a Ray AIR Predictor that encapsulates everything needed for inference.

The API for using Predictor is also very intuitive - simply call Predictor.predict().

from ray.ml.checkpoint import Checkpoint
from ray.ml.predictors.integrations.xgboost import XGBoostPredictor
predictor = XGBoostPredictor.from_checkpoint(Checkpoint.from_directory(CHECKPOINT_PATH))
import numpy as np
## Now let's do some prediciton.
loan_request_dict = {
    "zipcode": [76104],
    "dob_ssn": ["19630621_4278"],
    "person_age": [133],
    "person_income": [59000],
    "person_home_ownership": ["RENT"],
    "person_emp_length": [123.0],
    "loan_intent": ["PERSONAL"],
    "loan_amnt": [35000],
    "loan_int_rate": [16.02],
}

# Now augment the request with online features.
zipcode = loan_request_dict["zipcode"][0]
dob_ssn = loan_request_dict["dob_ssn"][0]
online_features =  fs.get_online_features(
    entity_rows=[{"zipcode": zipcode, "dob_ssn": dob_ssn}],
    features=feast_features,
).to_dict()
loan_request_dict.update(online_features)
loan_request_df = pd.DataFrame.from_dict(loan_request_dict, dtype=np.float)
loan_request_df = loan_request_df.drop(["zipcode", "dob_ssn"], axis=1)
display(loan_request_df)
/tmp/ipykernel_4511/2153939661.py:23: DeprecationWarning: `np.float` is a deprecated alias for the builtin `float`. To silence this warning, use `float` by itself. Doing this will not modify any behavior and is safe. If you specifically wanted the numpy scalar type, use `np.float64` here.
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  loan_request_df = pd.DataFrame.from_dict(loan_request_dict, dtype=np.float)
/tmp/ipykernel_4511/2153939661.py:23: FutureWarning: Could not cast to float64, falling back to object. This behavior is deprecated. In a future version, when a dtype is passed to 'DataFrame', either all columns will be cast to that dtype, or a TypeError will be raised.
  loan_request_df = pd.DataFrame.from_dict(loan_request_dict, dtype=np.float)
person_age person_income person_home_ownership person_emp_length loan_intent loan_amnt loan_int_rate state population location_type ... tax_returns_filed student_loan_due missed_payments_1y hard_pulls mortgage_due bankruptcies credit_card_due missed_payments_2y missed_payments_6m vehicle_loan_due
0 133.0 59000.0 RENT 123.0 PERSONAL 35000.0 16.02 NaN NaN NaN ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN

1 rows × 22 columns

# Run through our predictor using `Predictor.predict()` API.
loan_result = np.round(predictor.predict(loan_request_df)["predictions"][0])

if loan_result == 0:
    print("Loan approved!")
elif loan_result == 1:
    print("Loan rejected!")
Loan rejected!