嘿!您似乎在 United States,您想使用我们的 English 网站吗?
Switch to English site
Skip to main content

NVIDIA Jetson Nano應用-多執行緒平行處理,以專案「你帶口罩了嗎?」為例

作者

張嘉鈞

難度

普通

思路分析

原先使用主線程運行影像辨識以及IFTTT進行即時監控,光是執行影像辨識就會要等待推論的時間,而後如果要傳送至IFTTT則又有一個傳送的等待Request時間,如此便會影響到While迴圈裡的即時影像,這邊有很多種方法可以改善,最快且較為簡單的解決方式是將即時影像放到另一個線程中去運行,這樣顯示即時影像與推論的線程是同步進行的,即時影像就不會因此被推論以及等待網頁的時間給延遲,只需要專心處理即時影像的部分即可。

01_method_e5dd02916374532eeabcadb794ec5f7f08411c19.png

平行運算中的多執行緒

在Python的平行運算中有分兩種,一個是Multi-Thread另一個是Multi-Process;Process ( 進程 ) 跟Thread ( 執行緒 ) 其實大家平常都會聽到,在購買電腦的時候常常會聽到幾核幾緒 ( 例如 : 四核八緒 ) 就是類似的概念,幾個觀念重點介紹:

  1. 每個CPU都只能運行一個Process,每個Process彼此之間是獨立的。
  2. 每個Process可以有多個Thread運行,彼此共享記憶體、變數。

由於Thread無法回傳值所以要使用Queue ( 佇列 ) 去儲存資料,那這部分我就不多作介紹因為網路上已經有很多相關的參考了,不過,這邊我沒有使用queue的方式去撰寫程式。

增加即時影像的線程到程式中

我使用class的方式去寫因為可以直接省略queue去儲存、取得變數,算是一個偷吃步的小技巧,因為我這邊除了讀取幀之外就只有回傳的動作,應該不會導致搶資源或同步的問題。

客製化的即時影像物件

為了符合我們的需求,我客製了一個類別提供了幾個所需的功能,首先在initialize的部分,比較特別的地方在我使用了 isStop的參數用來中斷線程並且宣告了t為即時影像線程的物件。

# 客製化的影像擷取程式
class CustomVideoCapture():

    # 初始化 預設的攝影機裝置為 0
    def __init__(self, dev=0):

        self.cap = cv2.VideoCapture(dev)
        self.ret = ''
        self.frame = []
        self.win_title = 'Modified with set_title()'
        self.info = ''
        self.fps = 0
        self.fps_time = 0

        self.isStop = False
        self.t = threading.Thread(target=self.video, name='stream')

接著先宣告了一些可以從外部控制線程的函式,像是 start_stream就是開啟線程;stop_stream關閉線程;get_current_frame就是取得當前的畫面,使用get_current_frame可以讓外部直接獲取線程更新的畫面,算是一個使用Thread運行OpenCV常用的方法;最後還提供了一個set_title可以修改視窗的名稱:

    # 可以透過這個函式 開啟 Thread 
    def start_stream(self):
        self.t.start()
    
    # 關閉 Thread 與 Camera
    def stop_stream(self):
        self.isStop = True
        self.cap.release()
        cv2.destroyAllWindows()

    # 取得最近一次的幀
    def get_current_frame(self):
        return self.ret, self.frame

    def get_fps(self):
        return self.fps

    # 設定顯示視窗的名稱
    def set_title(self, txt):
        self.win_title = txt

