https://github.com/yuminghuang1995/RL3DPToolpathPlanner
Tip revision: b8704633dab0de26d0d81291c9e46a57f19d2cb5 authored by yuminghuang1995 on 13 April 2025, 12:43:55 UTC
Update README.md
Update README.md
Tip revision: b870463
functions.py
from __future__ import division
import numpy as np
import torch
import matplotlib.pyplot as plt
import math
import networkx as nx
from matplotlib.collections import LineCollection
from scipy.spatial import KDTree
import random
import os
import pickle
import sys
import igl
def beam_fea(G_new, adjacency_matrix_flow_, node_dict, boundary_nodes_array, draw):
lines = get_merged_diff_ga(G_new, adjacency_matrix_flow_)
lines = np.array(lines, dtype=int)
needed_nodes = set(lines.flatten())
old_to_new = {old: new for new, old in enumerate(sorted(needed_nodes))}
total_force = calculate_global_force_fea(node_dict, lines, boundary_nodes_array, A=1.0e-6, rho=1250.0, g=9.8)
total_force_dict = {row[0]: row[1:] for row in total_force}
total_force_new = []
for old, new in old_to_new.items():
if old in total_force_dict:
total_force_new.append(np.append(new, total_force_dict[old]))
total_force_new = np.array(total_force_new)
g_coord = np.array([node_dict[old] for old in sorted(needed_nodes)])
g_num = np.vectorize(old_to_new.get)(lines)
boundary_nodes_array_fea = [old_to_new[node] for node in boundary_nodes_array if node in old_to_new]
total_deformation = beam_fea_calculate(g_num, g_coord, total_force_new, boundary_nodes_array_fea, draw)
return total_deformation
def beam_fea_graph(lines, node_dict, boundary_nodes_array, draw):
needed_nodes = set(lines.flatten())
old_to_new = {old: new for new, old in enumerate(sorted(needed_nodes))}
total_force = calculate_global_force_fea(node_dict, lines, boundary_nodes_array, A=3.14e-6, rho=1250.0, g=9.8)
total_force_dict = {row[0]: row[1:] for row in total_force}
total_force_new = []
for old, new in old_to_new.items():
if old in total_force_dict:
total_force_new.append(np.append(new, total_force_dict[old]))
total_force_new = np.array(total_force_new)
g_coord = np.array([node_dict[old] for old in sorted(needed_nodes)])
g_num = np.vectorize(old_to_new.get)(lines)
boundary_nodes_array_fea = [old_to_new[node] for node in boundary_nodes_array if node in old_to_new]
total_deformation = beam_fea_calculate(g_num, g_coord, total_force_new, boundary_nodes_array_fea, draw)
return total_deformation
def calculate_global_stiffness_matrix(coordinates, lines, E=10**6, A=0.000004):
K_global = np.zeros((3*len(coordinates), 3*len(coordinates)))
for line in lines:
point1 = np.array(coordinates[line[0]])
point2 = np.array(coordinates[line[1]])
L = np.linalg.norm(point2 - point1) * 0.001
direction = (point2 - point1) * 0.001 / L
K_local = E * A / L * np.outer(direction, direction)
T = np.zeros((3, len(coordinates)*3))
T[:, line[0]*3:line[0]*3+3] = np.eye(3)
T[:, line[1]*3:line[1]*3+3] = -np.eye(3)
K_global += T.T @ K_local @ T
return K_global
def calculate_global_force_vector(coordinates, lines, boundary_nodes_array, A=0.000004, rho=1000.0, g=9.8):
F_global = np.zeros(3*len(coordinates))
for line in lines:
point1 = np.array(coordinates[line[0]])
point2 = np.array(coordinates[line[1]])
L = np.linalg.norm(point2 - point1) * 0.001
F_self = rho * A * L * g
F_global[line[0]*3+2] -= F_self / 2
F_global[line[1]*3+2] -= F_self / 2
for node in boundary_nodes_array:
F_global[node*3:node*3+3] = 0
return F_global
def calculate_displacement(coordinates, lines, boundary_nodes_array, E=10**4, A=0.0016, rho=1250.0, g=9.81):
K = calculate_global_stiffness_matrix(coordinates, lines, E, A)
F = calculate_global_force_vector(coordinates, lines, boundary_nodes_array, A, rho, g)
constrained_dof = []
for node in boundary_nodes_array:
constrained_dof.extend([node * 3, node * 3 + 1, node * 3 + 2])
all_dof = list(range(3 * len(coordinates)))
free_dof = list(set(all_dof) - set(constrained_dof))
K_free = K[np.ix_(free_dof, free_dof)]
F_free = F[free_dof]
U_free = np.dot(np.linalg.pinv(K_free), F_free)
U_global = np.zeros(3 * len(coordinates))
for i in range(len(free_dof)):
U_global[free_dof[i]] = U_free[i] * 1000
for dof in constrained_dof:
U_global[dof] = 0
U_total = 0
U_max = 0
for i in range(len(coordinates)):
displacement = np.linalg.norm(U_global[i * 3:i * 3 + 3])
U_total += displacement
if displacement > U_max:
U_max = displacement
return U_max, U_global
def get_merged_diff_ga(G_new, adjacency_matrix_flow_):
edges_from_matrix = set()
size = adjacency_matrix_flow_.shape[0]
for i in range(size):
for j in range(i+1, size):
if adjacency_matrix_flow_[i][j] != 0:
edges_from_matrix.add((i, j))
edges_from_G_new = set()
for edge in G_new.edges():
edges_from_G_new.add(tuple(sorted(edge)))
diff_edges = edges_from_G_new - edges_from_matrix
return list(diff_edges)
def calculate_global_force_fea(coordinates, lines, boundary_nodes_array, A=0.0001, rho=1250.0, g=9.8):
F_global = np.zeros((len(coordinates), 4))
F_global[:, 0] = np.arange(0, len(coordinates))
for line in lines:
point1 = np.array(coordinates[line[0]])
point2 = np.array(coordinates[line[1]])
L = np.linalg.norm(point2 - point1) * 0.001
F_self = rho * A * L * g
F_global[line[0], 3] -= F_self / 2
F_global[line[1], 3] -= F_self / 2
mask = np.ones(len(coordinates), dtype=bool)
if len(boundary_nodes_array) != 0:
mask[np.array(boundary_nodes_array)] = False
F_global = F_global[mask]
return F_global
def compute_centroid(node_list, coordinates):
sum_x = sum_y = sum_z = 0
for node in node_list:
pos = coordinates[node]
sum_x += pos[0]
sum_y += pos[1]
sum_z += pos[2]
n = len(node_list)
return np.array([sum_x / n, sum_y / n, sum_z / n])
def load_data(filename, dataset_path):
with open(os.path.join(dataset_path, filename + ".pkl"), 'rb') as f:
new_adjacency_matrix, new_seq = pickle.load(f)
return new_adjacency_matrix, new_seq
def get_max_index(dataset_path):
max_index = -1
for filename in os.listdir(dataset_path):
if filename.endswith(".pkl"):
index = int(filename.split('.')[0])
max_index = max(max_index, index)
return max_index
def save_data(new_adjacency_matrix, new_seq, dataset_path, env_name, index, i):
similar_seq = judge_similarity(new_adjacency_matrix, dataset_path)
if similar_seq:
return
new_index = get_max_index(dataset_path) + 1
with open(os.path.join(dataset_path, str(new_index) + ".pkl"), 'wb') as f:
pickle.dump((new_adjacency_matrix, new_seq), f)
def judge_similarity(input_adjacency_matrix, dataset_path):
if not os.path.exists(dataset_path):
os.makedirs(dataset_path)
for filename in os.listdir(dataset_path):
if filename.endswith(".pkl"):
with open(os.path.join(dataset_path, filename), 'rb') as f:
new_adjacency_matrix, new_seq = pickle.load(f)
if new_adjacency_matrix.shape == input_adjacency_matrix.shape \
and np.allclose(input_adjacency_matrix, new_adjacency_matrix, rtol=0.02):
return new_seq
return []
def get_new_boundary_nodes(boundary_nodes_array, node_mapping):
boundary_nodes_array_new = []
for node_new, node in node_mapping.items():
if node in boundary_nodes_array:
boundary_nodes_array_new.append(node_new)
return boundary_nodes_array_new
def distance_coord(coord1, coord2):
return math.sqrt((coord1[0] - coord2[0]) ** 2 + (coord1[1] - coord2[1]) ** 2)
def update_matrix(matrix, idx1, idx2):
if matrix[idx1][idx2] < 0 or matrix[idx2][idx1] < 0:
matrix[idx1][idx2] = -matrix[idx1][idx2]
matrix[idx2][idx1] = -matrix[idx2][idx1]
else:
matrix[idx1][idx2] = 0
matrix[idx2][idx1] = 0
return matrix
def update_new_adjacency_matrix(new_adj_matrix, init_adj_matrix, new_state, state, idx1, idx2):
if init_adj_matrix[state[idx1]][state[idx2]] < 0 and new_adj_matrix[new_state[idx1]][new_state[idx2]] == 0:
new_adj_matrix[new_state[idx1]][new_state[idx2]] = -init_adj_matrix[state[idx1]][state[idx2]]
new_adj_matrix[new_state[idx2]][new_state[idx1]] = -init_adj_matrix[state[idx2]][state[idx1]]
else:
new_adj_matrix[new_state[idx1]][new_state[idx2]] = init_adj_matrix[state[idx1]][state[idx2]]
new_adj_matrix[new_state[idx2]][new_state[idx1]] = init_adj_matrix[state[idx2]][state[idx1]]
return new_adj_matrix
def calculate_angle(node1, node2, node3):
node1 = np.array(node1)
node2 = np.array(node2)
node3 = np.array(node3)
vec1 = node2 - node1
vec2 = node3 - node2
if np.linalg.norm(vec1) == 0 or np.linalg.norm(vec2) == 0:
cos_angle = 1
else:
cos_angle = np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
if cos_angle > 1:
angle_ = 0.0
elif cos_angle < -1:
angle_ = 3.14159
else:
angle_ = np.arccos(cos_angle)
if np.isnan(angle_):
angle_ = 3.14159
return np.degrees(angle_)
def calculate_angle_vector(vector1, vector2):
unit_vector1 = vector1 / np.linalg.norm(vector1)
unit_vector2 = vector2 / np.linalg.norm(vector2)
dot_product = np.dot(unit_vector1, unit_vector2)
dot_product = np.clip(dot_product, -1.0, 1.0)
angle = np.arccos(dot_product)
if np.isnan(angle):
angle = np.pi
return angle
def calculate_angle_along_stress(node2, node3, vec1):
node2 = np.array(node2)
node3 = np.array(node3)
vec2 = node3 - node2
if np.linalg.norm(vec1) == 0 or np.linalg.norm(vec2) == 0:
cos_angle = 1
else:
cos_angle = np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
if cos_angle > 1:
angle_ = 0.0
elif cos_angle < -1:
angle_ = 3.14159
else:
angle_ = np.arccos(cos_angle)
if np.isnan(angle_):
angle_ = 3.14159
angle_ = np.degrees(angle_)
if angle_ > 90:
angle_ = 180 - angle_
return angle_
def draw_graph(env_name, G, node_dict, next_state_, adjacency_matrix_flow_, i=100000, index=100000, output=True, draw=True, subgraph=False, mode='Euler', show='PLA3D', block=0):
node_colors = []
if mode == 'Euler':
sorted_keys = sorted(node_dict.keys())
node_colors = ["red" if n == next_state_[-1] else "lightblue" for n in sorted_keys]
if mode == 'Tsp':
heat_values = [G.nodes[node]['heat'] for node in G.nodes()]
if np.max(heat_values) == np.min(heat_values):
norm_heat_values = np.zeros(len(heat_values))
else:
norm_heat_values = (heat_values - np.min(heat_values)) / (np.max(heat_values) - np.min(heat_values))
color_mapping = {0: 'lightblue', 1: 'cyan', 2: 'lightgreen', 3: 'yellow', 4: 'orange', 5: 'red'}
node_colors = [color_mapping[int(value * 5)] for value in norm_heat_values]
edge_colors = {}
if show == 'PLA3D':
for u, v in G.edges():
if adjacency_matrix_flow_[u, v] == 0:
edge_colors[(u, v)] = "black"
elif adjacency_matrix_flow_[u, v] > 0:
edge_colors[(u, v)] = "lightgray"
else:
edge_colors[(u, v)] = "none"
fig = plt.figure(figsize=(10, 10), dpi=150)
ax = fig.add_subplot(111, projection='3d')
if subgraph:
for node, coordinates in node_dict.items():
ax.text(coordinates[0], coordinates[1], coordinates[2], str(node), fontsize=4)
for edge in G.edges():
xs, ys, zs = zip(*[(node_dict[node][0], node_dict[node][1], node_dict[node][2]) for node in edge])
ax.plot(xs, ys, zs, color=edge_colors.get(edge, 'gray'), lw=1)
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
ax.grid(False)
ax.xaxis.pane.fill = False
ax.yaxis.pane.fill = False
ax.zaxis.pane.fill = False
ax.xaxis.pane.set_edgecolor('w')
ax.yaxis.pane.set_edgecolor('w')
ax.zaxis.pane.set_edgecolor('w')
ax.set_box_aspect([1, 1, 1])
ax.axis('off')
else:
for u, v in G.edges():
if adjacency_matrix_flow_[u, v] == 0:
edge_colors[(u, v)] = "black"
elif adjacency_matrix_flow_[u, v] > 0:
edge_colors[(u, v)] = "blue"
else:
edge_colors[(u, v)] = "lightgray"
if mode == 'Tsp':
temp_dpi = 1200
else:
temp_dpi = 200
fig, ax = plt.subplots(dpi=temp_dpi)
x_coords = [coordinates[0] for coordinates in node_dict.values()]
y_coords = [coordinates[1] for coordinates in node_dict.values()]
colors = [node_colors[node] for node in node_dict.keys()]
if mode == 'Tsp':
ax.scatter(x_coords, y_coords, s=0.01, c='none')
else:
ax.scatter(x_coords, y_coords, s=2, c=colors)
if subgraph:
if mode == 'Tsp':
for node, coordinates in node_dict.items():
ax.text(coordinates[0], coordinates[1], str(node), fontsize=4)
else:
for node, coordinates in node_dict.items():
ax.text(coordinates[0], coordinates[1], str(node), fontsize=4)
lines = []
colors = []
for edge in G.edges():
xs, ys = zip(*[(node_dict[node][0], node_dict[node][1]) for node in edge])
lines.append(list(zip(xs, ys)))
colors.append(edge_colors.get(edge, 'lightgray'))
if mode == 'Tsp':
lc = LineCollection(lines, colors=colors, linewidths=0.35)
else:
lc = LineCollection(lines, colors=colors, linewidths=1.4)
ax.add_collection(lc)
ax.axis('off')
ax.set_aspect('equal')
if output:
if mode == 'Tsp':
output_file = f"./figure/{env_name}_{block}_{index}_{i}.png"
else:
output_file = f"./figure/{env_name}_{index}_{i}.png"
plt.savefig(output_file)
if draw:
plt.show(block=False)
plt.close()
def transform_faces(faces, node_mapping):
reversed_mapping = {v: k for k, v in node_mapping.items()}
faces_new = []
for face in faces:
if all(node in reversed_mapping for node in face):
new_face = [reversed_mapping[node] for node in face]
faces_new.append(new_face)
else:
missing_nodes = [node for node in face if node not in reversed_mapping]
faces_new_np = np.array(faces_new, dtype=int)
return faces_new_np
def lscm_parameterization_libigl(G, faces, new_state_):
max_node_id = max(G.nodes)
vertices = np.zeros((max_node_id + 1, 3))
for n in G.nodes:
vertices[n] = G.nodes[n]['pos']
faces = np.array(faces, dtype=np.int32)
if new_state_[1] != new_state_[2]:
b = np.array([new_state_[1], new_state_[2]], dtype=np.int32)
else:
target_node = new_state_[2]
min_distance = float('inf')
nearest_node = None
for neighbor in G.neighbors(target_node):
pos_target = np.array(G.nodes[target_node]['pos'])
pos_neighbor = np.array(G.nodes[neighbor]['pos'])
distance = np.linalg.norm(pos_target - pos_neighbor)
if distance < min_distance:
min_distance = distance
nearest_node = neighbor
if nearest_node is None:
raise ValueError("No neighbors found or no valid nearest node found")
b = np.array([nearest_node, target_node], dtype=np.int32)
bc = np.array([[-1.0, 0.0], [0.0, 0.0]])
success, u = igl.lscm(vertices, faces, b, bc)
if not success:
raise ValueError("LSCM parameterization failed")
return u
def reorder_nodes(G, uv_coordinates):
uv_coordinates = np.array(uv_coordinates)
indices_sorted = np.lexsort((uv_coordinates[:, 0], uv_coordinates[:, 1])) # 先x后y
old_to_new_mapping = {old_index: new_index for new_index, old_index in enumerate(indices_sorted)}
G_new = nx.relabel_nodes(G, old_to_new_mapping)
return G_new, old_to_new_mapping
def combine_mappings(new_temp_to_new_mapping, old_to_new_temp_mapping_):
final_mapping = {}
for temp_index, new_index in new_temp_to_new_mapping.items():
if temp_index in old_to_new_temp_mapping_:
orig_index = old_to_new_temp_mapping_[temp_index]
final_mapping[new_index] = orig_index
else:
print(f"Warning: No original mapping found for temp index {temp_index}")
return final_mapping
def create_standard_graph(coordinates_, state_, subgraph):
node1 = coordinates_[state_[-2]]
node2 = coordinates_[state_[-1]]
if node1 == node2:
orig_vector = np.array([1, 0, 0])
else:
orig_vector = np.array(node2) - np.array(node1)
angles = []
for node in subgraph.nodes:
if node == state_[-1]:
continue
node_coord = coordinates_[node]
new_vector = np.array(node_coord) - np.array(node2)
angle = calculate_angle_vector(orig_vector, new_vector)
vector_magnitude = np.linalg.norm(new_vector)
angles.append((node, angle, vector_magnitude))
sorted_nodes = sorted(angles, key=lambda x: (x[1], x[2]))
node_mapping_ = {0: state_[-1]}
for index, (orig_node, angle, magnitude) in enumerate(sorted_nodes):
node_mapping_[index + 1] = orig_node
return node_mapping_
def align_graph(G, state):
pos = nx.get_node_attributes(G, 'pos')
if state[1] == state[2] or state[0] == state[1]:
min_distance = float('inf')
closest_node = None
origin_pos = np.array(pos[state[2]])
for node, node_pos in pos.items():
if node != state[2]:
distance = np.linalg.norm(np.array(node_pos) - origin_pos)
if distance < min_distance:
min_distance = distance
closest_node = node
index1 = closest_node
else:
index1 = state[1]
origin = np.array(pos[state[2]])
for node in pos:
pos[node] = np.array(pos[node]) - origin
target = np.array(pos[index1])
angle = np.arctan2(target[1], target[0])
rotation_matrix = np.array([
[np.cos(angle), np.sin(angle), 0],
[-np.sin(angle), np.cos(angle), 0],
[0, 0, 1]
])
for node in pos:
pos[node] = rotation_matrix @ np.array(pos[node]).reshape(-1, 1)
pos[node] = pos[node].flatten()
if pos[index1][0] > 0:
for node in pos:
pos[node][0] *= -1
result = np.array([pos[node] for node in sorted(G.nodes())])
return result
def find_k_hop_neighbors(G, start_node, k):
neighbors = {start_node}
for _ in range(k):
next_neighbors = set()
for node in neighbors:
next_neighbors.update(G.neighbors(node))
neighbors.update(next_neighbors)
return neighbors
def create_new_graph(G_orig, coordinates_, edges_, adjacency_matrix_, state_, init_adjacency_matrix_, radius_, state_dim, heat_radius, faces, env_name, index, mode='Tsp', material='PLA3D'):
k_hop_neighbors = find_k_hop_neighbors(G_orig, state_[-1], radius_)
subgraph = G_orig.subgraph(k_hop_neighbors)
if material == 'PLA3D':
old_to_new_temp_mapping_ = {index: orig_index for index, orig_index in enumerate(subgraph.nodes)}
G_new_temp = nx.relabel_nodes(subgraph, {v: k for k, v in old_to_new_temp_mapping_.items()}, copy=True)
faces_new = transform_faces(faces, old_to_new_temp_mapping_)
new_state_temp = np.zeros_like(state_)
for ii, old_index in enumerate(state_):
new_index_ = next((k for k, v in old_to_new_temp_mapping_.items() if v == old_index), None)
if new_index_ is not None:
new_state_temp[ii] = new_index_
uv_coordinates = lscm_parameterization_libigl(G_new_temp, faces_new, new_state_temp)
G_new_, new_temp_to_new_mapping = reorder_nodes(G_new_temp, uv_coordinates)
node_mapping_ = combine_mappings(new_temp_to_new_mapping, old_to_new_temp_mapping_)
elif material == 'CCF':
old_to_new_temp_mapping_ = {index: orig_index for index, orig_index in enumerate(subgraph.nodes)}
G_new_temp = nx.relabel_nodes(subgraph, {v: k for k, v in old_to_new_temp_mapping_.items()}, copy=True)
new_state_temp = np.zeros_like(state_)
for ii, old_index in enumerate(state_):
new_index_ = next((k for k, v in old_to_new_temp_mapping_.items() if v == old_index), None)
if new_index_ is not None:
new_state_temp[ii] = new_index_
aligned_positions = align_graph(G_new_temp, new_state_temp)
G_new_, new_temp_to_new_mapping = reorder_nodes(G_new_temp, aligned_positions)
node_mapping_ = combine_mappings(new_temp_to_new_mapping, old_to_new_temp_mapping_)
else:
node_mapping_ = {index: orig_index for index, orig_index in enumerate(subgraph.nodes)}
G_new_ = nx.relabel_nodes(subgraph, {v: k for k, v in node_mapping_.items()}, copy=True)
for new_index, orig_index in node_mapping_.items():
G_new_.nodes[new_index]['pos'] = coordinates_[orig_index]
G_new_.nodes[new_index]['heat'] = G_orig.nodes[orig_index]['heat']
G_new_.nodes[new_index]['p_heat'] = G_orig.nodes[orig_index]['p_heat']
G_new_.nodes[new_index]['p_p_heat'] = G_orig.nodes[orig_index]['p_p_heat']
node_dict_ = nx.get_node_attributes(G_new_, 'pos')
num_nodes = len(node_dict_)
coords_array = np.zeros((3, num_nodes))
for index, coord in node_dict_.items():
coords_array[:, index] = coord
new_state_ = np.zeros_like(state_)
for ii, old_index in enumerate(state_):
new_index_ = next((k for k, v in node_mapping_.items() if v == old_index), None)
if new_index_ is not None:
new_state_[ii] = new_index_
heat_array = np.zeros((1, num_nodes))
p_heat_array = np.zeros((1, num_nodes))
p_p_heat_array = np.zeros((1, num_nodes))
for idx, (node, attrs) in enumerate(G_new_.nodes(data=True)):
heat_array[0, idx] = attrs['heat']
p_heat_array[0, idx] = attrs['p_heat']
p_p_heat_array[0, idx] = attrs['p_p_heat']
coords_array_repeated = np.vstack((coords_array, heat_array))
if mode == 'Tsp':
pre_coords_array = np.vstack((coords_array, p_heat_array))
pre_pre_coords_array = np.vstack((coords_array, p_p_heat_array))
coords_array = np.stack((pre_pre_coords_array, pre_coords_array, coords_array_repeated), axis=0)
else:
coords_array = np.repeat(coords_array_repeated[np.newaxis, :, :], state_dim, axis=0)
new_adjacency_matrix_ = np.zeros((len(node_mapping_), len(node_mapping_)), dtype=float)
for new_index_a, old_index_a in node_mapping_.items():
for new_index_b, old_index_b in node_mapping_.items():
new_adjacency_matrix_[new_index_a, new_index_b] = adjacency_matrix_[old_index_a, old_index_b]
for i in range(-2, -state_dim - 1, -1):
new_adjacency_matrix_ = update_new_adjacency_matrix(new_adjacency_matrix_, init_adjacency_matrix_, new_state_, state_, i, i + 1)
new_state_adjacency_ = [new_adjacency_matrix_.copy()]
for i in range(-state_dim, -1, 1):
new_adjacency_matrix_ = update_matrix(new_adjacency_matrix_, new_state_[i], new_state_[i + 1])
new_state_adjacency_.append(new_adjacency_matrix_.copy())
new_state_adjacency_ = np.array(new_state_adjacency_)
return G_new_, new_state_, new_adjacency_matrix_, new_state_adjacency_, node_mapping_, node_dict_, coords_array
def create_current_state(lst, state_dim, init_adjacency_matrix, mode='Euler', material='CCF'):
arr = np.zeros(state_dim, dtype=int)
for i in range(min(3, len(lst))):
arr[-(i + 1)] = lst[-(i + 1)]
if mode == 'Tsp' or material == 'PLA3D':
if len(lst) > 1 and init_adjacency_matrix[lst[-1]][lst[-2]] == 0:
for ii in range(len(arr)):
arr[ii] = lst[-1]
return arr
def create_adj_matrix(adjacency_matrix, beam_seq, mode='Euler'):
for i in range(len(beam_seq) - 1):
if adjacency_matrix[beam_seq[i]][beam_seq[i + 1]] < 0:
adjacency_matrix[beam_seq[i]][beam_seq[i + 1]] = -adjacency_matrix[beam_seq[i]][beam_seq[i + 1]]
adjacency_matrix[beam_seq[i + 1]][beam_seq[i]] = -adjacency_matrix[beam_seq[i + 1]][beam_seq[i]]
else:
adjacency_matrix[beam_seq[i]][beam_seq[i + 1]] = 0
adjacency_matrix[beam_seq[i + 1]][beam_seq[i]] = 0
if mode == 'Tsp' and len(beam_seq) > 1:
if adjacency_matrix[beam_seq[i]][beam_seq[i - 1]] == 0:
adjacency_matrix[beam_seq[i]][beam_seq[i - 1]] = 1000
adjacency_matrix[beam_seq[i - 1]][beam_seq[i]] = 1000
return adjacency_matrix
def o2n_seq_trans(old_seq, node_mapping):
new_seq = old_seq.copy()
for ii, old_index in enumerate(old_seq):
new_index_ = next((k for k, v in node_mapping.items() if v == old_index), None)
if new_index_ is not None:
new_seq[ii] = new_index_
return new_seq
def n2o_seq_trans(new_seq, node_mapping):
old_seq = new_seq.copy()
for ii, new_index in enumerate(new_seq):
old_seq[ii] = node_mapping[new_index]
return old_seq
def remove_duplicates_and_keep_max(input_list):
input_list.sort(key=lambda x: x[1], reverse=True)
result_dict = {}
for item in input_list:
key = tuple(item[0])
if key not in result_dict:
result_dict[key] = item
result = list(result_dict.values())
return result
def adjacency_matrix_to_graph(matrix):
graph = nx.from_numpy_array(np.array(matrix), create_using=nx.Graph())
return graph
def calculate_maximum_common_subgraph_similarity(matrix1, matrix2):
graph1 = adjacency_matrix_to_graph(matrix1)
graph2 = adjacency_matrix_to_graph(matrix2)
gm = nx.algorithms.isomorphism.GraphMatcher(graph1, graph2)
mcs = max(gm.subgraph_isomorphisms_iter(), key=lambda x: len(x))
similarity = len(mcs) / min(len(graph1.nodes), len(graph2.nodes))
return similarity
def find_most_similar_graph(target_matrix, other_matrices):
max_similarity = -1
most_similar_index = None
for i, matrix in enumerate(other_matrices):
similarity = calculate_maximum_common_subgraph_similarity(target_matrix, matrix)
if similarity > max_similarity:
max_similarity = similarity
most_similar_index = i
return most_similar_index, max_similarity
def dfs_cycles(G, source, target, visited, depth, max_depth, cycles):
visited[source] = True
if depth == max_depth:
if target in G[source]:
cycles.append(tuple(visited.keys()))
else:
for neighbor in G[source]:
if neighbor not in visited:
dfs_cycles(G, neighbor, target, visited.copy(), depth + 1, max_depth, cycles)
def find_cycles_of_length(G, length):
cycles = []
for node in G.nodes:
dfs_cycles(G, node, node, {}, 0, length - 1, cycles)
cycles = list(set(cycles))
for i in range(len(cycles)):
cycle = cycles[i]
if len(cycle) == 3:
A, B, C = cycle
pos_A = np.array(G.nodes[A]['pos'])
pos_B = np.array(G.nodes[B]['pos'])
pos_C = np.array(G.nodes[C]['pos'])
AB = pos_B - pos_A
AC = pos_C - pos_A
cross_product = np.cross(AB, AC)
if cross_product[2] < 0:
cycles[i] = (A, C, B)
return cycles
def remove_duplicate_cycles(cycles):
unique_cycle_sets = set()
unique_cycles = []
for cycle in cycles:
cycle_set = frozenset(cycle)
if cycle_set not in unique_cycle_sets:
unique_cycle_sets.add(cycle_set)
unique_cycles.append(cycle)
return unique_cycles
def remove_nested_cycles(cycles_length_3, cycles_length_4):
nested_cycles = set()
for cycle3_a in cycles_length_3:
for cycle3_b in cycles_length_3:
if cycle3_a == cycle3_b:
continue
for cycle4 in cycles_length_4:
if set(cycle3_a).issubset(cycle4) and set(cycle3_b).issubset(cycle4):
nested_cycles.add(cycle4)
break
filtered_cycles_length_4 = [cycle4 for cycle4 in cycles_length_4 if cycle4 not in nested_cycles]
return cycles_length_3 + filtered_cycles_length_4
def find_boundary_edges(G):
cycles = find_faces(G)
edge_count = {}
for cycle in cycles:
for i in range(len(cycle)):
edge = frozenset((cycle[i], cycle[(i + 1) % len(cycle)]))
if edge in edge_count:
edge_count[edge] += 1
else:
edge_count[edge] = 1
boundary_nodes = set()
boundary_edges_list = []
for edge, count in edge_count.items():
if count == 1:
boundary_nodes.update(edge)
boundary_edges_list.append(list(edge))
boundary_edges = np.array(boundary_edges_list)
boundary_nodes = np.array(list(boundary_nodes))
return boundary_edges, boundary_nodes
def find_faces(G):
cycles_length_3 = find_cycles_of_length(G, 3)
cycles_length_4 = find_cycles_of_length(G, 4)
unique_cycles_length_3 = remove_duplicate_cycles(cycles_length_3)
unique_cycles_length_4 = remove_duplicate_cycles(cycles_length_4)
cycles = remove_nested_cycles(unique_cycles_length_3, unique_cycles_length_4)
return cycles
def find_boundary_nodes(G):
cycles = find_faces(G)
edge_count = {}
for cycle in cycles:
for i in range(len(cycle)):
edge = frozenset((cycle[i], cycle[(i + 1) % len(cycle)]))
if edge in edge_count:
edge_count[edge] += 1
else:
edge_count[edge] = 1
boundary_nodes = set()
for edge, count in edge_count.items():
if count == 1:
boundary_nodes.update(edge)
single_degree_nodes = {node for node in G.nodes() if G.degree(node) == 1}
boundary_nodes.update(single_degree_nodes)
return boundary_nodes
def find_fix_nodes(G, G_new, adjacency_matrix, new_adjacency_matrix, node_mapping):
boundary_nodes_new = list(G_new.nodes())
new_counts = count_zeros_in_adjacent_nodes(G_new, new_adjacency_matrix, boundary_nodes_new)
old_counts = count_zeros_in_adjacent_nodes(G, adjacency_matrix, [node_mapping[node] for node in boundary_nodes_new])
difference_nodes = [node for node in boundary_nodes_new if new_counts[node] != old_counts[node_mapping[node]]]
return difference_nodes
def count_zeros_in_adjacent_nodes(G_new, matrix, rows):
counts = {}
for row in rows:
adjacent_nodes = list(G_new.adj[row])
count = 0
for node in adjacent_nodes:
if matrix[row, node] == 0:
count += 1
counts[row] = count
return counts
def find_smallest_z_set(G):
z_values = {}
for node in G.nodes():
z_values[node] = G.nodes[node]['pos'][-1]
nodes_with_z_smaller_than_one = {node for node, z_value in z_values.items() if z_value < 1}
return nodes_with_z_smaller_than_one
def load_checkpoint():
checkpoint = torch.load('checkpoint/model2_0.pt')
adj_matrix = checkpoint['adj_matrix']
print(adj_matrix)
return adj_matrix
def calculate_distance(pos1, pos2):
pos1_np, pos2_np = np.array(pos1), np.array(pos2)
return np.linalg.norm(pos1_np - pos2_np)
def create_heat_field(beam_seq, G_orig, heat_radius, mode='Tsp'):
nodes = G_orig.nodes
for node in nodes:
nodes[node]['heat'] = 0
nodes[node]['p_heat'] = 0
nodes[node]['p_p_heat'] = 0
if mode == 'Tsp':
ini_temp = 0
node_positions = [nodes[node]['pos'] for node in nodes]
kdtree = KDTree(node_positions)
for i, center_node in enumerate(beam_seq[-8:], start=1):
for node in nodes:
nodes[node]['heat'] = max(nodes[node]['heat'] * 0.52, 0)
center_node_pos = nodes[center_node]['pos']
indices = kdtree.query_ball_point(center_node_pos, heat_radius + 0.01)
for index in indices:
node = list(nodes)[index]
distance_ = calculate_distance(center_node_pos, nodes[node]['pos'])
heat_value = heat_radius * (1 - (distance_ / heat_radius) ** 0.8) + ini_temp
nodes[node]['heat'] = min(nodes[node]['heat'] + heat_value, heat_radius)
if len(beam_seq) < 3:
for node in nodes:
nodes[node]['p_heat'] = 0
nodes[node]['p_p_heat'] = 0
if i == len(beam_seq[-8:]) - 2:
for node in nodes:
nodes[node]['p_p_heat'] = nodes[node]['heat']
elif i == len(beam_seq[-8:]) - 1:
for node in nodes:
nodes[node]['p_heat'] = nodes[node]['heat']
return G_orig
def update_heat_field(next_state, coords_array, heat_radius, mode='Tsp'):
next_coords_array = np.copy(coords_array)
if mode == 'Tsp':
new_heat_values = np.maximum(coords_array[3] * 0.52, 0)
next_coords_array[3] = new_heat_values
center_node_idx = next_state
center_node_coords = next_coords_array[:3, center_node_idx]
tree = KDTree(next_coords_array[:3].T)
in_radius_indices = tree.query_ball_point(center_node_coords, heat_radius + 0.01)
distances = np.sqrt(np.sum((next_coords_array[:3, in_radius_indices] - center_node_coords.reshape(3, 1)) ** 2, axis=0))
new_heat_values = np.maximum(heat_radius * (1 - (distances / heat_radius) ** 0.8), 0)
next_coords_array[3, in_radius_indices] += new_heat_values
next_coords_array[3, in_radius_indices] = np.minimum(next_coords_array[3, in_radius_indices], heat_radius)
return next_coords_array
def create_new_graph_with_grid_nodes_block(G, divisions=3, interval=0.1, max_nodes=4000):
pos = nx.get_node_attributes(G, 'pos')
min_x, max_x = float('inf'), float('-inf')
min_y, max_y = float('inf'), float('-inf')
for _, position in pos.items():
x, y, _ = position
min_x = min(min_x, x)
max_x = max(max_x, x)
min_y = min(min_y, y)
max_y = max(max_y, y)
width = max_x - min_x
height = max_y - min_y
grid_width = width / divisions
grid_height = height / divisions
G_new_list = [nx.Graph() for _ in range(divisions * divisions)]
new_node_id = 0
for x in np.arange(min_x, max_x + interval, interval):
for y in np.arange(min_y, max_y + interval, interval):
if x > max_x or y > max_y:
continue
grid_x = int((x - min_x) // grid_width)
grid_y = int((y - min_y) // grid_height)
grid_x = min(divisions - 1, grid_x)
grid_y = min(divisions - 1, grid_y)
grid_index = grid_y * divisions + grid_x
G_new_list[grid_index].add_node(new_node_id, pos=(x, y, 0))
new_node_id += 1
G_new_list_final = []
for G_new in G_new_list:
num_nodes = G_new.number_of_nodes()
if num_nodes > max_nodes:
G_sub_list = create_new_graph_with_grid_nodes_block(G_new, divisions=2, interval=interval, max_nodes=4000)
G_new_list_final.extend(G_sub_list)
else:
G_new_list_final.append(G_new)
return G_new_list_final
def traverse_matrix_euler(matrix, row_index):
if not any(matrix[row_index, :]):
return
columns = np.where(matrix[row_index, :] != 0)[0]
matrix[row_index, :] = 0
matrix[:, row_index] = 0
for col_index in columns:
traverse_matrix_euler(matrix, col_index)
def recursive_search(G, node, node_dict):
neighbors = [n for n in G.neighbors(node) if node_dict[n] == 0]
for neighbor in neighbors:
node_dict[neighbor] = 1
recursive_search(G, neighbor, node_dict)
def angle_between(v1, v2):
dot_product = np.dot(v1, v2)
norms = np.linalg.norm(v1) * np.linalg.norm(v2)
cos_theta = dot_product / norms
cos_theta = np.clip(cos_theta, -1.0, 1.0)
angle = np.arccos(cos_theta)
return np.degrees(angle)
def is_within_sector(v_base, v1, v2):
angle_v1 = angle_between(v_base, v1)
angle_v2 = angle_between(v_base, v2)
if angle_v2 >= angle_v1 - 0.01:
return False
cross_v1 = np.cross(v_base, v1)
cross_v2 = np.cross(v_base, v2)
if angle_between(cross_v1, cross_v2) < 89:
return True
else:
return False
def anti_self_locking_subgraph(G, G_new, adjacency_matrix_, node_mapping_, state_, beam_seq, new_adjacency_matrix_,
new_state_, coordinates, heat_radius, init_adjacency_matrix_, rays, train_mode, mode='Tsp', material='CCF'):
optional_action = np.array([])
lifting = False
lines = get_merged_diff(init_adjacency_matrix_, adjacency_matrix_)
if mode == 'Tsp':
optional_action = np.where(np.array(adjacency_matrix_[state_[-1]]) != 0)[0]
row_col_array = np.concatenate(np.argwhere(adjacency_matrix_ > 0).T)
optional_action = np.setdiff1d(optional_action, row_col_array)
if optional_action.size == 0:
lifting = True
candidate_node = np.array(list(G.nodes()))
unique_nodes = np.setdiff1d(candidate_node, row_col_array)
distances = [nx.shortest_path_length(G, state_[-1], node) for node in unique_nodes]
sorted_nodes = [node for _, node in sorted(zip(distances, unique_nodes), reverse=True)]
for index in sorted_nodes:
connectivity = True
optional_action_jump = np.where(np.array(adjacency_matrix_[index]) != 0)[0]
optional_action_jump = np.setdiff1d(optional_action_jump, row_col_array)
if len(optional_action_jump) == 2:
angle = calculate_angle(coordinates[optional_action_jump[0]], coordinates[index], coordinates[optional_action_jump[1]])
if angle < 1:
connectivity = False
if connectivity:
optional_action = np.append(optional_action, index)
break
new_optional_action_ = np.zeros_like(optional_action)
if lifting:
new_optional_action_ = optional_action
else:
for ii, old_index in enumerate(optional_action):
new_index_ = next((k for k, v in node_mapping_.items() if v == old_index), None)
if new_index_ is not None:
new_optional_action_[ii] = new_index_
optional_action = new_optional_action_
if mode == 'Euler':
optional_action = np.where(np.array(adjacency_matrix_[state_[-1]]) != 0)[0]
if material == 'CCF':
for row_index in optional_action:
adjacency_matrix_next = adjacency_matrix_.copy()
if adjacency_matrix_next[state_[-1]][row_index] < 0 or adjacency_matrix_next[row_index][state_[-1]] < 0:
adjacency_matrix_next[state_[-1]][row_index] = -adjacency_matrix_next[state_[-1]][row_index]
adjacency_matrix_next[row_index][state_[-1]] = -adjacency_matrix_next[row_index][state_[-1]]
else:
adjacency_matrix_next[state_[-1]][row_index] = 0.0
adjacency_matrix_next[row_index][state_[-1]] = 0.0
connectivity = True
if not connectivity:
optional_action = optional_action[optional_action != row_index]
if material == 'PLA3D':
collision_check = False
optional_action_temp = optional_action.copy()
for action in optional_action:
temp_lines = lines.copy()
if coordinates[state_[-1]][2] > coordinates[action][2]:
temp_lines.append((state_[-1], action))
if collision_check:
collision, _, _ = collision_check_simulation(coordinates, state_[-1], action, rays, temp_lines, norm_output=False)
else:
collision = False
if collision:
optional_action_temp = optional_action_temp[optional_action_temp != action]
optional_action = optional_action_temp
if optional_action.size == 0:
lifting = True
row_col_array = np.concatenate(np.argwhere(adjacency_matrix_ > 0).T)
unique_nodes = np.unique(row_col_array)
distances = [nx.shortest_path_length(G, state_[-1], node) for node in unique_nodes]
sorted_nodes = [node for _, node in sorted(zip(distances, unique_nodes))]
for row_index in sorted_nodes:
append_node = False
optional_action_jump = np.where(np.array(adjacency_matrix_[row_index]) != 0)[0]
for index in optional_action_jump:
temp_lines = lines.copy()
if coordinates[row_index][2] > coordinates[index][2]:
temp_lines.append((row_index, index))
if collision_check:
collision, _, _ = collision_check_simulation(coordinates, row_index, index, rays, temp_lines, norm_output=False)
else:
collision = False
if row_index in beam_seq and not collision:
append_node = True
break
if append_node:
optional_action = np.append(optional_action, row_index)
break
new_optional_action_ = np.zeros_like(optional_action)
if lifting:
new_optional_action_ = optional_action
else:
for ii, old_index in enumerate(optional_action):
new_index_ = next((k for k, v in node_mapping_.items() if v == old_index), None)
if new_index_ is not None:
new_optional_action_[ii] = new_index_
optional_action = new_optional_action_
return optional_action, lifting, lines
def aabb_tree(nodes, lines, coordinates):
x_min = min(nodes[0][0], nodes[-1][0]) - 180
x_max = max(nodes[0][0], nodes[-1][0]) + 180
y_min = min(nodes[0][1], nodes[-1][1]) - 180
y_max = max(nodes[0][1], nodes[-1][1]) + 180
z_min = min(nodes[0][2], nodes[-1][2]) - 30
z_max = max(nodes[0][2], nodes[-1][2]) + 180
filtered_lines = []
for line in lines:
x1, y1, z1 = coordinates[line[0]]
x2, y2, z2 = coordinates[line[1]]
if (x_min <= x1 <= x_max and y_min <= y1 <= y_max and z_min <= z1 <= z_max and
x_min <= x2 <= x_max and y_min <= y2 <= y_max and z_min <= z2 <= z_max):
filtered_lines.append(line)
return filtered_lines
def collision_check_simulation(coordinates, current, action, rays, lines, interval=5, norm_output=True, specified_vector=np.array([0., 0., 1.])):
current_coords = np.array(coordinates[current])
action_coords = np.array(coordinates[action])
distance = np.linalg.norm(current_coords - action_coords)
if distance <= interval:
nodes = [current_coords, action_coords]
else:
num_of_new_nodes = int(distance / interval)
new_nodes = [current_coords + i * interval * (action_coords - current_coords) / distance for i in range(1, num_of_new_nodes)]
nodes = [current_coords] + new_nodes + [action_coords]
lines = aabb_tree(nodes, lines, coordinates)
radius = 32
height_cylinder = 150
height_cone = 41
node_rays = [[] for _ in nodes]
node_index = 0
for node in nodes:
for ray_dir in rays:
cone_center = node - height_cone * ray_dir
cylinder_center = node - (height_cylinder + height_cone) * ray_dir
collision = False
for line in lines:
line_nodes = sample_line_nodes(line, coordinates, step=5)
for P in line_nodes:
if if_in_cone(P, cone_center, ray_dir, radius, height_cone):
collision = True
break
if if_in_cylinder(P, cylinder_center, ray_dir, radius, height_cylinder):
collision = True
break
if collision:
break
if not collision:
node_rays[node_index].append(ray_dir)
node_index = node_index + 1
all_rays = [ray for rays in node_rays for ray in rays]
duplicate_rays = []
threshold = 1e-6
for i, ray_i in enumerate(all_rays):
if any(np.linalg.norm(dup - ray_i) < threshold for dup in duplicate_rays):
continue
count = sum(1 for ray_j in all_rays if np.linalg.norm(ray_i - ray_j) < threshold)
if count == len(node_rays):
duplicate_rays.append(ray_i)
final_collision = True
normal = None
min_angle = 0
if len(duplicate_rays) != 0:
final_collision = False
if norm_output:
neg_duplicate_rays = [-1 * ray for ray in duplicate_rays]
angles = [np.arccos(np.clip(np.dot(ray, specified_vector) / (np.linalg.norm(ray) * np.linalg.norm(specified_vector)), -1, 1)) for ray in neg_duplicate_rays]
min_angle_index = np.argmin(angles)
normal = neg_duplicate_rays[min_angle_index]
normal = normal.tolist()
min_angle = angles[min_angle_index]
return final_collision, normal, min_angle
def generate_rays(degrees):
rays = []
init_vector = np.array([1, 0, 0])
for elev in range(-35, -65, -13):
rot_mat_y = np.array([
[math.cos(math.radians(elev)), 0, -math.sin(math.radians(elev))],
[0, 1, 0],
[math.sin(math.radians(elev)), 0, math.cos(math.radians(elev))]
])
curr_vector = np.matmul(rot_mat_y, init_vector)
for azim in range(0, 360, 30):
rot_mat_z = np.array([
[math.cos(math.radians(azim)), -math.sin(math.radians(azim)), 0],
[math.sin(math.radians(azim)), math.cos(math.radians(azim)), 0],
[0, 0, 1]
])
ray_dir = np.matmul(rot_mat_z, curr_vector)
rays.append(ray_dir)
rays.append(np.array([0, 0, -1]))
return rays
def sample_line_nodes(line, coordinates, step=2):
start, end = line
start_coords = np.array(coordinates[start])
end_coords = np.array(coordinates[end])
length = np.linalg.norm(end_coords - start_coords)
nodes_count = int(length / step)
new_nodes_coords = [start_coords + (end_coords - start_coords) * i / nodes_count for i in range(1, nodes_count)]
nodes = [start_coords] + new_nodes_coords + [end_coords]
return nodes
def get_merged_diff(init_adjacency_matrix_, adjacency_matrix_):
diff = np.where(init_adjacency_matrix_ != adjacency_matrix_)
lines = list(zip(diff[0], diff[1]))
merged_lines = set()
for line in lines:
line = tuple(sorted(line))
merged_lines.add(line)
return list(merged_lines)
def collision_checking(coordinates, adjacency_matrix_, vector_z, current, action, optional_action, beam_seq, rays):
collision = False
optional_action_lo = optional_action[optional_action != action]
for other_action in optional_action_lo:
vector1 = np.array(coordinates[action]) - np.array(coordinates[current])
vector2 = np.array(coordinates[other_action]) - np.array(coordinates[current])
if is_within_sector(vector_z, vector1, vector2):
collision = True
if len(beam_seq) > 1:
optional_action_right = np.where(np.array(adjacency_matrix_[action]) != 0)[0]
optional_action_ro = optional_action_right[optional_action_right != current]
for other_action in optional_action_ro:
vector1 = np.array(coordinates[current]) - np.array(coordinates[action])
vector2 = np.array(coordinates[other_action]) - np.array(coordinates[action])
if is_within_sector(vector_z, vector1, vector2):
collision = True
return collision
def if_in_cylinder(P, cylinder_center, ray_dir, radius, height):
ray_dir = ray_dir / np.linalg.norm(ray_dir)
P_prime = P - cylinder_center
distance_along_ray = np.dot(P_prime, ray_dir)
if distance_along_ray < 0 or distance_along_ray > height - 0.01:
return False
P_proj = distance_along_ray * ray_dir
distance_to_ray = np.linalg.norm(P_proj - P_prime)
return distance_to_ray < radius - 0.01
def if_in_cone(P, node, ray_dir, radius, height):
ray_dir = ray_dir / np.linalg.norm(ray_dir)
P_prime = P - node
distance_along_ray = np.dot(P_prime, ray_dir)
if distance_along_ray < 0 or distance_along_ray > height - 0.01:
return False
P_proj = distance_along_ray * ray_dir
distance_to_ray = np.linalg.norm(P_proj - P_prime)
cone_radius_at_P = (height - distance_along_ray) * radius / height
return distance_to_ray < cone_radius_at_P - 0.01
def choose_start_nodes(G, boundary_nodes_array, material):
if material == 'PLA3D':
selected_node = np.random.choice(boundary_nodes_array)
neighbors = list(G.neighbors(selected_node))
boundary_neighbors = [node for node in neighbors if node in boundary_nodes_array]
selected_boundary_neighbor = np.random.choice(boundary_neighbors)
start_nodes = [selected_node, selected_boundary_neighbor]
elif material == 'CCF':
available_nodes = list(set(G.nodes) - set(boundary_nodes_array))
random_node = random.choice(available_nodes)
start_nodes = [random_node]
else:
start_nodes = [0]
return start_nodes
def normalize_vector(v):
norm = np.linalg.norm(v)
if norm == 0:
return v
return v / norm
def calculate_mid_vector(v1, v2):
v1_norm = normalize_vector(v1)
v2_norm = normalize_vector(v2)
return normalize_vector(v1_norm + v2_norm)
def calculate_offset_point(point, mid_vector, offset_distance):
return point + mid_vector * offset_distance
def generate_beam_sequence(env_name, existing_coordinates):
parts = env_name.split('-')
common_name = '-'.join(parts[:-1]) + '_wireframe'
common_name_2 = parts[-1]
best_path_file = f'data/{parts[0]}/{common_name}.txt'
node_coordinates_file = f'data/{parts[0]}/Node{common_name}.txt'
second_node_coordinates_file = f'data/{parts[0]}/Node{parts[0]}-{common_name_2}.txt'
def extract_best_path(file_path):
with open(file_path, 'r') as file:
for line in file:
if line.startswith("Best path:"):
best_path_str = line.split(":")[1].strip()
best_path_ = eval(best_path_str)
return best_path_
return None
def extract_coordinates(file_path):
coordinates = {}
with open(file_path, 'r') as file:
for i, line in enumerate(file):
coords = [float(x) for x in line.split()]
coordinates[i] = coords
return coordinates
def compare_coordinates(best_path_, node_coordinates_, existing_coordinates_):
beam_seq_ = []
close_nodes = []
for index in best_path_:
if index in node_coordinates_:
x, y, z = node_coordinates_[index]
for key, value in existing_coordinates_.items():
if all(abs(a - b) <= 0.01 for a, b in zip(value, [x, y, z])):
close_nodes.append(key)
if close_nodes:
beam_seq_.append([close_nodes, 0.0])
return beam_seq_
def add_min_z_coordinate_node(beam_seq_, second_node_coordinates_, existing_coordinates_):
if beam_seq_:
min_z_node = min(second_node_coordinates_, key=lambda k: second_node_coordinates_[k][2])
min_z_coords = second_node_coordinates_[min_z_node]
corresponding_node = next((key for key, value in existing_coordinates_.items() if all(abs(a - b) <= 0.01 for a, b in zip(value, min_z_coords))), None)
if corresponding_node is not None:
beam_seq_[0][0].append(corresponding_node)
best_path = extract_best_path(best_path_file)
node_coordinates = extract_coordinates(node_coordinates_file)
second_node_coordinates = extract_coordinates(second_node_coordinates_file)
beam_seq = compare_coordinates(best_path, node_coordinates, existing_coordinates)
add_min_z_coordinate_node(beam_seq, second_node_coordinates, existing_coordinates)
return beam_seq
def calculate_total_max_deformation(node_file, path_file):
def read_coordinates(filename):
coordinates_ = {}
with open(filename, 'r') as file:
for index, line in enumerate(file):
x, y, z = map(float, line.split())
coordinates_[index] = [x, y, z]
return coordinates_
def read_path(filename):
path_ = []
with open(filename, 'r') as file:
for line in file:
if line.startswith("Best path:"):
path_ = line.split(':')[1].strip().strip('[]').split(', ')
path_ = list(map(int, path_))
break
return [(path_[i], path_[i + 1]) for i in range(len(path_) - 1)]
coordinates = read_coordinates(node_file)
path = read_path(path_file)
boundary_nodes_array = np.array([index for index, coord in coordinates.items() if coord[2] < 1])
lines = []
max_deformation = 0
lines_max_deformation = []
index = 0
max_index = 0
for edge in path:
if edge not in lines:
lines.append(edge)
current_lines = np.array(lines)
current_deformation = beam_fea_graph(current_lines, coordinates, boundary_nodes_array, draw=False)
if current_deformation > max_deformation:
max_deformation = current_deformation
lines_max_deformation = list(lines)
max_index = index
index += 1
lines_max_deformation_array = np.array(lines_max_deformation)
beam_fea_graph(lines_max_deformation_array, coordinates, boundary_nodes_array, draw=False)
return max_deformation, max_index
def save_average_to_file(return_list, env_name, savept):
average = round(sum(return_list[:100]) / 100, 3)
file_suffix = "P" if savept else "I"
file_name = f"checkpoint/100_average_{env_name}_{file_suffix}.txt"
with open(file_name, "a") as file:
file.write(f"{average}\n")
def update_progress(beam_seq, total_nodes, bar_length=50):
unique_nodes = len(set(node for path in beam_seq for node in path[0]))
progress = unique_nodes / total_nodes
arrow = int(round(progress * bar_length - 1)) if unique_nodes != total_nodes else bar_length
spaces = bar_length - arrow
progress_bar = 'Progress: [{0}{1}] {2}%'.format('>' * arrow, ' ' * spaces, round(progress * 100, 2))
sys.stdout.write("\r" + progress_bar)
sys.stdout.flush()
def update_progress_post(ii, total, bar_length=50):
"""Update the progress bar based on the iteration index and total length."""
progress = ii / total
arrow = int(round(progress * bar_length - 1)) if ii < total else bar_length
spaces = bar_length - arrow
progress_bar = 'Progress: [{0}{1}] {2}%'.format('>' * arrow, ' ' * spaces, round(progress * 100, 2))
sys.stdout.write("\r" + progress_bar)
sys.stdout.flush()
def final_progress(bar_length=50):
"""Function to display the progress bar at 100% completion and then move to a new line."""
progress_bar = 'Progress: [{0}] {1}%'.format('>' * bar_length, 100.0)
print("\r" + progress_bar)
def find_most_similar_checkpoint(current_adj_matrix, checkpoint_dir="checkpoint"):
max_similarity = -1
most_similar_checkpoint_path = None
normalized_matrix = normalize_matrix(current_adj_matrix)
for filename in os.listdir(checkpoint_dir):
if filename.endswith(".pt"):
filepath = os.path.join(checkpoint_dir, filename)
checkpoint = torch.load(filepath, map_location=torch.device('cpu'))
adj_matrix = checkpoint['adj_matrix']
if adj_matrix.shape != normalized_matrix.shape:
larger_shape = tuple(max(s1, s2) for s1, s2 in zip(adj_matrix.shape, normalized_matrix.shape))
adj_matrix = adjust_and_pad_matrix(adj_matrix, larger_shape)
normalized_matrix = adjust_and_pad_matrix(normalized_matrix, larger_shape)
similarity = calculate_similarity(normalized_matrix, adj_matrix)
if similarity > max_similarity:
max_similarity = similarity
most_similar_checkpoint_path = filepath
return most_similar_checkpoint_path, max_similarity
def normalize_matrix(matrix):
max_val = matrix.max()
if max_val > 0:
normalized_matrix = matrix / max_val
else:
normalized_matrix = matrix
return normalized_matrix
def calculate_similarity(matrix_a, matrix_b):
if not isinstance(matrix_a, torch.Tensor):
matrix_a = torch.tensor(matrix_a, dtype=torch.float32)
if not isinstance(matrix_b, torch.Tensor):
matrix_b = torch.tensor(matrix_b, dtype=torch.float32)
diff = torch.norm(matrix_a - matrix_b, p='fro') * 0.2
similarity = 1 / (1 + diff)
return similarity.item()
def adjust_and_pad_matrix(matrix, target_shape):
padding = [(0, max(0, t - c)) for t, c in zip(target_shape, matrix.shape)]
if isinstance(matrix, np.ndarray):
return np.pad(matrix, padding, mode='constant', constant_values=0)
elif isinstance(matrix, torch.Tensor):
pad = (0, padding[2][1], 0, padding[1][1], 0, padding[0][1])
return torch.nn.functional.pad(matrix, pad, "constant", 0)
else:
raise TypeError("Unsupported type. The input must be either a numpy array or a torch tensor.")
def extract_faces_from_obj(env_name):
file_name = f"data/{env_name}.obj"
faces = []
try:
with open(file_name, 'r') as file:
for line in file:
if line.startswith('f '):
face_indices = line.strip().split()[1:]
face_indices = [int(index) - 1 for index in face_indices]
faces.append(face_indices)
faces_np = np.array(faces, dtype=int)
return faces_np
except FileNotFoundError:
print(f"file {file_name} not found。")
return None
except Exception as e:
print(f"read eroor:{e}")
return None
def pad_matrix(matrix, target_dim):
current_depth, current_height, current_width = matrix.shape
padding_height = max(0, target_dim - current_height)
padding_width = max(0, target_dim - current_width)
new_height = current_height + padding_height
new_width = current_width + padding_width
new_shape = (current_depth, new_height, new_width)
padded_matrix = np.zeros(new_shape, dtype=matrix.dtype)
padded_matrix[:, :current_height, :current_width] = matrix
return padded_matrix
def compute_edges_length(V, Edge):
diff = V[Edge[:, 1], :] - V[Edge[:, 0], :]
return np.sqrt(np.sum(diff ** 2, axis=1))
def beam_fea_calculate(g_num, g_coord, total_force_new, boundary_nodes_array_fea, draw, index=0):
E = 2636
rho = 1250
A = 3.14e-6
G = 1419
r = 0.001
G = G * 2.22
E = E + 700
V = g_coord * 0.001
Edge = g_num
le = compute_edges_length(V, Edge)
F = np.zeros(6 * V.shape[0])
for force in total_force_new:
F[int(force[0]) * 6 + 2] = force[-1]
F = F * 0.001
Iz = np.pi * r ** 4 / 4
Iy = np.pi * r ** 4 / 4
J = np.pi * r ** 4 / 2
K = np.zeros((6 * V.shape[0], 6 * V.shape[0]))
for i in range(Edge.shape[0]):
v1 = Edge[i, 0]
v2 = Edge[i, 1]
DOF = np.hstack((np.arange(6 * v1, 6 * (v1 + 1)), np.arange(6 * v2, 6 * (v2 + 1))))
kk = np.zeros((12, 12))
kk[0, 0] = E * A / le[i]
kk[6, 0] = -kk[0, 0]
kk[0, 6] = -E * A / le[i]
kk[6, 6] = -kk[0, 6]
kk[1, 1] = 12 * E * Iz / le[i] ** 3
kk[7, 1] = -kk[1, 1]
kk[1, 5] = 6 * E * Iz / le[i] ** 2
kk[7, 5] = -kk[1, 5]
kk[1, 7] = -12 * E * Iz / le[i] ** 3
kk[7, 7] = -kk[1, 7]
kk[1, 11] = 6 * E * Iz / le[i] ** 2
kk[7, 11] = -kk[1, 11]
kk[2, 2] = 12 * E * Iy / le[i] ** 3
kk[8, 2] = -kk[2, 2]
kk[2, 4] = -6 * E * Iy / le[i] ** 2
kk[8, 4] = -kk[2, 4]
kk[2, 8] = -12 * E * Iy / le[i] ** 3
kk[8, 8] = -kk[2, 8]
kk[2, 10] = -6 * E * Iy / le[i] ** 2
kk[8, 10] = -kk[2, 10]
kk[3, 3] = G * J / le[i]
kk[9, 3] = -kk[3, 3]
kk[3, 9] = -G * J / le[i]
kk[9, 9] = -kk[3, 9]
kk[4, 2] = -6 * E * Iy / le[i] ** 2
kk[10, 2] = kk[4, 2]
kk[4, 4] = 4 * E * Iy / le[i]
kk[10, 4] = 2 * E * Iy / le[i]
kk[4, 8] = 6 * E * Iy / le[i] ** 2
kk[10, 8] = kk[4, 8]
kk[4, 10] = 2 * E * Iy / le[i]
kk[10, 10] = 4 * E * Iy / le[i]
kk[5, 1] = 6 * E * Iz / le[i] ** 2
kk[11, 1] = kk[5, 1]
kk[5, 5] = 4 * E * Iz / le[i]
kk[11, 5] = 2 * E * Iz / le[i]
kk[5, 7] = -6 * E * Iz / le[i] ** 2
kk[11, 7] = kk[5, 7]
kk[5, 11] = 2 * E * Iz / le[i]
kk[11, 11] = 4 * E * Iz / le[i]
l = (V[v2, 0] - V[v1, 0]) / le[i]
m = (V[v2, 1] - V[v1, 1]) / le[i]
n = (V[v2, 2] - V[v1, 2]) / le[i]
D = np.sqrt(l ** 2 + m ** 2)
if D == 0:
if n > 0:
R = np.array([[0, 0, 1], [0, 1, 0], [-1, 0, 0]])
else:
R = np.array([[0, 0, -1], [0, 1, 0], [1, 0, 0]])
else:
R = np.array([[l, m, n], [-m / D, l / D, 0], [-l * n / D, -m * n / D, D]])
RR = np.zeros((12, 12))
RR[:3, :3] = R
RR[3:6, 3:6] = R
RR[6:9, 6:9] = R
RR[9:, 9:] = R
K[np.ix_(DOF, DOF)] += RR.T @ kk @ RR
U = np.zeros(6 * V.shape[0])
fixed_node = boundary_nodes_array_fea
fixed_free_dof = np.unique(np.hstack([np.arange(6 * node, 6 * (node + 1)) for node in fixed_node]))
all_free_dof = np.arange(6 * V.shape[0])
free_dof = np.setdiff1d(all_free_dof, fixed_free_dof)
U[free_dof] = np.linalg.solve(K[np.ix_(free_dof, free_dof)], F[free_dof])
U = U.reshape(-1, 6)
s = U[:, :3]
VV = V + s
if draw:
distances = np.linalg.norm(V - VV, axis=1)
color_factor = 100 * distances / 8
color_factor = np.clip(color_factor, 0, 1)
fig = plt.figure(dpi=500)
ax = fig.add_subplot(111, projection='3d')
for i in range(Edge.shape[0]):
ax.plot(*V[Edge[i, :], :].T, color='grey', linewidth=1.4, alpha=0.9)
for i in range(V.shape[0]):
ax.scatter(*V[i], color='grey', s=2, alpha=0.9)
ax.axis('off')
output_dir = 'FEA_simu/ori'
if not os.path.exists(output_dir):
os.makedirs(output_dir)
file_name = f"{output_dir}/plot_{index}.png"
elev = 30
azim = 0
ax.view_init(elev=elev, azim=azim)
ax.set_xlim([0, 0.1])
ax.set_ylim([0, 0.1])
ax.set_zlim([0, 0.1])
plt.savefig(file_name)
plt.clf()
max_deformation_length = 0
for i in range(VV.shape[0]):
deformation_length = np.linalg.norm(V[i] - VV[i]) * 100
if deformation_length > max_deformation_length:
max_deformation_length = deformation_length
return max_deformation_length
