ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Python] 실시간 Data 처리기(window os 환경)
    2) Tech 2021. 1. 8. 16:46
    반응형

    요즘 신호data를 쌓고 있습니다.

     

    그런데 아무래도 window 환경이라

    data 수집 후 직접 축적된 data를 다시 가져와서 처리하는 과정이 굉장히 번거로워서

    미루고 미루던 실시간 처리기를 만들어 봤습니다..

     

    급하게 만들어서 부족한 부분이 있겠지만, 여기 올리면 조언도 좀 받을 수 있을거 같아서 올리기로 했어요.

    (Test 해봤을 때는 잘동작되네요!)

     

    python multiprocessing을 사용하여

    raw data save process와 저장된 data를 불러와서 처리하는 process를 생성했습니다.

     

    때문에 data 처리를 하려면 두 프로세스가 data directory를 공유해야합니다.

    그런데 막상 문자열을 공유해야하다보니 생각한대로 되지않아, 조금 지저분한 방법을 사용했네요..!

     

    먼저 file_dir value를 공유하려 했는데 한프로세스에서만 적용이되고

    다른 프로세서에서 적용이 안돼서 뭐 다른 문제일 수 도 있겠지만, 

    dict에 문자열(경로) 할당하고 dict를 multiprocessing Queue에 넣었습니다.

     

    1. rawdata 저장 경로 매일 Update 해주기

    from ctypes import c_wchar_p
    
    base_dir = "D:/cDAQ/Python/"
    file_dir = multiprocessing.Value(c_wchar_p, base_dir) 
    q_name = multiprocessing.Queue()
    
    def make_dirname(file_dir,date,make_value,base_dir,q_name,filedir_dict):
    
        if make_value== 0:
             filedir_dict.clear()
             file_dir.value = base_dir + date
             filedir_dict['dir'] = file_dir.value
             q_name.put(filedir_dict) # 여기서 q 에 넣으므로 q 안에 경로 값을 공유할 수 있게되었네요!
             make_value= 1
        return make_value
    
        else:
             if file_dir.value[-10:] != date: #날짜 확인해서 날짜 바뀔 때 다시 경로 Update 
                  name_make_value = 0
                  make_dirname(file_dir,date,name_make_value,base_dir,q_name,filedir_dict)
             return name_make_value
    
    def file_save_process(q_packet,file_dir,base_dir, Fs, q_name):
        p = psutil.Process(os.getpid())
        p.nice(psutil.HIGH_PRIORITY_CLASS)
        name_make_value = 0
        filedir_dict = dict()
    
        while True:
              current_date = datetime.datetime.now().strftime("%Y-%m-%d")
              make_value=make_dirname(file_dir,current_date,make_value,base_dir,q_name,filedir_dict)
              
              ########## 중략 #########

    자 ! 여기까지 진행 했을 때 날이 바뀔때 마다 변경된 경로에 저장될 뿐아니라,

     

    다른 프로세스에서도 경로이름 공유가 가능해졌습니다.

     

    2. 공유 된 경로에 저장된 data 처리하기

    def factor_check_process(q_name,Fs,readable):
         p = psutil.Process(os.getpid())
         p.nice(psutil.HIGH_PRIORITY_CLASS)
         global valid_factor
         cnt = 1
         f_range_env = np.array([1, 50, 1000])
    
         while True:
              try:
                  if readable.value == 1:
                      if q_name.empty() is False:
                          dir = q_name.get()
                          if 'dir' in dir:
                             data_dir = dir['dir']
                             print(data_dir)
                      else:
                            data_list = glob.glob(os.path.join(data_dir, 'sample_*.csv'))
                            end_num = len(data_list)
                            if end_num==0:
                                cnt = 1
                            if (cnt == end_num):
                                continue
     
                            if end_num>=2 :
                                sig, constant_sig, elapsed = data_preprocess(data_list, cnt)
                                signal_process = Analysis(sig, constant_sig, elapsed,Fs)
                                ...... 생략

    q_name에 값이 들어오게 되면 경로를 공유 받게 되고 그 이후로 파일 list에서 저장된 파일을 처리하게 됩니다.

     

    위 코드는 아직 미흡하고 비효율 적인 부분이 많아서.. 시간날 때 다른 방식으로 바꾸어 보려고 합니다.

     

    그래서 다음에는 유효한 signal 주기가 시작될때 list에 넣어놓고 주기가 끝나면 바로 처리해보는 방식으로 바꿔보겠습니다.

     

    이번시간에는 code 공부 했다고 생각해야겠어요..

     

     

    반응형

    댓글

Designed by Tistory.