Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

https://github.com/yuminghuang1995/RL3DPToolpathPlanner
15 April 2025, 06:47:10 UTC
  • Code
  • Branches (1)
  • Releases (0)
  • Visits
    • Branches
    • Releases
    • HEAD
    • refs/heads/main
    No releases to show
  • 7e7d097
  • /
  • dqn.py
Raw File Download Save again
Take a new snapshot of a software origin

If the archived software origin currently browsed is not synchronized with its upstream version (for instance when new commits have been issued), you can explicitly request Software Heritage to take a new snapshot of it.

Use the form below to proceed. Once a request has been submitted and accepted, it will be processed as soon as possible. You can then check its processing state by visiting this dedicated page.
swh spinner

Processing "take a new snapshot" request ...

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
  • revision
  • snapshot
origin badgecontent badge
swh:1:cnt:ebe03543c52867e485a1f5642b484be893948acc
origin badgedirectory badge
swh:1:dir:7e7d0976cb84dfac80530dc9db150561cf9ef388
origin badgerevision badge
swh:1:rev:b8704633dab0de26d0d81291c9e46a57f19d2cb5
origin badgesnapshot badge
swh:1:snp:127c589bbb43a926c129e29c25a3793468ce12dd

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
  • revision
  • snapshot
(requires biblatex-software package)
Generating citation ...
(requires biblatex-software package)
Generating citation ...
(requires biblatex-software package)
Generating citation ...
(requires biblatex-software package)
Generating citation ...
Tip revision: b8704633dab0de26d0d81291c9e46a57f19d2cb5 authored by yuminghuang1995 on 13 April 2025, 12:43:55 UTC
Update README.md
Tip revision: b870463
dqn.py
import random
import numpy as np
import collections
import torch
import torch.nn.functional as F
import torch.nn as nn
import math
import functions as func
import os
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"


