ray.data.read_mongo#
- ray.data.read_mongo(uri: str, database: str, collection: str, *, pipeline: List[Dict] | None = None, schema: pymongoarrow.api.Schema | None = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, concurrency: int | None = None, override_num_blocks: int | None = None, **mongo_args) Dataset [source]#
Create a
Dataset
from a MongoDB database.The data to read from is specified via the
uri
,database
andcollection
of the MongoDB. The dataset is created from the results of executingpipeline
against thecollection
. Ifpipeline
is None, the entirecollection
is read.Tip
For more details about these MongoDB concepts, see the following: - URI: https://www.mongodb.com/docs/manual/reference/connection-string/ - Database and Collection: https://www.mongodb.com/docs/manual/core/databases-and-collections/ - Pipeline: https://www.mongodb.com/docs/manual/core/aggregation-pipeline/
To read the MongoDB in parallel, the execution of the pipeline is run on partitions of the collection, with a Ray read task to handle a partition. Partitions are created in an attempt to evenly distribute the documents into the specified number of partitions. The number of partitions is determined by
parallelism
which can be requested from this interface or automatically chosen if unspecified (see theparallelism
arg below).Examples
>>> import ray >>> from pymongoarrow.api import Schema >>> ds = ray.data.read_mongo( ... uri="mongodb://username:[email protected]:27017/?authSource=admin", # noqa: E501 ... database="my_db", ... collection="my_collection", ... pipeline=[{"$match": {"col2": {"$gte": 0, "$lt": 100}}}, {"$sort": "sort_field"}], # noqa: E501 ... schema=Schema({"col1": pa.string(), "col2": pa.int64()}), ... override_num_blocks=10, ... )
- Parameters:
uri – The URI of the source MongoDB where the dataset is read from. For the URI format, see details in the MongoDB docs.
database – The name of the database hosted in the MongoDB. This database must exist otherwise ValueError is raised.
collection – The name of the collection in the database. This collection must exist otherwise ValueError is raised.
pipeline – A MongoDB pipeline, which is executed on the given collection with results used to create Dataset. If None, the entire collection will be read.
schema – The schema used to read the collection. If None, it’ll be inferred from the results of pipeline.
parallelism – This argument is deprecated. Use
override_num_blocks
argument.ray_remote_args – kwargs passed to
ray.remote()
in the read tasks.concurrency – The maximum number of Ray tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn’t change the total number of tasks run or the total number of output blocks. By default, concurrency is dynamically decided based on the available resources.
override_num_blocks – Override the number of output blocks from all read tasks. By default, the number of output blocks is dynamically decided based on input data size and available resources. You shouldn’t manually set this value in most cases.
mongo_args – kwargs passed to aggregate_arrow_all() in pymongoarrow in producing Arrow-formatted results.
- Returns:
Dataset
producing rows from the results of executing the pipeline on the specified MongoDB collection.- Raises:
ValueError – if
database
doesn’t exist.ValueError – if
collection
doesn’t exist.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.