dds recording_xxx.dat 数据生成 imu timestamp diff 解析图
工具脚本:
import sqlite3
import numpy as np
import matplotlib.pyplot as plt
import sys
import struct
import os
time_list = []
def parse_sqlite(file_list, file_dir):
for file in file_list:
dat_path = file_dir + '/' + file
parse_sqlite_timelist(dat_path)
plot_time_ellipses(file_dir)
time_list = []
def plot_time_ellipses(file_dir):
output_dir = file_dir + '/Imu_KPI_Output'
if not os.path.exists(output_dir):
print("create output dir:", output_dir)
os.mkdir(output_dir)
plt.close()
fig = plt.figure(figsize=(20, 10))
ellipses = []
last_time = 0
for time in time_list:
if last_time > 0:
elps = time - last_time
if elps > 19 or elps < 0:
print("Error elps >> " + str(elps) + " ms" )
last_time = time
continue
ellipses.append(elps)
last_time = time
x = range(len(ellipses))
x = [a*0.01 for a in x]
y = ellipses
print("ellipses max = ", max(ellipses), "min =", min(ellipses))
print("len = " , len(x))#, '; x=', x)
print("len = " , len(y))#, '; y=', y)
plt.title("imu time ellipses")
# # 使用 matplotlib 来绘制点
plt.plot(x, y, '.', markersize=0.5)
# fig = plt.figure(figsize=(20, 10))
image_name = output_dir + '/Imu_TimeStamp_Ellipses.png'
fig.savefig(image_name, dpi=144)
# plt.savefig(image_name)
print("save plot >>", image_name)
plt.close(fig)
# plt.show()
def parse_sqlite_timelist(file_name):
print('parse dat file >>', file_name)
conn = sqlite3.connect(file_name)
cursor = conn.cursor()
cursor.execute('select rti_cdr_sample from [ImuSysRxTopic@128]')
values = cursor.fetchall()
# print(values[0][0])
delta_s_list = []
last_real_time = 0
for r in values:
for c in r:
# hex_str = 'raw_hex = '
# for j in c:
# hex_str += "%02x" % j + " "
# print(hex_str + "; size =", len(c))
# time_hex = ''
# for i in range(4,12,):
# time_hex += "%02x" % c[i]
# print(i, "%02x" % c[i])
# print(time_hex)
time_hex = ''
time_ns = 0
for i in range(11,3,-1):
time_hex += "%02x" % c[i]
# print(i, "%02x" % c[i])
time_ns = int(time_hex, 16)
time_ms = time_ns/1e6
# print(time_hex , ">>", time_ns , ">>", time_ms)
delta_s_hex = ''
delta_s_float = 0.
for i in range(1, 5):
# print(-i, "%02x" % c[-i])
delta_s_hex += "%02x" % c[-i]
delta_s_float = struct.unpack('!f', bytes.fromhex(delta_s_hex))[0]
# print(delta_s_hex, ">>", delta_s_float , "s")
cur_real_time = time_ms + delta_s_float*1000
print("timestamp =", time_ms , "ms; delta =", delta_s_float, "s >> real_time =", time_ms + delta_s_float*1000 , "ms", "diff @ ", cur_real_time - last_real_time)
time_list.append(cur_real_time)
last_real_time = cur_real_time
# time_str = ''
# for i in range(0,8):
# time_str += r[4+i]
# print(time_str)
# first_time = values[0][0]
# for sample in values:
# time_list.append((sample[0] - first_time) * 1e-06 * 1.0) # ns -> ms
# time_length = (values[-1][0] - first_time) * 1e-09 * 1.0 # sec
# print('data time length = ' + str(time_length) + 's')
if __name__ == '__main__':
dat_list = []
if len(sys.argv) > 1:
print(len(sys.argv))
if sys.argv[1]:
dat_dir = sys.argv[1]
else :
print('please input dat_dir in argv[1]')
sys.exit(1)
print("parse dir:", dat_dir)
# get dat list
print("\n1. get dat_list")
ow = os.walk(dat_dir)
for path,dir_list,file_list in ow:
for file_name in file_list:
if ('recording' in file_name) and ('recording_0_' not in file_name) and ('.dat' in file_name):
print(">> Add dat >>", file_name)
dat_list.append(file_name)
# sort dat list
if len(dat_list) > 1:
print("\n2. sort dat_list")
dat_list.sort(key=lambda x:int(x[10:12].replace('_', '')))
for d in dat_list:
print(d[10:12].replace('_', '') , ">>", d)
parse_sqlite(dat_list, dat_dir)
使用方法:
执行命令:
|
python3 sqlite_ImuSysT_reader.py [dat所在文件夹]
|
生成解析结果: dat所在文件夹/Imu_KPI_Output/Imu_TimeStamp_Ellipses.png
结果如图: (两帧之间时差, 期望值是10ms)
过滤了一些异常数值: (过滤 时差小于0,或大于19的数值)
工具原理:
- 对'dat所在文件夹'中,所有的数据进行排序,
- 使用sqlite库批量解析所有dat数据, 获取timelist
- 遍历timelist, 计算time diff , 并绘制plot图