ray.data.DataIterator.to_tf#

DataIterator.to_tf(feature_columns: str | List[str], label_columns: str | List[str], *, additional_columns: str | None | List[str] = None, prefetch_batches: int = 1, batch_size: int = 1, drop_last: bool = False, local_shuffle_buffer_size: int | None = None, local_shuffle_seed: int | None = None, feature_type_spec: tf.TypeSpec | Dict[str, tf.TypeSpec] = None, label_type_spec: tf.TypeSpec | Dict[str, tf.TypeSpec] = None, additional_type_spec: tf.TypeSpec | None | Dict[str, tf.TypeSpec] = None) tf.data.Dataset[source]#

Return a TF Dataset over this dataset.

Warning

If your dataset contains ragged tensors, this method errors. To prevent errors, resize your tensors.

Examples

>>> import ray
>>> ds = ray.data.read_csv(
...     "s3://anonymous@air-example-data/iris.csv"
... )
>>> it = ds.iterator(); it
DataIterator(Dataset(
   num_rows=?,
   schema={
      sepal length (cm): double,
      sepal width (cm): double,
      petal length (cm): double,
      petal width (cm): double,
      target: int64
   }
))

If your model accepts a single tensor as input, specify a single feature column.

>>> it.to_tf(feature_columns="sepal length (cm)", label_columns="target")
<_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>

If your model accepts a dictionary as input, specify a list of feature columns.

>>> it.to_tf(["sepal length (cm)", "sepal width (cm)"], "target")
<_OptionsDataset element_spec=({'sepal length (cm)': TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), 'sepal width (cm)': TensorSpec(shape=(None,), dtype=tf.float64, name='sepal width (cm)')}, TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>

If your dataset contains multiple features but your model accepts a single tensor as input, combine features with Concatenator.

>>> from ray.data.preprocessors import Concatenator
>>> columns_to_concat = ["sepal length (cm)", "sepal width (cm)", "petal length (cm)", "petal width (cm)"]
>>> preprocessor = Concatenator(columns=columns_to_concat, output_column_name="features")
>>> it = preprocessor.transform(ds).iterator()
>>> it
DataIterator(Concatenator
+- Dataset(
      num_rows=?,
      schema={
         sepal length (cm): double,
         sepal width (cm): double,
         petal length (cm): double,
         petal width (cm): double,
         target: int64
      }
   ))
>>> it.to_tf("features", "target")
<_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float64, name='features'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>

If your model accepts different types, shapes, or names of tensors as input, specify the type spec. If type specs are not specified, they are automatically inferred from the schema of the iterator.

>>> import tensorflow as tf
>>> it.to_tf(
...     feature_columns="features",
...     label_columns="target",
...     feature_type_spec=tf.TensorSpec(shape=(None, 4), dtype=tf.float32, name="features"),
...     label_type_spec=tf.TensorSpec(shape=(None,), dtype=tf.float32, name="label")
... )
<_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float32, name='features'), TensorSpec(shape=(None,), dtype=tf.float32, name='label'))>

If your model accepts additional metadata aside from features and label, specify a single additional column or a list of additional columns. A common use case is to include sample weights in the data samples and train a tf.keras.Model with tf.keras.Model.fit.

>>> ds = ds.add_column("sample weights", lambda df: 1)
>>> it = ds.iterator()
>>> it.to_tf(feature_columns="sepal length (cm)", label_columns="target", additional_columns="sample weights")
<_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'), TensorSpec(shape=(None,), dtype=tf.int64, name='sample weights'))>

If your model accepts different types, shapes, or names for the additional metadata, specify the type spec of the additional column.

>>> it.to_tf(
...     feature_columns="sepal length (cm)",
...     label_columns="target",
...     additional_columns="sample weights",
...     additional_type_spec=tf.TensorSpec(shape=(None,), dtype=tf.float32, name="weight")
... )
<_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'), TensorSpec(shape=(None,), dtype=tf.float32, name='weight'))>
Parameters:
  • feature_columns – Columns that correspond to model inputs. If this is a string, the input data is a tensor. If this is a list, the input data is a dict that maps column names to their tensor representation.

  • label_columns – Columns that correspond to model targets. If this is a string, the target data is a tensor. If this is a list, the target data is a dict that maps column names to their tensor representation.

  • additional_columns – Columns that correspond to sample weights or other metadata. If this is a string, the weight data is a tensor. If this is a list, the weight data is a dict that maps column names to their tensor representation.

  • prefetch_batches – The number of batches to fetch ahead of the current batch to fetch. If set to greater than 0, a separate threadpool will be used to fetch the objects to the local node, format the batches, and apply the collate_fn. Defaults to 1.

  • batch_size – Record batch size. Defaults to 1.

  • drop_last – Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. Defaults to False.

  • local_shuffle_buffer_size – If non-None, the data will be randomly shuffled using a local in-memory shuffle buffer, and this value will serve as the minimum number of rows that must be in the local in-memory shuffle buffer in order to yield a batch. When there are no more rows to add to the buffer, the remaining rows in the buffer will be drained. This buffer size must be greater than or equal to batch_size, and therefore batch_size must also be specified when using local shuffling.

  • local_shuffle_seed – The seed to use for the local random shuffle.

  • feature_type_spec – The tf.TypeSpec of feature_columns. If there is only one column, specify a tf.TypeSpec. If there are multiple columns, specify a dict that maps column names to their tf.TypeSpec. Default is None to automatically infer the type of each column.

  • label_type_spec – The tf.TypeSpec of label_columns. If there is only one column, specify a tf.TypeSpec. If there are multiple columns, specify a dict that maps column names to their tf.TypeSpec. Default is None to automatically infer the type of each column.

  • additional_type_spec – The tf.TypeSpec of additional_columns. If there is only one column, specify a tf.TypeSpec. If there are multiple columns, specify a dict that maps column names to their tf.TypeSpec. Default is None to automatically infer the type of each column.

Returns:

A tf.data.Dataset that yields inputs and targets.

PublicAPI (beta): This API is in beta and may change before becoming stable.