ray.data.read_mongo(uri: str, database: str, collection: str, *, pipeline: Optional[List[Dict]] = None, schema: Optional[pymongoarrow.api.Schema] = None, parallelism: int = - 1, ray_remote_args: Dict[str, Any] = None, **mongo_args) ray.data.dataset.Dataset[ray.data._internal.arrow_block.ArrowRow][source]#

Create an Arrow dataset from MongoDB.

The data to read from is specified via the uri, database and collection of the MongoDB. The dataset is created from the results of executing pipeline against the collection. If pipeline is None, the entire collection will be read.

You can check out more details here about these MongoDB concepts: - URI: https://www.mongodb.com/docs/manual/reference/connection-string/ - Database and Collection: https://www.mongodb.com/docs/manual/core/databases-and-collections/ - Pipeline: https://www.mongodb.com/docs/manual/core/aggregation-pipeline/

To read the MongoDB in parallel, the execution of the pipeline is run on partitions of the collection, with a Ray read task to handle a partition. Partitions are created in an attempt to evenly distribute the documents into the specified number of partitions. The number of partitions is determined by parallelism which can be requested from this interface or automatically chosen if unspecified (see the parallelism arg below).


>>> import ray
>>> from pymongoarrow.api import Schema 
>>> ds = ray.data.read_mongo( 
...     uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin", # noqa: E501
...     database="my_db",
...     collection="my_collection",
...     pipeline=[{"$match": {"col2": {"$gte": 0, "$lt": 100}}}, {"$sort": "sort_field"}], # noqa: E501
...     schema=Schema({"col1": pa.string(), "col2": pa.int64()}),
...     parallelism=10,
... )
  • uri – The URI of the source MongoDB where the dataset will be read from. For the URI format, see details in https://www.mongodb.com/docs/manual/reference/connection-string/.

  • database – The name of the database hosted in the MongoDB. This database must exist otherwise ValueError will be raised.

  • collection – The name of the collection in the database. This collection must exist otherwise ValueError will be raised.

  • pipeline – A MongoDB pipeline, which will be executed on the given collection with results used to create Dataset. If None, the entire collection will be read.

  • schema – The schema used to read the collection. If None, it’ll be inferred from the results of pipeline.

  • parallelism – The requested parallelism of the read. If -1, it will be automatically chosen based on the available cluster resources and estimated in-memory data size.

  • ray_remote_args – kwargs passed to ray.remote in the read tasks.

  • mong_args – kwargs passed to aggregate_arrow_all() in pymongoarrow in producing Arrow-formatted results.


Dataset holding Arrow records from the results of executing the pipeline on the specified MongoDB collection.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.