NVIDIA Jetson Nano應用-多執行緒平行處理,以專案「你帶口罩了嗎?」為例

*本文由RS components 贊助發表,轉載自DesignSpark部落格原文連結

作者/攝影 張嘉鈞
難度

★★☆☆☆(普通)

材料表

RK-NVIDIA® Jetson Nano™ Developer Kit B01 套件

思路分析

原先使用主線程運行影像辨識以及IFTTT進行即時監控,光是執行影像辨識就會要等待推論的時間,而後如果要傳送至IFTTT則又有一個傳送的等待Request時間,如此便會影響到While迴圈裡的即時影像,這邊有很多種方法可以改善,最快且較為簡單的解決方式是將即時影像放到另一個線程中去運行,這樣顯示即時影像與推論的線程是同步進行的,即時影像就不會因此被推論以及等待網頁的時間給延遲,只需要專心處理即時影像的部分即可。

平行運算中的多執行緒

在Python的平行運算中有分兩種,一個是Multi-Thread另一個是Multi-Process;Process (進程 ) 跟Thread (執行緒 ) 其實大家平常都會聽到,在購買電腦的時候常常會聽到幾核幾緒( 例如 : 四核八緒 )就是類似的概念,幾個觀念重點介紹:

  1. 每個CPU都只能運行一個Process,每個Process彼此之間是獨立的。
  2. 每個Process可以有多個Thread運行,彼此共享記憶體、變數。

由於Thread無法回傳值所以要使用Queue (佇列)去儲存資料,那這部分我就不多作介紹因為網路上已經有很多相關的參考了,不過,這邊我沒有使用queue的方式去撰寫程式。

增加即時影像的線程到程式中

我使用class的方式去寫因為可以直接省略queue去儲存、取得變數,算是一個偷吃步的小技巧,因為我這邊除了讀取幀之外就只有回傳的動作,應該不會導致搶資源或同步的問題。

客製化的即時影像物件

為了符合我們的需求,我客製了一個類別提供了幾個所需的功能,首先在initialize的部分,比較特別的地方在我使用了 isStop的參數用來中斷線程並且宣告了t為即時影像線程的物件。

# 客製化的影像擷取程式

class CustomVideoCapture():


# 初始化 預設的攝影機裝置為 0

def __init__(self, dev=0):

self.cap = cv2.VideoCapture(dev)
self.ret = ''
self.frame = []
self.win_title = 'Modified with set_title()'
self.info = ''
self.fps = 0
self.fps_time = 0

self.isStop = False
self.t = threading.Thread(target=self.video, name='stream')

接著先宣告了一些可以從外部控制線程的函式,像是 start_stream就是開啟線程;stop_stream關閉線程;get_current_frame就是取得當前的畫面,使用get_current_frame可以讓外部直接獲取線程更新的畫面,算是一個使用Thread運行OpenCV常用的方法;最後還提供了一個set_title可以修改視窗的名稱:

# 可以透過這個函式 開啟 Thread 

def start_stream(self):
self.t.start()


# 關閉 Thread 與 Camera

def stop_stream(self):
self.isStop = True
self.cap.release()
cv2.destroyAllWindows()


# 取得最近一次的幀

def get_current_frame(self):
return self.ret, self.frame

def get_fps(self):
return self.fps


# 設定顯示視窗的名稱

def set_title(self, txt):
self.win_title = txt

最後宣告了多線程要運作的函式,由於要不斷更新畫面所以使用while,透過isStop控制是否跳出迴圈,其中做的事情就是取得當前影像,設定要印上去的資訊並顯示出來,當按下q的時候會退出迴圈並且使用stop_stream終止迴圈:

# Thread主要運行的函式

def video(self):
try:
while(not self.isStop):
self.fps_time = time.time()
self.ret, self.frame = self.cap.read()

