Source code for ray.rllib.env.wrappers.pettingzoo_env

from ray.rllib.env.multi_agent_env import MultiAgentEnv


[docs]class PettingZooEnv(MultiAgentEnv): """An interface to the PettingZoo MARL environment library. See: https://github.com/PettingZoo-Team/PettingZoo Inherits from MultiAgentEnv and exposes a given AEC (actor-environment-cycle) game from the PettingZoo project via the MultiAgentEnv public API. Note that the wrapper has some important limitations: 1. All agents have the same action_spaces and observation_spaces. Note: If, within your aec game, agents do not have homogeneous action / observation spaces, apply SuperSuit wrappers to apply padding functionality: https://github.com/PettingZoo-Team/ SuperSuit#built-in-multi-agent-only-functions 2. Environments are positive sum games (-> Agents are expected to cooperate to maximize reward). This isn't a hard restriction, it just that standard algorithms aren't expected to work well in highly competitive games. Examples: >>> from pettingzoo.butterfly import prison_v2 >>> env = PettingZooEnv(prison_v2.env()) >>> obs = env.reset() >>> print(obs) # only returns the observation for the agent which should be stepping { 'prisoner_0': array([[[0, 0, 0], [0, 0, 0], [0, 0, 0], ..., [0, 0, 0], [0, 0, 0], [0, 0, 0]]], dtype=uint8) } >>> obs, rewards, dones, infos = env.step({ ... "prisoner_0": 1 ... }) # only returns the observation, reward, info, etc, for # the agent who's turn is next. >>> print(obs) { 'prisoner_1': array([[[0, 0, 0], [0, 0, 0], [0, 0, 0], ..., [0, 0, 0], [0, 0, 0], [0, 0, 0]]], dtype=uint8) } >>> print(rewards) { 'prisoner_1': 0 } >>> print(dones) { 'prisoner_1': False, '__all__': False } >>> print(infos) { 'prisoner_1': {'map_tuple': (1, 0)} } """ def __init__(self, env): self.env = env # agent idx list self.agents = self.env.possible_agents # Get dictionaries of obs_spaces and act_spaces self.observation_spaces = self.env.observation_spaces self.action_spaces = self.env.action_spaces # Get first observation space, assuming all agents have equal space self.observation_space = self.observation_spaces[self.agents[0]] # Get first action space, assuming all agents have equal space self.action_space = self.action_spaces[self.agents[0]] assert all(obs_space == self.observation_space for obs_space in self.env.observation_spaces.values()), \ "Observation spaces for all agents must be identical. Perhaps " \ "SuperSuit's pad_observations wrapper can help (useage: " \ "`supersuit.aec_wrappers.pad_observations(env)`" assert all(act_space == self.action_space for act_space in self.env.action_spaces.values()), \ "Action spaces for all agents must be identical. Perhaps " \ "SuperSuit's pad_action_space wrapper can help (useage: " \ "`supersuit.aec_wrappers.pad_action_space(env)`" self.reset()
[docs] def reset(self): self.env.reset() return { self.env.agent_selection: self.env.observe( self.env.agent_selection) }
[docs] def step(self, action): self.env.step(action[self.env.agent_selection]) obs_d = {} rew_d = {} done_d = {} info_d = {} while self.env.agents: obs, rew, done, info = self.env.last() a = self.env.agent_selection obs_d[a] = obs rew_d[a] = rew done_d[a] = done info_d[a] = info if self.env.dones[self.env.agent_selection]: self.env.step(None) else: break all_done = not self.env.agents done_d["__all__"] = all_done return obs_d, rew_d, done_d, info_d
def close(self): self.env.close() def seed(self, seed=None): self.env.seed(seed) def render(self, mode="human"): return self.env.render(mode)
class ParallelPettingZooEnv(MultiAgentEnv): def __init__(self, env): self.par_env = env # agent idx list self.agents = self.par_env.possible_agents # Get dictionaries of obs_spaces and act_spaces self.observation_spaces = self.par_env.observation_spaces self.action_spaces = self.par_env.action_spaces # Get first observation space, assuming all agents have equal space self.observation_space = self.observation_spaces[self.agents[0]] # Get first action space, assuming all agents have equal space self.action_space = self.action_spaces[self.agents[0]] assert all(obs_space == self.observation_space for obs_space in self.par_env.observation_spaces.values()), \ "Observation spaces for all agents must be identical. Perhaps " \ "SuperSuit's pad_observations wrapper can help (useage: " \ "`supersuit.aec_wrappers.pad_observations(env)`" assert all(act_space == self.action_space for act_space in self.par_env.action_spaces.values()), \ "Action spaces for all agents must be identical. Perhaps " \ "SuperSuit's pad_action_space wrapper can help (useage: " \ "`supersuit.aec_wrappers.pad_action_space(env)`" self.reset() def reset(self): return self.par_env.reset() def step(self, action_dict): obss, rews, dones, infos = self.par_env.step(action_dict) dones["__all__"] = all(dones.values()) return obss, rews, dones, infos def close(self): self.par_env.close() def seed(self, seed=None): self.par_env.seed(seed) def render(self, mode="human"): return self.par_env.render(mode)