想象一下,你打算在一个房间里自动移动几个机器人。您需要每个机器人的精确位置和方向(方向正面指向)。显然,像GPS这样的户外系统不起作用,而且你的预算也很少。你是做什么工作的?
经过一些研究,易于应用的解决方案,我和我的学生决定视觉跟踪我们的机器人。我们在天花板上放了一台摄像机,连续播放下面机器人的视频。剩下的工作是捕捉视频流的帧,搜索内部感兴趣的对象,并为调查结果提供服务。
虽然下面我描述的解决方案适用于所有质量合理的 USB 摄像机和任何计算机,但我们使用了树莓派 3B+ 与树莓相机,以满足我们的预算限制。
使用 OpenCV 跟踪对象
我们最初打算用对象识别算法跟踪我们的机器人,但很快决定节省训练适当系统的时间,而将标记贴在我们的机器人上。开源图像处理库OpenCV附带了一个易于使用的标记检测框架,称为Aruco。此外,如果您对 C/C++编程感到不自在,OpenCV 还包括 Python 绑定。
OpenCV本身是作为pip的包,即使在无头捆绑 opencv-python-headless
。除了 OpenCV 之外,您还需要安装到 OpenCV opencv-contrib-python-headless
的第三方贡献,其中包括 Aruco 库。
阿德里安·罗斯布罗克写了一篇关于安装OpenCV的好帖子,有皮点和一些怪癖。
检测视频流中的标记
我们必须按帧捕获视频帧并搜索这些帧中的标记。虽然 Pi 摄像机可用作 USB 摄像机,但有经过优化的 Python 库 picamera
来获取图像或流式传输视频和捕获帧。
camera = PiCamera()
camera.resolution = RESOLUTION # (1008,1008)
camera.framerate = FRAMERATE # 30
rawCapture = PiRGBArray(camera, size=RESOLUTION)
try:
for capture in camera.capture_continuous(rawCapture, format="bgr", use_video_port=True):
frame = capture.array
markers = find_markers(frame)
# ... do something with the found markers here
rawCapture.truncate(0)
except:
logging.error("Capturing stopped with an error.");
finally:
上述代码连续从 Pi 摄像机循环捕获帧,并将每个帧传递给我们的 find_markers
功能。我们包装了一些日志记录和错误处理。
标记位置
对于我们的机器人导航,我们需要知道机器人(它的标记)在哪里,相对于相机的视点,以及机器人的去向。因此,我们希望 find_markers
返回一组包含ID、位置(x,y) 和标题(0° 等于北)的分体。
Aruco 支持具有不同分辨率的标记集,如 4×4 或 6×6 网格。有一个相当不错的 OpenCV教程解释 Arcuco 标记,其中还显示了如何创建标记我/阿鲁科根/”rel=”不跟随”目标=”_blank”=标记生成器。
Aruco 提供 detectMarkers
搜索图像中标记的功能。该函数返回每个检测到的标记的多维角数组,以及另一个带有标记标识的数组。在下面的第 15-20 行中,我们从右上角和左下角计算每个标记的中心。生成的数组具有一些额外的维度,因此我们使用 NumPy squeze
之前来减小角数组的尺寸。
def find_markers(frame):
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
corners, ids, rejectedImgPoints = cv2.aruco.detectMarkers(gray, DICTIONARY, parameters=PARAMETERS)
rvecs, tvecs, _objPoints = cv2.aruco.estimatePoseSingleMarkers(corners, MARKER_EDGE, CAMERA_MATRIX, DIST_COEFFS)
result = set()
if ids is None:
return result
for i in range(0, len(ids)):
try:
id = str(ids[i][0])
marker = np.squeeze(corners[i])
x1,y1 = marker[0]
x2,y2 = marker[2]
x = int((x1 + x2)/2)
y = int((y1 + y2)/2)
bearing = calc_heading(rvecs[i][0])
result.add((id, x, y, bearing))
except Exception:
traceback.print_exc()
return result
您可能已经注意到一些常量,我们设置如下:
DICTIONARY = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_250) # our marker dictionary
PARAMETERS = cv2.aruco.DetectorParameters_create() # default params
MARKER_EDGE = 0.05 # not really relevant for our use case; our marker size is 5cm x 5cm
标记标题
检测标记的标题更具挑战性。可能有一个更好的方法来查找基于角的标记的方向;然而,我们使用的是Aruco函数 estimatePoseSingleMarkers
,它尊重相机镜头的失真。因此,该函数需要一个摄像机矩阵和一个具有失真系数的矢量。尽管有一个很好的教程,如何做到这一点,找到相机的相机矩阵和失真系数基于OpenCV的相机校准被证明是最困难的工作。(您可能会发现马克·赫德利·琼斯的棋盘收藏很有用。
这些是我们在多次尝试我们的 pi 相机(镜头 1/4″、光圈 f/1.8、焦距 3.6mm、视角对角 75.7°)后获得的值:
CAMERA_MATRIX = np.array([[2.01976721e+03, 0.00000000e+00, 1.32299009e+03],[0.00000000e+00, 2.00413989e+03, 8.60071427e+02],[0.00000000e+00, 0.00000000e+00, 1.00000000e+00]])
DIST_COEFFS = np.array([-0.63653859, 0.2963752, 0.03228459, 0.0028887, 0.82956323])
# IMPORTANT: this will only work for you if your camera has the same technical specifations
姿势检测返回所有标记的旋转和平移矢量数组。我们使用 函数 calc_heading
从旋转矢量以度数形式获取每个标记的实际标题。旋转矢量在三维中描述标记的旋转。假设我们从上面看标记,我们只对围绕 z 轴的旋转感兴趣。线 2-5 在将旋转矢量转换为旋转矩阵后计算弧度中所有轴的角度
def angles_from_rvec(rvec):
r_mat, _jacobian = cv2.Rodrigues(rvec);
a = math.atan2(r_mat[2][1], r_mat[2][2])
b = math.atan2(-r_mat[2][0], math.sqrt(math.pow(r_mat[2][1],2) + math.pow(r_mat[2][2],2)))
c = math.atan2(r_mat[1][0], r_mat[0][0])
return [a,b,c];
def calc_heading(rvec):
angles = angles_from_rvec(rvec);
degree_angle = math.degrees(angles[2]);
if degree_angle < 0:
degree_angle = 360 + degree_angle
return degree_angle;
完整跟踪服务
现在,我们有所有部分来组装标记跟踪。我们打算稍后同时将跟踪运行到 Web 套接字服务器。捕获循环是阻塞的,因此我们必须将所有内容放入单独的线程中。Pi Camera 不能由两个进程同时使用,因此我们还必须限制 ThreadPoolExecuter
单个辅助角色访问摄像机的线程数(第 100 行、110 行)。使用 asyncio
模块中的事件循环,我们正确连接所有内容。请注意,每个线程将有一个事件循环。基循环始终存在,但我们必须为捕获线程创建一个新循环(下面第 71-74 行)。我们的服务通过调用异步回调函数 on_track
来通知找到的标记。
import asyncio
import cv2
import concurrent
import logging
import math
import numpy as np
from picamera.array import PiRGBArray
from picamera import PiCamera
RESOLUTION = (1088,1088)
FRAMERATE = 30
DICTIONARY = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_250)
# pi cam
CAMERA_MATRIX = np.array([[2.01976721e+03, 0.00000000e+00, 1.32299009e+03],[0.00000000e+00, 2.00413989e+03, 8.60071427e+02],[0.00000000e+00, 0.00000000e+00, 1.00000000e+00]])
DIST_COEFFS = np.array([-0.63653859, 0.2963752, 0.03228459, 0.0028887, 0.82956323])
PARAMETERS = cv2.aruco.DetectorParameters_create()
MARKER_EDGE = 0.05
def angles_from_rvec(rvec):
r_mat, _jacobian = cv2.Rodrigues(rvec)
a = math.atan2(r_mat[2][1], r_mat[2][2])
b = math.atan2(-r_mat[2][0], math.sqrt(math.pow(r_mat[2][1],2) + math.pow(r_mat[2][2],2)))
c = math.atan2(r_mat[1][0], r_mat[0][0])
return [a,b,c]
def calc_heading(rvec):
angles = angles_from_rvec(rvec)
degree_angle = math.degrees(angles[2])
if degree_angle < 0:
degree_angle = 360 + degree_angle
return degree_angle
def find_markers(frame):
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
corners, ids, rejectedImgPoints = cv2.aruco.detectMarkers(gray, DICTIONARY, parameters=PARAMETERS)
rvecs, tvecs, _objPoints = cv2.aruco.estimatePoseSingleMarkers(corners, MARKER_EDGE, CAMERA_MATRIX, DIST_COEFFS)
result = set()
if ids is None:
return result
for i in range(0, len(ids)):
try:
id = str(ids[i][0])
marker = np.squeeze(corners[i])
x0, y0 = marker[0]
x2, y2 = marker[2]
x = int((x0 + x2)/2)
y = int((y0 + y2)/2)
heading = calc_heading(rvecs[i][0])
result.add((id, x, y, heading))
except Exception:
traceback.print_exc()
return result
class PositioningSystem:
def __init__(self, on_update):
self._on_update = on_update
def start(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._capture())
loop.run_forever()
async def _capture(self):
camera = PiCamera()
camera.resolution = RESOLUTION
camera.framerate = FRAMERATE
rawCapture = PiRGBArray(camera, size=RESOLUTION)
asyncio.sleep(0.1)
logging.info("Start capturing from pi camera.")
try:
for capture in camera.capture_continuous(rawCapture, format="bgr", use_video_port=True):
frame = capture
截流(0)
用于标记中的标记:
等待自我._on_update(标记)
除了:
日志记录.错误(”捕获停止,出错。
最后:
摄像机.close()
• 创建有限的线程池。
执行器 = 并发.futures.ThreadPool 执行器(max_workers=1)
异步定义轨道(on_track):
ps = 定位系统(on_track)
循环 = 异步.get_event_loop()
循环.run_in_executor(执行器,ps.start)
返回 ps
positioning.py
连接到 Web 套接字
与 Web 套接字服务器的连接是直接的。我们从这里举了一个例子。WebSocket 服务器等待客户端连接,并 on_connect
为每个已建立的连接调用回调行 15-24。WebSocket 仅用于将位置更新推送到客户端,我们接收传入消息,但在此示例中,我们不做出反应。相反,我们将套接字放入一个集中,以便以后能够访问它。
import asyncio
import websockets
import logging
import json
import positioning
PORT = 5678
LOG_LEVEL = "INFO"
logging.basicConfig(level=LOG_LEVEL)
sockets = set()
loop = asyncio.get_event_loop()
async def on_connect(socket, path):
logging.info("Socket connected...")
sockets.add(socket)
try:
while True:
message = await socket.recv()
logging.warning('Ignoring received message: {}', message)
except:
sockets.remove(socket)
logging.info("Socket disconnected (maybe in response to closing handshake)...")
async def on_track(position):
id, x, y, bearing = position
message = {
"id": id,
"x":x,
"y":y,
"bearing":bearing
}
for socket in sockets:
await socket.send(json.dumps(message))
if __name__ == "__main__":
logging.debug("Starting capture loop...")
start_positioning = positioning.track(on_track)
logging.debug("Starting websocket...")
start_server = websockets.serve(on_connect, port=PORT)
loop.run_until_complete(start_positioning)
loop.run_until_complete(start_server)
logging.info("Started ... ")
loop.run_forever()
tracking.py
一旦新的标记位置可用, on_track
跟踪系统就会调用功能行 30-38。在这里,我们只需遍迭代所有打开的套接字,并将位置更新作为 JSON 格式的字符串发送。就是这样。
学习和最后发言
在树莓派 (3B+) 上从 Python 运行 OpenCV 图像处理不是个好主意,如果性能至关重要。树莓 PI 4 可能很快就会出现不同。到目前为止,处理分辨率为每秒 1000 x 1000 像素的 10 帧是我们的最高采样率。令人惊讶的是,Pi 和 USB 摄像机并没有表现出太大的差异。此外,标记检测结果也非常容易出错。机器人的光或振动的变化使得检测不可靠。因此,我们开始实施一些对策,例如以采样率为代价对误差求平均值。最后,跟踪的结果相当不错,但机器人位置的更新间隔约为500ms,这需要我们的机器人去真的,非常慢。
因此,我们在机器人上用陀螺仪和加速度计来补充我们的设置。当机器人移动时,我们使用传感器。当机器人静止时,我们会检测到它的绝对位置。这对我们的应用程序是一个技巧。
我愿意接受有关如何改进代码和解决方案的所有建议。如果你是一个学生,你想做更多的这个和类似的项目,然后来柏林,并满足我们在柏林的Designakademie