*本文由RS components 贊助發表,轉載自DesignSpark 部落格原文連結
思路分析
原先使用主線程運行影像辨識以及IFTTT進行即時監控,光是執行影像辨識就會要等待推論的時間,而後如果要傳送至IFTTT則又有一個傳送的等待Request時間,如此便會影響到While迴圈裡的即時影像,這邊有很多種方法可以改善,最快且較為簡單的解決方式是將即時影像放到另一個線程中去運行,這樣顯示即時影像與推論的線程是同步進行的,即時影像就不會因此被推論以及等待網頁的時間給延遲,只需要專心處理即時影像的部分即可。
平行運算中的多執行緒
在Python的平行運算中有分兩種,一個是Multi-Thread另一個是Multi-Process;Process (進程 ) 跟Thread (執行緒 ) 其實大家平常都會聽到,在購買電腦的時候常常會聽到幾核幾緒( 例如 : 四核八緒 )就是類似的概念,幾個觀念重點介紹:
每個CPU都只能運行一個Process,每個Process彼此之間是獨立的。
每個Process可以有多個Thread運行,彼此共享記憶體、變數。
由於Thread無法回傳值所以要使用Queue (佇列)去儲存資料,那這部分我就不多作介紹因為網路上已經有很多相關的參考了,不過,這邊我沒有使用queue的方式去撰寫程式。
增加即時影像的線程到程式中
我使用class的方式去寫因為可以直接省略queue去儲存、取得變數,算是一個偷吃步的小技巧,因為我這邊除了讀取幀之外就只有回傳的動作,應該不會導致搶資源或同步的問題。
客製化的即時影像物件
為了符合我們的需求,我客製了一個類別提供了幾個所需的功能,首先在initialize的部分,比較特別的地方在我使用了 isStop的參數用來中斷線程並且宣告了t為即時影像線程的物件。
# 客製化的影像擷取程式 class CustomVideoCapture(): # 初始化 預設的攝影機裝置為 0 def __init__(self, dev=0): self.cap = cv2.VideoCapture(dev) self.ret = '' self.frame = [] self.win_title = 'Modified with set_title()' self.info = '' self.fps = 0 self.fps_time = 0 self.isStop = False self.t = threading.Thread(target=self.video, name='stream')
接著先宣告了一些可以從外部控制線程的函式,像是 start_stream就是開啟線程;stop_stream關閉線程;get_current_frame就是取得當前的畫面,使用get_current_frame可以讓外部直接獲取線程更新的畫面,算是一個使用Thread運行OpenCV常用的方法;最後還提供了一個set_title可以修改視窗的名稱:
# 可以透過這個函式 開啟 Thread def start_stream(self): self.t.start() # 關閉 Thread 與 Camera def stop_stream(self): self.isStop = True self.cap.release() cv2.destroyAllWindows() # 取得最近一次的幀 def get_current_frame(self): return self.ret, self.frame def get_fps(self): return self.fps # 設定顯示視窗的名稱 def set_title(self, txt): self.win_title = txt
最後宣告了多線程要運作的函式,由於要不斷更新畫面所以使用while,透過isStop控制是否跳出迴圈,其中做的事情就是取得當前影像,設定要印上去的資訊並顯示出來,當按下q的時候會退出迴圈並且使用stop_stream終止迴圈:
# Thread主要運行的函式 def video(self): try: while(not self.isStop): self.fps_time = time.time() self.ret, self.frame = self.cap.read() if self.info is not '': cv2.putText(self.frame, self.info, (10,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2) cv2.imshow(self.win_title, self.frame) if cv2.waitKey(1) == ord('q'): break self.fps = int(1/(time.time() - self.fps_time)) self.stop_stream() except: self.stop_stream()
我建立了一個tools.py存放所有會用到的副函式 ( 包含上述的客製化影像類別 ),這邊開始介紹其他副函式,preprocess專門在處理輸入前的資料,針對該資料進行縮放、正規化、轉換成含有批次大小的格式:
# 用於資料前處理的程式 def preprocess(frame, resize=(224, 224), norm=True): ''' 設定格式 ( 1, 224, 224, 3)、縮放大、正規化、放入資料並回傳正確格式的資料 ''' input_format = np.ndarray(shape=(1, 224, 224, 3), dtype=np.float32) frame_resize = cv2.resize(frame, resize) frame_norm = ((frame_resize.astype(np.float32) / 127.0) - 1) if norm else frame_resize input_format[0]=frame_norm return input_format
load_model_folder則是載入模型與標籤,這邊寫成只需要輸入存放模型與標籤的目錄路徑即可,兩者須放置在一起,程式會靠副檔名去判斷:
# 讀取 模型 與 標籤 def load_model_folder(trg_dir) -> "'trg_dir' is the path include model file and labels file. return (model, label).": model_type = [ 'trt','engine','h5'] label_type = [ 'txt'] for f in os.listdir(trg_dir): extension = f.split('.')[-1] if extension in model_type: model_dir = os.path.join(trg_dir, f) elif extension in label_type: lable_dir = os.path.join(trg_dir, f) return get_model(model_dir), get_label(lable_dir)
剛剛輸出的時候有用到兩個副函式 get_model、get_label,分別去取得模型與標籤檔的物件:
# 讀取模型 def get_model(model_dir) -> "support keras and tensorrt model": if model_dir.split('.')[-1] == 'h5': print('Load Keras Model') model = tf.keras.models.load_model(model_dir) else: print('Load TensorRT Engine') model = load_engine(model_dir) return model # 讀取標籤 def get_label(lable_dir) -> 'return dict of labels': label = {} with open(lable_dir) as f: for line in f.readlines(): idx, name = line.strip().split(' ') label[int(idx)]=name return label # 讀取TensorRT模型 def load_engine(engine_path): if trt_found: TRT_LOGGER = trt.Logger(trt.Logger.WARNING) trt_runtime = trt.Runtime(TRT_LOGGER) with open(engine_path, 'rb') as f: engine_data = f.read() engine = trt_runtime.deserialize_cuda_engine(engine_data) return engine else: print("Can not load load_engine because there is no tensorrt module") exit(1)
接著是解析預測結果的副函式,通常我們會取得到一組預測的信心指數,我們需要針對這組數據去解析出最大數值是在哪一個位置,而該位置又屬於哪一個類別:
# 解析輸出資訊 def parse_output(preds, label) -> 'return ( class id, class name, probobility) ': preds = preds[0] if len(preds.shape)==4 else preds trg_id = np.argmax(preds) trg_name = label[trg_id] trg_prob = preds[trg_id] return ( trg_id, trg_name, trg_prob)
截至目前為止的程式,我都將其放在tools.py裡,後續只要做import的動作即可將這些功能導入。
最後來到主程式的部分,這部分須要涵蓋IFTTT以及Inference,流程大致如下:
1.取得模型與標籤、開啟即時影像的線程:
# 取得模型與標籤 model, label = load_model_folder('keras_models') # 設定影像擷取 vid = CustomVideoCapture() vid.set_title('{sys} - {framework}'.format(sys='Jetson Nano', framework='Tensorflow')) vid.start_stream()
2.設定辨識的參數,主要用於控制幾秒辨識一次 ( t_delay ),與上次辨識結果不同才進行傳送 ( pre_id ):
# 設定幾秒辨識一次,降低運行負擔 t_check = 0 t_delay = 2 t_start = 0 # 儲存上一次辨識的結果,如果改變才傳送,防止ifttt負擔太大 pre_id = -1
3.設定IFTTT的參數:
# 設定「Line訊息」資訊 event = 'jetsonnano_line' key = 'i3_S_gIAsOty30yvIg4vg' status = { 0:['是本人', '確定有做好防疫工作'], 1:['是本人', '注意,已成為防疫破口'], 2:['離開位置', ''], 3:['非本人', '注意您的財產'] }
4.使用While不斷進行即時的辨識與LINE監控,這邊設定了如果大於預設的delay時間則進行辨識:
# 開始即時辨識 t_start = time.time() while(not vid.isStop): # 計算時間如果大於預設延遲時間則進行辨識與發送 t_check = time.time() - t_start if (t_check >= t_delay) or ( not vid.fps): # 取得當前圖片 ret, frame = vid.get_current_frame() # 如果沒有幀則重新執行 if not ret: continue
5.進行推論以及取得辨識結果,最後設定顯示在即時影像上的資訊:
# 進行處理與推論 data = preprocess(frame, resize=(224,224), norm=True) prediction = model(data)[0] # 解析 辨識結果 trg_id, trg_class, trg_prob =parse_output(prediction, label) # 設定顯示資訊 vid.info = '{} : {:.3f} , FPS {}'.format(trg_class, trg_prob, vid.get_fps())
6.如果辨識結果與上次的不同,則回傳給LINE:
if pre_id != trg_id: ifttt.send_to_webhook(event, key, '環境變動', status[trg_id][0], status[trg_id][1] if status[trg_id][1] else '') pre_id = trg_id # 更新 time t_start = time.time()
7.最後在While的外部需要確認一下Thread是否都有關閉了,寫多線程很常遇到的問題就是開了線程,但是忘記關閉導致資源被用完,所以做個DoubleCheck會是不錯的選擇:
# 跳出 while 迴圈需要檢查多線程是否已經關閉 time.sleep(1) print('-'*30) print(f'影像串流的線程是否已關閉 : {not vid.t.is_alive()}') print('離開程式')
完整主程式如下:
#%% import cv2 import threading import os, time, random import ifttt import numpy as np import tensorflow as tf import platform as plt from tools import CustomVideoCapture, preprocess, load_model_folder, parse_output import time # 取得模型與標籤 model, label = load_model_folder('keras_models') # 設定影像擷取 vid = CustomVideoCapture() vid.set_title('{sys} - {framework}'.format(sys='Jetson Nano', framework='Tensorflow')) vid.start_stream() # 設定幾秒辨識一次,降低運行負擔 t_check = 0 t_delay = 2 t_start = 0 # 儲存上一次辨識的結果,如果改變才傳送,防止ifttt負擔太大 pre_id = -1 # 設定「Line訊息」資訊 event = 'jetsonnano_line' key = 'i3_S_gIAsOty30yvIg4vg' status = { 0:['是本人', '確定有做好防疫工作'], 1:['是本人', '注意,已成為防疫破口'], 2:['離開位置', ''], 3:['非本人', '注意您的財產'] } #%% # 開始即時辨識 t_start = time.time() while(not vid.isStop): # 計算時間如果大於預設延遲時間則進行辨識與發送 t_check = time.time() - t_start if (t_check >= t_delay) or ( not vid.fps): # 取得當前圖片 ret, frame = vid.get_current_frame() # 如果沒有幀則重新執行 if not ret: continue # 進行處理與推論 data = preprocess(frame, resize=(224,224), norm=True) prediction = model(data)[0] # 解析 辨識結果 trg_id, trg_class, trg_prob =parse_output(prediction, label) # 設定顯示資訊 vid.info = '{} : {:.3f} , FPS {}'.format(trg_class, trg_prob, vid.get_fps()) # 如果與上次辨識不同,則將辨識到的結果傳送至Line if pre_id != trg_id: ifttt.send_to_webhook(event, key, '環境變動', status[trg_id][0], status[trg_id][1] if status[trg_id][1] else '') pre_id = trg_id # 更新 time t_start = time.time() # 跳出 while 迴圈需要檢查多線程是否已經關閉 time.sleep(1) print('-'*30) print(f'影像串流的線程是否已關閉 : {not vid.t.is_alive()}') print('離開程式')
可以發現使用Thread來運行影像就完全不會受到IFTTT的影響,FPS都可以維持在30甚至以上,而主線程只需要關注於辨識以及傳送資料給IFTTT即可。
VIDEO
使用TensorRT引擎加速推論
剛剛使用了Thread來改善IFTTT傳送卡頓的問題,我們也可以針對AI推論來做改善,我們使用Jetson Nano最大的優勢就在於可以使用TensorRT引擎加速處理,所以這邊教大家怎麼從Teachable Machine下載模型並轉換成TensorRT引擎。
概略介紹
TensorRT是一個支援NVIDIA CUDA核心的加速引擎,透過對神經網路模型進行重構與資料縮減來達到加速的目的,在Jetson Nano中使用TensorRT絕對是做AI Inference的首選,那如何將神經網路模型轉換成TensorRT去運行呢?
1.需要先將模型轉換成 Onnx 的通用格式
2.接著在轉換成 TensorRT 引擎可運作的格式
在Jetson Nano中已經帶有TensorRT轉換的工具,但是怎麼將模型轉換成Onnx還需要安裝額外的工具,所以我們先來安裝一下tf2onnx這個套件吧。
環境版本
JetPack
4.4.1
Python
3.6.9
pip
21.0
tensorflow
2.3.1+nv20.12
onnx
1.8.1
安裝 tf2onnx並將模型轉換成onnx
首先需要將tensorflow的模型轉換成onnx,我們將使用tf2onnx這個套件,在安裝之前需要先確保onnx已經被安裝了,這邊提供相依套件以onnx的安裝命令:
$ sudo apt-get install protobuf-compiler libprotoc-dev # onnx 相依套件 $ pip3 install onnx $ pip install onnxruntime
升級numpy (可有可無):
$ python3 -m pip install -U numpy --no-cache-dir --no-binary numpy
安裝tf2onnx:
宣告OpenBLAS的核心架構,在JetsonNano上少了這步應該會報錯誤訊息” Illegal instruction(core dumped) ”:
$ nano ~/.bashrc export OPENBLAS_CORETYPE=ARMV8 $ source ~/.bashrc
安裝完之後可以回到上次教學的Teachable Machine,這次要下載的檔案格式必須選擇成TensorFlow > Savemodel,如下圖所示:
Savemodel是Tensorflow模型「序列化」的格式,由於Onnx的格式也是序列化的,所以在一開始就轉換成Savemodel在後續轉換Onnx比較不容易出錯。我們可以使用執行下列指令轉換成onnx模型:
$ python3 -m tf2onnx.convert --saved-model ./savemodel --output ./test_opset_default.onnx
透過Jetson Nano內建工具轉換成TensorRT
接著可以使用JetsonNano的原生工具 (trtexec) 轉換成TensorRT:
$ /usr/src/tensorrt/bin/trtexec --onnx=/home/dlinano/TM2/test_opset_default.onnx --saveEngine=/home/dlinano/TM2/test.trt --shapes=input0:1x3x224x224
同時需要安裝pycuda,安裝步驟當中有一個nvcc是用來確認是否有抓到cuda,若沒有加入環境變數則會報錯,同時也無法安裝pycuda:
$ nano ~/.bashrc export PATH=${PATH}:/usr/local/cuda/bin export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:/usr/local/cuda/lib64 $ source ~/.bashrc $ nvcc -V $ pip3 install pycuda
由於我們會使用到tensorrt提供的範例common.py,所以先直接複製一份:
$ cp /usr/src/tensorrt/samples/python/common.py ./common.py
經過繁瑣的操作後,終於可以運行程式了:
這個程式比照上一篇的方法所撰寫,可以注意到FPS相較於之前的推論程式都高非常多,已經可以到順跑的程度了。
程式講解
導入函式庫以及設定TRT的基本參數
import cv2 import tensorrt as trt import numpy as np import common import platform as plt import time from tools import preprocess, load_model_folder TRT_LOGGER = trt.Logger(trt.Logger.WARNING) trt_runtime = trt.Runtime(TRT_LOGGER)
先取得 TensorRT引擎,透過先前撰寫好的副函式 ( load_model_folder ) 來取得 engine、label;再導入之前我們需要預先定義好buffer給TensorRT;接著解析TensorRT物件取得該「執行文本」:
load trt engine print('取得TRT引擎與標籤') engine, label = load_model_folder('tensorrt_engine') # allocate buffers print('分配 buffers 給 TensorRT 所須的物件') inputs, outputs, bindings, stream = common.allocate_buffers(engine) print('創建執行文本 ( context )') context = engine.create_execution_context()
接著我們使用與上一篇雷同的OpenCV程式完成即時影像辨識,最大的區別在於TensorRT引擎導入資料的方法與推論的方法:
print('開啟即時影像') fps = -1 cap = cv2.VideoCapture(0, cv2.CAP_GSTREAMER) while(True): t_start = time.time() # 讀取圖片 ret, frame = cap.read() # 將圖片進行前處理並放入輸入資料中 inputs[0].host = preprocess(frame) # 進行 Inference trt_outputs = common.do_inference(context, bindings=bindings, inputs=inputs, outputs=outputs, stream=stream) # 解析輸出資料 trg_idx, trg_class, trg_prob = parse_output(trt_outputs[0], label) # 設定顯示資料 info = '{} : {:.3f} , FPS {}'.format(trg_class, trg_prob, fps) # 將顯示資料繪製在圖片上 cv2.putText(frame, info, (10,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2) cv2.imshow('TensorRT', frame) if cv2.waitKey(1) == ord('q'): break # 更新FPS與時間點 fps = int(1/(time.time()-t_start)) t_start = time.time()
最後離開的時候一樣要做確認的動作:
cap.release() cv2.destroyAllWindows() print('離開程式')
三種框架比較
既然都做到TensorRT加速了,我們還是得來比較一下速度差距(僅供參考):
可以注意到Tensorflow的速度最慢但是準確度最高;Tensorflow Lite則是犧牲準確度換取高效能的表現;而TensorRT就更優秀了,優化的時候保留更多準確度,效能也能有效提高。
TensorRT結合Thread與IFTTT
建構的方式與上述雷同,所以就直接提供完整程式:
import cv2 import tensorrt as trt import numpy as np import common import platform as plt import time import ifttt import threading from tools import CustomVideoCapture, preprocess, load_model_folder, parse_output TRT_LOGGER = trt.Logger(trt.Logger.WARNING) trt_runtime = trt.Runtime(TRT_LOGGER) def main(): pre_idx = -1 print('取得TRT引擎與標籤') engine, label = load_model_folder('tensorrt_engine') print('分配 buffers 給 TensorRT 所須的物件') inputs, outputs, bindings, stream = common.allocate_buffers(engine) print('創建執行文本 ( context )') context = engine.create_execution_context() print('設定即時影像參數') vid = CustomVideoCapture() vid.set_title('{sys} - {framework}'.format(sys='Jetson Nano', framework='TensorRT')) vid.start_stream() # 設定幾秒辨識一次,為了配合 ifttt 的延遲通知 t_check = 0 t_delay = 1 t_start = 0 # 儲存上一次辨識的結果,如果改變才傳送,防止ifttt負擔太大 pre_id = -1 # 設定「Line訊息」資訊 print('設定IFTTT參數') event = 'jetsonnano_line' key = 'i3_S_gIAsOty30yvIg4vg' status = { 0:['是本人', '確定有做好防疫工作'], 1:['是本人', '注意,已成為防疫破口'], 2:['離開位置', ''], 3:['非本人', '注意您的財產'] } t_start = time.time() while(not vid.isStop): # 計算時間如果大於預設延遲時間則進行辨識與發送 t_check = time.time()-t_start if t_check >= t_delay: ret, frame = vid.get_current_frame() if not ret: continue inputs[0].host = preprocess(frame, resize=(224, 224), norm=True) infer_time = time.time() # with engine.create_execution_context() as context: trt_outputs = common.do_inference(context, bindings=bindings, inputs=inputs, outputs=outputs, stream=stream) infer_time = time.time() - infer_time preds = trt_outputs[0] trg_id, trg_class, trg_prob = parse_output(preds, label) vid.info = '{} : {:.3f} , FPS : {:.3f}'.format(trg_class, trg_prob, vid.get_fps()) if pre_id != trg_id: ifttt.send_to_webhook(event, key, '環境變動', status[trg_id][0], status[trg_id][1] if status[trg_id][1] else '') pre_id = trg_id t_start = time.time() # 跳出 while 迴圈需要檢查多線程是否已經關閉 time.sleep(1) print('-'*30, '\n') print(f'影像串流的線程是否已關閉 : {not vid.t.is_alive()}') if __name__ == '__main__': main()
結語
這次我們使用了兩種方式來進行改造、加速,其實透過Thread就能有不錯的成果了,但是TensorRT又能再減少一些負擔,讓 AI辨識與Line的監控訊息可以變得更加確實、快速。
相關文章
Onnx-tensorrt Github
Program/Process/Thread 差異
*本文由RS components 贊助發表,轉載自DesignSpark 部落格原文連結 (本篇文章完整範例程式請至原文下載)
Post Views: 332