ray.data.DataIterator.to_tf#
- DataIterator.to_tf(feature_columns: str | List[str], label_columns: str | List[str], *, additional_columns: str | None | List[str] = None, prefetch_batches: int = 1, batch_size: int = 1, drop_last: bool = False, local_shuffle_buffer_size: int | None = None, local_shuffle_seed: int | None = None, feature_type_spec: tf.TypeSpec | Dict[str, tf.TypeSpec] = None, label_type_spec: tf.TypeSpec | Dict[str, tf.TypeSpec] = None, additional_type_spec: tf.TypeSpec | None | Dict[str, tf.TypeSpec] = None) tf.data.Dataset [source]#
Return a TF Dataset over this dataset.
Warning
If your dataset contains ragged tensors, this method errors. To prevent errors, resize your tensors.
Examples
>>> import ray >>> ds = ray.data.read_csv( ... "s3://anonymous@air-example-data/iris.csv" ... ) >>> it = ds.iterator(); it DataIterator(Dataset( num_rows=?, schema={ sepal length (cm): double, sepal width (cm): double, petal length (cm): double, petal width (cm): double, target: int64 } ))
If your model accepts a single tensor as input, specify a single feature column.
>>> it.to_tf(feature_columns="sepal length (cm)", label_columns="target") <_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>
If your model accepts a dictionary as input, specify a list of feature columns.
>>> it.to_tf(["sepal length (cm)", "sepal width (cm)"], "target") <_OptionsDataset element_spec=({'sepal length (cm)': TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), 'sepal width (cm)': TensorSpec(shape=(None,), dtype=tf.float64, name='sepal width (cm)')}, TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>
If your dataset contains multiple features but your model accepts a single tensor as input, combine features with
Concatenator
.>>> from ray.data.preprocessors import Concatenator >>> columns_to_concat = ["sepal length (cm)", "sepal width (cm)", "petal length (cm)", "petal width (cm)"] >>> preprocessor = Concatenator(columns=columns_to_concat, output_column_name="features") >>> it = preprocessor.transform(ds).iterator() >>> it DataIterator(Concatenator +- Dataset( num_rows=?, schema={ sepal length (cm): double, sepal width (cm): double, petal length (cm): double, petal width (cm): double, target: int64 } )) >>> it.to_tf("features", "target") <_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float64, name='features'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>
If your model accepts different types, shapes, or names of tensors as input, specify the type spec. If type specs are not specified, they are automatically inferred from the schema of the iterator.
>>> import tensorflow as tf >>> it.to_tf( ... feature_columns="features", ... label_columns="target", ... feature_type_spec=tf.TensorSpec(shape=(None, 4), dtype=tf.float32, name="features"), ... label_type_spec=tf.TensorSpec(shape=(None,), dtype=tf.float32, name="label") ... ) <_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float32, name='features'), TensorSpec(shape=(None,), dtype=tf.float32, name='label'))>
If your model accepts additional metadata aside from features and label, specify a single additional column or a list of additional columns. A common use case is to include sample weights in the data samples and train a
tf.keras.Model
withtf.keras.Model.fit
.>>> import pandas as pd >>> ds = ds.add_column("sample weights", lambda df: pd.Series([1] * len(df))) >>> it = ds.iterator() >>> it.to_tf(feature_columns="sepal length (cm)", label_columns="target", additional_columns="sample weights") <_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'), TensorSpec(shape=(None,), dtype=tf.int64, name='sample weights'))>
If your model accepts different types, shapes, or names for the additional metadata, specify the type spec of the additional column.
>>> it.to_tf( ... feature_columns="sepal length (cm)", ... label_columns="target", ... additional_columns="sample weights", ... additional_type_spec=tf.TensorSpec(shape=(None,), dtype=tf.float32, name="weight") ... ) <_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'), TensorSpec(shape=(None,), dtype=tf.float32, name='weight'))>
- Parameters:
feature_columns – Columns that correspond to model inputs. If this is a string, the input data is a tensor. If this is a list, the input data is a
dict
that maps column names to their tensor representation.label_columns – Columns that correspond to model targets. If this is a string, the target data is a tensor. If this is a list, the target data is a
dict
that maps column names to their tensor representation.additional_columns – Columns that correspond to sample weights or other metadata. If this is a string, the weight data is a tensor. If this is a list, the weight data is a
dict
that maps column names to their tensor representation.prefetch_batches – The number of batches to fetch ahead of the current batch to fetch. If set to greater than 0, a separate threadpool will be used to fetch the objects to the local node, format the batches, and apply the collate_fn. Defaults to 1.
batch_size – Record batch size. Defaults to 1.
drop_last – Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. Defaults to False.
local_shuffle_buffer_size – If non-None, the data will be randomly shuffled using a local in-memory shuffle buffer, and this value will serve as the minimum number of rows that must be in the local in-memory shuffle buffer in order to yield a batch. When there are no more rows to add to the buffer, the remaining rows in the buffer will be drained. This buffer size must be greater than or equal to
batch_size
, and thereforebatch_size
must also be specified when using local shuffling.local_shuffle_seed – The seed to use for the local random shuffle.
feature_type_spec – The
tf.TypeSpec
offeature_columns
. If there is only one column, specify atf.TypeSpec
. If there are multiple columns, specify adict
that maps column names to theirtf.TypeSpec
. Default isNone
to automatically infer the type of each column.label_type_spec – The
tf.TypeSpec
oflabel_columns
. If there is only one column, specify atf.TypeSpec
. If there are multiple columns, specify adict
that maps column names to theirtf.TypeSpec
. Default isNone
to automatically infer the type of each column.additional_type_spec – The
tf.TypeSpec
ofadditional_columns
. If there is only one column, specify atf.TypeSpec
. If there are multiple columns, specify adict
that maps column names to theirtf.TypeSpec
. Default isNone
to automatically infer the type of each column.
- Returns:
A
tf.data.Dataset
that yields inputs and targets.