ray.util.queue.Queue#

class ray.util.queue.Queue(maxsize: int = 0, actor_options: Optional[Dict] = None)[source]#

Bases: object

A first-in, first-out queue implementation on Ray.

The behavior and use cases are similar to those of the asyncio.Queue class.

Features both sync and async put and get methods. Provides the option to block until space is available when calling put on a full queue, or to block until items are available when calling get on an empty queue.

Optionally supports batched put and get operations to minimize serialization overhead.

Parameters
  • maxsize (optional, int) – maximum size of the queue. If zero, size is unbounded.

  • actor_options (optional, Dict) – Dictionary of options to pass into the QueueActor during creation. These are directly passed into QueueActor.options(…). This could be useful if you need to pass in custom resource requirements, for example.

Examples

>>> from ray.util.queue import Queue
>>> q = Queue() 
>>> items = list(range(10)) 
>>> for item in items: 
...     q.put(item) 
>>> for item in items: 
...     assert item == q.get() 
>>> # Create Queue with the underlying actor reserving 1 CPU.
>>> q = Queue(actor_options={"num_cpus": 1}) 

PublicAPI (beta): This API is in beta and may change before becoming stable.

size() int[source]#

The size of the queue.

qsize() int[source]#

The size of the queue.

empty() bool[source]#

Whether the queue is empty.

full() bool[source]#

Whether the queue is full.

put(item: Any, block: bool = True, timeout: Optional[float] = None) None[source]#

Adds an item to the queue.

If block is True and the queue is full, blocks until the queue is no longer full or until timeout.

There is no guarantee of order if multiple producers put to the same full queue.

Raises
  • Full – if the queue is full and blocking is False.

  • Full – if the queue is full, blocking is True, and it timed out.

  • ValueError – if timeout is negative.

async put_async(item: Any, block: bool = True, timeout: Optional[float] = None) None[source]#

Adds an item to the queue.

If block is True and the queue is full, blocks until the queue is no longer full or until timeout.

There is no guarantee of order if multiple producers put to the same full queue.

Raises
  • Full – if the queue is full and blocking is False.

  • Full – if the queue is full, blocking is True, and it timed out.

  • ValueError – if timeout is negative.

get(block: bool = True, timeout: Optional[float] = None) Any[source]#

Gets an item from the queue.

If block is True and the queue is empty, blocks until the queue is no longer empty or until timeout.

There is no guarantee of order if multiple consumers get from the same empty queue.

Returns

The next item in the queue.

Raises
  • Empty – if the queue is empty and blocking is False.

  • Empty – if the queue is empty, blocking is True, and it timed out.

  • ValueError – if timeout is negative.

async get_async(block: bool = True, timeout: Optional[float] = None) Any[source]#

Gets an item from the queue.

There is no guarantee of order if multiple consumers get from the same empty queue.

Returns

The next item in the queue.

Raises
  • Empty – if the queue is empty and blocking is False.

  • Empty – if the queue is empty, blocking is True, and it timed out.

  • ValueError – if timeout is negative.

put_nowait(item: Any) None[source]#

Equivalent to put(item, block=False).

Raises

Full – if the queue is full.

put_nowait_batch(items: collections.abc.Iterable) None[source]#

Takes in a list of items and puts them into the queue in order.

Raises

Full – if the items will not fit in the queue

get_nowait() Any[source]#

Equivalent to get(block=False).

Raises

Empty – if the queue is empty.

get_nowait_batch(num_items: int) List[Any][source]#

Gets items from the queue and returns them in a list in order.

Raises

Empty – if the queue does not contain the desired number of items

shutdown(force: bool = False, grace_period_s: int = 5) None[source]#

Terminates the underlying QueueActor.

All of the resources reserved by the queue will be released.

Parameters
  • force – If True, forcefully kill the actor, causing an immediate failure. If False, graceful actor termination will be attempted first, before falling back to a forceful kill.

  • grace_period_s – If force is False, how long in seconds to wait for graceful termination before falling back to forceful kill.