嘿!您似乎在 United States,您想使用我们的 English 网站吗?
Switch to English site
Skip to main content

NVIDIA Jetson Nano应用-多线程平行处理,以项目-「你带口罩了吗?」为例

作者

张嘉钧

难度

普通

思路分析

原先使用主线程运行影像辨识以及IFTTT进行实时监控,光是执行影像辨识就会要等待推论的时间,而后如果要传送至IFTTT则又有一个传送的等待Request时间,如此便会影响到While循环里的实时影像,这边有很多种方法可以改善,最快且较为简单的解决方式是将实时影像放到另一个线程中去运行,这样显示实时影像与推论的线程是同步进行的,实时影像就不会因此被推论以及等待网页的时间给延迟,只需要专心处理实时影像的部分即可。

01_method2_e5dd02916374532eeabcadb794ec5f7f08411c19.png

平行运算中的多线程

在Python的平行运算中有分两种,一个是Multi-Thread另一个是Multi-Process;Process ( 进程 ) 跟Thread ( 线程 ) 其实大家平常都会听到,在购买计算机的时候常常会听到几核几绪 ( 例如 : 四核八绪 ) 就是类似的概念,几个观念重点介绍:

  1. 每个CPU都只能运行一个Process,每个Process彼此之间是独立的。
  2. 每个Process可以有多个Thread运行,彼此共享内存、变量。

由于Thread无法回传值所以要使用Queue ( 队列 ) 去储存数据,那这部分我就不多作介绍因为网络上已经有很多相关的参考了,不过,这边我没有使用queue的方式去撰写程序。

增加实时影像的线程到程序中

我使用class的方式去写因为可以直接省略queue去储存、取得变量,算是一个偷吃步的小技巧,因为我这边除了读取帧之外就只有回传的动作,应该不会导致抢资源或同步的问题。

客制化的实时影像对象

为了符合我们的需求,我客制了一个类别提供了几个所需的功能,首先在initialize的部分,比较特别的地方在我使用了 isStop的参数用来中断线程并且宣告了t为实时影像线程的对象。

# 客制化的影像撷取程序
class CustomVideoCapture():

    # 初始化 默认的摄影机装置为 0
    def __init__(self, dev=0):

        self.cap = cv2.VideoCapture(dev)
        self.ret = ''
        self.frame = []
        self.win_title = 'Modified with set_title()'
        self.info = ''
        self.fps = 0
        self.fps_time = 0

        self.isStop = False
        self.t = threading.Thread(target=self.video, name='stream')

接着先宣告了一些可以从外部控制线程的函式,像是 start_stream就是开启线程;stop_stream关闭线程;get_current_frame就是取得当前的画面,使用get_current_frame可以让外部直接获取线程更新的画面,算是一个使用Thread运行OpenCV常用的方法;最后还提供了一个set_title可以修改窗口的名称:

    # 可以透过这个函式 开启 Thread 
    def start_stream(self):
        self.t.start()
    
    # 关闭 Thread 与 Camera
    def stop_stream(self):
        self.isStop = True
        self.cap.release()
        cv2.destroyAllWindows()

    # 取得最近一次的帧
    def get_current_frame(self):
        return self.ret, self.frame

    def get_fps(self):
        return self.fps

    # 设定显示窗口的名称
    def set_title(self, txt):
        self.win_title = txt