if self.info is not '':
cv2.putText(self.frame, self.info, (10,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)

cv2.imshow(self.win_title, self.frame)

if cv2.waitKey(1) == ord('q'):
break

self.fps = int(1/(time.time() - self.fps_time))

self.stop_stream()
except:
self.stop_stream()

我建立了一個tools.py存放所有會用到的副函式 ( 包含上述的客製化影像類別 ),這邊開始介紹其他副函式,preprocess專門在處理輸入前的資料,針對該資料進行縮放、正規化、轉換成含有批次大小的格式:

# 用於資料前處理的程式

def preprocess(frame, resize=(224, 224), norm=True):
'''
設定格式 ( 1, 224, 224, 3)、縮放大、正規化、放入資料並回傳正確格式的資料
'''
input_format = np.ndarray(shape=(1, 224, 224, 3), dtype=np.float32)
frame_resize = cv2.resize(frame, resize)
frame_norm = ((frame_resize.astype(np.float32) / 127.0) - 1) if norm else frame_resize
input_format[0]=frame_norm
return input_format

load_model_folder則是載入模型與標籤,這邊寫成只需要輸入存放模型與標籤的目錄路徑即可,兩者須放置在一起,程式會靠副檔名去判斷:

# 讀取 模型 與 標籤

def load_model_folder(trg_dir) -> "'trg_dir' is the path include model file and labels file. return (model, label).":

model_type = [ 'trt','engine','h5']
label_type = [ 'txt']

for f in os.listdir(trg_dir):
extension = f.split('.')[-1]

if extension in model_type:
model_dir = os.path.join(trg_dir, f)
elif extension in label_type:
lable_dir = os.path.join(trg_dir, f)

return get_model(model_dir), get_label(lable_dir)

剛剛輸出的時候有用到兩個副函式 get_model、get_label,分別去取得模型與標籤檔的物件:

# 讀取模型

def get_model(model_dir) -> "support keras and tensorrt model":

if model_dir.split('.')[-1] == 'h5':
print('Load Keras Model')
model = tf.keras.models.load_model(model_dir)
else:
print('Load TensorRT Engine')
model = load_engine(model_dir)

return model


# 讀取標籤

def get_label(lable_dir) -> 'return dict of labels':

label = {}

with open(lable_dir) as f:
for line in f.readlines():
idx, name = line.strip().split(' ')
label[int(idx)]=name

return label

# 讀取TensorRT模型

def load_engine(engine_path):

if trt_found:

TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
trt_runtime = trt.Runtime(TRT_LOGGER)

with open(engine_path, 'rb') as f:
engine_data = f.read()
engine = trt_runtime.deserialize_cuda_engine(engine_data)

return engine
else:
print("Can not load load_engine because there is no tensorrt module")
exit(1)

接著是解析預測結果的副函式,通常我們會取得到一組預測的信心指數,我們需要針對這組數據去解析出最大數值是在哪一個位置,而該位置又屬於哪一個類別:

# 解析輸出資訊

def parse_output(preds, label) -> 'return ( class id, class name, probobility) ':

preds = preds[0] if len(preds.shape)==4 else preds
trg_id = np.argmax(preds)
trg_name = label[trg_id]
trg_prob = preds[trg_id]
return ( trg_id, trg_name, trg_prob)

截至目前為止的程式,我都將其放在tools.py裡,後續只要做import的動作即可將這些功能導入。

最後來到主程式的部分,這部分須要涵蓋IFTTT以及Inference,流程大致如下:

1.取得模型與標籤、開啟即時影像的線程:

# 取得模型與標籤

model, label = load_model_folder('keras_models')

# 設定影像擷取

vid = CustomVideoCapture()
vid.set_title('{sys} - {framework}'.format(sys='Jetson Nano', framework='Tensorflow'))
vid.start_stream()

2.設定辨識的參數,主要用於控制幾秒辨識一次 ( t_delay ),與上次辨識結果不同才進行傳送 ( pre_id ):

# 設定幾秒辨識一次,降低運行負擔

t_check = 0
t_delay = 2
t_start = 0
# 儲存上一次辨識的結果,如果改變才傳送,防止ifttt負擔太大

pre_id = -1

3.設定IFTTT的參數:

# 設定「Line訊息」資訊

event = 'jetsonnano_line'
key = 'i3_S_gIAsOty30yvIg4vg'
status = {
0:['是本人', '確定有做好防疫工作'],
1:['是本人', '注意,已成為防疫破口'],
2:['離開位置', ''],
3:['非本人', '注意您的財產']
}

4.使用While不斷進行即時的辨識與LINE監控,這邊設定了如果大於預設的delay時間則進行辨識:

# 開始即時辨識

t_start = time.time()
while(not vid.isStop):


# 計算時間如果大於預設延遲時間則進行辨識與發送

t_check = time.time() - t_start

if (t_check >= t_delay) or ( not vid.fps):


# 取得當前圖片

ret, frame = vid.get_current_frame()


# 如果沒有幀則重新執行

if not ret: continue

5.進行推論以及取得辨識結果,最後設定顯示在即時影像上的資訊:

# 進行處理與推論

data = preprocess(frame, resize=(224,224), norm=True)
prediction = model(data)[0]


# 解析 辨識結果

trg_id, trg_class, trg_prob =parse_output(prediction, label)


# 設定顯示資訊

vid.info = '{} : {:.3f} , FPS {}'.format(trg_class, trg_prob, vid.get_fps())

6.如果辨識結果與上次的不同,則回傳給LINE:

        if pre_id != trg_id:

ifttt.send_to_webhook(event,
key,
'環境變動',
status[trg_id][0],
status[trg_id][1] if status[trg_id][1] else '')
pre_id = trg_id


# 更新 time

t_start = time.time()

7.最後在While的外部需要確認一下Thread是否都有關閉了,寫多線程很常遇到的問題就是開了線程,但是忘記關閉導致資源被用完,所以做個DoubleCheck會是不錯的選擇:

# 跳出 while 迴圈需要檢查多線程是否已經關閉

time.sleep(1)
print('-'*30)
print(f'影像串流的線程是否已關閉 : {not vid.t.is_alive()}')
print('離開程式')

完整主程式如下:

#%%

import cv2
import threading
import os, time, random
import ifttt
import numpy as np
import tensorflow as tf
import platform as plt
from tools import CustomVideoCapture, preprocess, load_model_folder, parse_output
import time

# 取得模型與標籤

model, label = load_model_folder('keras_models')

# 設定影像擷取

vid = CustomVideoCapture()
vid.set_title('{sys} - {framework}'.format(sys='Jetson Nano', framework='Tensorflow'))
vid.start_stream()

# 設定幾秒辨識一次,降低運行負擔

t_check = 0
t_delay = 2
t_start = 0
# 儲存上一次辨識的結果,如果改變才傳送,防止ifttt負擔太大

pre_id = -1

# 設定「Line訊息」資訊

event = 'jetsonnano_line'
key = 'i3_S_gIAsOty30yvIg4vg'
status = {
0:['是本人', '確定有做好防疫工作'],
1:['是本人', '注意,已成為防疫破口'],
2:['離開位置', ''],
3:['非本人', '注意您的財產']
}
#%%



# 開始即時辨識

t_start = time.time()
while(not vid.isStop):


# 計算時間如果大於預設延遲時間則進行辨識與發送

t_check = time.time() - t_start

if (t_check >= t_delay) or ( not vid.fps):


# 取得當前圖片

ret, frame = vid.get_current_frame()


# 如果沒有幀則重新執行

if not ret: continue


# 進行處理與推論

data = preprocess(frame, resize=(224,224), norm=True)
prediction = model(data)[0]


# 解析 辨識結果

trg_id, trg_class, trg_prob =parse_output(prediction, label)


# 設定顯示資訊

vid.info = '{} : {:.3f} , FPS {}'.format(trg_class, trg_prob, vid.get_fps())


# 如果與上次辨識不同,則將辨識到的結果傳送至Line

if pre_id != trg_id:

ifttt.send_to_webhook(event,
key,
'環境變動',
status[trg_id][0],
status[trg_id][1] if status[trg_id][1] else '')
pre_id = trg_id


# 更新 time

t_start = time.time()

# 跳出 while 迴圈需要檢查多線程是否已經關閉

time.sleep(1)
print('-'*30)
print(f'影像串流的線程是否已關閉 : {not vid.t.is_alive()}')
print('離開程式')

可以發現使用Thread來運行影像就完全不會受到IFTTT的影響,FPS都可以維持在30甚至以上,而主線程只需要關注於辨識以及傳送資料給IFTTT即可。

使用TensorRT引擎加速推論

剛剛使用了Thread來改善IFTTT傳送卡頓的問題,我們也可以針對AI推論來做改善,我們使用Jetson Nano最大的優勢就在於可以使用TensorRT引擎加速處理,所以這邊教大家怎麼從Teachable Machine下載模型並轉換成TensorRT引擎。

概略介紹

TensorRT是一個支援NVIDIA CUDA核心的加速引擎,透過對神經網路模型進行重構與資料縮減來達到加速的目的,在Jetson Nano中使用TensorRT絕對是做AI Inference的首選,那如何將神經網路模型轉換成TensorRT去運行呢?

1.需要先將模型轉換成 Onnx 的通用格式

2.接著在轉換成 TensorRT 引擎可運作的格式

在Jetson Nano中已經帶有TensorRT轉換的工具,但是怎麼將模型轉換成Onnx還需要安裝額外的工具,所以我們先來安裝一下tf2onnx這個套件吧。

環境版本

JetPack 4.4.1
Python 3.6.9
pip 21.0
tensorflow 2.3.1+nv20.12
onnx 1.8.1

安裝 tf2onnx並將模型轉換成onnx

首先需要將tensorflow的模型轉換成onnx,我們將使用tf2onnx這個套件,在安裝之前需要先確保onnx已經被安裝了,這邊提供相依套件以onnx的安裝命令:

$ sudo apt-get install protobuf-compiler libprotoc-dev 
# onnx 相依套件

$ pip3 install onnx
$ pip install onnxruntime

升級numpy (可有可無):

$ python3 -m pip install -U numpy --no-cache-dir --no-binary numpy

安裝tf2onnx:

$ pip3 install tf2onnx

宣告OpenBLAS的核心架構,在JetsonNano上少了這步應該會報錯誤訊息” Illegal instruction(core dumped)”:

$ nano ~/.bashrc
export OPENBLAS_CORETYPE=ARMV8
$ source ~/.bashrc

安裝完之後可以回到上次教學的Teachable Machine,這次要下載的檔案格式必須選擇成TensorFlow > Savemodel,如下圖所示:

Savemodel是Tensorflow模型「序列化」的格式,由於Onnx的格式也是序列化的,所以在一開始就轉換成Savemodel在後續轉換Onnx比較不容易出錯。我們可以使用執行下列指令轉換成onnx模型:

$ python3 -m tf2onnx.convert --saved-model ./savemodel --output ./test_opset_default.onnx

透過Jetson Nano內建工具轉換成TensorRT

接著可以使用JetsonNano的原生工具 (trtexec) 轉換成TensorRT:

$ /usr/src/tensorrt/bin/trtexec --onnx=/home/dlinano/TM2/test_opset_default.onnx --saveEngine=/home/dlinano/TM2/test.trt --shapes=input0:1x3x224x224

同時需要安裝pycuda,安裝步驟當中有一個nvcc是用來確認是否有抓到cuda,若沒有加入環境變數則會報錯,同時也無法安裝pycuda:

$ nano ~/.bashrc
export PATH=${PATH}:/usr/local/cuda/bin
export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:/usr/local/cuda/lib64
$ source ~/.bashrc
$ nvcc -V
$ pip3 install pycuda

由於我們會使用到tensorrt提供的範例common.py,所以先直接複製一份:

$ cp /usr/src/tensorrt/samples/python/common.py ./common.py

經過繁瑣的操作後,終於可以運行程式了:

$ python3 tm_tensorrt.py

這個程式比照上一篇的方法所撰寫,可以注意到FPS相較於之前的推論程式都高非常多,已經可以到順跑的程度了。

程式講解

導入函式庫以及設定TRT的基本參數

import cv2
import tensorrt as trt
import numpy as np
import common
import platform as plt
import time
from tools import preprocess, load_model_folder

TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
trt_runtime = trt.Runtime(TRT_LOGGER)

先取得 TensorRT引擎,透過先前撰寫好的副函式 ( load_model_folder ) 來取得 engine、label;再導入之前我們需要預先定義好buffer給TensorRT;接著解析TensorRT物件取得該「執行文本」:

 load trt engine
print('取得TRT引擎與標籤')
engine, label = load_model_folder('tensorrt_engine')

# allocate buffers

print('分配 buffers 給 TensorRT 所須的物件')
inputs, outputs, bindings, stream = common.allocate_buffers(engine)

print('創建執行文本 ( context )')
context = engine.create_execution_context()

接著我們使用與上一篇雷同的OpenCV程式完成即時影像辨識,最大的區別在於TensorRT引擎導入資料的方法與推論的方法:

print('開啟即時影像')
fps = -1
cap = cv2.VideoCapture(0, cv2.CAP_GSTREAMER)

while(True):

t_start = time.time()


# 讀取圖片

ret, frame = cap.read()


# 將圖片進行前處理並放入輸入資料中

inputs[0].host = preprocess(frame)


# 進行 Inference

trt_outputs = common.do_inference(context, bindings=bindings, inputs=inputs, outputs=outputs, stream=stream)


# 解析輸出資料

trg_idx, trg_class, trg_prob = parse_output(trt_outputs[0], label)


# 設定顯示資料

info = '{} : {:.3f} , FPS {}'.format(trg_class, trg_prob, fps)


# 將顯示資料繪製在圖片上

cv2.putText(frame, info, (10,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)
cv2.imshow('TensorRT', frame)

if cv2.waitKey(1) == ord('q'):
break

# 更新FPS與時間點

fps = int(1/(time.time()-t_start))
t_start = time.time()

最後離開的時候一樣要做確認的動作:

cap.release()
cv2.destroyAllWindows()
print('離開程式')

三種框架比較

既然都做到TensorRT加速了,我們還是得來比較一下速度差距(僅供參考):

可以注意到Tensorflow的速度最慢但是準確度最高;Tensorflow Lite則是犧牲準確度換取高效能的表現;而TensorRT就更優秀了,優化的時候保留更多準確度,效能也能有效提高。

TensorRT結合Thread與IFTTT

建構的方式與上述雷同,所以就直接提供完整程式:

import cv2
import tensorrt as trt
import numpy as np
import common
import platform as plt
import time
import ifttt
import threading
from tools import CustomVideoCapture, preprocess, load_model_folder, parse_output

TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
trt_runtime = trt.Runtime(TRT_LOGGER)

def main():

pre_idx = -1

print('取得TRT引擎與標籤')
engine, label = load_model_folder('tensorrt_engine')

print('分配 buffers 給 TensorRT 所須的物件')
inputs, outputs, bindings, stream = common.allocate_buffers(engine)

print('創建執行文本 ( context )')
context = engine.create_execution_context()

print('設定即時影像參數')
vid = CustomVideoCapture()
vid.set_title('{sys} - {framework}'.format(sys='Jetson Nano', framework='TensorRT'))
vid.start_stream()


# 設定幾秒辨識一次,為了配合 ifttt 的延遲通知

t_check = 0
t_delay = 1
t_start = 0


# 儲存上一次辨識的結果,如果改變才傳送,防止ifttt負擔太大

pre_id = -1


# 設定「Line訊息」資訊

print('設定IFTTT參數')
event = 'jetsonnano_line'
key = 'i3_S_gIAsOty30yvIg4vg'
status = {
0:['是本人', '確定有做好防疫工作'],
1:['是本人', '注意,已成為防疫破口'],
2:['離開位置', ''],
3:['非本人', '注意您的財產']
}

t_start = time.time()

while(not vid.isStop):


# 計算時間如果大於預設延遲時間則進行辨識與發送

t_check = time.time()-t_start
if t_check >= t_delay:

ret, frame = vid.get_current_frame()
if not ret: continue

inputs[0].host = preprocess(frame, resize=(224, 224), norm=True)

infer_time = time.time()


# with engine.create_execution_context() as context:

trt_outputs = common.do_inference(context, bindings=bindings, inputs=inputs, outputs=outputs, stream=stream)

infer_time = time.time() - infer_time

preds = trt_outputs[0]

trg_id, trg_class, trg_prob = parse_output(preds, label)

vid.info = '{} : {:.3f} , FPS : {:.3f}'.format(trg_class, trg_prob, vid.get_fps())

if pre_id != trg_id:

ifttt.send_to_webhook(event,
key,
'環境變動',
status[trg_id][0],
status[trg_id][1] if status[trg_id][1] else '')
pre_id = trg_id

t_start = time.time()


# 跳出 while 迴圈需要檢查多線程是否已經關閉

time.sleep(1)
print('-'*30, '\n')
print(f'影像串流的線程是否已關閉 : {not vid.t.is_alive()}')

if __name__ == '__main__':

main()

結語

這次我們使用了兩種方式來進行改造、加速,其實透過Thread就能有不錯的成果了,但是TensorRT又能再減少一些負擔,讓 AI辨識與Line的監控訊息可以變得更加確實、快速。

相關文章

Onnx-tensorrt Github

Program/Process/Thread 差異

 

*本文由RS components 贊助發表,轉載自DesignSpark部落格原文連結(本篇文章完整範例程式請至原文下載)

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *