Introduction
Once, while reading the book "Reinforcement Learning: An Introduction", I thought about supplementing my theoretical knowledge with practical ones, but there was no desire to solve the next problem of balancing a bar, teach an agent to play chess or invent another bicycle.
At the same time, the book contained one interesting example of optimizing the customer queue, which, on the one hand, is not too complicated in terms of implementation / understanding of the process, and on the other hand, it is quite interesting and can be implemented with some success in real life.
Having slightly changed this example, I came to the idea, which will be discussed further.
Formulation of the problem
So, imagine the following picture:
We have at our disposal a bakery that produces 6 (conditionally) tons of raspberry pies every day and distributes these products to three stores every day.
However, how best to do this so that there are as few expired products as possible (provided that the shelf life of the pies is three days), if we have only three trucks with a capacity of 1, 2 and 3 tons, respectively, at each point of sale it is most profitable to send only one truck (because they are located far enough from each other) and, moreover, only once a day after baking the pies, and besides, we do not know the purchasing power in our stores (since the business has just started)?
Let's agree that the FIFO layout strategy works perfectly in stores, in which customers take only the goods that were produced later than the others, but if the raspberry pie was not purchased within three days, then the store staff gets rid of it.
We (conditionally) do not know what demand for pies on a particular day in a particular store will be, however, in our simulation we set it as follows for each of the three stores: 3 ± 0.1, 1 ± 0.1, 2 ± 0.1.
Obviously, the most profitable option for us is to send three tons to the first store, one to the second, and two tons of pies to the third, respectively.
To solve this problem, we use a custom gym environment, as well as Deep Q Learning (Keras implementation).
Custom environment
The state of the environment will be described by three real positive numbers - the remainder of products for the current day in each of the three stores. The agent's actions are numbers from 0 to 5 inclusive, denoting the indices of the permutation of integers 1, 2 and 3. It is clear that the most beneficial action will be under the 4th index (3, 1, 2). We consider the problem as episodic, in one episode 30 days.
import gym
from gym import error, spaces, utils
from gym.utils import seeding
import itertools
import random
import time
class ShopsEnv(gym.Env):
metadata = {'render.modes': ['human']}
# ,
#
def __init__(self):
self.state = [0, 0, 0] #
self.next_state = [0, 0, 0] #
self.done = False #
self.actions = list(itertools.permutations([1, 2, 3])) #
self.reward = 0 #
self.time_tracker = 0 #
self.remembered_states = [] #
#
t = int( time.time() * 1000.0 )
random.seed( ((t & 0xff000000) >> 24) +
((t & 0x00ff0000) >> 8) +
((t & 0x0000ff00) << 8) +
((t & 0x000000ff) << 24) )
# ()
def step(self, action_num):
#
if self.done:
return [self.state, self.reward, self.done, self.next_state]
else:
#
self.state = self.next_state
#
self.remembered_states.append(self.state)
#
self.time_tracker += 1
#
action = self.actions[action_num]
# , ( )
self.next_state = [x + y for x, y in zip(action, self.state)]
#
self.next_state[0] -= (3 + random.uniform(-0.1, 0.1))
self.next_state[1] -= (1 + random.uniform(-0.1, 0.1))
self.next_state[2] -= (2 + random.uniform(-0.1, 0.1))
#
if any([x < 0 for x in self.next_state]):
self.reward = sum([x for x in self.next_state if x < 0])
else:
self.reward = 1
#
#
# ( ),
#
if self.time_tracker >= 3:
remembered_state = self.remembered_states.pop(0)
self.next_state = [max(x - y, 0) for x, y in zip(self.next_state, remembered_state)]
else:
self.next_state = [max(x, 0) for x in self.next_state]
# 30
self.done = self.time_tracker == 30
#
return [self.state, self.reward, self.done, self.next_state]
#
def reset(self):
#
self.state = [0, 0, 0]
self.next_state = [0, 0, 0]
self.done = False
self.reward = 0
self.time_tracker = 0
self.remembered_states = []
t = int( time.time() * 1000.0 )
random.seed( ((t & 0xff000000) >> 24) +
((t & 0x00ff0000) >> 8) +
((t & 0x0000ff00) << 8) +
((t & 0x000000ff) << 24) )
#
return self.state
# :
#
def render(self, mode='human', close=False):
print('-'*20)
print('First shop')
print('Pies:', self.state[0])
print('Second shop')
print('Pies:', self.state[1])
print('Third shop')
print('Pies:', self.state[2])
print('-'*20)
print('')
Major imports
import numpy as np #
import pandas as pd #
import gym #
import gym_shops #
from tqdm import tqdm #
#
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import clear_output
sns.set_color_codes()
#
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import random #
Defining an agent
class DQLAgent():
def __init__(self, env):
#
self.state_size = 3 #
self.action_size = 6 #
# replay()
self.gamma = 0.99
self.learning_rate = 0.01
# adaptiveEGreedy()
self.epsilon = 0.99
self.epsilon_decay = 0.99
self.epsilon_min = 0.0001
self.memory = deque(maxlen = 5000) # 5000 , -
# (NN)
self.model = self.build_model()
# Deep Q Learning
def build_model(self):
model = Sequential()
model.add(Dense(10, input_dim = self.state_size, activation = 'sigmoid')) #
model.add(Dense(50, activation = 'sigmoid')) #
model.add(Dense(10, activation = 'sigmoid')) #
model.add(Dense(self.action_size, activation = 'sigmoid')) #
model.compile(loss = 'mse', optimizer = Adam(lr = self.learning_rate))
return model
#
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
#
def act(self, state):
# 0 1 epsilon
# (exploration)
if random.uniform(0,1) <= self.epsilon:
return random.choice(range(6))
else:
#
act_values = self.model.predict(state)
return np.argmax(act_values[0])
#
def replay(self, batch_size):
# ,
if len(self.memory) < batch_size:
return
minibatch = random.sample(self.memory, batch_size) # batch_size
#
for state, action, reward, next_state, done in minibatch:
if done: # -
target = reward
else:
#
target = reward + self.gamma * np.amax(self.model.predict(next_state)[0])
# target = R(s,a) + gamma * max Q`(s`,a`)
# target (max Q` value) , s`
train_target = self.model.predict(state) # s --> NN --> Q(s,a) = train_target
train_target[0][action] = target
self.model.fit(state, train_target, verbose = 0)
# exploration rate,
# epsilon
def adaptiveEGreedy(self):
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
Train the agent
# gym
env = gym.make('shops-v0')
agent = DQLAgent(env)
#
batch_size = 100
episodes = 1000
#
progress_bar = tqdm(range(episodes), position=0, leave=True)
for e in progress_bar:
#
state = env.reset()
state = np.reshape(state, [1, 3])
# , id
time = 0
taken_actions = []
sum_rewards = 0
#
while True:
#
action = agent.act(state)
#
taken_actions.append(action)
#
next_state, reward, done, _ = env.step(action)
next_state = np.reshape(next_state, [1, 3])
#
sum_rewards += reward
#
agent.remember(state, action, reward, next_state, done)
#
state = next_state
# replay
agent.replay(batch_size)
# epsilon
agent.adaptiveEGreedy()
#
time += 1
#
progress_bar.set_postfix_str(s='mean reward: {}, time: {}, epsilon: {}'.format(round(sum_rewards/time, 3), time, round(agent.epsilon, 3)), refresh=True)
#
if done:
#
clear_output(wait=True)
sns.distplot(taken_actions, color="y")
plt.title('Episode: ' + str(e))
plt.xlabel('Action number')
plt.ylabel('Occurrence in %')
plt.show()
break
Testing the agent
import time
trained_model = agent #
state = env.reset() #
state = np.reshape(state, [1,3])
#
time_t = 0
MAX_EPISOD_LENGTH = 1000 #
taken_actions = []
mean_reward = 0
#
progress_bar = tqdm(range(MAX_EPISOD_LENGTH), position=0, leave=True)
for time_t in progress_bar:
#
action = trained_model.act(state)
next_state, reward, done, _ = env.step(action)
next_state = np.reshape(next_state, [1,3])
state = next_state
taken_actions.append(action)
#
clear_output(wait=True)
env.render()
progress_bar.set_postfix_str(s='time: {}'.format(time_t), refresh=True)
print('Reward:', round(env.reward, 3))
time.sleep(0.5)
mean_reward += env.reward
if done:
break
#
sns.distplot(taken_actions, color='y')
plt.title('Test episode - mean reward: ' + str(round(mean_reward/(time_t+1), 3)))
plt.xlabel('Action number')
plt.ylabel('Occurrence in %')
plt.show()
Total
Thus, the agent quickly understood how to act most profitably.
In general, there is still a lot of room for experimentation: you can increase the number of stores, diversify actions, or even just change the hyperparameters of the training model - and this is just the beginning of the list.