-
Notifications
You must be signed in to change notification settings - Fork 55
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
698 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
from machine import UART, Pin | ||
import time | ||
|
||
class GPS(): | ||
def __init__(self,uart_num=1, pin_tx=Pin.P14, pin_rx=Pin.P13): | ||
self.__uart_port = UART(uart_num, baudrate=9600, tx=pin_tx, rx=pin_rx) | ||
self.GNSS_RX_Buffer = "" | ||
self.GNSS_Buffer = "" | ||
self.UTC_Time = "" | ||
self.latitude = "" | ||
self.N_S = "" | ||
self.longitude = "" | ||
self.E_W = "" | ||
self.speed_to_groud = "" | ||
self.speed_to_groud_kh = 0 | ||
self.course_over_ground = "" | ||
self.date = "" | ||
self.isDecodeData = False | ||
self.isParseData = False | ||
self.DataIsUseful = False # 是否有有效数据信息 | ||
|
||
def GNSS_Read(self): | ||
self.GNSS_RX_Buffer = self.__uart_port.read(1200) | ||
while type(self.GNSS_RX_Buffer)!=bytes: | ||
self.GNSS_RX_Buffer = self.__uart_port.read(1200) | ||
|
||
# 由于电路连接不可靠等原因,有时接收到的信息含有非 ASCII 字符,引发 UnicodeError | ||
# AttributeError 引发原因未知 | ||
while self.isDecodeData==False: | ||
try: | ||
self.GNSS_Buffer = self.GNSS_RX_Buffer.decode('ASCII') | ||
self.isDecodeData = True | ||
except UnicodeError: | ||
self.GNSS_RX_Buffer = self.__uart_port.read(1200) | ||
except AttributeError: | ||
self.GNSS_RX_Buffer = self.__uart_port.read(1200) | ||
|
||
# 测试模块工作是否正常的代码,实际应用请注释掉以避免输出信息过多 | ||
#print("This is GNSS_RX_Buffer, origin data from uart:") | ||
#print(self.GNSS_RX_Buffer) | ||
#print("This is GNSS_Buffer, after decoding:") | ||
#print(self.GNSS_Buffer) | ||
|
||
# 模块输出的信息很多,为了简便起见我们只选择 RMC 最小定位信息,此部分代码用于从 GNSS_Buffer 中截取 RMC 部分 | ||
GNSS_BufferHead = self.GNSS_Buffer.find("$GPRMC,") | ||
if GNSS_BufferHead == -1: | ||
GNSS_BufferHead = self.GNSS_Buffer.find("GNRMC,") | ||
#print(GNSS_BufferHead) | ||
if GNSS_BufferHead == -1: | ||
print("Cannot read the GPS , RMC imformation") | ||
self.isDecodeData = False | ||
else: | ||
GNSS_BufferTail = self.GNSS_Buffer[GNSS_BufferHead:].find("\r\n") | ||
#print(GNSS_BufferTail) | ||
if GNSS_BufferTail == -1: | ||
print("Not end with newline") | ||
self.isDecodeData = False | ||
|
||
self.GNSS_Buffer=self.GNSS_Buffer[GNSS_BufferHead:GNSS_BufferHead+GNSS_BufferTail] | ||
|
||
# 测试模块工作是否正常的代码,实际应用请注释掉以避免输出信息过多 | ||
# print("This is GNSS_RX_Buffer, origin data from uart:") | ||
# print(self.GNSS_RX_Buffer) | ||
# print("This is GNSS_Buffer, including RMC info:") | ||
# print(self.GNSS_Buffer) | ||
|
||
def GNSS_Parese(self): | ||
if(self.isDecodeData == True): | ||
self.isDecodeData= False | ||
print("*****************************************") | ||
|
||
temp = self.GNSS_Buffer.split(',') | ||
|
||
try: | ||
# RMC 信息中自带的标识符, A 代表有效, V 代表无效 | ||
if temp[2] == 'A': | ||
self.DataIsUseful = True | ||
else: | ||
self.DataIsUseful = False | ||
|
||
if temp[1] == "": | ||
self.UTC_Time = "-1" | ||
else: | ||
self.UTC_Time = temp[1] | ||
self.UTC_Time_Beijing = str(int(self.UTC_Time[0:2])+8)+':'+self.UTC_Time[2:4]+':'+self.UTC_Time[4:6] | ||
self.UTC_Time = self.UTC_Time[0:2]+':'+self.UTC_Time[2:4]+':'+self.UTC_Time[4:6] | ||
|
||
if temp[3] == "": | ||
self.latitude = "-1" | ||
else: | ||
self.latitude = temp[3] | ||
self.latitude = self.latitude[0:2]+' degree '+self.latitude[2:]+'\'' | ||
|
||
self.N_S = temp[4] | ||
|
||
if temp[5] == "": | ||
self.longitude = "-1" | ||
else: | ||
self.longitude = temp[5] | ||
#self.longitude = self.longitude[0:3]+'°'+self.latitude[3:] | ||
self.longitude = self.longitude[0:3]+' degree '+self.longitude[3:]+'\'' | ||
|
||
self.E_W = temp[6] | ||
|
||
# RMC 中的速度以节为单位 | ||
if temp[7] != "": | ||
try: | ||
self.speed_to_groud = float(temp[7]) | ||
self.speed_to_groud_kh = self.speed_to_groud*1.852 | ||
except ValueError: | ||
pass | ||
else: | ||
self.speed_to_groud = -1 | ||
self.speed_to_groud_kh = -1 | ||
|
||
if temp[8] != "": | ||
try: | ||
self.course_over_ground = float(temp[8]) | ||
except ValueError: | ||
pass | ||
else: | ||
self.course_over_ground = -1 | ||
|
||
if temp[9] == "": | ||
self.date = "-1" | ||
else: | ||
self.date = temp[9] | ||
self.date = self.date[4:6]+'.'+self.date[2:4]+'.'+self.date[0:2] | ||
|
||
self.isParseData = True | ||
except IndexError: | ||
pass | ||
|
||
def print_GNSS_info(self): | ||
if(self.isParseData): | ||
self.isParseData = False | ||
if(self.DataIsUseful): | ||
print("维度: %s %s",self.latitude,self.N_S) | ||
print("经度: %s %s",self.longitude,self.E_W) | ||
print("日期: %s",self.date,end=' ') | ||
print("UTC时间: %s",self.UTC_Time) | ||
print("卫星对地速度: %f km/h",self.speed_to_groud_kh) | ||
# print("course_over_groud: %f°",self.course_over_ground) | ||
else: | ||
print(" !") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
# 25 samples per second (in algorithm.h) | ||
SAMPLE_FREQ = 25 | ||
#计算HR时取4个样本的移动平均值 | ||
MA_SIZE = 4 | ||
#采样频率 | ||
BUFFER_SIZE = 100 | ||
|
||
#取平均值 | ||
|
||
def get_mean(ls): | ||
return sum(ls)/len(ls) | ||
|
||
def calc_hr_and_spo2(ir_data, red_data): | ||
|
||
ir_mean = int(get_mean(ir_data)) | ||
x = [] | ||
for k in ir_data: | ||
x.append((k-ir_mean)*-1) | ||
m = len(x) - MA_SIZE | ||
for i in range(m): | ||
x[i] = sum(x[i:i+MA_SIZE]) / MA_SIZE | ||
|
||
#计算阈值 | ||
n_th = int(get_mean(x)) | ||
n_th = 30 if n_th < 30 else n_th #允许的最小值 | ||
n_th = 60 if n_th > 60 else n_th #允许的最大值 | ||
|
||
ir_valley_locs, n_peaks = find_peaks(x, BUFFER_SIZE, n_th, 4, 15) | ||
peak_interval_sum = 0 | ||
if n_peaks >= 2: | ||
for i in range(1, n_peaks): | ||
peak_interval_sum += (ir_valley_locs[i] - ir_valley_locs[i-1]) | ||
peak_interval_sum = int(peak_interval_sum / (n_peaks - 1)) | ||
hr = int(SAMPLE_FREQ * 60 / peak_interval_sum) | ||
hr_valid = True | ||
else: | ||
hr = -999 #错误值 | ||
hr_valid = False | ||
|
||
# ---------spo2 血氧浓度--------- | ||
exact_ir_valley_locs_count = n_peaks | ||
for i in range(exact_ir_valley_locs_count): | ||
if ir_valley_locs[i] > BUFFER_SIZE: | ||
spo2 = -999 | ||
spo2_valid = False | ||
return hr, hr_valid, spo2, spo2_valid | ||
|
||
i_ratio_count = 0 | ||
ratio = [] | ||
|
||
# find max between two valley locations 在两个山谷位置之间找到最大值 | ||
# and use ratio between AC component of Ir and Red DC component of Ir and Red for SpO2 以及Ir的AC分量与Ir和Red的Red DC分量对于SpO2的使用比率 | ||
red_dc_max_index = -1 | ||
ir_dc_max_index = -1 | ||
for k in range(exact_ir_valley_locs_count-1): | ||
red_dc_max = -16777216 | ||
ir_dc_max = -16777216 | ||
if ir_valley_locs[k+1] - ir_valley_locs[k] > 3: | ||
for i in range(ir_valley_locs[k], ir_valley_locs[k+1]): | ||
if ir_data[i] > ir_dc_max: | ||
ir_dc_max = ir_data[i] | ||
ir_dc_max_index = i | ||
if red_data[i] > red_dc_max: | ||
red_dc_max = red_data[i] | ||
red_dc_max_index = i | ||
|
||
red_ac = int((red_data[ir_valley_locs[k+1]] - red_data[ir_valley_locs[k]]) * (red_dc_max_index - ir_valley_locs[k])) | ||
red_ac = red_data[ir_valley_locs[k]] + int(red_ac / (ir_valley_locs[k+1] - ir_valley_locs[k])) | ||
red_ac = red_data[red_dc_max_index] - red_ac # subtract linear DC components from raw 从原始中减去线性直流分量 | ||
|
||
ir_ac = int((ir_data[ir_valley_locs[k+1]] - ir_data[ir_valley_locs[k]]) * (ir_dc_max_index - ir_valley_locs[k])) | ||
ir_ac = ir_data[ir_valley_locs[k]] + int(ir_ac / (ir_valley_locs[k+1] - ir_valley_locs[k])) | ||
ir_ac = ir_data[ir_dc_max_index] - ir_ac # subtract linear DC components from raw | ||
|
||
nume = red_ac * ir_dc_max | ||
denom = ir_ac * red_dc_max | ||
if (denom > 0 and i_ratio_count < 5) and nume != 0: | ||
# original cpp implementation uses overflow intentionally. | ||
# but at 64-bit OS, Pyhthon 3.X uses 64-bit int and nume*100/denom does not trigger overflow | ||
# so using bit operation ( &0xffffffff ) is needed | ||
ratio.append(int(((nume * 100) & 0xffffffff) / denom)) | ||
i_ratio_count += 1 | ||
|
||
# choose median value since PPG signal may vary from beat to beat | ||
ratio = sorted(ratio) # sort to ascending order | ||
mid_index = int(i_ratio_count / 2) | ||
|
||
ratio_ave = 0 | ||
if mid_index > 1: | ||
ratio_ave = int((ratio[mid_index-1] + ratio[mid_index])/2) | ||
else: | ||
if len(ratio) != 0: | ||
ratio_ave = ratio[mid_index] | ||
|
||
# why 184? | ||
# print("ratio average: ", ratio_ave) | ||
if ratio_ave > 2 and ratio_ave < 184: | ||
# -45.060 * ratioAverage * ratioAverage / 10000 + 30.354 * ratioAverage / 100 + 94.845 | ||
spo2 = -45.060 * (ratio_ave**2) / 10000.0 + 30.054 * ratio_ave / 100.0 + 94.845 | ||
spo2_valid = True | ||
else: | ||
spo2 = -999 | ||
spo2_valid = False | ||
|
||
return hr, hr_valid, spo2, spo2_valid | ||
|
||
|
||
def find_peaks(x, size, min_height, min_dist, max_num): | ||
""" | ||
Find at most MAX_NUM peaks above MIN_HEIGHT separated by at least MIN_DISTANCE | ||
""" | ||
ir_valley_locs, n_peaks = find_peaks_above_min_height(x, size, min_height, max_num) | ||
ir_valley_locs, n_peaks = remove_close_peaks(n_peaks, ir_valley_locs, x, min_dist) | ||
|
||
n_peaks = min([n_peaks, max_num]) | ||
|
||
return ir_valley_locs, n_peaks | ||
|
||
|
||
def find_peaks_above_min_height(x, size, min_height, max_num): | ||
""" | ||
Find all peaks above MIN_HEIGHT | ||
""" | ||
|
||
i = 0 | ||
n_peaks = 0 | ||
ir_valley_locs = [] # [0 for i in range(max_num)] | ||
while i < size - 1: | ||
if x[i] > min_height and x[i] > x[i-1]: # find the left edge of potential peaks | ||
n_width = 1 | ||
# original condition i+n_width < size may cause IndexError | ||
# so I changed the condition to i+n_width < size - 1 | ||
while i + n_width < size - 1 and x[i] == x[i+n_width]: # find flat peaks | ||
n_width += 1 | ||
if x[i] > x[i+n_width] and n_peaks < max_num: # find the right edge of peaks | ||
# ir_valley_locs[n_peaks] = i | ||
ir_valley_locs.append(i) | ||
n_peaks += 1 # original uses post increment | ||
i += n_width + 1 | ||
else: | ||
i += n_width | ||
else: | ||
i += 1 | ||
|
||
return ir_valley_locs, n_peaks | ||
|
||
|
||
def remove_close_peaks(n_peaks, ir_valley_locs, x, min_dist): | ||
""" | ||
Remove peaks separated by less than MIN_DISTANCE | ||
""" | ||
|
||
# should be equal to maxim_sort_indices_descend | ||
# order peaks from large to small | ||
# should ignore index:0 | ||
sorted_indices = sorted(ir_valley_locs, key=lambda i: x[i]) | ||
sorted_indices.reverse() | ||
|
||
# this "for" loop expression does not check finish condition | ||
# for i in range(-1, n_peaks): | ||
i = -1 | ||
while i < n_peaks: | ||
old_n_peaks = n_peaks | ||
n_peaks = i + 1 | ||
# this "for" loop expression does not check finish condition | ||
# for j in (i + 1, old_n_peaks): | ||
j = i + 1 | ||
while j < old_n_peaks: | ||
n_dist = (sorted_indices[j] - sorted_indices[i]) if i != -1 else (sorted_indices[j] + 1) # lag-zero peak of autocorr is at index -1 | ||
if n_dist > min_dist or n_dist < -1 * min_dist: | ||
sorted_indices[n_peaks] = sorted_indices[j] | ||
n_peaks += 1 # original uses post increment | ||
j += 1 | ||
i += 1 | ||
|
||
sorted_indices[:n_peaks] = sorted(sorted_indices[:n_peaks]) | ||
|
||
return sorted_indices, n_peaks |
Oops, something went wrong.