-
[Python] 실시간 Data 처리기-2 (window os 환경)2) Tech 2022. 3. 16. 10:39반응형
지난 포스팅(technical-support.tistory.com/110) 에서는
파일 저장 Process와 처리 Process를 나누어서 파일이 저장됐을 때를 확인하고
저장된 파일을 처리하는 방식으로 진행 했었습니다.
그러다 보니 파일 개수에 의존하게 되고, 만약 코드 실행 마지막 파일에 경우 처리 하지 못하는 코드였고
그래서 실시간으로 들어오는 유효한 data를 list에 쌓아 놓고
모두 들어왔을때 쌓인 list를 처리하는 방식으로 수정했습니다!!.
하지만,
그렇게 하기위해선 송신부에서 '유효 or 유효 X' 와 같은 제어 시점을 알려주는 값을 보내줘야 했습니다.
1. 수신부 packet size value 추가
def packet_recv_process(PORT, transmit_ready, connected, q_packet,readable,packet_size): server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(('', PORT)) server_socket.listen(1) client_socket, addr = server_socket.accept() print('Connected by', addr) while True: try: if transmit_ready==False: transmit_ready = True check_msg = client_socket.recv(1024) print('Received from', addr, check_msg.decode()) if readable.value == 0: readable.value = 1 data = 'OK' client_socket.sendall(data.encode()) else: packet = recv_msg(client_socket) packet_size.value = len(packet) if packet == None: break if packet_size.value > 1: # 유효 하지 않을 때는 크기가 1인 packet을 보낸다. q_packet.put(packet) # 유효할 때 만 packet을 형성해서 보냄 else: packet.clear()
위 코드에서 보시면 packet_size라는 multiprocess Value 값을 할당해서
수신부에서 받는 packet 사이즈를 확인합니다.
packet 크기가 1 보다 클 때만 data 저장 및 처리를 위한 q_packet에 넣어주게 되죠!
이렇게 size value를 공유하게 되면 size가 1보다 클때는 list에 넣어서 쌓아두고,
size가 1 일때 list에 쌓인 data를 처리해주면,
굳이 파일에 접근하지 않고 바로 처리가 가능해집니다.
2. Data list에 쌓고 처리하기
def file_save_process(q_packet,file_dir,base_dir, Fs, q_name,packet_size): p = psutil.Process(os.getpid()) p.nice(psutil.HIGH_PRIORITY_CLASS) name_make_value = 0 filedir_dict = dict() sig_list = list() f_range_env = np.array([1, 50, 1000]) while True: current_date = datetime.datetime.now().strftime("%Y-%m-%d") name_make_value = make_dirname(file_dir,current_date,name_make_value,base_dir,q_name,filedir_dict) try: if q_packet.empty() is False: packet_message = q_packet.get() createFolder(file_dir.value) file_name = packet_message[0:15].decode() fn= file_dir.value + f'/sample_{file_name}.csv' size = int(chr(packet_message[15])) sample_rate = int(packet_message[16:20].decode()) Fs.value = sample_rate time_stamp = packet_message[20:43].decode() signal_bytes = packet_message[43:] signal_bytes_to_array = np.frombuffer(signal_bytes, dtype=np.float64).reshape(size,-1) data_write(time_stamp,signal_bytes_to_array, fn) # raw data save sig_list.append(signal_bytes_to_array) if packet_size.value == 1: if len(sig_list)>0: all_sig = np.array(sig_list).reshape(-1,500) print(all_sig.shape) sig_list[:] = [] sig = [all_sig[i::size].flatten() for i in range(size)] sig = np.array(sig) ### 생략 ####
위와 같이 packet_size value 가 1 이고 data를 쌓는 list의 크기가 0보다 크다면, 처리하도록 변경했습니다.
손봐야할 부분이 많지만 좋은 연습이 되었던 코드였습니다.
감사합니다!
반응형'2) Tech' 카테고리의 다른 글
Linux에서 Anaconda conda activate이 실행 되지 않을 때 (2) 2021.08.10 [Python] 실시간 Data 처리기(window os 환경) (0) 2021.01.08 [Python] FFT 0Hz Peak 제거 하기, DC 성분 제거하기 (0) 2020.11.26 [Python] USB Serial Communication(USB 시리얼 통신) (0) 2020.11.24 [Pytorch Tip!] 파이토치 GPU 사용 설정 (0) 2020.09.23