ray.data.Dataset.join#
- Dataset.join(ds: Dataset, join_type: str, num_partitions: int, on: Tuple[str] = ('id',), right_on: Tuple[str] | None = None, left_suffix: str | None = None, right_suffix: str | None = None, *, partition_size_hint: int | None = None, aggregator_ray_remote_args: Dict[str, Any] | None = None, validate_schemas: bool = False) Dataset [source]#
Join
Datasets
on join keys.- Parameters:
ds – Other dataset to join against
join_type – The kind of join that should be performed, one of (“inner”, “left_outer”, “right_outer”, “full_outer”)
num_partitions – Total number of “partitions” input sequences will be split into with each partition being joined independently. Increasing number of partitions allows to reduce individual partition size, hence reducing memory requirements when individual partitions are being joined. Note that, consequently, this will also be a total number of blocks that will be produced as a result of executing join.
on – The columns from the left operand that will be used as keys for the join operation.
right_on – The columns from the right operand that will be used as keys for the join operation. When none,
on
will be assumed to be a list of columns to be used for the right dataset as well.left_suffix – (Optional) Suffix to be appended for columns of the left operand.
right_suffix – (Optional) Suffix to be appended for columns of the right operand.
partition_size_hint – (Optional) Hint to joining operator about the estimated avg expected size of the individual partition (in bytes). This is used in estimating the total dataset size and allow to tune memory requirement of the individual joining workers to prevent OOMs when joining very large datasets.
aggregator_ray_remote_args – (Optional) Parameter overriding
ray.remote
args passed when constructing joining (aggregator) workers.validate_schemas – (Optional) Controls whether validation of provided configuration against input schemas will be performed (defaults to false, since obtaining schemas could be prohibitively expensive).
- Returns:
A
Dataset
that holds rows of input left Dataset joined with the right Dataset based on join type and keys.
Note
This operation requires all inputs to be materialized in object store for it to execute.
Examples:
doubles_ds = ray.data.range(4).map( lambda row: {"id": row["id"], "double": int(row["id"]) * 2} ) squares_ds = ray.data.range(4).map( lambda row: {"id": row["id"], "square": int(row["id"]) ** 2} ) joined_ds = doubles_ds.join( squares_ds, join_type="inner", num_partitions=2, on=("id",), ) print(sorted(joined_ds.take_all(), key=lambda item: item["id"]))
[ {'id': 0, 'double': 0, 'square': 0}, {'id': 1, 'double': 2, 'square': 1}, {'id': 2, 'double': 4, 'square': 4}, {'id': 3, 'double': 6, 'square': 9} ]