ray.data.read_mongo#

ray.data.read_mongo(uri: str, database: str, collection: str, *, pipeline: Optional[List[Dict]] = None, schema: Optional[pymongoarrow.api.Schema] = None, parallelism: int = - 1, ray_remote_args: Dict[str, Any] = None, **mongo_args) ray.data.dataset.Dataset[source]#

Create a Dataset from a MongoDB database.

The data to read from is specified via the uri, database and collection of the MongoDB. The dataset is created from the results of executing pipeline against the collection. If pipeline is None, the entire collection is read.

To read the MongoDB in parallel, the execution of the pipeline is run on partitions of the collection, with a Ray read task to handle a partition. Partitions are created in an attempt to evenly distribute the documents into the specified number of partitions. The number of partitions is determined by parallelism which can be requested from this interface or automatically chosen if unspecified (see the parallelism arg below).

Examples

>>> import ray
>>> from pymongoarrow.api import Schema 
>>> ds = ray.data.read_mongo( 
...     uri="mongodb://username:[email protected]:27017/?authSource=admin", # noqa: E501
...     database="my_db",
...     collection="my_collection",
...     pipeline=[{"$match": {"col2": {"$gte": 0, "$lt": 100}}}, {"$sort": "sort_field"}], # noqa: E501
...     schema=Schema({"col1": pa.string(), "col2": pa.int64()}),
...     parallelism=10,
... )
Parameters
  • uri – The URI of the source MongoDB where the dataset is read from. For the URI format, see details in the MongoDB docs.

  • database – The name of the database hosted in the MongoDB. This database must exist otherwise ValueError is raised.

  • collection – The name of the collection in the database. This collection must exist otherwise ValueError is raised.

  • pipeline – A MongoDB pipeline, which is executed on the given collection with results used to create Dataset. If None, the entire collection will be read.

  • schema – The schema used to read the collection. If None, it’ll be inferred from the results of pipeline.

  • parallelism – The requested parallelism of the read. Defaults to -1, which automatically determines the optimal parallelism for your configuration. You should not need to manually set this value in most cases. For details on how the parallelism is automatically determined and guidance on how to tune it, see Tuning read parallelism.

  • ray_remote_args – kwargs passed to remote() in the read tasks.

  • mongo_args – kwargs passed to aggregate_arrow_all() in pymongoarrow in producing Arrow-formatted results.

Returns

Dataset producing rows from the results of executing the pipeline on the specified MongoDB collection.

Raises
  • ValueError – if database doesn’t exist.

  • ValueError – if collection doesn’t exist.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.