Joining datasets#
Note
This is a new feature released in Ray 2.46. Note, this is an experimental feature and some things might not work as expected.
Ray Data allows multiple Dataset
instances to be joined using different join types (left/right/full outer, inner, etc) based
on the provided key columns like following:
import ray
doubles_ds = ray.data.range(4).map(
lambda row: {"id": row["id"], "double": int(row["id"]) * 2}
)
squares_ds = ray.data.range(4).map(
lambda row: {"id": row["id"], "square": int(row["id"]) ** 2}
)
doubles_and_squares_ds = doubles_ds.join(
squares_ds,
join_type="inner",
num_partitions=2,
on=("id",),
)
Internally joins are currently powered by the hash-shuffle backend.
Configuring Joins#
Joins are generally memory-intensive operations that require accurate memory accounting and projection and hence are sensitive to skews and imbalances in the dataset.
Ray Data provides following levers to allow to tune up performance of joins for your workload:
num_partitions
: (required) specifies number of partitions both incoming datasets will be hash-partitioned into. Check out configuring number of partitions section for guidance on how to tune this up.partition_size_hint
: (optional) Hint to joining operator about the estimated avg expected size of the individual partition (in bytes). If not specified, defaults to DataContext.target_max_block_size (128Mb by default). - Note that,num_partitions * partition_size_hint
should ideally be approximating actual dataset size, iepartition_size_hint
could be estimated as dataset size divided bynum_partitions
(assuming relatively evenly sized partitions) - However, in cases when dataset partitioning is expected to be heavily skewedpartition_size_hint
should approximate largest partition to prevent Out-of-Memory (OOM) errors
Note
Be mindful that by default Ray reserves only 30% of the memory for its Object Store. This is recommended to be set at least to *50%* for all Ray Data workloads, but especially so for ones utilizing joins.
To configure Object Store to be 50% add to your image:
RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION=0.5
Configuring number of partitions#
Number of partitions (also referred to as blocks) provide an important trade-off between the size of individual batch of rows handled by individual tasks against memory requirements of the operation performed on them
Rule of thumb: keep partitions large, but not too large to cause Out-of-Memory (OOM) errors
It’s important to not “oversize” partitions for joins as that could lead to OOM errors (if joined partitions might be too large to fit in memory)
It’s also important to not create too many small partitions as this creates an overhead of passing large amount of smaller objects
Configuring number of Aggregators#
“Aggregators” are worker actors that perform actual joins/aggregations/shuffling, they receive individual partition chunks from the incoming blocks and subsequently “aggregate” them in the way that’s required to perform given operation.
Following are important considerations for successfully configuring number of aggregators in your pool:
Defaults to 64 or
num_partitions
(in cases when there are less than 64 partitions)Individual Aggregators might be assigned to handle more than one partition (partitions are evenly split in round-robin fashion among the aggregators)
Aggregators are stateful components that hold the state (partitions) during shuffling in memory
Note
Rule of thumb is to avoid setting `num_partitions` >> number of aggregators as it might create bottlenecks
Setting
DataContext.max_hash_shuffle_aggregators
caps the number of aggregatorsSetting it to large enough value has an effect of allocating 1 partition to 1 aggregator (when
max_hash_shuffle_aggregators >= num_partitions
)