在之前的帖子中,我们完成了神经网络自动驾驶小车的硬件搭建与底盘控制。当小车通过WiFi无线连接到网络后,已经可以对其进行远程操控,成为一辆无线遥控小车,但是这还不够,本讲我们将为它搭建神经网络控制器,赋予它自己的思想。
本篇我们将从0开始搭建并训练一个神经网络,用它来控制小车的运动。
要训练神经网络,我们首先需要的就是训练神经网络所需的数据集。对于本作品,我们收集训练数据的思路如下:使用电脑远程控制,手动控制小车在跑道上运动,此时小车摄像头采集到的道路图像数据将发送至电脑,电脑将当前道路图像数据与当前的操控方向打包存储起来,制作成一个数据集。
为了实现这个功能,我们需要在PYNQ-Z2开发板上安装视频流服务器,使车载摄像头采集到的图像数据实时传输至电脑端。在本作品中,我使用的是一个成熟的开源软件:mjpg-streamer,从网上下载源码并编译安装后,只需运行服务器的启动脚本“start.sh”,就可以启动流服务器。
这时,在同一网段下的其他电脑,可以通过链接“pynq:8080/?action=stream”获取数据流,上位机接收数据流,并使用OpenCV重新解码为图像,进行后续处理。
上位机接收视频流的代码如下:
- import re
- from urllib.request import urlopen
- import cv2
- import numpy as np
- # mjpg-streamer URL
- url = 'http://192.168.1.103:8080/?action=stream'
- stream = urlopen(url)
-
- # Read the boundary message and discard
- stream.readline()
- sz = 0
- rdbuffer = None
- clen_re = re.compile(b'Content-Length: (d+)\r\n')
- # Read each frame
- # TODO: This is hardcoded to mjpg-streamer's behavior
- while True:
-
- stream.readline() # content type
-
- try: # content length
- m = clen_re.match(stream.readline())
- clen = int(m.group(1))
- except:
- break
-
- stream.readline() # timestamp
- stream.readline() # empty line
-
- # Reallocate buffer if necessary
- if clen > sz:
- sz = clen*2
- rdbuffer = bytearray(sz)
- rdview = memoryview(rdbuffer)
-
- # Read frame into the preallocated buffer
- stream.readinto(rdview[:clen])
-
- stream.readline() # endline
- stream.readline() # boundary
-
- # This line will need to be different when using OpenCV 2.x
- img = cv2.imdecode(np.frombuffer(rdbuffer, count=clen, dtype=np.byte), flags=cv2.IMREAD_COLOR)
- # Show Image
- cv2.imshow('Image', img)
- c = cv2.waitKey(1)
- if c & 0xFF == ord('q'):
- exit(0)
复制代码
下一步,我们还需要编写远程控制小车运动的代码,作为一个可以远程控制小车的上位机控制端。在本讲中我们使用串口对底盘进行控制,向底盘发送“F”、“B”、“L”、“R”等字符,将控制底盘做出前进、后退、左转、右转等动作。PYNQ-Z2开发板上运行ser2net程序,其作用是通过无线网络接收字符串,并使用串口将信息转发至底盘控制器。
远程控制底盘运动代码如下:
最后,我们将以上两个程序融合,并加入制作数据集的代码,数据集采集程序就编写完成了。数据集采集使用Python的Numpy库完成,将采集得到的数据集保存为“.npz”格式的数据集文件。
需要注意的是,在这里我们为了减少神经网络规模与计算量,将原640*480的彩色图像转化为320*240的灰度图像,随后设置感兴趣区域,只收集最重要的下半部分的320*120区域,以进一步缩小规模,由于道路图像元素相对简单,虽然图像信息经过大量压缩,但是用来作为指导简单路面上神经网络驾驶信息还是足够的。
数据采集源代码如下:
- import numpy as np
- import cv2
- import pygame
- from pygame.locals import *
- import socket
- import time
- import os
- from urllib.request import urlopen
- import re
- MOTION_STOP = b'S'
- MOTION_FORWARD = b'F'
- MOTION_REVERSE = b'B'
- MOTION_LEFT = b'L'
- MOTION_RIGHT = b'R'
- MOTION_REVERSE_LEFT = b'H'
- MOTION_REVERSE_RIGHT = b'J'
- MOTION_FORWARD_LEFT = b'G'
- MOTION_FORWARD_RIGHT = b'I'
- SPEED_LEFT_FAST = b'5'
- SPEED_RIGHT_FAST = b'5'
- SPEED_LEFT_SLOW = b'1'
- SPEED_RIGHT_SLOW = b'1'
- class CollectTrainingData(object):
-
- def __init__(self):
- self.send_inst = True
- # create labels
- self.k = np.zeros((4, 4), 'float')
- for i in range(4):
- self.k[i, i] = 1
- self.temp_label = np.zeros((1, 4), 'float')
- self.address = ('192.168.1.103', 8484)
- self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.s.connect(self.address)
- pygame.init()
- window_size = Rect(0,0,100,100)#设置窗口的大小
- screen = pygame.display.set_mode(window_size.size)#设置窗口模式
- self.collect_image()
- def collect_image(self):
- saved_frame = 0
- total_frame = 0
- # collect images for training
- print('Start collecting images...')
- e1 = cv2.getTickCount()
- image_array = np.zeros((1, 38400))
- label_array = np.zeros((1, 4), 'float')
- # mjpg-streamer URL
- url = 'http://192.168.1.101:1010/?action=stream'
- stream = urlopen(url)
-
- # Read the boundary message and discard
- stream.readline()
- sz = 0
- rdbuffer = None
- clen_re = re.compile(b'Content-Length: (d+)\r\n')
- # stream video frames one by one
- try:
- stream_bytes = ' '
- frame = 1
- while self.send_inst:
- stream.readline() # content type
-
- try: # content length
- m = clen_re.match(stream.readline())
- clen = int(m.group(1))
- except:
- break
-
- stream.readline() # timestamp
- stream.readline() # empty line
-
- # Reallocate buffer if necessary
- if clen > sz:
- sz = clen*2
- rdbuffer = bytearray(sz)
- rdview = memoryview(rdbuffer)
-
- # Read frame into the preallocated buffer
- stream.readinto(rdview[:clen])
-
- stream.readline() # endline
- stream.readline() # boundary
-
- # This line will need to be different when using OpenCV 2.x
- image = cv2.imdecode(np.frombuffer(rdbuffer, count=clen, dtype=np.byte), flags=cv2.IMREAD_GRAYSCALE)
- height, width = image.shape[:2]
- # shrink image
- size = (int(320), int(240))
- shrink = cv2.resize(image, size, interpolation=cv2.INTER_AREA)
-
- # select lower half of the image
- roi = shrink[120:320,:]
-
- # save streamed images
- cv2.imwrite('training_images/frame{:>05}.jpg'.format(frame), image)
- print(image.shape)
- #cv2.imshow('roi_image', roi)
- cv2.imshow('image', image)
- cv2.imshow('image_gray', roi)
- # reshape the roi image into one row array
- temp_array = roi.reshape(1, 38400).astype(np.float32)
-
- frame += 1
- total_frame += 1
- # get input from human driver
- for event in pygame.event.get():
- if event.type == KEYDOWN:
- key_input = pygame.key.get_pressed()
- # complex orders
- if key_input[pygame.K_UP] and key_input[pygame.K_RIGHT]:
- print("Forward Right")
- image_array = np.vstack((image_array, temp_array))
- label_array = np.vstack((label_array, self.k[1]))
- saved_frame += 1
- self.s.send(MOTION_FORWARD_RIGHT)
- elif key_input[pygame.K_UP] and key_input[pygame.K_LEFT]:
- print("Forward Left")
- image_array = np.vstack((image_array, temp_array))
- label_array = np.vstack((label_array, self.k[0]))
- saved_frame += 1
- self.s.send(MOTION_FORWARD_LEFT)
- elif key_input[pygame.K_DOWN] and key_input[pygame.K_RIGHT]:
- print("Reverse Right")
- self.s.send(MOTION_REVERSE_RIGHT)
-
- elif key_input[pygame.K_DOWN] and key_input[pygame.K_LEFT]:
- print("Reverse Left")
- self.s.send(MOTION_REVERSE_LEFT)
- # simple orders
- elif key_input[pygame.K_UP]:
- print("Forward")
- saved_frame += 1
- image_array = np.vstack((image_array, temp_array))
- label_array = np.vstack((label_array, self.k[2]))
- self.s.send(MOTION_FORWARD)
- elif key_input[pygame.K_DOWN]:
- print("Reverse")
- saved_frame += 1
- image_array = np.vstack((image_array, temp_array))
- label_array = np.vstack((label_array, self.k[3]))
- self.s.send(MOTION_REVERSE)
-
- elif key_input[pygame.K_RIGHT]:
- print("Right")
- image_array = np.vstack((image_array, temp_array))
- label_array = np.vstack((label_array, self.k[1]))
- saved_frame += 1
- self.s.send(MOTION_RIGHT)
- elif key_input[pygame.K_LEFT]:
- print("Left")
- image_array = np.vstack((image_array, temp_array))
- label_array = np.vstack((label_array, self.k[0]))
- saved_frame += 1
- self.s.send(MOTION_LEFT)
- elif key_input[pygame.K_x] or key_input[pygame.K_q]:
- print('exit')
- self.send_inst = False
- self.s.send(MOTION_STOP)
- break
-
- elif event.type == pygame.KEYUP:
- self.s.send(MOTION_STOP)
- # save training images and labels
- train = image_array[1:, :]
- train_labels = label_array[1:, :]
- # save training data as a numpy file
- file_name = str(int(time.time()))
- directory = "training_data"
- if not os.path.exists(directory):
- os.makedirs(directory)
- try:
- np.savez(directory + '/' + file_name + '.npz', train=train, train_labels=train_labels)
- except IOError as e:
- print(e)
- e2 = cv2.getTickCount()
- # calculate streaming duration
- time0 = (e2 - e1) / cv2.getTickFrequency()
- print('Streaming duration:', time0)
- print((train.shape))
- print((train_labels.shape))
- print('Total frame:', total_frame)
- print('Saved frame:', saved_frame)
- print('Dropped frame', total_frame - saved_frame)
- finally:
- self.s.close()
- if __name__ == '__main__':
- CollectTrainingData()
复制代码
使用采集程序控制小车在跑道上规范地驾驶数分钟,采集的数据集已经足够训练。
数据集制作完成,下一步就可以搭建神经网络进行训练了。
在“神经网络自动驾驶小车”的初级阶段实现里,我使用了PYNQ-Z2开发板镜像中预装的OpenCV框架中的ml机器学习模块搭建并训练神经网络。
在本作品的应用场景中,神经网络被用于分类。它的输入是数据集中的原始图像数据,输出层的类别就对应着底盘的四种运动。
由于识别目标相对简单,我们使用38400 -> 32 -> 4的单隐层神经网络,在OpenCV中,使用ml模块的ANN_MLP相关函数实现。
搭建BP神经网络,相关参数如程序所示:
- import cv2
- import numpy as np
- import glob
- print ('Loading training data...')
- e0 = cv2.getTickCount()
- # load training data
- image_array = np.zeros((1, 38400))
- label_array = np.zeros((1, 4), 'float')
- training_data = glob.glob('Training_Data/*.npz')
- for single_npz in training_data:
- with np.load(single_npz) as data:
- print (data.files)
- train_temp = data['train']
- train_labels_temp = data['train_labels']
- print (train_temp.shape)
- print (train_labels_temp.shape)
- image_array = np.vstack((image_array, train_temp))
- label_array = np.vstack((label_array, train_labels_temp))
- train = image_array[1:, :]
- train_labels = label_array[1:, :]
- print (train.shape)
- print (train_labels.shape)
- e00 = cv2.getTickCount()
- time0 = (e00 - e0)/ cv2.getTickFrequency()
- print ('Loading image duration:', time0)
- # set start time
- e1 = cv2.getTickCount()
- # create MLP
- layer_sizes = np.int32([38400, 32, 4])
- model = cv2.ml.ANN_MLP_create()
- model.setLayerSizes(layer_sizes)
- model.setTrainMethod(cv2.ml.ANN_MLP_BACKPROP)
- model.setBackpropMomentumScale(0.0)
- model.setBackpropWeightScale(0.001)
- model.setTermCriteria((cv2.TERM_CRITERIA_COUNT, 20, 0.01))
- model.setActivationFunction(cv2.ml.ANN_MLP_SIGMOID_SYM, 2, 1)
- print ('Training MLP ...')
- num_iter = model.train(np.float32(train), cv2.ml.ROW_SAMPLE, np.float32(train_labels))
- # set end time
- e2 = cv2.getTickCount()
- time = (e2 - e1)/cv2.getTickFrequency()
- print ('Training duration:', time)
- # save param
- model.save('mlp_xml/mlp.xml')
- print ('Ran for %d iterations' % num_iter)
- ret, resp = model.predict(train)
- prediction = resp.argmax(-1)
- print ('Prediction:', prediction)
- true_labels = train_labels.argmax(-1)
- print ('True labels:', true_labels)
- print ('Testing...')
- train_rate = np.mean(prediction == true_labels)
- print ('Train rate: %f:' % (train_rate*100))
复制代码
训练结束后,程序将训练出的模型保存。这个模型就可以直接被自动驾驶用来进行神经网络推理了。
随后,我们还需要完成交通标识识别的程序,交通标识识别的程序使用OpenCV的级联分类器,与神经网络控制器共同决定小车的运动方向。
自动驾驶程序如下:
- import threading
- import cv2
- import numpy as np
- import math
- import socket
- import re
- from urllib.request import urlopen
- from sys import exit
- import time
- MOTION_STOP = b'S'
- MOTION_FORWARD = b'F'
- MOTION_REVERSE = b'B'
- MOTION_LEFT = b'L'
- MOTION_RIGHT = b'R'
- MOTION_REVERSE_LEFT = b'H'
- MOTION_REVERSE_RIGHT = b'J'
- MOTION_FORWARD_LEFT = b'G'
- MOTION_FORWARD_RIGHT = b'I'
- SPEED_LEFT_FAST = b'5'
- SPEED_RIGHT_FAST = b'5'
- SPEED_LEFT_SLOW = b'1'
- SPEED_RIGHT_SLOW = b'1'
- class ObjectDetection(object):
- def __init__(self):
- self.red_light = False
- self.green_light = False
- self.yellow_light = False
- self.stop_sign = False
- def detect(self, cascade_classifier, gray_image, image):
- # y camera coordinate of the target point 'P'
- v = 0
- # minimum value to proceed traffic light state validation
- threshold = 150
-
- # detection
- cascade_obj = cascade_classifier.detectMultiScale(
- gray_image,
- scaleFactor=1.1,
- minNeighbors=5,
- minSize=(30, 30)
- )
- # draw a rectangle around the objects
- for (x_pos, y_pos, width, height) in cascade_obj:
- cv2.rectangle(image, (x_pos+5, y_pos+5), (x_pos+width-5, y_pos+height-5), (255, 255, 255), 2)
- v = y_pos + height - 5
- # stop sign
- if width/height == 1:
- cv2.putText(image, 'STOP', (x_pos, y_pos-10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
- if width > 190:
- self.stop_sign = True
- print("STOP Sign")
- # traffic lights
- else:
- self.stop_sign = False
- roi = gray_image[y_pos+10:y_pos + height-10, x_pos+10:x_pos + width-10]
- mask = cv2.GaussianBlur(roi, (25, 25), 0)
- (minVal, maxVal, minLoc, maxLoc) = cv2.minMaxLoc(mask)
-
- # check if light is on
- if maxVal - minVal > threshold:
- cv2.circle(roi, maxLoc, 5, (255, 0, 0), 2)
-
- # Red light
- if 1.0/8*(height-30) < maxLoc[1] < 4.0/8*(height-30):
- cv2.putText(image, 'Red', (x_pos+5, y_pos-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
- if width > 70:
- self.red_light = True
- print("Red Light")
-
- # Green light
- elif 5.5/8*(height-30) < maxLoc[1] < height-30:
- cv2.putText(image, 'Green', (x_pos+5, y_pos - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
- if width > 70:
- self.green_light = True
- print("Green Light")
- else:
- self.red_light = False
- self.green_light = False
- return v
- class RCControl(object):
- def __init__(self):
- self.address = ('192.168.1.103', 8484)
- self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.s.connect(self.address)
- self.s.send(SPEED_LEFT_FAST)
- time.sleep(0.01)
- self.s.send(SPEED_RIGHT_FAST)
- time.sleep(0.01)
- def steer(self, prediction):
- if prediction == 2:
- self.s.send(MOTION_FORWARD)
- print("Forward")
- elif prediction == 0:
- self.s.send(MOTION_LEFT)
- print("Left")
- elif prediction == 1:
- self.s.send(MOTION_RIGHT)
- print("Right")
- else:
- self.stop()
- def stop(self):
- self.s.send(SPEED_LEFT_FAST)
- time.sleep(0.01)
- self.s.send(SPEED_RIGHT_FAST)
- time.sleep(0.01)
- self.s.send(MOTION_STOP)
- print("Stop")
- class VideoStream_Thread(threading.Thread):
- def __init__(self):
- super(VideoStream_Thread, self).__init__()
- self.url = 'http://192.168.1.103:8080/?action=stream'
- self.stream = urlopen(self.url)
- # Read the boundary message and discard
- self.stream.readline()
- self.sz = 0
- self.rdbuffer = None
- self.clen_re = re.compile(b'Content-Length: (d+)\r\n')
- self.stop_cascade = cv2.CascadeClassifier('cascade_xml/stop_sign.xml')
- self.light_cascade = cv2.CascadeClassifier('cascade_xml/traffic_light.xml')
- self.rc_car = RCControl()
- self.obj_detection = ObjectDetection()
- self.model = cv2.ml.ANN_MLP_load('mlp_xml/mlp.xml')
- print("model loaded")
- def predict(self, samples):
- ret, resp = self.model.predict(samples)
- return resp.argmax(-1)
- def run(self):
- stop_flag = False
- stop_sign_active = True
- # stream video frames one by one
- try:
- while True:
- self.stream.readline() # content type
- try: # content length
- m = self.clen_re.match(self.stream.readline())
- clen = int(m.group(1))
- except:
- break
- self.stream.readline() # timestamp
- self.stream.readline() # empty line
-
- # Reallocate buffer if necessary
- if clen > self.sz:
- self.sz = clen*2
- self.rdbuffer = bytearray(self.sz)
- rdview = memoryview(self.rdbuffer)
-
- # Read frame into the preallocated buffer
- self.stream.readinto(rdview[:clen])
-
- self.stream.readline() # endline
- self.stream.readline() # boundary
-
- # This line will need to be different when using OpenCV 2.x
- image = cv2.imdecode(np.frombuffer(self.rdbuffer, count=clen, dtype=np.byte), flags=cv2.IMREAD_COLOR)
- gray = cv2.imdecode(np.frombuffer(self.rdbuffer, count=clen, dtype=np.byte), flags=cv2.IMREAD_GRAYSCALE)
-
- # shrink image
- size = (int(320), int(240))
- shrink = cv2.resize(gray, size, interpolation=cv2.INTER_AREA)
- # lower half of the image
- half_gray = shrink[120:240, :]
- # object detection
- v_param1 = self.obj_detection.detect(self.stop_cascade, gray, image)
- v_param2 = self.obj_detection.detect(self.light_cascade, gray, image)
- cv2.imshow("image", image)
- # cv2.imshow('image', half_gray)
- cv2.imshow('ANN_image', half_gray)
- # reshape image
- image_array = half_gray.reshape(1, 38400).astype(np.float32)
-
- # neural network makes prediction
- prediction = self.predict(image_array)
- # print(prediction)
- # else:
- if self.obj_detection.stop_sign == True or self.obj_detection.red_light == True:
- self.rc_car.stop()
- if cv2.waitKey(1) & 0xFF == ord('q'):
- time.sleep(0.2)
- break
- continue
- self.rc_car.steer(prediction)
- if cv2.waitKey(1) & 0xFF == ord('q'):
- self.rc_car.stop()
- time.sleep(0.2)
- break
- cv2.destroyAllWindows()
- finally:
- self.rc_car.s.close()
- print("Connection closed on thread 1")
- class ThreadServer(object):
- video_thread = VideoStream_Thread()
- video_thread.start()
- if __name__ == '__main__':
- ThreadServer()
复制代码
在PC运行自动驾驶程序,小车就可以在跑道上自动行驶了,经过简单修改,自动驾驶程序可以同样直接运行在PYNQ-Z2开发板上,但是由于图像处理由板载双核ARM Cortex-A9处理器配合512M DDR运行,程序的控制帧率会相对有所下降。
至此,神经网络自动驾驶小车的初级实现就已经全部完成了。
|