-
Notifications
You must be signed in to change notification settings - Fork 0
/
DQN_Code
226 lines (190 loc) · 9.24 KB
/
DQN_Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import gym
import math
import matplotlib.pyplot as plt
import matplotlib.pyplot as plt1
import matplotlib.pyplot as plt2
import matplotlib.pyplot as plt3
import matplotlib.pyplot as plt4
import matplotlib.pyplot as plt5
from matplotlib.font_manager import FontProperties
import torchvision.models as models
import time
#調整訓練迭代次數
episode = 5000
#畫圖時的樣本數
sample = 100
plt_episode = np.zeros((sample),)
plt_rewards = np.zeros((sample),)
plt_steps = np.zeros((sample),)
plt_loss = np.zeros((sample),)
global global_loss
global_loss = 0
#使用gpu
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(torch.cuda.is_available())
#定義網路
class Net(nn.Module):
def __init__(self, n_states, n_actions, n_hidden, n_hidden2):
super(Net, self).__init__()
self.fc1 = nn.Linear(n_states, n_hidden)
self.fc2 = nn.Linear(n_hidden, n_hidden2)
self.out = nn.Linear(n_hidden2, n_actions)
def forward(self, x): #x為當前狀態
x1 = self.fc1(x)
h1 = F.sigmoid(x1) #連接輸入層到隱藏層1,並使用激勵函數Sigmoid
x2 = self.fc2(h1)
h2 = F.sigmoid(x2) #連接隱藏層1到隱藏層2,並使用激勵函數Sigmoid
actions_value = self.out(h2) #連接隱藏層2到輸出層,並輸出動作
return actions_value
#定義DQN網路,包含target net 與 eval net
class DQN(object):
def __init__(self, n_states, n_actions, n_hidden, n_hidden2, batch_size, lr, epsilon, gamma, target_replace_iter, memory_capacity):
#產生兩個網路,target與eval
self.eval_net, self.target_net = Net(n_states, n_actions, n_hidden, n_hidden2), Net(n_states, n_actions, n_hidden, n_hidden2)
#初始化記憶庫,一行代表一個transition
self.memory = np.zeros((memory_capacity, n_states * 2 + 2)) # initialize memory, each memory slot is of size (state + next state + reward + action)
#Adam優化器,輸入為eval net的參數與learning rate
self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=lr)
#均方損失函數,(loss(x,y) = (x-y)^2)
self.loss_func = nn.MSELoss()
self.memory_counter = 0
self.learn_step_counter = 0 # for target network update
self.n_states = n_states
self.n_actions = n_actions
self.n_hidden = n_hidden
self.batch_size = batch_size
self.lr = lr
self.epsilon = epsilon
self.gamma = gamma
self.target_replace_iter = target_replace_iter
self.memory_capacity = memory_capacity
def choose_action(self, state, i_episode):
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 500
x = torch.unsqueeze(torch.FloatTensor(state), 0)
#根據公式更新貪婪值
eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * i_episode / EPS_DECAY)
# epsilon-greedy
#print(eps_threshold)
if np.random.uniform() < eps_threshold: #隨機產生0~1,若大於Epsilon,就選擇最佳動作
action = np.random.randint(0, self.n_actions) #從動作中隨機抽取一個
else: # greedy
actions_value = self.eval_net(x) #透過評估網路獲取動作
action = torch.max(actions_value, 1)[1].data.numpy()[0] #尋找最大值的動作並回傳
return action
def store_transition(self, state, action, reward, next_state):
# Pack the experience
transition = np.hstack((state, [action, reward], next_state))
#如果記憶體滿了,便覆蓋舊的資料
index = self.memory_counter % self.memory_capacity #2000
self.memory[index, :] = transition #根據序列儲存記憶
self.memory_counter += 1
def learn(self):
#抽取樣本
sample_index = np.random.choice(self.memory_capacity, self.batch_size) #2000個樣本取32個樣本
b_memory = self.memory[sample_index, :] #把抽取到的樣本丟進來
#取前N_STATES個值(2個,當前狀態)
b_state = torch.FloatTensor(b_memory[:, :self.n_states])
#取N_STATES + 1 的值(1個,action)
b_action = torch.LongTensor(b_memory[:, self.n_states:self.n_states+1].astype(int))
#取N_STATES + 2 的值(1個,reward)
b_reward = torch.FloatTensor(b_memory[:, self.n_states+1:self.n_states+2])
#取倒數N_STATES個值(2個,未來狀態)
b_next_state = torch.FloatTensor(b_memory[:, -self.n_states:])
#用eval_net執行b_s,計算Q
q_eval = self.eval_net(b_state).gather(1, b_action)
q_next = self.target_net(b_next_state).detach() # detach from graph, don't backpropagate
q_target = b_reward + self.gamma * q_next.max(1)[0].view(self.batch_size, 1) # compute the target Q values
loss = self.loss_func(q_eval, q_target)
# Backpropagation
self.optimizer.zero_grad()
global global_loss
global_loss += loss
#print('loss : ',str(loss.item()))
loss.backward()
self.optimizer.step()
#目標網路的參數更新
self.learn_step_counter += 1
if self.learn_step_counter % self.target_replace_iter == 0: #一開始及每100步更新一次參數
self.target_net.load_state_dict(self.eval_net.state_dict()) #把評估網路的值丟給目標網路
for run in range(1):
time_begin = time.time()
env = gym.make('PepperMap-v6')
env = env.unwrapped # For cheating mode to access values hidden in the environment
# Environment parameters
n_actions = 9 #動作數(上 下 左 右)
n_states = 6 #環境狀態(x,y)
n_hidden = 150 #神經元
n_hidden2 = 75 #神經元
batch_size = 128
lr = 0.001 # learning rate
epsilon = 0.1 # epsilon-greedy, factor to explore randomly
gamma = 0.9 # reward discount factor
target_replace_iter = 100 #目標網路的更新頻率
memory_capacity = 3000 #記憶體容量
n_episodes = episode
# Create DQN
dqn = DQN(n_states, n_actions, n_hidden,n_hidden2, batch_size, lr, epsilon, gamma, target_replace_iter, memory_capacity)
average = [0, 0, 0] # [reward, step, loss]
# Collect experience
for i_episode in range(n_episodes): #迭代
t = 0 # timestep
rewards = 0 # accumulate rewards for each episode
state = env.reset() # reset environment to initial state for each episode
while True:
env.render() #可視化虛擬環境 要加速可以隱藏
action = dqn.choose_action(state, i_episode) # choose an action based on DQN
next_state, reward, done,info = env.step(action) # do the action, get the reward
dqn.store_transition(state, action, reward, next_state) #把資訊存進回放記憶體
#累加reward
rewards += reward
# If enough memory stored, agent learns from them via Q-learning
if dqn.memory_counter > memory_capacity:
dqn.learn()
t = t + 1
#狀態轉移
state = next_state
#判斷是否達到結束條件
if done or t > 300:
average[0] += rewards
average[1] += t
average[2] += global_loss / t
print('Episode {} finished after {} timesteps, total rewards {}'.format(i_episode, t+1, rewards))
if (i_episode+1) % (n_episodes / sample) == 0:
n = int(i_episode / (n_episodes / sample))
plt_episode[run][n] = i_episode
plt_rewards[run][n] = average[0] / (n_episodes / sample)
plt_steps[run][n] = average[1] / (n_episodes / sample)
plt_loss[run][n] = average[2] / (n_episodes / sample)
average = [0, 0, 0]
global_loss = 0
break
env.close()
#存參數
torch.save(dqn.target_net.state_dict(), "5000_E500_ru_" + str(n_hidden) + "x" + str(n_hidden2) + "_v6_test1_"+str(run)+".pth")
##訓練完,plot畫圖
plt.figure(figsize=(12,8))
plt.subplots_adjust(top=0.95, bottom=0.06, right=0.93, hspace=0.375)
plt.subplot(3,1,1)
plt.plot(plt_episode[run], plt_rewards[run])
plt.grid(True)
plt.ylabel('Cumulative reward')
plt.xlabel('episode')
plt.subplot(3,1,2)
plt.plot(plt_episode[run], plt_steps[run])
plt.grid(True)
plt.ylabel('steps')
plt.xlabel('episode')
plt.subplot(3,1,3)
plt.plot(plt_episode[run], plt_loss[run])
plt.grid(True)
plt.ylabel('loss')
plt.xlabel('episode')
plt.legend()
#plt.show()
plt.savefig("5000_E500_ru_" + str(n_hidden) + "x" + str(n_hidden2) + "_v6_test1_"+str(run), dpi=300, facecolor='white')