想象一下,你打算在一个房间里自动移动几个机器人。您需要每个机器人的精确位置和方向(方向正面指向)。显然,像GPS这样的户外系统不起作用,而且你的预算也很少。你是做什么工作的?

经过一些研究,易于应用的解决方案,我和我的学生决定视觉跟踪我们的机器人。我们在天花板上放了一台摄像机,连续播放下面机器人的视频。剩下的工作是捕捉视频流的帧,搜索内部感兴趣的对象,并为调查结果提供服务。

虽然下面我描述的解决方案适用于所有质量合理的 USB 摄像机和任何计算机,但我们使用了树莓派 3B+ 与树莓相机,以满足我们的预算限制。

使用 OpenCV 跟踪对象

我们最初打算用对象识别算法跟踪我们的机器人,但很快决定节省训练适当系统的时间,而将标记贴在我们的机器人上。开源图像处理库OpenCV附带了一个易于使用的标记检测框架,称为Aruco。此外,如果您对 C/C++编程感到不自在,OpenCV 还包括 Python 绑定。

OpenCV本身是作为pip的包,即使在无头捆绑 opencv-python-headless 。除了 OpenCV 之外,您还需要安装到 OpenCV opencv-contrib-python-headless 的第三方贡献,其中包括 Aruco 库。

阿德里安·罗斯布罗克写了一篇关于安装OpenCV的好帖子,有皮点和一些怪癖。

检测视频流中的标记

我们必须按帧捕获视频帧并搜索这些帧中的标记。虽然 Pi 摄像机可用作 USB 摄像机,但有经过优化的 Python 库 picamera 来获取图像或流式传输视频和捕获帧。

    camera = PiCamera()
    camera.resolution = RESOLUTION # (1008,1008)
    camera.framerate = FRAMERATE # 30

    rawCapture = PiRGBArray(camera, size=RESOLUTION)

    try:
        for capture in camera.capture_continuous(rawCapture, format="bgr", use_video_port=True):
            frame = capture.array
            markers = find_markers(frame)

            # ... do something with the found markers here

            rawCapture.truncate(0) 
    except:
        logging.error("Capturing stopped with an error.");
    finally:
        

上述代码连续从 Pi 摄像机循环捕获帧,并将每个帧传递给我们的 find_markers 功能。我们包装了一些日志记录和错误处理。

标记位置

对于我们的机器人导航,我们需要知道机器人(它的标记)在哪里,相对于相机的视点,以及机器人的去向。因此,我们希望 find_markers 返回一组包含ID、位置(x,y) 和标题(0° 等于北)的分体。

Aruco 支持具有不同分辨率的标记集,如 4×4 或 6×6 网格。有一个相当不错的 OpenCV教程解释 Arcuco 标记,其中还显示了如何创建标记我/阿鲁科根/”rel=”不跟随”目标=”_blank”=标记生成器。

Aruco 提供 detectMarkers 搜索图像中标记的功能。该函数返回每个检测到的标记的多维角数组,以及另一个带有标记标识的数组。在下面的第 15-20 行中,我们从右上角和左下角计算每个标记的中心。生成的数组具有一些额外的维度,因此我们使用 NumPy squeze 之前来减小角数组的尺寸。

    def find_markers(frame):
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        corners, ids, rejectedImgPoints = cv2.aruco.detectMarkers(gray, DICTIONARY, parameters=PARAMETERS)
        rvecs, tvecs, _objPoints = cv2.aruco.estimatePoseSingleMarkers(corners, MARKER_EDGE, CAMERA_MATRIX, DIST_COEFFS)

        result = set()
        if ids is None:
            return result

        for i in range(0, len(ids)):
            try:
                id = str(ids[i][0])

                marker = np.squeeze(corners[i])

                x1,y1 = marker[0]
                x2,y2 = marker[2]
                x = int((x1 + x2)/2)
                y = int((y1 + y2)/2)

                bearing = calc_heading(rvecs[i][0])
                result.add((id, x, y, bearing))
            except Exception:
                traceback.print_exc()

        return result

您可能已经注意到一些常量,我们设置如下:

DICTIONARY = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_250) # our marker dictionary
PARAMETERS = cv2.aruco.DetectorParameters_create() # default params
MARKER_EDGE = 0.05 # not really relevant for our use case; our marker size is 5cm x 5cm

标记标题

