ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Python] 실시간 Data 처리기-2 (window os 환경)
    2) Tech 2022. 3. 16. 10:39
    반응형

    지난 포스팅(technical-support.tistory.com/110) 에서는

    파일 저장 Process와 처리 Process를 나누어서 파일이 저장됐을 때를 확인하고

    저장된 파일을 처리하는 방식으로 진행 했었습니다.

     

    그러다 보니 파일 개수에 의존하게 되고, 만약 코드 실행 마지막 파일에 경우 처리 하지 못하는 코드였고

     

    그래서 실시간으로 들어오는 유효한 data를 list에 쌓아 놓고

    모두 들어왔을때 쌓인 list를 처리하는 방식으로 수정했습니다!!.

     

    하지만,

    그렇게 하기위해선 송신부에서 '유효 or 유효 X' 와 같은 제어 시점을 알려주는 값을 보내줘야 했습니다.

    1. 수신부 packet size value 추가

    def packet_recv_process(PORT, transmit_ready, connected, q_packet,readable,packet_size):
        server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        server_socket.bind(('', PORT))
        server_socket.listen(1)
        client_socket, addr = server_socket.accept()
        print('Connected by', addr)
    
        while True:
            try:
                if transmit_ready==False:
                    transmit_ready = True
                    check_msg = client_socket.recv(1024)
                    print('Received from', addr, check_msg.decode())
                    if readable.value == 0:
                        readable.value = 1
                    data = 'OK'
                    client_socket.sendall(data.encode())
                else:
                    packet = recv_msg(client_socket)
                    packet_size.value = len(packet)
    
                    if packet == None:
                        break
    
                    if packet_size.value > 1:  # 유효 하지 않을 때는 크기가 1인 packet을 보낸다.
                        q_packet.put(packet)   # 유효할 때 만 packet을 형성해서 보냄
                    else:
                        packet.clear()

    위 코드에서 보시면 packet_size라는 multiprocess Value 값을 할당해서

    수신부에서 받는 packet 사이즈를 확인합니다.

     

    packet 크기가 1 보다 클 때만 data 저장 및 처리를 위한 q_packet에 넣어주게 되죠!

     

     

     

    이렇게 size value를 공유하게 되면 size가 1보다 클때는 list에 넣어서 쌓아두고,

    size가 1 일때 list에 쌓인 data를 처리해주면,

     

    굳이 파일에 접근하지 않고 바로 처리가 가능해집니다.

     

    2. Data list에 쌓고 처리하기

    def file_save_process(q_packet,file_dir,base_dir, Fs, q_name,packet_size):
    
        p = psutil.Process(os.getpid())
        p.nice(psutil.HIGH_PRIORITY_CLASS)
        name_make_value = 0
        filedir_dict = dict()
        sig_list = list()
        f_range_env = np.array([1, 50, 1000])
        while True:
            current_date = datetime.datetime.now().strftime("%Y-%m-%d")
            name_make_value = make_dirname(file_dir,current_date,name_make_value,base_dir,q_name,filedir_dict)
    
            try:
                if q_packet.empty() is False:
                    packet_message = q_packet.get()
                    createFolder(file_dir.value)
                    file_name = packet_message[0:15].decode()
                    fn= file_dir.value + f'/sample_{file_name}.csv'
                    size = int(chr(packet_message[15]))
                    sample_rate = int(packet_message[16:20].decode())
                    Fs.value = sample_rate
                    time_stamp = packet_message[20:43].decode()
                    signal_bytes = packet_message[43:]
                    signal_bytes_to_array = np.frombuffer(signal_bytes, dtype=np.float64).reshape(size,-1)
                    data_write(time_stamp,signal_bytes_to_array, fn) # raw data save
                    sig_list.append(signal_bytes_to_array)
    
                if packet_size.value == 1:
                    if len(sig_list)>0:
                        all_sig = np.array(sig_list).reshape(-1,500)
                        print(all_sig.shape)
                        sig_list[:] = []
                        sig = [all_sig[i::size].flatten() for i in range(size)]
                        sig = np.array(sig)
                        ### 생략 ####

     

    위와 같이 packet_size value 가 1 이고 data를 쌓는 list의 크기가 0보다 크다면, 처리하도록 변경했습니다. 

     

    손봐야할 부분이 많지만 좋은 연습이 되었던 코드였습니다.

     

    감사합니다!

    반응형

    댓글

Designed by Tistory.