最后宣告了多线程要运作的函式,由于要不断更新画面所以使用while,透过isStop控制是否跳出循环,其中做的事情就是取得当前影像,设定要印上去的信息并显示出来,当按下q的时候会退出循环并且使用stop_stream终止循环:

    # Thread主要运行的函式
    def video(self):
        try:
            while(not self.isStop):
                self.fps_time = time.time()
                self.ret, self.frame = self.cap.read()

                if self.info is not '':
                    cv2.putText(self.frame, self.info, (10,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)
                
                cv2.imshow(self.win_title, self.frame)
                
                if cv2.waitKey(1) == ord('q'):
                    break
                
                self.fps = int(1/(time.time() - self.fps_time))

            self.stop_stream()
        except:
            self.stop_stream()

我建立了一个tools.py存放所有会用到的副函式 ( 包含上述的客制化影像类别 ),这边开始介绍其他副函式,preprocess专门在处理输入前的数据,针对该数据进行缩放、正规化、转换成含有批次大小的格式:

# 用于数据前处理的程序
def preprocess(frame, resize=(224, 224), norm=True):
    '''
    设定格式 ( 1, 224, 224, 3)、缩放大、正规化、放入数据并回传正确格式的数据
    '''
    input_format = np.ndarray(shape=(1, 224, 224, 3), dtype=np.float32)
    frame_resize = cv2.resize(frame, resize)
    frame_norm =  ((frame_resize.astype(np.float32) / 127.0) - 1) if norm else frame_resize
    input_format[0]=frame_norm
    return input_format

load_model_folder则是加载模型与卷标,这边写成只需要输入存放模型与卷标的目录路径即可,两者须放置在一起,程序会靠扩展名去判断:

 # 读取 模型 与 卷标
def load_model_folder(trg_dir) -> "'trg_dir' is the path include model file and labels file. return (model, label).":

    model_type = [ 'trt','engine','h5']
    label_type = [ 'txt']

    for f in os.listdir(trg_dir):
        extension = f.split('.')[-1]
        
        if extension in model_type:
            model_dir = os.path.join(trg_dir, f)
        elif extension in label_type:
            lable_dir = os.path.join(trg_dir, f)

    return get_model(model_dir), get_label(lable_dir)

刚刚输出的时候有用到两个副函式 get_model、get_label,分别去取得模型与卷标文件的对象:

# 读取模型
def get_model(model_dir) -> "support keras and tensorrt model":
    
    if model_dir.split('.')[-1] == 'h5':
        print('Load Keras Model')
        model = tf.keras.models.load_model(model_dir)
    else:
        print('Load TensorRT Engine')
        model = load_engine(model_dir)
        
    return model
    

# 读取标签
def get_label(lable_dir) -> 'return dict of labels':

    label = {}

    with open(lable_dir) as f:    
        for line in f.readlines():
            idx, name = line.strip().split(' ')
            label[int(idx)]=name

    return label

# 读取TensorRT模型
def load_engine(engine_path):

    if trt_found:

        TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
        trt_runtime = trt.Runtime(TRT_LOGGER)
        
        with open(engine_path, 'rb') as f:
            engine_data = f.read()
        engine = trt_runtime.deserialize_cuda_engine(engine_data)

        return engine
    else:
        print("Can not load load_engine because there is no tensorrt module")
        exit(1)

接着是解析预测结果的副函式,通常我们会取得到一组预测的信心指数,我们需要针对这组数据去解析出最大数值是在哪一个位置,而该位置又属于哪一个类别:

# 解析输出信息
def parse_output(preds, label) -> 'return ( class id, class name, probobility) ':
    
    preds = preds[0] if len(preds.shape)==4 else preds
    trg_id = np.argmax(preds)
    trg_name = label[trg_id]
    trg_prob = preds[trg_id]
    return ( trg_id, trg_name, trg_prob)

截至目前为止的程序,我都将其放在tools.py里,后续只要做import的动作即可将这些功能导入。

最后来到主程序的部分,这部分须要涵盖IFTTT以及Inference,流程大致如下:

1.取得模型与卷标、开启实时影像的线程:

# 取得模型与卷标
model, label = load_model_folder('keras_models')

# 设定影像撷取
vid = CustomVideoCapture()
vid.set_title('{sys} - {framework}'.format(sys='Jetson Nano', framework='Tensorflow'))
vid.start_stream()

2.设定辨识的参数,主要用于控制几秒辨识一次 ( t_delay ),与上次辨识结果不同才进行传送 ( pre_id ):

 # 设定几秒辨识一次,降低运行负担
t_check = 0
t_delay = 2
t_start = 0
# 储存上一次辨识的结果,如果改变才传送,防止ifttt负担太大
pre_id = -1

3.设定IFTTT的参数:

# 设定「Line讯息」信息
event = 'jetsonnano_line'
key = 'i3_S_gIAsOty30yvIg4vg'
status = {
    0:['是本人', '确定有做好防疫工作'],
    1:['是本人', '注意,已成为防疫破口'], 
    2:['离开位置', ''], 
    3:['非本人', '注意您的财产']
    }

4.使用While不断进行实时的辨识与LINE监控,这边设定了如果大于预设的delay时间则进行辨识:

# 开始实时辨识
t_start = time.time()
while(not vid.isStop):

    # 计算时间如果大于预设延迟时间则进行辨识与发送
    t_check = time.time() - t_start

    if (t_check >= t_delay) or ( not vid.fps):

        # 取得当前图片
        ret, frame = vid.get_current_frame()

        # 如果没有帧则重新执行
        if not ret: continue

5.进行推论以及取得辨识结果,最后设定显示在实时影像上的信息:

        # 进行处理与推论
        data = preprocess(frame, resize=(224,224), norm=True)
        prediction = model(data)[0]
        
        # 解析 辨识结果
        trg_id, trg_class, trg_prob =parse_output(prediction, label)

        # 设定显示信息
        vid.info = '{} : {:.3f} , FPS {}'.format(trg_class, trg_prob, vid.get_fps())

6.如果辨识结果与上次的不同,则回传给LINE:

        if pre_id != trg_id:
            
            ifttt.send_to_webhook(event, 
                                    key, 
                                    '环境变动', 
                                    status[trg_id][0], 
                                    status[trg_id][1] if status[trg_id][1] else '')
            pre_id = trg_id

        # 更新 time
        t_start = time.time()

7.最后在While的外部需要确认一下Thread是否都有关闭了,写多线程很常遇到的问题就是开了线程,但是忘记关闭导致资源被用完,所以做个DoubleCheck会是不错的选择:

# 跳出 while 循环需要检查多线程是否已经关闭
time.sleep(1)
print('-'*30)
print(f'影像串流的线程是否已关闭 : {not vid.t.is_alive()}')
print('离开程序')

完整主程序如下:

#%%
import cv2
import threading
import os, time, random
import ifttt
import numpy as np
import tensorflow as tf
import platform as plt
from tools import CustomVideoCapture, preprocess, load_model_folder, parse_output
import time

# 取得模型与卷标
model, label = load_model_folder('keras_models')

# 设定影像撷取
vid = CustomVideoCapture()
vid.set_title('{sys} - {framework}'.format(sys='Jetson Nano', framework='Tensorflow'))
vid.start_stream()

# 设定几秒辨识一次,降低运行负担
t_check = 0
t_delay = 2
t_start = 0
# 储存上一次辨识的结果,如果改变才传送,防止ifttt负担太大
pre_id = -1

# 设定「Line讯息」信息
event = 'jetsonnano_line'
key = 'i3_S_gIAsOty30yvIg4vg'
status = {
    0:['是本人', '确定有做好防疫工作'],
    1:['是本人', '注意,已成为防疫破口'], 
    2:['离开位置', ''], 
    3:['非本人', '注意您的财产']
    }
#%%

                             
# 开始实时辨识
t_start = time.time()
while(not vid.isStop):

    # 计算时间如果大于预设延迟时间则进行辨识与发送
    t_check = time.time() - t_start

    if (t_check >= t_delay) or ( not vid.fps):

        # 取得当前图片
        ret, frame = vid.get_current_frame()

        # 如果没有帧则重新执行
        if not ret: continue

        # 进行处理与推论
        data = preprocess(frame, resize=(224,224), norm=True)
        prediction = model(data)[0]
        
        # 解析 辨识结果
        trg_id, trg_class, trg_prob =parse_output(prediction, label)

        # 设定显示信息
        vid.info = '{} : {:.3f} , FPS {}'.format(trg_class, trg_prob, vid.get_fps())

        # 如果与上次辨识不同,则将辨识到的结果传送至Line
        if pre_id != trg_id:
            
            ifttt.send_to_webhook(event, 
                                    key, 
                                    '环境变动', 
                                    status[trg_id][0], 
                                    status[trg_id][1] if status[trg_id][1] else '')
            pre_id = trg_id

        # 更新 time
        t_start = time.time()

# 跳出 while 循环需要检查多线程是否已经关闭
time.sleep(1)
print('-'*30)
print(f'影像串流的线程是否已关闭 : {not vid.t.is_alive()}')
print('离开程序')

可以发现使用Thread来运行影像就完全不会受到IFTTT的影响,FPS都可以维持在30甚至以上,而主线程只需要关注于辨识以及传送数据给IFTTT即可。

使用TensorRT引擎加速推论

刚刚使用了Thread来改善IFTTT传送卡顿的问题,我们也可以针对AI推论来做改善,我们使用Jetson Nano最大的优势就在于可以使用TensorRT引擎加速处理,所以这边教大家怎么从Teachable Machine下载模型并转换成TensorRT引擎。

概略介绍

TensorRT是一个支持NVIDIA CUDA核心的加速引擎,透过对神经网络模型进行重构与数据缩减来达到加速的目的,在Jetson Nano中使用TensorRT绝对是做AI Inference的首选,那如何将神经网络模型转换成TensorRT去运行呢?

1.需要先将模型转换成 Onnx 的通用格式

2.接着在转换成 TensorRT 引擎可运作的格式

在Jetson Nano中已经带有TensorRT转换的工具,但是怎么将模型转换成Onnx还需要安装额外的工具,所以我们先来安装一下tf2onnx这个套件吧。

环境版本

JetPack

4.4.1

Python

3.6.9

pip

21.0

tensorflow

2.3.1+nv20.12

onnx

1.8.1

安装 tf2onnx并将模型转换成onnx

首先需要将tensorflow的模型转换成onnx,我们将使用tf2onnx这个套件,在安装之前需要先确保onnx已经被安装了,这边提供相依套件以onnx的安装命令:

$ sudo apt-get install protobuf-compiler libprotoc-dev # onnx 相依套件
$ pip3 install onnx
$ pip install onnxruntime

升级numpy (可有可无):

$ python3 -m pip install -U numpy --no-cache-dir --no-binary numpy

安装tf2onnx:

$ pip3 install tf2onnx

宣告OpenBLAS的核心架构,在JetsonNano上少了这步应该会报错误讯息” Illegal instruction(core dumped)”:

$ nano ~/.bashrc
export OPENBLAS_CORETYPE=ARMV8
$ source ~/.bashrc	

安装完之后可以回到上次教学的Teachable Machine,这次要下载的文件格式必须选择成TensorFlow > Savemodel,如下图所示:

03_TM1_456e1988ca869dd6d2539fbfbc9edb41fd431903.png

Savemodel是Tensorflow模型「串行化」的格式,由于Onnx的格式也是串行化的,所以在一开始就转换成Savemodel在后续转换Onnx比较不容易出错。我们可以使用执行下列指令转换成onnx模型:

$ python3 -m tf2onnx.convert --saved-model ./savemodel --output ./test_opset_default.onnx

透过Jetson Nano内建工具转换成TensorRT

接着可以使用JetsonNano的原生工具 (trtexec) 转换成TensorRT:

$ /usr/src/tensorrt/bin/trtexec --onnx=/home/dlinano/TM2/test_opset_default.onnx --saveEngine=/home/dlinano/TM2/test.trt --shapes=input0:1x3x224x224

同时需要安装pycuda,安装步骤当中有一个nvcc是用来确认是否有抓到cuda,若没有加入环境变量则会报错,同时也无法安装pycuda:

$ nano ~/.bashrc
export PATH=${PATH}:/usr/local/cuda/bin
export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:/usr/local/cuda/lib64
$ source ~/.bashrc
$ nvcc -V
$ pip3 install pycuda

由于我们会使用到tensorrt提供的范例common.py,所以先直接复制一份:

$ cp /usr/src/tensorrt/samples/python/common.py ./common.py

经过繁琐的操作后,终于可以运行程序了:

$ python3 tm_tensorrt.py

这个程序比照上一篇的方法所撰写,可以注意到FPS相较于之前的推论程序都高非常多,已经可以到顺跑的程度了。

04_TensorRT_tm21_e4b1bf91ed3a3ee81ae3fb8dab521393f2d5b388.png

程序讲解

导入函式库以及设定TRT的基本参数

import cv2
import tensorrt as trt
import numpy as np
import common
import platform as plt
import time
from tools import preprocess, load_model_folder

TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
trt_runtime = trt.Runtime(TRT_LOGGER)

先取得 TensorRT引擎,透过先前撰写好的副函式 ( load_model_folder ) 来取得 engine、label;再导入之前我们需要预先定义好buffer给TensorRT;接着解析TensorRT对象取得该「执行文本」:

 load trt engine
print('取得TRT引擎与卷标')
engine, label = load_model_folder('tensorrt_engine')

# allocate buffers
print('分配 buffers 给 TensorRT 所须的物件')
inputs, outputs, bindings, stream = common.allocate_buffers(engine)

print('创建执行文本 ( context )')
context = engine.create_execution_context()

接着我们使用与上一篇雷同的OpenCV程序完成实时影像辨识,最大的区别在于TensorRT引擎导入数据的方法与推论的方法:

print('开启实时影像')
fps = -1
cap = cv2.VideoCapture(0, cv2.CAP_GSTREAMER)

while(True):

    t_start = time.time()

    # 读取图片
    ret, frame = cap.read()

    # 将图片进行前处理并放入输入数据中
    inputs[0].host = preprocess(frame)

    # 进行 Inference
    trt_outputs = common.do_inference(context, bindings=bindings, inputs=inputs, outputs=outputs, stream=stream)
    
    # 解析输出数据
    trg_idx, trg_class, trg_prob = parse_output(trt_outputs[0], label)

    # 设定显示数据
    info = '{} : {:.3f} , FPS {}'.format(trg_class, trg_prob, fps)

    # 将显示数据绘制在图片上
    cv2.putText(frame, info, (10,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)
    cv2.imshow('TensorRT', frame)

    if cv2.waitKey(1) == ord('q'):
        break
    # 更新FPS与时间点
    fps = int(1/(time.time()-t_start))
    t_start = time.time()

最后离开的时候一样要做确认的动作:

cap.release()
cv2.destroyAllWindows()
print('离开程序')

三种框架比较

既然都做到TensorRT加速了,我们还是得来比较一下速度差距(仅供参考):

05_compare1_eac925a164dd70a107aa9fc10c81d14cb5b03691.png

可以注意到Tensorflow的速度最慢但是准确度最高;Tensorflow Lite则是牺牲准确度换取高效能的表现;而TensorRT就更优秀了,优化的时候保留更多准确度,效能也能有效提高。

TensorRT结合Thread与IFTTT

建构的方式与上述雷同,所以就直接提供完整程序:

import cv2
import tensorrt as trt
import numpy as np	
import common
import platform as plt
import time
import ifttt
import threading
from tools import CustomVideoCapture, preprocess, load_model_folder, parse_output

TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
trt_runtime = trt.Runtime(TRT_LOGGER)

def main():

    pre_idx = -1

    print('取得TRT引擎与卷标')
    engine, label = load_model_folder('tensorrt_engine')

    print('分配 buffers 给 TensorRT 所须的物件')
    inputs, outputs, bindings, stream = common.allocate_buffers(engine)

    print('创建执行文本 ( context )')
    context = engine.create_execution_context()

    print('设定实时影像参数')
    vid = CustomVideoCapture()
    vid.set_title('{sys} - {framework}'.format(sys='Jetson Nano', framework='TensorRT'))
    vid.start_stream()
    
    # 设定几秒辨识一次,为了配合 ifttt 的延迟通知
    t_check = 0
    t_delay = 1
    t_start = 0

    # 储存上一次辨识的结果,如果改变才传送,防止ifttt负担太大
    pre_id = -1

    # 设定「Line讯息」信息
    print('设定IFTTT参数')
    event = 'jetsonnano_line'
    key = 'i3_S_gIAsOty30yvIg4vg'
    status = {
        0:['是本人', '确定有做好防疫工作'],
        1:['是本人', '注意,已成为防疫破口'], 
        2:['离开位置', ''], 
        3:['非本人', '注意您的财产']
        }

    t_start = time.time()
    
    while(not vid.isStop):

        # 计算时间如果大于预设延迟时间则进行辨识与发送
        t_check = time.time()-t_start
        if t_check >= t_delay:
            
            ret, frame = vid.get_current_frame()
            if not ret: continue

            inputs[0].host = preprocess(frame, resize=(224, 224), norm=True)

            infer_time = time.time()
            
            # with engine.create_execution_context() as context:
            trt_outputs = common.do_inference(context, bindings=bindings, inputs=inputs, outputs=outputs, stream=stream)
            
            infer_time = time.time() - infer_time
            
            preds = trt_outputs[0]

            trg_id, trg_class, trg_prob = parse_output(preds, label)

            vid.info = '{} : {:.3f} , FPS : {:.3f}'.format(trg_class, trg_prob, vid.get_fps())

            if pre_id != trg_id:
                
                ifttt.send_to_webhook(event, 
                                      key, 
                                      '环境变动', 
                                      status[trg_id][0], 
                                      status[trg_id][1] if status[trg_id][1] else '')
                pre_id = trg_id

            t_start = time.time()
    
    # 跳出 while 循环需要检查多线程是否已经关闭
    time.sleep(1)
    print('-'*30, '\n')
    print(f'影像串流的线程是否已关闭 : {not vid.t.is_alive()}')

if __name__ == '__main__':
    
    main()

结语

这次我们使用了两种方式来进行改造、加速,其实透过Thread就能有不错的成果了,但是TensorRT又能再减少一些负担,让 AI辨识与Line的监控讯息可以变得更加确实、快速。

相关文章

Onnx-tensorrt Github

Program/Process/Thread 差异

CAVEDU Education is devoted into robotics education and maker movement since 2008, and is intensively active in teaching fundamental knowledge and skills. We had published many books for readers in all ages, topics including Deep Learning, edge computing, App Inventor, IoT and robotics. Please check CAVEDU's website for more information: http://www.cavedu.com, http://www.appinventor.tw
DesignSpark Electrical Logolinkedin