201 lines
7.0 KiB
Python
201 lines
7.0 KiB
Python
import numpy as np
|
||
import pandas as pd
|
||
from matplotlib import pyplot as plt
|
||
import argparse
|
||
import sys
|
||
|
||
def absoluteOrientationMatrix(D, M):
|
||
dmean = D.mean(1)
|
||
mmean = M.mean(1)
|
||
|
||
print("dmean is: ", dmean[:, np.newaxis])
|
||
print("mmean is: ", mmean[:, np.newaxis])
|
||
|
||
D_centr = D - dmean[:, np.newaxis]
|
||
M_centr = M - mmean[:, np.newaxis]
|
||
|
||
print("D_centr is: ", D_centr)
|
||
print("M_centr is: ", M_centr)
|
||
|
||
# Comment with reference (1).
|
||
# If you don't understand what's written here, plese refer to the links and the code below
|
||
H = np.zeros(shape=(3, 3))
|
||
for column in range(D.shape[1]):
|
||
H += np.outer(D_centr[:, column], M_centr[:, column])
|
||
# print("H is: ", H)
|
||
print("H is: ", H.T)
|
||
|
||
u, s, vh = np.linalg.svd(H.T)
|
||
#R = vh @ u.T
|
||
I = np.identity(3)
|
||
if(np.linalg.det(u) * np.linalg.det(vh)<0):
|
||
I[2,2] = -1
|
||
R = u @ I @ vh
|
||
R = R.T
|
||
|
||
# Output of rotmodel has to be the vector.
|
||
# Comment with reference (2).
|
||
rotmodel = R @ M_centr
|
||
# print(rotmodel)
|
||
dots = 0.0
|
||
norms = 0.0
|
||
|
||
for column in range(D_centr.shape[1]):
|
||
dots += np.dot(D_centr[:,column].transpose(),rotmodel[:,column])
|
||
normi = np.linalg.norm(M_centr[:,column])
|
||
norms += normi*normi
|
||
|
||
# s has to be the scalar.
|
||
s = float(dots/norms)
|
||
|
||
T = dmean - R @ mmean
|
||
T_GT = dmean - s * R @ mmean
|
||
|
||
print("The Rotation matrix is: \n", R, "\n")
|
||
print("The Translation matrix is: \n", T, "\n")
|
||
print("The scale factor is: ", s)
|
||
|
||
return R, T, T_GT, s
|
||
|
||
|
||
def find_closest_times(res, gt, difference, time_measure="nano"):
|
||
'''
|
||
:param res:
|
||
:param gt:
|
||
:param difference:
|
||
:param time_measure:
|
||
:return:
|
||
'''
|
||
|
||
gt_adapted = pd.DataFrame({"time[s]":[0], "tx":[0], "ty":[0], "tz":[0], "qx":[0], "qy":[0], "qz":[0], "qw":[0]})
|
||
gt_adapted = gt_adapted.drop(0)
|
||
|
||
if time_measure == "milli":
|
||
res["time[s]"] = res["time[s]"] * 1000000
|
||
elif time_measure == "s":
|
||
res["time[s]"] = res["time[s]"] * 1000000000
|
||
elif time_measure == "nano":
|
||
pass
|
||
|
||
print(res["time[s]"])
|
||
print()
|
||
print(gt["# timestamp[ns]"])
|
||
|
||
res = res.rename(columns={"x":"tx", "y":"ty", "z":"tz"})
|
||
res_initial = res.copy()
|
||
res = res_initial[0:0]
|
||
|
||
# Последний момент времени в res неадкватный, поэтому уберем его:
|
||
# res = res.drop(len(res)-1)
|
||
all_gt_times = gt["# timestamp[ns]"].to_numpy()
|
||
|
||
for i in range(len(res_initial)):
|
||
time_res = res_initial.iloc[i][0]
|
||
compar_res = np.where(all_gt_times < time_res)
|
||
# print(compar_res[0])
|
||
if not compar_res[0].shape[0] == 0:
|
||
gt_time = gt.iloc[compar_res[0][-1]][0]
|
||
# print(abs(gt_time - time_res) < difference)
|
||
if ( (not compar_res[0].shape[0] == 0) and (abs(gt_time - time_res) < difference) ):
|
||
index = compar_res[0][-1]
|
||
else:
|
||
continue
|
||
|
||
# print(index)
|
||
closest_row = gt.iloc[index]
|
||
closest_row = closest_row.rename({ "# timestamp[ns]":"time[s]" })
|
||
closest_row = closest_row.rename(i)
|
||
# print(closest_row)
|
||
gt_adapted = gt_adapted.append(closest_row)
|
||
res = res.append(res_initial.iloc[i])
|
||
|
||
return res, gt_adapted
|
||
|
||
|
||
def ATE_error(D, M):
|
||
diff = D.T - M.T
|
||
print("alignmentGT resuls is: \n", diff)
|
||
trans_error1 = np.sqrt(np.sum(np.multiply(diff,diff), 0))
|
||
error1 = np.sqrt(np.dot(trans_error1, trans_error1)/len(trans_error1))
|
||
return error1
|
||
|
||
|
||
def plot_and_save(M, D, name):
|
||
plot = plt.figure(figsize=(10,10))
|
||
ax1 = plot.add_subplot(1, 1, 1)
|
||
ax1.grid()
|
||
ax1.set_xlabel('x [m]')
|
||
ax1.set_ylabel('y [m]')
|
||
#ax1.set_aspect("equal")
|
||
ax1.plot(M[:, 0], M[:, 1], label="estimated")
|
||
ax1.plot(D[:, 0], D[:, 1], label="ground truth")
|
||
plot.legend()
|
||
plot.savefig(name)
|
||
|
||
|
||
def read_file(filename, sep_=",", switcher=0):
|
||
file = pd.read_csv(filename, sep=sep_)
|
||
if switcher == "1":
|
||
file = file.rename(columns={ "#timestamp [ns]":"# timestamp[ns]", " p_RS_R_x [m]":"tx", " p_RS_R_y [m]":"ty",
|
||
" p_RS_R_z [m]":"tz", " q_RS_w []":"qw", " q_RS_x []":"qx", " q_RS_y []":"qy",
|
||
" q_RS_z []":"qz" })
|
||
return file
|
||
|
||
|
||
if __name__ == "__main__":
|
||
|
||
parser = argparse.ArgumentParser(description='''This script computes the absolute trajectory error from the ground truth trajectory and the estimated trajectory. ''')
|
||
parser.add_argument("first_file", help='ground truth trajectory (format: # timestamp[ns],tx,ty,tz,qw,qx,qy,qz) or (format: #timestamp [ns], p_RS_R_x [m], p_RS_R_y [m], p_RS_R_z [m], q_RS_w [], q_RS_x [], q_RS_y [], q_RS_z []). Use --switch_gt=0 to use first format or --switch_gt=1 to use second format')
|
||
parser.add_argument("second_file", help="estimated trajectory (format: time[s] x y z qx qy qz qw)")
|
||
parser.add_argument("--switch_gt", help="Use --switch_gt=0 to use first format or --switch_gt=1 to use second format", default=0)
|
||
parser.add_argument("--slam_name", help="the name of the slam used for the processing. ORB-SLAM3 by default", default="orb_slam3")
|
||
parser.add_argument("--savefile_folder", help="absolute or relative path to save the file", default=".")
|
||
parser.add_argument("--res_measure", help="time measurement of the resulting text file.", default="nano")
|
||
parser.add_argument("--separator", help="separator used in the estimated values file ", default=" ")
|
||
parser.add_argument("--sensor", help="lol no help. ahahhah")
|
||
|
||
args = parser.parse_args()
|
||
first_file = args.first_file
|
||
switcher = args.switch_gt
|
||
second_file = args.second_file
|
||
|
||
gt = read_file(first_file, switcher=switcher)
|
||
# TODO: Adapt for sep = ","
|
||
sep = args.separator
|
||
res = read_file(second_file, sep_=sep)
|
||
|
||
tmp = gt["qw"]
|
||
del gt["qw"]
|
||
gt["qw"] = tmp
|
||
gt
|
||
|
||
# by default is 20 mlns of ns
|
||
difference = 20000000
|
||
time_measure = args.res_measure
|
||
if time_measure:
|
||
res, gt_adapted = find_closest_times(res, gt, difference, time_measure)
|
||
else:
|
||
res, gt_adapted = find_closest_times(res, gt, difference)
|
||
|
||
D = gt_adapted[["tx", "ty", "tz"]].to_numpy()
|
||
M = res[["tx", "ty", "tz"]].to_numpy()
|
||
|
||
R, T, T_GT, s = absoluteOrientationMatrix(D.T, M.T)
|
||
M_GT_aligned = (s * R @ M.T).T + T_GT
|
||
M_aligned = (R @ M.T).T + T
|
||
|
||
error_GT = ATE_error(D, M_GT_aligned)
|
||
error = ATE_error(D, M_aligned)
|
||
|
||
folder = args.savefile_folder
|
||
f = open(folder + "/" + args.slam_name + "_" + args.sensor + "_errors.txt", "w")
|
||
f.write("RMSError using scaling parameter: " + str(error_GT) + " m.")
|
||
f.write("\n")
|
||
f.write("RMSError without scaling parameter: " + str(error) + " m.")
|
||
f.close()
|
||
|
||
plot_and_save(M_aligned, D, folder + "/" + args.slam_name + "_" + args.sensor + "_trajectory.png")
|
||
plot_and_save(M_GT_aligned, D, folder + "/" + args.slam_name + "_" + args.sensor + "_trajectory_GT.png")
|
||
|
||
print("End of processing. Success! Check the created plots and files with data")
|