ray.data.read_mongo(uri: str, database: str, collection: str, *, pipeline: List[Dict] | None = None, schema: pymongoarrow.api.Schema | None = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, concurrency: int | None = None, override_num_blocks: int | None = None, **mongo_args) Dataset[source]#

Create a Dataset from a MongoDB database.

The data to read from is specified via the uri, database and collection of the MongoDB. The dataset is created from the results of executing pipeline against the collection. If pipeline is None, the entire collection is read.

To read the MongoDB in parallel, the execution of the pipeline is run on partitions of the collection, with a Ray read task to handle a partition. Partitions are created in an attempt to evenly distribute the documents into the specified number of partitions. The number of partitions is determined by parallelism which can be requested from this interface or automatically chosen if unspecified (see the parallelism arg below).


>>> import ray
>>> from pymongoarrow.api import Schema 
>>> ds = ray.data.read_mongo( 
...     uri="mongodb://username:[email protected]:27017/?authSource=admin", # noqa: E501
...     database="my_db",
...     collection="my_collection",
...     pipeline=[{"$match": {"col2": {"$gte": 0, "$lt": 100}}}, {"$sort": "sort_field"}], # noqa: E501
...     schema=Schema({"col1": pa.string(), "col2": pa.int64()}),
...     override_num_blocks=10,
... )
  • uri – The URI of the source MongoDB where the dataset is read from. For the URI format, see details in the MongoDB docs.

  • database – The name of the database hosted in the MongoDB. This database must exist otherwise ValueError is raised.

  • collection – The name of the collection in the database. This collection must exist otherwise ValueError is raised.

  • pipeline – A MongoDB pipeline, which is executed on the given collection with results used to create Dataset. If None, the entire collection will be read.

  • schema – The schema used to read the collection. If None, it’ll be inferred from the results of pipeline.

  • parallelism – This argument is deprecated. Use override_num_blocks argument.

  • ray_remote_args – kwargs passed to remote() in the read tasks.

  • concurrency – The maximum number of Ray tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn’t change the total number of tasks run or the total number of output blocks. By default, concurrency is dynamically decided based on the available resources.

  • override_num_blocks – Override the number of output blocks from all read tasks. By default, the number of output blocks is dynamically decided based on input data size and available resources. You shouldn’t manually set this value in most cases.

  • mongo_args – kwargs passed to aggregate_arrow_all() in pymongoarrow in producing Arrow-formatted results.


Dataset producing rows from the results of executing the pipeline on the specified MongoDB collection.


PublicAPI (alpha): This API is in alpha and may change before becoming stable.