检测标记的标题更具挑战性。可能有一个更好的方法来查找基于角的标记的方向;然而,我们使用的是Aruco函数 estimatePoseSingleMarkers ,它尊重相机镜头的失真。因此,该函数需要一个摄像机矩阵和一个具有失真系数的矢量。尽管有一个很好的教程,如何做到这一点,找到相机的相机矩阵和失真系数基于OpenCV的相机校准被证明是最困难的工作。(您可能会发现马克·赫德利·琼斯的棋盘收藏很有用。

这些是我们在多次尝试我们的 pi 相机(镜头 1/4″、光圈 f/1.8、焦距 3.6mm、视角对角 75.7°)后获得的值:

CAMERA_MATRIX = np.array([[2.01976721e+03, 0.00000000e+00, 1.32299009e+03],[0.00000000e+00, 2.00413989e+03, 8.60071427e+02],[0.00000000e+00, 0.00000000e+00, 1.00000000e+00]])
DIST_COEFFS = np.array([-0.63653859,  0.2963752,  0.03228459, 0.0028887, 0.82956323])
# IMPORTANT: this will only work for you if your camera has the same technical specifations

姿势检测返回所有标记的旋转和平移矢量数组。我们使用 函数 calc_heading 从旋转矢量以度数形式获取每个标记的实际标题。旋转矢量在三维中描述标记的旋转。假设我们从上面看标记,我们只对围绕 z 轴的旋转感兴趣。线 2-5 在将旋转矢量转换为旋转矩阵后计算弧度中所有轴的角度

    def angles_from_rvec(rvec):
        r_mat, _jacobian = cv2.Rodrigues(rvec);
        a = math.atan2(r_mat[2][1], r_mat[2][2])
        b = math.atan2(-r_mat[2][0], math.sqrt(math.pow(r_mat[2][1],2) + math.pow(r_mat[2][2],2)))
        c = math.atan2(r_mat[1][0], r_mat[0][0])
        return [a,b,c];

    def calc_heading(rvec):
        angles = angles_from_rvec(rvec);
        degree_angle =  math.degrees(angles[2]);
        if degree_angle < 0:
            degree_angle = 360 + degree_angle
        return degree_angle;

完整跟踪服务

现在,我们有所有部分来组装标记跟踪。我们打算稍后同时将跟踪运行到 Web 套接字服务器。捕获循环是阻塞的,因此我们必须将所有内容放入单独的线程中。Pi Camera 不能由两个进程同时使用,因此我们还必须限制 ThreadPoolExecuter 单个辅助角色访问摄像机的线程数(第 100 行、110 行)。使用 asyncio 模块中的事件循环,我们正确连接所有内容。请注意,每个线程将有一个事件循环。基循环始终存在,但我们必须为捕获线程创建一个新循环(下面第 71-74 行)。我们的服务通过调用异步回调函数 on_track 来通知找到的标记。

import asyncio
import cv2
import concurrent
import logging
import math
import numpy as np

from picamera.array import PiRGBArray
from picamera import PiCamera

RESOLUTION = (1088,1088)
FRAMERATE = 30

DICTIONARY = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_250)

# pi cam
CAMERA_MATRIX = np.array([[2.01976721e+03, 0.00000000e+00, 1.32299009e+03],[0.00000000e+00, 2.00413989e+03, 8.60071427e+02],[0.00000000e+00, 0.00000000e+00, 1.00000000e+00]])
DIST_COEFFS = np.array([-0.63653859,  0.2963752,  0.03228459, 0.0028887, 0.82956323])

PARAMETERS =  cv2.aruco.DetectorParameters_create()
MARKER_EDGE = 0.05

def angles_from_rvec(rvec):
    r_mat, _jacobian = cv2.Rodrigues(rvec)
    a = math.atan2(r_mat[2][1], r_mat[2][2])
    b = math.atan2(-r_mat[2][0], math.sqrt(math.pow(r_mat[2][1],2) + math.pow(r_mat[2][2],2)))
    c = math.atan2(r_mat[1][0], r_mat[0][0])
    return [a,b,c]

def calc_heading(rvec):
    angles = angles_from_rvec(rvec)
    degree_angle =  math.degrees(angles[2])
    if degree_angle < 0:
        degree_angle = 360 + degree_angle
    return degree_angle

