# Copyright (c) Microsoft. All rights reserved. # Licensed under the MIT license. See LICENSE.md file in the project root # for full license information. # ============================================================================== from argparse import ArgumentParser import gym import numpy as np from cntk.core import Value from cntk.initializer import he_uniform from cntk.layers import Sequential, Convolution2D, Dense, default_options from cntk.layers.typing import Signature, Tensor from cntk.learners import adam, learning_parameter_schedule, momentum_schedule from cntk.logging import TensorBoardProgressWriter from cntk.ops import abs, argmax, element_select, less, relu, reduce_max, reduce_sum, square from cntk.ops.functions import CloneMethod, Function from cntk.train import Trainer class ReplayMemory(object): """ ReplayMemory keeps track of the environment dynamic. We store all the transitions (s(t), action, s(t+1), reward, done). The replay memory allows us to efficiently sample minibatches from it, and generate the correct state representation (w.r.t the number of previous frames needed). """ def __init__(self, size, sample_shape, history_length=4): self._pos = 0 self._count = 0 self._max_size = size self._history_length = max(1, history_length) self._state_shape = sample_shape self._states = np.zeros((size,) + sample_shape, dtype=np.float32) self._actions = np.zeros(size, dtype=np.uint8) self._rewards = np.zeros(size, dtype=np.float32) self._terminals = np.zeros(size, dtype=np.float32) def __len__(self): """ Returns the number of items currently present in the memory Returns: Int >= 0 """ return self._count def append(self, state, action, reward, done): """ Appends the specified transition to the memory. Attributes: state (Tensor[sample_shape]): The state to append action (int): An integer representing the action done reward (float): An integer representing the reward received for doing this action done (bool): A boolean specifying if this state is a terminal (episode has finished) """ assert state.shape == self._state_shape, \ 'Invalid state shape (required: %s, got: %s)' % (self._state_shape, state.shape) self._states[self._pos] = state self._actions[self._pos] = action self._rewards[self._pos] = reward self._terminals[self._pos] = done self._count = max(self._count, self._pos + 1) self._pos = (self._pos + 1) % self._max_size def sample(self, size): """ Generate size random integers mapping indices in the memory. The returned indices can be retrieved using #get_state(). See the method #minibatch() if you want to retrieve samples directly. Attributes: size (int): The minibatch size Returns: Indexes of the sampled states ([int]) """ # Local variable access is faster in loops count, pos, history_len, terminals = self._count - 1, self._pos, \ self._history_length, self._terminals indexes = [] while len(indexes) < size: index = np.random.randint(history_len, count) if index not in indexes: # if not wrapping over current pointer, # then check if there is terminal state wrapped inside if not (index >= pos > index - history_len): if not terminals[(index - history_len):index].any(): indexes.append(index) return indexes def minibatch(self, size): """ Generate a minibatch with the number of samples specified by the size parameter. Attributes: size (int): Minibatch size Returns: tuple: Tensor[minibatch_size, input_shape...], [int], [float], [bool] """ indexes = self.sample(size) pre_states = np.array([self.get_state(index) for index in indexes], dtype=np.float32) post_states = np.array([self.get_state(index + 1) for index in indexes], dtype=np.float32) actions = self._actions[indexes] rewards = self._rewards[indexes] dones = self._terminals[indexes] return pre_states, actions, post_states, rewards, dones def get_state(self, index): """ Return the specified state with the replay memory. A state consists of the last `history_length` perceptions. Attributes: index (int): State's index Returns: State at specified index (Tensor[history_length, input_shape...]) """ if self._count == 0: raise IndexError('Empty Memory') index %= self._count history_length = self._history_length # If index > history_length, take from a slice if index >= history_length: return self._states[(index - (history_length - 1)):index + 1, ...] else: indexes = np.arange(index - history_length + 1, index + 1) return self._states.take(indexes, mode='wrap', axis=0) class History(object): """ Accumulator keeping track of the N previous frames to be used by the agent for evaluation """ def __init__(self, shape): self._buffer = np.zeros(shape, dtype=np.float32) @property def value(self): """ Underlying buffer with N previous states stacked along first axis Returns: Tensor[shape] """ return self._buffer def append(self, state): """ Append state to the history Attributes: state (Tensor) : The state to append to the memory """ self._buffer[:-1] = self._buffer[1:] self._buffer[-1] = state def reset(self): """ Reset the memory. Underlying buffer set all indexes to 0 """ self._buffer.fill(0) class LinearEpsilonAnnealingExplorer(object): """ Exploration policy using Linear Epsilon Greedy Attributes: start (float): start value end (float): end value steps (int): number of steps between start and end """ def __init__(self, start, end, steps): self._start = start self._stop = end self._steps = steps self._step_size = (end - start) / steps def __call__(self, num_actions): """ Select a random action out of `num_actions` possibilities. Attributes: num_actions (int): Number of actions available """ return np.random.choice(num_actions) def _epsilon(self, step): """ Compute the epsilon parameter according to the specified step Attributes: step (int) """ if step < 0: return self._start elif step > self._steps: return self._stop else: return self._step_size * step + self._start def is_exploring(self, step): """ Commodity method indicating if the agent should explore Attributes: step (int) : Current step Returns: bool : True if exploring, False otherwise """ return np.random.rand() < self._epsilon(step) def huber_loss(y, y_hat, delta): """ Compute the Huber Loss as part of the model graph Huber Loss is more robust to outliers. It is defined as: if |y - y_hat| < delta : 0.5 * (y - y_hat)**2 else : delta * |y - y_hat| - 0.5 * delta**2 Attributes: y (Tensor[-1, 1]): Target value y_hat(Tensor[-1, 1]): Estimated value delta (float): Outliers threshold Returns: CNTK Graph Node """ half_delta_squared = 0.5 * delta * delta error = y - y_hat abs_error = abs(error) less_than = 0.5 * square(error) more_than = (delta * abs_error) - half_delta_squared loss_per_sample = element_select(less(abs_error, delta), less_than, more_than) return reduce_sum(loss_per_sample, name='loss') class DeepQAgent(object): """ Implementation of Deep Q Neural Network agent like in: Nature 518. "Human-level control through deep reinforcement learning" (Mnih & al. 2015) """ def __init__(self, input_shape, nb_actions, gamma=0.99, explorer=LinearEpsilonAnnealingExplorer(1, 0.1, 1000000), learning_rate=0.00025, momentum=0.95, minibatch_size=32, memory_size=500000, train_after=200000, train_interval=4, target_update_interval=10000, monitor=True): self.input_shape = input_shape self.nb_actions = nb_actions self.gamma = gamma self._train_after = train_after self._train_interval = train_interval self._target_update_interval = target_update_interval self._explorer = explorer self._minibatch_size = minibatch_size self._history = History(input_shape) self._memory = ReplayMemory(memory_size, input_shape[1:], 4) self._num_actions_taken = 0 # Metrics accumulator self._episode_rewards, self._episode_q_means, self._episode_q_stddev = [], [], [] # Action Value model (used by agent to interact with the environment) with default_options(activation=relu, init=he_uniform()): self._action_value_net = Sequential([ Convolution2D((8, 8), 16, strides=4), Convolution2D((4, 4), 32, strides=2), Convolution2D((3, 3), 32, strides=1), Dense(256, init=he_uniform(scale=0.01)), Dense(nb_actions, activation=None, init=he_uniform(scale=0.01)) ]) self._action_value_net.update_signature(Tensor[input_shape]) # Target model used to compute the target Q-values in training, updated # less frequently for increased stability. self._target_net = self._action_value_net.clone(CloneMethod.freeze) # Function computing Q-values targets as part of the computation graph @Function @Signature(post_states=Tensor[input_shape], rewards=Tensor[()], terminals=Tensor[()]) def compute_q_targets(post_states, rewards, terminals): return element_select( terminals, rewards, gamma * reduce_max(self._target_net(post_states), axis=0) + rewards, ) # Define the loss, using Huber Loss (more robust to outliers) @Function @Signature(pre_states=Tensor[input_shape], actions=Tensor[nb_actions], post_states=Tensor[input_shape], rewards=Tensor[()], terminals=Tensor[()]) def criterion(pre_states, actions, post_states, rewards, terminals): # Compute the q_targets q_targets = compute_q_targets(post_states, rewards, terminals) # actions is a 1-hot encoding of the action done by the agent q_acted = reduce_sum(self._action_value_net(pre_states) * actions, axis=0) # Define training criterion as the Huber Loss function return huber_loss(q_targets, q_acted, 1.0) # Adam based SGD lr_schedule = learning_parameter_schedule(learning_rate) m_schedule = momentum_schedule(momentum) vm_schedule = momentum_schedule(0.999) l_sgd = adam(self._action_value_net.parameters, lr_schedule, momentum=m_schedule, variance_momentum=vm_schedule) self._metrics_writer = TensorBoardProgressWriter(freq=1, log_dir='metrics', model=criterion) if monitor else None self._learner = l_sgd self._trainer = Trainer(criterion, (criterion, None), l_sgd, self._metrics_writer) def act(self, state): """ This allows the agent to select the next action to perform in regard of the current state of the environment. It follows the terminology used in the Nature paper. Attributes: state (Tensor[input_shape]): The current environment state Returns: Int >= 0 : Next action to do """ # Append the state to the short term memory (ie. History) self._history.append(state) # If policy requires agent to explore, sample random action if self._explorer.is_exploring(self._num_actions_taken): action = self._explorer(self.nb_actions) else: # Use the network to output the best action env_with_history = self._history.value q_values = self._action_value_net.eval( # Append batch axis with only one sample to evaluate env_with_history.reshape((1,) + env_with_history.shape) ) self._episode_q_means.append(np.mean(q_values)) self._episode_q_stddev.append(np.std(q_values)) # Return the value maximizing the expected reward action = q_values.argmax() # Keep track of interval action counter self._num_actions_taken += 1 return action def observe(self, old_state, action, reward, done): """ This allows the agent to observe the output of doing the action it selected through act() on the old_state Attributes: old_state (Tensor[input_shape]): Previous environment state action (int): Action done by the agent reward (float): Reward for doing this action in the old_state environment done (bool): Indicate if the action has terminated the environment """ self._episode_rewards.append(reward) # If done, reset short term memory (ie. History) if done: # Plot the metrics through Tensorboard and reset buffers if self._metrics_writer is not None: self._plot_metrics() self._episode_rewards, self._episode_q_means, self._episode_q_stddev = [], [], [] # Reset the short term memory self._history.reset() # Append to long term memory self._memory.append(old_state, action, reward, done) def train(self): """ This allows the agent to train itself to better understand the environment dynamics. The agent will compute the expected reward for the state(t+1) and update the expected reward at step t according to this. The target expectation is computed through the Target Network, which is a more stable version of the Action Value Network for increasing training stability. The Target Network is a frozen copy of the Action Value Network updated as regular intervals. """ agent_step = self._num_actions_taken if agent_step >= self._train_after: if (agent_step % self._train_interval) == 0: pre_states, actions, post_states, rewards, terminals = self._memory.minibatch(self._minibatch_size) self._trainer.train_minibatch( self._trainer.loss_function.argument_map( pre_states=pre_states, actions=Value.one_hot(actions.reshape(-1, 1).tolist(), self.nb_actions), post_states=post_states, rewards=rewards, terminals=terminals ) ) # Update the Target Network if needed if (agent_step % self._target_update_interval) == 0: self._target_net = self._action_value_net.clone(CloneMethod.freeze) def _plot_metrics(self): """Plot current buffers accumulated values to visualize agent learning """ if len(self._episode_q_means) > 0: mean_q = np.asscalar(np.mean(self._episode_q_means)) self._metrics_writer.write_value('Mean Q per ep.', mean_q, self._num_actions_taken) if len(self._episode_q_stddev) > 0: std_q = np.asscalar(np.mean(self._episode_q_stddev)) self._metrics_writer.write_value('Mean Std Q per ep.', std_q, self._num_actions_taken) self._metrics_writer.write_value('Sum rewards per ep.', sum(self._episode_rewards), self._num_actions_taken) def as_ale_input(environment): """Convert the Atari environment RGB output (210, 160, 3) to an ALE one (84, 84). We first convert the image to a gray scale image, and resize it. Attributes: environment (Tensor[input_shape]): Environment to be converted Returns: Tensor[84, 84] : Environment converted """ from PIL import Image return np.array(Image.fromarray(environment).convert('L').resize((84, 84))) if __name__ == '__main__': parser = ArgumentParser() parser.add_argument('-e', '--epoch', default=100, type=int, help='Number of epochs to run (epoch = 250k actions') parser.add_argument('-p', '--plot', action='store_true', default=False, help='Flag for enabling Tensorboard') parser.add_argument('env', default='Pong-v3', type=str, metavar='N', nargs='?', help='Gym Atari environment to run') args = parser.parse_args() # 1. Make environment: env = gym.make(args.env) # 2. Make agent agent = DeepQAgent((4, 84, 84), env.action_space.n, monitor=args.plot) # Train current_step = 0 max_steps = args.epoch * 250000 current_state = as_ale_input(env.reset()) while current_step < max_steps: action = agent.act(current_state) new_state, reward, done, _ = env.step(action) new_state = as_ale_input(new_state) # Clipping reward for training stability reward = np.clip(reward, -1, 1) agent.observe(current_state, action, reward, done) agent.train() current_state = new_state if done: current_state = as_ale_input(env.reset()) current_step += 1