最後宣告了多線程要運作的函式,由於要不斷更新畫面所以使用while,透過isStop控制是否跳出迴圈,其中做的事情就是取得當前影像,設定要印上去的資訊並顯示出來,當按下q的時候會退出迴圈並且使用stop_stream終止迴圈:

    # Thread主要運行的函式
    def video(self):
        try:
            while(not self.isStop):
                self.fps_time = time.time()
                self.ret, self.frame = self.cap.read()

                if self.info is not '':
                    cv2.putText(self.frame, self.info, (10,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)
                
                cv2.imshow(self.win_title, self.frame)
                
                if cv2.waitKey(1) == ord('q'):
                    break
                
                self.fps = int(1/(time.time() - self.fps_time))

            self.stop_stream()
        except:
            self.stop_stream()

我建立了一個tools.py存放所有會用到的副函式 ( 包含上述的客製化影像類別 ),這邊開始介紹其他副函式,preprocess專門在處理輸入前的資料,針對該資料進行縮放、正規化、轉換成含有批次大小的格式:

# 用於資料前處理的程式
def preprocess(frame, resize=(224, 224), norm=True):
    '''
    設定格式 ( 1, 224, 224, 3)、縮放大、正規化、放入資料並回傳正確格式的資料
    '''
    input_format = np.ndarray(shape=(1, 224, 224, 3), dtype=np.float32)
    frame_resize = cv2.resize(frame, resize)
    frame_norm =  ((frame_resize.astype(np.float32) / 127.0) - 1) if norm else frame_resize
    input_format[0]=frame_norm
    return input_format

load_model_folder則是載入模型與標籤,這邊寫成只需要輸入存放模型與標籤的目錄路徑即可,兩者須放置在一起,程式會靠副檔名去判斷:

 # 讀取 模型 與 標籤
def load_model_folder(trg_dir) -> "'trg_dir' is the path include model file and labels file. return (model, label).":

    model_type = [ 'trt','engine','h5']
    label_type = [ 'txt']

    for f in os.listdir(trg_dir):
        extension = f.split('.')[-1]
        
        if extension in model_type:
            model_dir = os.path.join(trg_dir, f)
        elif extension in label_type:
            lable_dir = os.path.join(trg_dir, f)

    return get_model(model_dir), get_label(lable_dir)

剛剛輸出的時候有用到兩個副函式 get_model、get_label,分別去取得模型與標籤檔的物件:

# 讀取模型
def get_model(model_dir) -> "support keras and tensorrt model":
    
    if model_dir.split('.')[-1] == 'h5':
        print('Load Keras Model')
        model = tf.keras.models.load_model(model_dir)
    else:
        print('Load TensorRT Engine')
        model = load_engine(model_dir)
        
    return model
    

# 讀取標籤
def get_label(lable_dir) -> 'return dict of labels':

    label = {}

    with open(lable_dir) as f:    
        for line in f.readlines():
            idx, name = line.strip().split(' ')
            label[int(idx)]=name

    return label

# 讀取TensorRT模型
def load_engine(engine_path):

    if trt_found:

        TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
        trt_runtime = trt.Runtime(TRT_LOGGER)
        
        with open(engine_path, 'rb') as f:
            engine_data = f.read()
        engine = trt_runtime.deserialize_cuda_engine(engine_data)

        return engine
    else:
        print("Can not load load_engine because there is no tensorrt module")
        exit(1)

接著是解析預測結果的副函式,通常我們會取得到一組預測的信心指數,我們需要針對這組數據去解析出最大數值是在哪一個位置,而該位置又屬於哪一個類別:

# 解析輸出資訊
def parse_output(preds, label) -> 'return ( class id, class name, probobility) ':
    
    preds = preds[0] if len(preds.shape)==4 else preds
    trg_id = np.argmax(preds)
    trg_name = label[trg_id]
    trg_prob = preds[trg_id]
    return ( trg_id, trg_name, trg_prob)

截至目前為止的程式,我都將其放在tools.py裡,後續只要做import的動作即可將這些功能導入。

最後來到主程式的部分,這部分須要涵蓋IFTTT以及Inference,流程大致如下:

1.取得模型與標籤、開啟即時影像的線程:

# 取得模型與標籤
model, label = load_model_folder('keras_models')

# 設定影像擷取
vid = CustomVideoCapture()
vid.set_title('{sys} - {framework}'.format(sys='Jetson Nano', framework='Tensorflow'))
vid.start_stream()

2.設定辨識的參數,主要用於控制幾秒辨識一次 ( t_delay ),與上次辨識結果不同才進行傳送 ( pre_id ):

 # 設定幾秒辨識一次,降低運行負擔
t_check = 0
t_delay = 2
t_start = 0
# 儲存上一次辨識的結果,如果改變才傳送,防止ifttt負擔太大
pre_id = -1

3.設定IFTTT的參數:

# 設定「Line訊息」資訊
event = 'jetsonnano_line'
key = 'i3_S_gIAsOty30yvIg4vg'
status = {
    0:['是本人', '確定有做好防疫工作'],
    1:['是本人', '注意,已成為防疫破口'], 
    2:['離開位置', ''], 
    3:['非本人', '注意您的財產']
    }

4.使用While不斷進行即時的辨識與LINE監控,這邊設定了如果大於預設的delay時間則進行辨識:

# 開始即時辨識
t_start = time.time()
while(not vid.isStop):

    # 計算時間如果大於預設延遲時間則進行辨識與發送
    t_check = time.time() - t_start

    if (t_check >= t_delay) or ( not vid.fps):

        # 取得當前圖片
        ret, frame = vid.get_current_frame()

        # 如果沒有幀則重新執行
        if not ret: continue

5.進行推論以及取得辨識結果,最後設定顯示在即時影像上的資訊:

        # 進行處理與推論
        data = preprocess(frame, resize=(224,224), norm=True)
        prediction = model(data)[0]
        
        # 解析 辨識結果
        trg_id, trg_class, trg_prob =parse_output(prediction, label)

        # 設定顯示資訊
        vid.info = '{} : {:.3f} , FPS {}'.format(trg_class, trg_prob, vid.get_fps())

6.如果辨識結果與上次的不同,則回傳給LINE:

        if pre_id != trg_id:
            
            ifttt.send_to_webhook(event, 
                                    key, 
                                    '環境變動', 
                                    status[trg_id][0], 
                                    status[trg_id][1] if status[trg_id][1] else '')
            pre_id = trg_id

        # 更新 time
        t_start = time.time()

7.最後在While的外部需要確認一下Thread是否都有關閉了,寫多線程很常遇到的問題就是開了線程,但是忘記關閉導致資源被用完,所以做個DoubleCheck會是不錯的選擇:

# 跳出 while 迴圈需要檢查多線程是否已經關閉
time.sleep(1)
print('-'*30)
print(f'影像串流的線程是否已關閉 : {not vid.t.is_alive()}')
print('離開程式')

完整主程式如下:

#%%
import cv2
import threading
import os, time, random
import ifttt
import numpy as np
import tensorflow as tf
import platform as plt
from tools import CustomVideoCapture, preprocess, load_model_folder, parse_output
import time

# 取得模型與標籤
model, label = load_model_folder('keras_models')

# 設定影像擷取
vid = CustomVideoCapture()
vid.set_title('{sys} - {framework}'.format(sys='Jetson Nano', framework='Tensorflow'))
vid.start_stream()

# 設定幾秒辨識一次,降低運行負擔
t_check = 0
t_delay = 2
t_start = 0
# 儲存上一次辨識的結果,如果改變才傳送,防止ifttt負擔太大
pre_id = -1

# 設定「Line訊息」資訊
event = 'jetsonnano_line'
key = 'i3_S_gIAsOty30yvIg4vg'
status = {
    0:['是本人', '確定有做好防疫工作'],
    1:['是本人', '注意,已成為防疫破口'], 
    2:['離開位置', ''], 
    3:['非本人', '注意您的財產']
    }
#%%

                             
# 開始即時辨識
t_start = time.time()
while(not vid.isStop):

    # 計算時間如果大於預設延遲時間則進行辨識與發送
    t_check = time.time() - t_start

    if (t_check >= t_delay) or ( not vid.fps):

        # 取得當前圖片
        ret, frame = vid.get_current_frame()

        # 如果沒有幀則重新執行
        if not ret: continue

        # 進行處理與推論
        data = preprocess(frame, resize=(224,224), norm=True)
        prediction = model(data)[0]
        
        # 解析 辨識結果
        trg_id, trg_class, trg_prob =parse_output(prediction, label)

        # 設定顯示資訊
        vid.info = '{} : {:.3f} , FPS {}'.format(trg_class, trg_prob, vid.get_fps())

        # 如果與上次辨識不同,則將辨識到的結果傳送至Line
        if pre_id != trg_id:
            
            ifttt.send_to_webhook(event, 
                                    key, 
                                    '環境變動', 
                                    status[trg_id][0], 
                                    status[trg_id][1] if status[trg_id][1] else '')
            pre_id = trg_id

        # 更新 time
        t_start = time.time()

# 跳出 while 迴圈需要檢查多線程是否已經關閉
time.sleep(1)
print('-'*30)
print(f'影像串流的線程是否已關閉 : {not vid.t.is_alive()}')
print('離開程式')

可以發現使用Thread來運行影像就完全不會受到IFTTT的影響,FPS都可以維持在30甚至以上,而主線程只需要關注於辨識以及傳送資料給IFTTT即可。

使用TensorRT引擎加速推論

剛剛使用了Thread來改善IFTTT傳送卡頓的問題,我們也可以針對AI推論來做改善,我們使用Jetson Nano最大的優勢就在於可以使用TensorRT引擎加速處理,所以這邊教大家怎麼從Teachable Machine下載模型並轉換成TensorRT引擎。

概略介紹

TensorRT是一個支援NVIDIA CUDA核心的加速引擎,透過對神經網路模型進行重構與資料縮減來達到加速的目的,在Jetson Nano中使用TensorRT絕對是做AI Inference的首選,那如何將神經網路模型轉換成TensorRT去運行呢?

1.需要先將模型轉換成 Onnx 的通用格式

2.接著在轉換成 TensorRT 引擎可運作的格式

在Jetson Nano中已經帶有TensorRT轉換的工具,但是怎麼將模型轉換成Onnx還需要安裝額外的工具,所以我們先來安裝一下tf2onnx這個套件吧。

環境版本

JetPack

4.4.1

Python

3.6.9

pip

21.0

tensorflow

2.3.1+nv20.12

onnx

1.8.1

安裝 tf2onnx並將模型轉換成onnx

首先需要將tensorflow的模型轉換成onnx,我們將使用tf2onnx這個套件,在安裝之前需要先確保onnx已經被安裝了,這邊提供相依套件以onnx的安裝命令:

$ sudo apt-get install protobuf-compiler libprotoc-dev # onnx 相依套件
$ pip3 install onnx
$ pip install onnxruntime

升級numpy (可有可無):

$ python3 -m pip install -U numpy --no-cache-dir --no-binary numpy

安裝tf2onnx:

$ pip3 install tf2onnx

宣告OpenBLAS的核心架構,在JetsonNano上少了這步應該會報錯誤訊息” Illegal instruction(core dumped)”:

$ nano ~/.bashrc
export OPENBLAS_CORETYPE=ARMV8
$ source ~/.bashrc	

安裝完之後可以回到上次教學的Teachable Machine,這次要下載的檔案格式必須選擇成TensorFlow > Savemodel,如下圖所示:

03_TM_456e1988ca869dd6d2539fbfbc9edb41fd431903.png

Savemodel是Tensorflow模型「序列化」的格式,由於Onnx的格式也是序列化的,所以在一開始就轉換成Savemodel在後續轉換Onnx比較不容易出錯。我們可以使用執行下列指令轉換成onnx模型:

$ python3 -m tf2onnx.convert --saved-model ./savemodel --output ./test_opset_default.onnx

透過Jetson Nano內建工具轉換成TensorRT

接著可以使用JetsonNano的原生工具 (trtexec) 轉換成TensorRT:

$ /usr/src/tensorrt/bin/trtexec --onnx=/home/dlinano/TM2/test_opset_default.onnx --saveEngine=/home/dlinano/TM2/test.trt --shapes=input0:1x3x224x224

同時需要安裝pycuda,安裝步驟當中有一個nvcc是用來確認是否有抓到cuda,若沒有加入環境變數則會報錯,同時也無法安裝pycuda:

$ nano ~/.bashrc
export PATH=${PATH}:/usr/local/cuda/bin
export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:/usr/local/cuda/lib64
$ source ~/.bashrc
$ nvcc -V
$ pip3 install pycuda

由於我們會使用到tensorrt提供的範例common.py,所以先直接複製一份:

$ cp /usr/src/tensorrt/samples/python/common.py ./common.py

經過繁瑣的操作後,終於可以運行程式了:

$ python3 tm_tensorrt.py

這個程式比照上一篇的方法所撰寫,可以注意到FPS相較於之前的推論程式都高非常多,已經可以到順跑的程度了。

04_TensorRT_tm2_e4b1bf91ed3a3ee81ae3fb8dab521393f2d5b388.png

程式講解

導入函式庫以及設定TRT的基本參數

import cv2
import tensorrt as trt
import numpy as np
import common
import platform as plt
import time
from tools import preprocess, load_model_folder

TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
trt_runtime = trt.Runtime(TRT_LOGGER)

先取得 TensorRT引擎,透過先前撰寫好的副函式 ( load_model_folder ) 來取得 engine、label;再導入之前我們需要預先定義好buffer給TensorRT;接著解析TensorRT物件取得該「執行文本」:

 load trt engine
print('取得TRT引擎與標籤')
engine, label = load_model_folder('tensorrt_engine')

# allocate buffers
print('分配 buffers 給 TensorRT 所須的物件')
inputs, outputs, bindings, stream = common.allocate_buffers(engine)

print('創建執行文本 ( context )')
context = engine.create_execution_context()

接著我們使用與上一篇雷同的OpenCV程式完成即時影像辨識,最大的區別在於TensorRT引擎導入資料的方法與推論的方法:

print('開啟即時影像')
fps = -1
cap = cv2.VideoCapture(0, cv2.CAP_GSTREAMER)

while(True):

    t_start = time.time()

    # 讀取圖片
    ret, frame = cap.read()

    # 將圖片進行前處理並放入輸入資料中
    inputs[0].host = preprocess(frame)

    # 進行 Inference
    trt_outputs = common.do_inference(context, bindings=bindings, inputs=inputs, outputs=outputs, stream=stream)
    
    # 解析輸出資料
    trg_idx, trg_class, trg_prob = parse_output(trt_outputs[0], label)

    # 設定顯示資料
    info = '{} : {:.3f} , FPS {}'.format(trg_class, trg_prob, fps)

    # 將顯示資料繪製在圖片上
    cv2.putText(frame, info, (10,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)
    cv2.imshow('TensorRT', frame)

    if cv2.waitKey(1) == ord('q'):
        break
    # 更新FPS與時間點
    fps = int(1/(time.time()-t_start))
    t_start = time.time()

最後離開的時候一樣要做確認的動作:

cap.release()
cv2.destroyAllWindows()
print('離開程式')

三種框架比較

既然都做到TensorRT加速了,我們還是得來比較一下速度差距(僅供參考):

可以注意到Tensorflow的速度最慢但是準確度最高;Tensorflow Lite則是犧牲準確度換取高效能的表現;而TensorRT就更優秀了,優化的時候保留更多準確度,效能也能有效提高。

05_compare_eac925a164dd70a107aa9fc10c81d14cb5b03691.png

TensorRT結合Thread與IFTTT

建構的方式與上述雷同,所以就直接提供完整程式:

import cv2
import tensorrt as trt
import numpy as np	
import common
import platform as plt
import time
import ifttt
import threading
from tools import CustomVideoCapture, preprocess, load_model_folder, parse_output

TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
trt_runtime = trt.Runtime(TRT_LOGGER)

def main():

    pre_idx = -1

    print('取得TRT引擎與標籤')
    engine, label = load_model_folder('tensorrt_engine')

    print('分配 buffers 給 TensorRT 所須的物件')
    inputs, outputs, bindings, stream = common.allocate_buffers(engine)

    print('創建執行文本 ( context )')
    context = engine.create_execution_context()

    print('設定即時影像參數')
    vid = CustomVideoCapture()
    vid.set_title('{sys} - {framework}'.format(sys='Jetson Nano', framework='TensorRT'))
    vid.start_stream()
    
    # 設定幾秒辨識一次,為了配合 ifttt 的延遲通知
    t_check = 0
    t_delay = 1
    t_start = 0

    # 儲存上一次辨識的結果,如果改變才傳送,防止ifttt負擔太大
    pre_id = -1

    # 設定「Line訊息」資訊
    print('設定IFTTT參數')
    event = 'jetsonnano_line'
    key = 'i3_S_gIAsOty30yvIg4vg'
    status = {
        0:['是本人', '確定有做好防疫工作'],
        1:['是本人', '注意,已成為防疫破口'], 
        2:['離開位置', ''], 
        3:['非本人', '注意您的財產']
        }

    t_start = time.time()
    
    while(not vid.isStop):

        # 計算時間如果大於預設延遲時間則進行辨識與發送
        t_check = time.time()-t_start
        if t_check >= t_delay:
            
            ret, frame = vid.get_current_frame()
            if not ret: continue

            inputs[0].host = preprocess(frame, resize=(224, 224), norm=True)

            infer_time = time.time()
            
            # with engine.create_execution_context() as context:
            trt_outputs = common.do_inference(context, bindings=bindings, inputs=inputs, outputs=outputs, stream=stream)
            
            infer_time = time.time() - infer_time
            
            preds = trt_outputs[0]

            trg_id, trg_class, trg_prob = parse_output(preds, label)

            vid.info = '{} : {:.3f} , FPS : {:.3f}'.format(trg_class, trg_prob, vid.get_fps())

            if pre_id != trg_id:
                
                ifttt.send_to_webhook(event, 
                                      key, 
                                      '環境變動', 
                                      status[trg_id][0], 
                                      status[trg_id][1] if status[trg_id][1] else '')
                pre_id = trg_id

            t_start = time.time()
    
    # 跳出 while 迴圈需要檢查多線程是否已經關閉
    time.sleep(1)
    print('-'*30, '\n')
    print(f'影像串流的線程是否已關閉 : {not vid.t.is_alive()}')

if __name__ == '__main__':
    
    main()

結語

這次我們使用了兩種方式來進行改造、加速,其實透過Thread就能有不錯的成果了,但是TensorRT又能再減少一些負擔,讓 AI辨識與Line的監控訊息可以變得更加確實、快速。

相關文章

Onnx-tensorrt Github

Program/Process/Thread 差異

CAVEDU Education is devoted into robotics education and maker movement since 2008, and is intensively active in teaching fundamental knowledge and skills. We had published many books for readers in all ages, topics including Deep Learning, edge computing, App Inventor, IoT and robotics. Please check CAVEDU's website for more information: http://www.cavedu.com, http://www.appinventor.tw
DesignSpark Electrical Logolinkedin