def train(env_name, coordinates, node_mapping, lines, rays, G, G_new, new_adjacency_matrix, new_state, new_state_adjacency, new_state_dim, new_node_dict, anti_lock_optional_action,
          new_angle_limit, beam_seq_i, max_edge_pass_, ori_i, ori_index, new_coords_array, boundary_nodes_array, heat_radius, calc_mode, material, max_edge_length, savept=False):

    class ReplayBuffer:

        def __init__(self, capacity):
            self.buffer = collections.deque(maxlen=capacity)

        def add(self, state_, state_adjacency_, action_, action_indexes_, reward_, next_state_, next_state_adjacency_, coords_array_, next_coords_array_, done_):  # 将数据加入buffer
            self.buffer.append((state_, state_adjacency_, action_, action_indexes_, reward_, next_state_, next_state_adjacency_, coords_array_, next_coords_array_, done_))

        def sample(self, batch_size_):  # 从buffer中采样数据,数量为batch_size
            transitions = random.sample(self.buffer, batch_size_)
            state_, state_adjacency_, action_, action_indexes_, reward_, next_state_, next_state_adjacency_, coords_array_, next_coords_array_, done_ = zip(*transitions)
            return np.array(state_), np.array(state_adjacency_), action_, action_indexes_, reward_, \
                np.array(next_state_), np.array(next_state_adjacency_), np.array(coords_array_), np.array(next_coords_array_), done_

        def size(self):
            return len(self.buffer)

    class E2EBlock(nn.Module):
        """E2E block."""

        def __init__(self, in_planes, planes, example, bias=False):
            super(E2EBlock, self).__init__()
            self.d = example.size(3)
            self.cnn1 = nn.Conv2d(in_planes, planes, (1, self.d), bias=bias)
            self.cnn2 = nn.Conv2d(in_planes, planes, (self.d, 1), bias=bias)

        def forward(self, x):
            x = x.mean(dim=1, keepdim=True).cuda()

            current_width = x.size(3)
            if current_width < self.d:
                padding = (self.d - current_width) // 2
                x1 = F.pad(x, (padding, padding, 0, 0))
            elif current_width > self.d:
                x1 = F.interpolate(x, size=(x.size(2), self.d), mode='bilinear', align_corners=False)
            else:
                x1 = x

            a = self.cnn1(x1)
            b = self.cnn2(x)

            desired_height = max(a.size(2), b.size(2))
            desired_width = max(a.size(3), b.size(3))

            a = F.interpolate(a, size=(desired_height, desired_width), mode='bilinear', align_corners=False)
            b = F.interpolate(b, size=(desired_height, desired_width), mode='bilinear', align_corners=False)

            return a + b


    class BrainNetCNN(torch.nn.Module):
        def __init__(self):
            super(BrainNetCNN, self).__init__()
            self.e2econv1 = None
            self.e2econv2 = None
            self.E2N = None
            self.N2G = None
            self.dense1 = None
            self.dense2 = None
            self.dense3 = None

        def initialize_layers(self, example):
            d = example.size(3)
            self.e2econv1 = E2EBlock(1, 32, example, bias=True)
            self.e2econv2 = E2EBlock(32, 64, example, bias=True)
            self.E2N = torch.nn.Conv2d(64, 1, (1, d))
            self.N2G = torch.nn.Conv2d(1, 256, (d, 1))
            self.dense1 = torch.nn.Linear(256, 128)
            self.dense2 = torch.nn.Linear(128, 30)
            self.dense3 = torch.nn.Linear(30, 2)

        def forward(self, x):
            x = x.cuda()
            out = F.leaky_relu(self.e2econv1(x), negative_slope=0.33)
            out = F.leaky_relu(self.e2econv2(out), negative_slope=0.33)
            out = F.leaky_relu(self.E2N(out), negative_slope=0.33)
            out = F.dropout(F.leaky_relu(self.N2G(out), negative_slope=0.33), p=0.5)
            out = out.view(out.size(0), -1)
            out = F.dropout(F.leaky_relu(self.dense1(out), negative_slope=0.33), p=0.5)
            out = F.dropout(F.leaky_relu(self.dense2(out), negative_slope=0.33), p=0.5)
            out = F.leaky_relu(self.dense3(out), negative_slope=0.33)
            return out

    class Qnet(torch.nn.Module):

        def __init__(self, state_dim_, hidden_dim_, action_dim_):
            super(Qnet, self).__init__()

            self.brain_net_cnn = BrainNetCNN()
            x_dummy = torch.zeros(1, state_dim_, max_sa_size + 4, max_sa_size)
            self.brain_net_cnn.initialize_layers(x_dummy)

            self.conv1 = self.brain_net_cnn.e2econv1
            self.conv2 = self.brain_net_cnn.e2econv2
            self.channel_match = nn.Conv2d(32, 64, kernel_size=1)
            self.conv3 = self.brain_net_cnn.E2N
            self.ReLu = torch.nn.ReLU()
            self.Sigmoid = torch.nn.Sigmoid()

            if max_edge_pass_ <= 4:
                self.fc1 = nn.Linear(244, 256)
                self.fc2 = torch.nn.Linear(256, 128)
                self.fc_A = torch.nn.Linear(128, action_dim_)
                self.fc_V = torch.nn.Linear(128, 1)
            else:
                self.fc1 = nn.Linear(244, 256)
                self.fc2 = torch.nn.Linear(256, 256)
                self.fc_A = torch.nn.Linear(256, action_dim_)
                self.fc_V = torch.nn.Linear(256, 1)

        def forward(self, x):
            batch_size_ = x.size(0)
            expected_dim = max_sa_size + 4
            if x.size(2) != expected_dim or x.size(3) != max_sa_size:
                x = F.interpolate(x, size=(expected_dim, max_sa_size), mode='bilinear', align_corners=False)
            x = self.conv1(x)
            x = self.ReLu(x)
            x = self.channel_match(x)
            x = self.conv3(x)
            x = x.view(batch_size_, -1)
            x = self.ReLu(x)
            required_features = 244
            current_features = x.size(1)
            if current_features != required_features:
                if current_features < required_features:
                    padding_size = required_features - current_features
                    x = F.pad(x, (0, padding_size))
                else:
                    x = x[:, :required_features]

            x = self.fc1(x)
            x = self.ReLu(x)
            x = self.fc2(x)
            x = self.ReLu(x)

            A = self.fc_A(x)
            V = self.fc_V(x)
            x = (V + A - A.mean(1, keepdim=True))
            return x


    class DQN:

        def __init__(self, state_dim_, hidden_dim_, action_dim_, learning_rate, gamma_, epsilon_, target_update_, device_):
            self.action_dim = action_dim_
            self.q_net = Qnet(state_dim_, hidden_dim_, self.action_dim).to('cuda')
            self.target_q_net = Qnet(state_dim_, hidden_dim_, self.action_dim).to('cuda')
            self.optimizer = torch.optim.AdamW(self.q_net.parameters(), lr=learning_rate)
            self.gamma = gamma_
            self.epsilon = epsilon_
            self.target_update = target_update_
            self.count = 0
            self.device = device_
            self.loss = 0
            self.loaded_checkpoint_path = None
            self.loaded_adj_matrix = None
            self.max_checkpoints = 10

        def take_action(self, state_, state_adjacency_, pre_pre_action_, adjacency_matrix_flow_, edge_pass_, coords_array_, greedy=False):
            optional_action = np.where(np.array(adjacency_matrix_flow_[state_[-1]]) != 0)[0]
            if calc_mode == 'Tsp':
                row_col_set = np.concatenate(np.argwhere(adjacency_matrix_flow_ > 0).T)
                optional_action = optional_action[~np.isin(optional_action, row_col_set)]

            if edge_pass_ == 0:
                optional_action = anti_lock_optional_action

            if np.random.random() < self.epsilon and not greedy:
                action_ = np.random.choice(optional_action)
                action_index_ = action_
            else:
                state_adjacency_add_ = np.concatenate((state_adjacency_, coords_array_), axis=1)
                state_adjacency_add_ = torch.as_tensor(np.array(state_adjacency_add_), dtype=torch.float).to(self.device)
                state_adjacency_add_ = state_adjacency_add_.unsqueeze(0)
                q_values = self.q_net(state_adjacency_add_)[0]
                selected_q_values = q_values[optional_action]
                max_index = torch.argmax(selected_q_values).item()
                action_ = optional_action[max_index]
                action_index_ = action_

            return action_, action_index_

        def update(self, transition_dict_):
            states_adjacency = np.concatenate((transition_dict_['states_adjacency'], transition_dict_['coor_array']), axis=2)
            states_adjacency = torch.tensor(states_adjacency).to(self.device)
            action_indexes = torch.tensor(transition_dict_['action_indexes']).view(-1, 1).to(self.device)
            rewards = torch.tensor(transition_dict_['rewards'], dtype=torch.float).view(-1, 1).to(self.device)
            next_states_adjacency = np.concatenate((transition_dict_['next_states_adjacency'], transition_dict_['next_coor_array']), axis=2)
            next_states_adjacency = torch.tensor(next_states_adjacency).to(self.device)
            dones = torch.tensor(transition_dict_['dones'], dtype=torch.float).view(-1, 1).to(self.device)
            q_values = self.q_net(states_adjacency.float()).gather(1, action_indexes)
            max_next_q_values, _ = self.target_q_net(next_states_adjacency.float()).max(dim=1, keepdim=True)

            q_targets = rewards + self.gamma * max_next_q_values * (1 - dones)
            dqn_loss = torch.mean(F.mse_loss(q_values, q_targets))

            self.optimizer.zero_grad()
            dqn_loss.backward()
            self.optimizer.step()
            self.loss = dqn_loss
            if self.count % self.target_update == 0:
                self.target_q_net.load_state_dict(self.q_net.state_dict())
            self.count += 1

        def save_checkpoint(self, new_state_adjacency_):
            existing_indices = [int(filename.split('_')[-1].split('.')[0]) for filename in os.listdir("checkpoint") if
                                filename.startswith("model_")]
            next_index = max(existing_indices + [-1]) + 1
            filepath = f"checkpoint/model_{next_index}.pt"

            adj_matrix_to_save = new_state_adjacency_
            adj_matrix_to_save = func.normalize_matrix(adj_matrix_to_save)
            adj_matrix_to_save = func.pad_matrix(adj_matrix_to_save, max_sa_size)
            torch.save({
                'model_state_dict': self.q_net.state_dict(),
                'optimizer_state_dict': self.optimizer.state_dict(),
                'loss': self.loss,
                'adj_matrix': adj_matrix_to_save,
            }, filepath)

            if len(existing_indices) + 1 > self.max_checkpoints:
                self._delete_similar_checkpoint()


        def load_checkpoint(self, current_adj_matrix):
            checkpoint_files = [f for f in os.listdir("checkpoint") if f.endswith('.pt')]
            num_checkpoint_files = len(checkpoint_files)
            current_adj_matrix_norm = func.normalize_matrix(current_adj_matrix)

            current_adj_matrix_norm = func.pad_matrix(current_adj_matrix_norm, max_sa_size)
            most_similar_checkpoint_path, similarity = func.find_most_similar_checkpoint(current_adj_matrix_norm, checkpoint_dir="checkpoint")

            if num_checkpoint_files < 1:
                self.loaded_checkpoint_path = None
                return

            if most_similar_checkpoint_path is not None:
                checkpoint = torch.load(most_similar_checkpoint_path)
                self.q_net.load_state_dict(checkpoint['model_state_dict'])
                self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
                self.loaded_checkpoint_path = most_similar_checkpoint_path
                self.loaded_adj_matrix = checkpoint['adj_matrix']
            else:
                self.loaded_checkpoint_path = None

        @staticmethod
        def _delete_similar_checkpoint():
            checkpoints = []
            for filename in os.listdir("checkpoint"):
                if filename.endswith(".pt"):
                    path = os.path.join("checkpoint", filename)
                    data = torch.load(path)
                    checkpoints.append((filename, data['adj_matrix']))

            max_combined_similarity = 0
            most_similar_file = None

            for ii in range(len(checkpoints)):
                current_file, current_adj = checkpoints[ii]
                max_similarity = 0
                second_max_similarity = 0

                for j in range(len(checkpoints)):
                    if ii != j:
                        similarity = func.calculate_similarity(current_adj, checkpoints[j][1])
                        if similarity > max_similarity:
                            second_max_similarity = max_similarity
                            max_similarity = similarity
                        elif similarity > second_max_similarity:
                            second_max_similarity = similarity

                combined_similarity = max_similarity + second_max_similarity
                if combined_similarity > max_combined_similarity:
                    max_combined_similarity = combined_similarity
                    most_similar_file = current_file

            if most_similar_file:
                os.remove(os.path.join("checkpoint", most_similar_file))

        @staticmethod
        def normalize_matrix(matrix):
            return matrix / matrix.sum()


    def damping(edge_pass_):
        if Gaussian_damp:
            sigma = 0.865
            damp = 0.9 * math.exp(-(edge_pass_ ** 2) / (2 * sigma ** 2)) + 0.1
        else:
            if edge_pass_ < 4:
                damp = (1 / (edge_pass_ + 1) ** 2)
            else:
                damp = 0.0625

        return damp

    def cal_reward(angle_, edge_pass_, double_pass_, lifting_):
        x1 = (angle_ - 60) / 30
        x2 = (angle_ - 120) / 30

        def sigmoid(x):
            return 1 / (1 + np.exp(-x))

        reward_ = sigmoid(-10 * x1) + 0.9 * (sigmoid(10 * x1) - sigmoid(10 * x2))

        reward_ = reward_ - double_pass_
        reward_ = reward_ - lifting_

        return reward_


    class Environment:

        def __init__(self, new_adjacency_matrix_, new_node_dict_, new_state_, new_state_adjacency_, new_coords_array_):
            self.adjacency_matrix = new_adjacency_matrix_
            self.node_dict = new_node_dict_
            self.state = new_state_
            self.state_adjacency = new_state_adjacency_
            self.coords_array = new_coords_array_

        def obser_space(self):
            return int(np.count_nonzero(self.adjacency_matrix) / 2)

        def action_space(self):
            return len(self.node_dict)

        def reset(self):
            state_ = self.state.copy()
            adjacency_matrix_flow_ = self.adjacency_matrix.copy()
            state_adjacency_ = self.state_adjacency.copy()
            coords_array_ = self.coords_array.copy()
            return state_, adjacency_matrix_flow_, state_adjacency_, coords_array_

        def step(self, action_, state_, state_adjacency_, edge_pass_, adjacency_matrix_flow_, coords_array_, temp_path_, temp_lines_):
            if adjacency_matrix_flow_[state[-1]][action_] < 0 or adjacency_matrix_flow_[action_][state[-1]] < 0:
                adjacency_matrix_flow_[state[-1]][action_] = -adjacency_matrix_flow_[state[-1]][action_]
                adjacency_matrix_flow_[action_][state[-1]] = -adjacency_matrix_flow_[action_][state[-1]]
            else:
                adjacency_matrix_flow_[state[-1]][action_] = 0.0
                adjacency_matrix_flow_[action_][state[-1]] = 0.0

            next_state_ = state_.copy()
            next_state_ = np.append(next_state_[1:], action_)
            next_state_adjacency_ = state_adjacency_.copy()
            adjacency_matrix_flow_ = np.expand_dims(adjacency_matrix_flow_, axis=0)
            next_state_adjacency_ = np.concatenate([next_state_adjacency_[1:], adjacency_matrix_flow_], axis=0)
            adjacency_matrix_flow_ = np.squeeze(adjacency_matrix_flow_, axis=0)
            next_lines_ = temp_lines_ + [(node_mapping[next_state_[-2]], node_mapping[next_state_[-1]])]

            done_ = False
            reward_ = 0
            angle_ = 0
            node0_ = self.node_dict[state_[len(next_state_) - 3]]
            node1_ = self.node_dict[next_state_[len(next_state_) - 3]]
            node2_ = self.node_dict[next_state_[len(next_state_) - 2]]
            node3_ = self.node_dict[next_state_[len(next_state_) - 1]]

            if calc_mode == 'Euler':
                if tuple(temp_path_) in path_reward_dict:
                    reward_ = path_reward_dict[tuple(temp_path_)]
                else:
                    if material == 'CCF':

                        angle_ = func.calculate_angle(node1_, node2_, node3_)

                        double_pass = 0
                        if adjacency_matrix_flow_[next_state_[-2]][next_state_[-1]] == 0:
                            if distance_reward:
                                double_pass = func.calculate_distance(node1_, node2_) / max_edge_length
                            else:
                                double_pass = 1.0

                        lifting = 0
                        if (next_state_[-2] in boundary_nodes_array) and next_state_[-1] == next_state_[-3]:
                            lifting = 0.5
                        if (next_state_[-2] not in boundary_nodes_array) and next_state_[-1] == next_state_[-3]:
                            lifting = 1

                        reward_ = cal_reward(angle_, edge_pass_, double_pass, lifting)

                        damping_ = damping(edge_pass_)
                        reward_ = reward_ * damping_

                    if material == 'PLA3D':
                        damping_ = damping(edge_pass_)
                        reward_ = func.beam_fea(G_new, adjacency_matrix_flow_, self.node_dict, boundary_nodes_array, draw=False)
                        reward_ = -reward_ * 10

                        gravity_center = 0.1 * ((node3_[2] + node2_[2]) / 2 - (node2_[2] + node1_[2]) / 2)
                        reward_ = reward_ - gravity_center

                        collision_punish = 0
                        if collision_reward:
                            collision, _, min_angle = func.collision_check_simulation(coordinates, node_mapping[next_state_[-2]], node_mapping[next_state_[-1]], rays, next_lines_, norm_output=False)
                            if collision:
                                collision_punish = 1000
                            else:
                                collision_punish = min_angle
                        reward_ = reward_ - collision_punish

                        reward_ = reward_ * damping_

                    path_reward_dict[tuple(temp_path_)] = reward_

                if np.sum(adjacency_matrix_flow_ != 0) == 0:
                    done_ = True
                if np.sum(adjacency_matrix_flow_ != 0) != 0 and np.sum(adjacency_matrix_flow_[action_] != 0) == 0:
                    done_ = True
                if edge_pass_ >= (max_edge_pass - 1):
                    done_ = True

            if calc_mode == 'Tsp':
                if tuple(temp_path_) in path_reward_dict:
                    reward_ = path_reward_dict[tuple(temp_path_)]
                else:
                    if material == 'Metal':
                        damping_ = damping(edge_pass_)

                        if np.count_nonzero(next_state_ == 0) >= 2:
                            angle_ = 0
                        else:
                            angle_ = func.calculate_angle(node1_, node2_, node3_)

                        reward_ = heat_radius - coords_array_[-1][3][next_state_[-1]]

                        reward_ = damping_ * reward_
                        unique_elements = np.unique(state_)
                        if len(unique_elements) == 1:
                            reward_ = reward_ - 1.0 * heat_radius * damping_

                    if material == 'Clay':
                        node2_ = self.node_dict[next_state_[len(next_state_) - 2]]
                        node3_ = self.node_dict[next_state_[len(next_state_) - 1]]
                        angle_ = func.calculate_angle_along_stress(node2_, node3_, np.array([10, 0, 0]))
                        if edge_pass_ == 0:
                            damping_ = 1
                        elif edge_pass_ == 1:
                            damping_ = 0.01
                        elif edge_pass_ == 2:
                            damping_ = 0.0
                        else:
                            damping_ = 0.1

                        if angle_ > angle_limit:
                            reward_ = 0 * damping_
                        elif 58 < angle_ <= angle_limit:
                            reward_ = 0.9 * damping_
                        else:
                            reward_ = 1 * damping_

                    if material == 'Tsp':
                        reward_ = -func.calculate_distance(node2_, node3_)

                    path_reward_dict[tuple(temp_path_)] = reward_

                if edge_pass_ >= (max_edge_pass - 1):
                    done_ = True
                row_col_array = np.concatenate(np.argwhere(adjacency_matrix_flow_ > 0).T)
                row_col_set = set(row_col_array)
                if all_integers.issubset(row_col_set):
                    done_ = True
                optional_action = np.where(np.array(adjacency_matrix_flow_[next_state_[-1]]) != 0)[0]
                optional_action = optional_action[~np.isin(optional_action, row_col_array)]
                if optional_action.shape == (0,):
                    done_ = True

            single_next_coords_array_ = func.update_heat_field(next_state_[-1], coords_array_[-1], heat_radius, calc_mode)
            next_coords_array_ = np.concatenate((coords_array_[1:], single_next_coords_array_[None]), axis=0)

            return next_state_, next_state_adjacency_, reward_, done_, angle_, next_coords_array_, next_lines_

    Iter = 1
    if material == 'CCF':
        action_dim = max_edge_pass_ * 35
    else:
        action_dim = max_edge_pass_ * 40

    lr = 5e-4
    num_episodes = 500
    hidden_dim = 256
    gamma = 0.98
    epsilon = 0.5
    target_update = 10
    buffer_size = 500
    minimal_size = 2
    batch_size = 3
    collision_reward = False
    distance_reward = False
    Gaussian_damp = False
    if material == 'CCF':
        max_sa_size = max_edge_pass_ * 35
    else:
        max_sa_size = max_edge_pass_ * 40

    state_dim = new_state_dim
    angle_limit = new_angle_limit
    max_edge_pass = max_edge_pass_
    device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

    replay_buffer = ReplayBuffer(buffer_size)

    env = Environment(new_adjacency_matrix, new_node_dict, new_state, new_state_adjacency, new_coords_array)

    agent = DQN(state_dim, hidden_dim, action_dim, lr, gamma, epsilon, target_update, device)
    return_list = []
    edge_pass_list = []
    ave_angle_list = []
    limit_angle_list = []

    counter = 0
    best_return = -100
    all_candidates = list()
    all_integers = set(range(len(new_adjacency_matrix[0])))
    path_reward_dict = {}
    new_sequence = [[[0], 0.0]]

    if savept:
        agent.load_checkpoint(new_state_adjacency)

    for i in range(Iter):
        for i_episode in range(int(num_episodes)):
            edge_pass = 0
            episode_return = 0
            total_angle = 0
            limit_angle_num = 0
            temp_path = []
            state, adjacency_matrix_flow, state_adjacency, coords_array = env.reset()
            pre_action = 0
            pre_pre_action = 0
            done = False
            first_reward = 0
            temp_lines = lines.copy()

            if material == 'CCF':
                m = 40
            elif material == 'PLA3D':
                m = 40
            else:
                m = 30

            if counter >= 5 * max_edge_pass_ and i_episode >= m * max_edge_pass_:

                sequences = func.remove_duplicates_and_keep_max(all_candidates)
                sequences = sorted(sequences, key=lambda tup: tup[1], reverse=True)
                action, action_index = agent.take_action(state, state_adjacency, pre_pre_action, adjacency_matrix_flow, edge_pass, coords_array, greedy=True)
                next_state, next_state_adjacency, reward, done, angle, next_coords_array, next_lines = env.step(action, state, state_adjacency, edge_pass, adjacency_matrix_flow,
                                                                                                                coords_array, temp_path, temp_lines)
                if reward != sequences[0][1]:
                    new_sequence = [[sequences[0][0], sequences[0][1]]]
                else:
                    new_sequence = [[[action], reward]]

                break

            while not done:
                action, action_index = agent.take_action(state, state_adjacency, pre_pre_action, adjacency_matrix_flow, edge_pass, coords_array, greedy=False)
                temp_path.append(action)
                next_state, next_state_adjacency, reward, done, angle, next_coords_array, next_lines = env.step(action, state, state_adjacency, edge_pass, adjacency_matrix_flow, coords_array, temp_path, temp_lines)
                replay_buffer.add(state, state_adjacency, action, action_index, reward, next_state, next_state_adjacency, coords_array, next_coords_array, done)

                state = next_state
                state_adjacency = next_state_adjacency
                pre_pre_action = pre_action
                pre_action = action
                coords_array = next_coords_array
                temp_lines = next_lines
                episode_return += reward

                total_angle += angle
                if angle > angle_limit:
                    limit_angle_num += 1

                if edge_pass == 0:
                    first_reward = episode_return

                edge_pass += 1

                if done:
                    if episode_return > best_return:
                        best_return = episode_return

                        counter = 0

                    else:
                        counter += 1

                    candidate = [[int(temp_path[0])], episode_return, first_reward]

                    all_candidates.append(candidate)

                if replay_buffer.size() > minimal_size:
                    b_s, b_sa, b_a, b_i, b_r, b_ns, b_nsa, b_ca, b_nca, b_d = replay_buffer.sample(batch_size)
                    transition_dict = {
                        'states': b_s,
                        'states_adjacency': b_sa,
                        'actions': b_a,
                        'action_indexes': b_i,
                        'next_states': b_ns,
                        'next_states_adjacency': b_nsa,
                        'rewards': b_r,
                        'coor_array': b_ca,
                        'next_coor_array': b_nca,
                        'dones': b_d
                    }
                    agent.update(transition_dict)

            return_list.append(episode_return)
            edge_pass_list.append(edge_pass)
            ave_angle_list.append(total_angle / edge_pass)
            limit_angle_list.append(limit_angle_num / edge_pass)

        if savept:
            agent.save_checkpoint(new_state_adjacency)

    return new_sequence
















back to top

Software Heritage — Copyright (C) 2015–2026, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API