[Python] 실시간 Data 처리기(window os 환경)
요즘 신호data를 쌓고 있습니다.
그런데 아무래도 window 환경이라
data 수집 후 직접 축적된 data를 다시 가져와서 처리하는 과정이 굉장히 번거로워서
미루고 미루던 실시간 처리기를 만들어 봤습니다..
급하게 만들어서 부족한 부분이 있겠지만, 여기 올리면 조언도 좀 받을 수 있을거 같아서 올리기로 했어요.
(Test 해봤을 때는 잘동작되네요!)
python multiprocessing을 사용하여
raw data save process와 저장된 data를 불러와서 처리하는 process를 생성했습니다.
때문에 data 처리를 하려면 두 프로세스가 data directory를 공유해야합니다.
그런데 막상 문자열을 공유해야하다보니 생각한대로 되지않아, 조금 지저분한 방법을 사용했네요..!
먼저 file_dir value를 공유하려 했는데 한프로세스에서만 적용이되고
다른 프로세서에서 적용이 안돼서 뭐 다른 문제일 수 도 있겠지만,
dict에 문자열(경로) 할당하고 dict를 multiprocessing Queue에 넣었습니다.
1. rawdata 저장 경로 매일 Update 해주기
from ctypes import c_wchar_p
base_dir = "D:/cDAQ/Python/"
file_dir = multiprocessing.Value(c_wchar_p, base_dir)
q_name = multiprocessing.Queue()
def make_dirname(file_dir,date,make_value,base_dir,q_name,filedir_dict):
if make_value== 0:
filedir_dict.clear()
file_dir.value = base_dir + date
filedir_dict['dir'] = file_dir.value
q_name.put(filedir_dict) # 여기서 q 에 넣으므로 q 안에 경로 값을 공유할 수 있게되었네요!
make_value= 1
return make_value
else:
if file_dir.value[-10:] != date: #날짜 확인해서 날짜 바뀔 때 다시 경로 Update
name_make_value = 0
make_dirname(file_dir,date,name_make_value,base_dir,q_name,filedir_dict)
return name_make_value
def file_save_process(q_packet,file_dir,base_dir, Fs, q_name):
p = psutil.Process(os.getpid())
p.nice(psutil.HIGH_PRIORITY_CLASS)
name_make_value = 0
filedir_dict = dict()
while True:
current_date = datetime.datetime.now().strftime("%Y-%m-%d")
make_value=make_dirname(file_dir,current_date,make_value,base_dir,q_name,filedir_dict)
########## 중략 #########
자 ! 여기까지 진행 했을 때 날이 바뀔때 마다 변경된 경로에 저장될 뿐아니라,
다른 프로세스에서도 경로이름 공유가 가능해졌습니다.
2. 공유 된 경로에 저장된 data 처리하기
def factor_check_process(q_name,Fs,readable):
p = psutil.Process(os.getpid())
p.nice(psutil.HIGH_PRIORITY_CLASS)
global valid_factor
cnt = 1
f_range_env = np.array([1, 50, 1000])
while True:
try:
if readable.value == 1:
if q_name.empty() is False:
dir = q_name.get()
if 'dir' in dir:
data_dir = dir['dir']
print(data_dir)
else:
data_list = glob.glob(os.path.join(data_dir, 'sample_*.csv'))
end_num = len(data_list)
if end_num==0:
cnt = 1
if (cnt == end_num):
continue
if end_num>=2 :
sig, constant_sig, elapsed = data_preprocess(data_list, cnt)
signal_process = Analysis(sig, constant_sig, elapsed,Fs)
...... 생략
q_name에 값이 들어오게 되면 경로를 공유 받게 되고 그 이후로 파일 list에서 저장된 파일을 처리하게 됩니다.
위 코드는 아직 미흡하고 비효율 적인 부분이 많아서.. 시간날 때 다른 방식으로 바꾸어 보려고 합니다.
그래서 다음에는 유효한 signal 주기가 시작될때 list에 넣어놓고 주기가 끝나면 바로 처리해보는 방식으로 바꿔보겠습니다.
이번시간에는 code 공부 했다고 생각해야겠어요..