def find_markers(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    corners, ids, rejectedImgPoints = cv2.aruco.detectMarkers(gray, DICTIONARY, parameters=PARAMETERS)
    rvecs, tvecs, _objPoints = cv2.aruco.estimatePoseSingleMarkers(corners, MARKER_EDGE, CAMERA_MATRIX, DIST_COEFFS)

    result = set()
    if ids is None:
        return result

    for i in range(0, len(ids)):
        try:
            id = str(ids[i][0])

            marker = np.squeeze(corners[i])

            x0, y0 = marker[0]
            x2, y2 = marker[2]
            x = int((x0 + x2)/2)
            y = int((y0 + y2)/2)

            heading = calc_heading(rvecs[i][0])
            result.add((id, x, y, heading))
        except Exception:
            traceback.print_exc()

    return result

class PositioningSystem:
    def __init__(self, on_update):
        self._on_update = on_update

    def start(self):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(self._capture())
        loop.run_forever()

    async def _capture(self):

        camera = PiCamera()
        camera.resolution = RESOLUTION
        camera.framerate = FRAMERATE

        rawCapture = PiRGBArray(camera, size=RESOLUTION)

        asyncio.sleep(0.1)

        logging.info("Start capturing from pi camera.")
        try:
            for capture in camera.capture_continuous(rawCapture, format="bgr", use_video_port=True):

                frame = capture

截流(0)

用于标记中的标记:
等待自我._on_update(标记)
除了:
日志记录.错误(”捕获停止,出错。
最后:
摄像机.close()

• 创建有限的线程池。
执行器 = 并发.futures.ThreadPool 执行器(max_workers=1)

异步定义轨道(on_track):
ps = 定位系统(on_track)

循环 = 异步.get_event_loop()
循环.run_in_executor(执行器,ps.start)
返回 ps

positioning.py

连接到 Web 套接字

与 Web 套接字服务器的连接是直接的。我们从这里举了一个例子。WebSocket 服务器等待客户端连接,并 on_connect 为每个已建立的连接调用回调行 15-24。WebSocket 仅用于将位置更新推送到客户端,我们接收传入消息,但在此示例中,我们不做出反应。相反,我们将套接字放入一个集中,以便以后能够访问它。

import asyncio
import websockets
import logging
import json
import positioning

PORT = 5678
LOG_LEVEL = "INFO"

logging.basicConfig(level=LOG_LEVEL)

sockets = set()
loop = asyncio.get_event_loop()

async def on_connect(socket, path):
    logging.info("Socket connected...")
    sockets.add(socket)
    try:
        while True:
            message = await socket.recv()
            logging.warning('Ignoring received message: {}', message)
    except:
        sockets.remove(socket)
        logging.info("Socket disconnected (maybe in response to closing handshake)...")

async def on_track(position):
    id, x, y, bearing = position
    message = {
        "id": id,
        "x":x,
        "y":y,
        "bearing":bearing
    }

    for socket in sockets:
        await socket.send(json.dumps(message))

if __name__ == "__main__":
    logging.debug("Starting capture loop...")
    start_positioning = positioning.track(on_track)
    logging.debug("Starting websocket...")
    start_server = websockets.serve(on_connect, port=PORT)

    loop.run_until_complete(start_positioning)
    loop.run_until_complete(start_server)

    logging.info("Started ... ")
    loop.run_forever()

tracking.py

一旦新的标记位置可用, on_track 跟踪系统就会调用功能行 30-38。在这里,我们只需遍迭代所有打开的套接字,并将位置更新作为 JSON 格式的字符串发送。就是这样。

学习和最后发言

在树莓派 (3B+) 上从 Python 运行 OpenCV 图像处理不是个好主意,如果性能至关重要。树莓 PI 4 可能很快就会出现不同。到目前为止,处理分辨率为每秒 1000 x 1000 像素的 10 帧是我们的最高采样率。令人惊讶的是,Pi 和 USB 摄像机并没有表现出太大的差异。此外,标记检测结果也非常容易出错。机器人的光或振动的变化使得检测不可靠。因此,我们开始实施一些对策,例如以采样率为代价对误差求平均值。最后,跟踪的结果相当不错,但机器人位置的更新间隔约为500ms,这需要我们的机器人去真的,非常慢。

因此,我们在机器人上用陀螺仪和加速度计来补充我们的设置。当机器人移动时,我们使用传感器。当机器人静止时,我们会检测到它的绝对位置。这对我们的应用程序是一个技巧。

我愿意接受有关如何改进代码和解决方案的所有建议。如果你是一个学生,你想做更多的这个和类似的项目,然后来柏林,并满足我们在柏林的Designakademie

Comments are closed.