python读取海康RGBD感知相机并解析图像数据
python读取海康RGBD感知相机
情景:
相机:MV-EB435i
海康提供的C++ SDK比较完善,但是python的比较粗糙,给的demo只能得到数据帧
需求:
基于海康提供的python SDK,进一步开发读取RGB和Depth图,并转换成后续任务需要的numpy数组形式
相机分析:
可以使用HiViewer先调试相机,确认相机读取RGBD没问题:下载地址
这些参数可以跟着相机的指南挑一挑,调到结果比较好为止,不过我发现下次打开HiViewer的时候这些参数又会回到默认,所以我猜用sdk读图的时候应该也都是默认的,这里参数改不改可能也没啥影响。只要读图是正常的就行了,你读到的深度图可能不是灰色的,在上面渲染设置里调一下就行了。
还有就是这个灰度图,要拍的东西隔相机稍微远一点才能有,如果离的比较近就是直接全黑,value=0;而且有时候灰度图很不稳定,还很乱,不知道拍的什么东西,可以尝试重新连接相机。把场景搞远一点。
SDK分析:
SDK就在HiViewer的安装目录里面,\HiViewer\Development\Samples\Python这里面
直接把Python文件夹拷贝到你的工作目录。
python的SDK实在是太简陋了,给了轮询读图和回调读图两种,这里我用轮询的代码展示。
# -- coding: utf-8 --
import threading
import msvcrt
import ctypes
import time
import os
from ctypes import *
from Mv3dRgbdImport.Mv3dRgbdDefine import *
from Mv3dRgbdImport.Mv3dRgbdApi import *
from Mv3dRgbdImport.Mv3dRgbdDefine import DeviceType_Ethernet, DeviceType_USB, DeviceType_Ethernet_Vir, DeviceType_USB_Vir, MV3D_RGBD_FLOAT_EXPOSURETIME, \
ParamType_Float, ParamType_Int, ParamType_Enum, CoordinateType_Depth, MV3D_RGBD_FLOAT_Z_UNIT
g_bExit = False
def work_thread(camera=0,pdata=0,nDataSize=0):
while True:
stFrameData=MV3D_RGBD_FRAME_DATA()
ret=camera.MV3D_RGBD_FetchFrame(pointer(stFrameData), 5000)
if ret==0:
for i in range(0, stFrameData.nImageCount):
print("MV3D_RGBD_FetchFrame[%d]:nFrameNum[%d],nDataLen[%d],nWidth[%d],nHeight[%d]" % (
i, stFrameData.stImageData[i].nFrameNum, stFrameData.stImageData[i].nDataLen, stFrameData.stImageData[i].nWidth, stFrameData.stImageData[i].nHeight))
else:
print("no data[0x%x]" % ret)
if g_bExit == True:
break
if __name__ == "__main__":
nDeviceNum=ctypes.c_uint(0)
nDeviceNum_p=byref(nDeviceNum)
# ch:获取设备数量 | en:Get device number
ret=Mv3dRgbd.MV3D_RGBD_GetDeviceNumber(DeviceType_Ethernet | DeviceType_USB | DeviceType_Ethernet_Vir | DeviceType_USB_Vir, nDeviceNum_p)
if ret!=0:
print("MV3D_RGBD_GetDeviceNumber fail! ret[0x%x]" % ret)
os.system('pause')
sys.exit()
if nDeviceNum==0:
print("find no device!")
os.system('pause')
sys.exit()
print("Find devices numbers:", nDeviceNum.value)
stDeviceList = MV3D_RGBD_DEVICE_INFO_LIST()
net = Mv3dRgbd.MV3D_RGBD_GetDeviceList(DeviceType_Ethernet | DeviceType_USB | DeviceType_Ethernet_Vir | DeviceType_USB_Vir, pointer(stDeviceList.DeviceInfo[0]), 20, nDeviceNum_p)
for i in range(0, nDeviceNum.value):
print("\ndevice: [%d]" % i)
strModeName = ""
for per in stDeviceList.DeviceInfo[i].chModelName:
strModeName = strModeName + chr(per)
print("device model name: %s" % strModeName)
strSerialNumber = ""
for per in stDeviceList.DeviceInfo[i].chSerialNumber:
strSerialNumber = strSerialNumber + chr(per)
print("device SerialNumber: %s" % strSerialNumber)
# ch:创建相机示例 | en:Create a camera instance
camera=Mv3dRgbd()
nConnectionNum = input("please input the number of the device to connect:")
if int(nConnectionNum) >= nDeviceNum.value:
print("intput error!")
os.system('pause')
sys.exit()
# ch:打开设备 | en:Open device
ret = camera.MV3D_RGBD_OpenDevice(pointer(stDeviceList.DeviceInfo[int(nConnectionNum)]))
if ret != 0:
print ("MV3D_RGBD_OpenDevice fail! ret[0x%x]" % ret)
os.system('pause')
sys.exit()
# ch:开始取流 | en:Start grabbing
ret=camera.MV3D_RGBD_Start()
if ret != 0:
print ("start fail! ret[0x%x]" % ret)
camera.MV3D_RGBD_CloseDevice()
os.system('pause')
sys.exit()
# ch:获取图像线程 | en:Get image thread
try:
hthreadhandle=threading.Thread(target=work_thread,args=(camera,None,None))
hthreadhandle.start()
except:
print("error: unable to start thread")
os.system('pause')
g_bExit = True
hthreadhandle.join()
# ch:停止取流 | en:Stop grabbing
ret=camera.MV3D_RGBD_Stop()
if ret != 0:
print ("stop fail! ret[0x%x]" % ret)
os.system('pause')
sys.exit()
# ch:销毁句柄 | en:Destroy the device handle
ret=camera.MV3D_RGBD_CloseDevice()
if ret != 0:
print ("CloseDevice fail! ret[0x%x]" % ret)
os.system('pause')
sys.exit()
sys.exit()
运行这个demo,主要流程就是获取设备,会有两个,第一个就是我们要用的设备,input输入0就行了,另一个是虚拟的不管他,如果只有一个虚拟的,那就是设备还没有连上,相机连电脑之后得有个几秒钟才能连接上。
如果你读到了两个,input输入0之后提示下面这个打开设备失败,说明设备被占用了还没有释放,可能是HiViewer占用了,你要在HiViewer断开链接;也有可能是你运行的上一个python demo占用的,但是如果你没有让他正常运行结束,他就没有运行到后面停止取流和关闭链接销毁句柄的代码,所以设备还在被占用,你可以重启python内核重试就行了。
正常的来说你输入0之后相机就会一直取流了,控制台会输出这样的内容:
MV3D_RGBD_FetchFrame[0]:nFrameNum[1],nDataLen[1843200],nWidth[1280],nHeight[720]
MV3D_RGBD_FetchFrame[1]:nFrameNum[1],nDataLen[1843200],nWidth[1280],nHeight[720]
这其实就是一帧的内容,0是Depth图,1是RGB图。
这一段是来自于demo里面的work_thread函数,相机开始取流之后就开启了一个并行的线程,运行这个work_thread,我们看一下这个函数:
def work_thread(camera=0,pdata=0,nDataSize=0):
while True:
stFrameData=MV3D_RGBD_FRAME_DATA()
ret=camera.MV3D_RGBD_FetchFrame(pointer(stFrameData), 5000)
if ret==0:
for i in range(0, stFrameData.nImageCount):
print("MV3D_RGBD_FetchFrame[%d]:nFrameNum[%d],nDataLen[%d],nWidth[%d],nHeight[%d]" % (
i, stFrameData.stImageData[i].nFrameNum, stFrameData.stImageData[i].nDataLen, stFrameData.stImageData[i].nWidth, stFrameData.stImageData[i].nHeight))
else:
print("no data[0x%x]" % ret)
if g_bExit == True:
break
MV3D_RGBD_FetchFrame就是读取当前帧,如果ret返回0就表示ok,会把数据存在stFrameData里面,这个stFrameData是一个MV3D_RGBD_FRAME_DATA的当前这一帧的图像帧数据:
# ch:图像帧数据 | en:Frame Data
_MV3D_RGBD_FRAME_DATA_._fields_ = [
('nImageCount', c_uint), # 图像个数,表示stImage数组的有效个数
('stImageData', MV3D_RGBD_IMAGE_DATA * MV3D_RGBD_MAX_IMAGE_COUNT), # 图像数组,每一个代表一种类型的图像
('nValidInfo', c_uint), # 帧有效信息:0(帧有效),1 << 0(丢包),1 << 1(触发标识符无效)
('nReserved', c_byte * 12), # 保留字节
]
四个成员,这个nImageCount图像个数就是2,一张RGB和一张Depth;stImageData图像数据就是装了RGB和Depth,都是用MV3D_RGBD_IMAGE_DATA类型封装的图像,这个类型长这样:
_MV3D_RGBD_IMAGE_DATA_._fields_ = [
('enImageType', Mv3dRgbdImageType), #~chinese 图像格式 ~english Image format
('nWidth', c_uint), #~chinese 图像宽 ~english Image width
('nHeight', c_uint), #~chinese 图像高 ~english Image height
('pData', POINTER(c_ubyte)), #~chinese 相机输出的图像数据 ~english Image data, which is outputted by the camera
('nDataLen', c_uint), #~chinese 图像数据长度(字节) ~english Image data length (bytes)
('nFrameNum', c_uint), #~chinese 帧号,代表第几帧图像 ~english Frame number, which indicates the frame sequence
('nTimeStamp', int64_t), #~chinese 设备上报的时间戳 (设备上电从0开始,规则详见设备手册)
#~english Timestamp uploaded by the device. It starts from 0 when the device is powered on. Refer to the device user manual for detailed rules
('bIsRectified', c_uint), #~chinese 是否校正 ~english Correction flag
('enStreamType', Mv3dRgbdStreamType), #~chinese 流类型,用于区分图像(图像格式相同时) ~english Data stream type, used to distinguish data in the same image format
('enCoordinateType', Mv3dRgbdCoordinateType), #~chinese 坐标系类型 ~english Coordinates type
('nReserved', c_byte * 4), #~chinese 保留字节 ~english Reserved
]
看备注能看出来里面都存了些什么,我们主要关注两个,enImageType表示存的图像具体是什么格式的,pData表示这个图像data。
所以我们只需要拿到pData,已经格式,通过他是什么格式把pData转换出来就行了。
我们可以找到图像格式的说明:
# Mv3dRgbdImageType
# ch:图像格式 | en:Image Format
ImageType_Undefined = MV3D_RGBD_UNDEFINED #~chinese 未定义 ~english Undefined
ImageType_Mono8 = 0x01080001 # Mono8
ImageType_Mono16 = 0x01100007 # Mono16
ImageType_Depth = 0x011000B8 # C16
ImageType_YUV422 = 0x02100032 # YUV422
ImageType_YUV420SP_NV12 = 0x020C8001 # YUV420SP_NV12
ImageType_YUV420SP_NV21 = 0x020C8002 # YUV420SP_NV21
ImageType_RGB8_Planar = 0x02180021 # RGB8_Planar
ImageType_PointCloud = 0x026000C0 # ABC32f
ImageType_Jpeg = 0x80180001 # Jpeg
ImageType_Rgbd = 0x82283007 # RGBD
可以试试在work_thread里打印这个图像帧的enImageType值,会发现每一个图像帧的两个图片,第一张图片是17825976,第二张图片是34603058,print出来是10进制的,转换成16进制对比上面这些,会发现第一张图就是ImageType_Depth,第二张是ImageType_YUV422,所以第一张是深度图,第二张是RGB图,并且分别是C16和YUV422格式保存的。
接下来就把pData进行转换就行了,但是这个pData,虽然逻辑是C16和YUV422格式的,但是你拿到的stFrameData.stImageData[i].pData实际上是lp_c_ubyte类型的,并且shape是(1843200,)所以还得先把lp_c_ubyte类型转换成numpy,得到(1843200,)的numpy,而我们要的图像是1280×720的,而这个(1843200,)实际上可以换作是2x1280x720,很头晕但是没事。
得到Depth图:
先转Depth的,我不保证我的做法是对的,但是我的结果看起来是没问题的。
Depth的1843200我尝试分成2x1280x720之后取第一个1280×720和第二个1280×720,分别得到下面这两个:
并惊奇的发现,他们俩居然可以拼起来成为一张不太正确,但是和正确的Depth很像的图。
后来我尝试把1843200直接分成1280*2 x 720,然后再直接用cv2.INTER_AREA把它resize成1280×720,没想到得到了一张很合理的Depth图,然后保存成gray格式的,就ok了。
得到RGB图:
既然是YUV442的格式,就很明确了,先提取YUV分量,然后y、u、v分量拼起来用cv2提供的接口直接转成RGB就行了。具体就是:
提取y、u、v分量的函数:
# 从 YUV422 数据中提取 Y、U、V 分量
def extract_yuv422(np_data, rows, cols):
"""
:param np_data: YUV422 格式的 numpy 数据
:param rows: 图像的行数
:param cols: 图像的列数
:return: Y, U, V 分量
"""
# YUV422 每像素 2 字节,因此数据大小应为 rows * cols * 2
assert len(np_data) == rows * cols * 2, "数据大小与分辨率不匹配!"
# 提取 Y、U、V 分量
y = np_data[0::2].reshape((rows, cols)) # Y 分量
u = np_data[1::4].reshape((rows, cols // 2)) # U 分量
v = np_data[3::4].reshape((rows, cols // 2)) # V 分量
# 对 U 和 V 进行插值或扩展,使其宽度与 Y 匹配
u_expanded = cv2.resize(u, (cols, rows), interpolation=cv2.INTER_CUBIC)
v_expanded = cv2.resize(v, (cols, rows), interpolation=cv2.INTER_CUBIC)
return y, u_expanded, v_expanded
# return y,u,v
为了后面能拼起来,这里我对u和v插值resize到和y一样的。
然后拼起来转成RGB:
yuv_image = np.stack((y, u, v), axis=-1)
rgb_image = cv2.cvtColor(yuv_image, cv2.COLOR_YUV2RGB)
至此,RGBD图就都拿到了,numpy格式,可以直接show、save或者用作后续模型预测。
成果展示:
这个黑边用HiViewer读也有,不知道这么搞的,不过问题不大。
最后附上修改好的work_thread()函数:
def work_thread(camera=0,pdata=0,nDataSize=0):
total = 1
while total>0:
total = total-1
stFrameData=MV3D_RGBD_FRAME_DATA()
ret=camera.MV3D_RGBD_FetchFrame(pointer(stFrameData), 5000)
if ret==0:
for i in range(0, stFrameData.nImageCount):
print('------------------------------------------')
# stFrameData.stImageData[i]这是一张图MV3D_RGBD_IMAGE_DATA,0101RGBD
print("MV3D_RGBD_FetchFrame[%d]:nFrameNum[%d],nDataLen[%d],nWidth[%d],nHeight[%d]" % (
i, stFrameData.stImageData[i].nFrameNum, stFrameData.stImageData[i].nDataLen, stFrameData.stImageData[i].nWidth, stFrameData.stImageData[i].nHeight))
# print('流类型:',stFrameData.stImageData[i].enStreamType)
print('矫正:', stFrameData.stImageData[i].bIsRectified)
print('图像数据:', stFrameData.stImageData[i].pData)
data = stFrameData.stImageData[i].pData
rows = 720
cols = 1280
size = 1280 * 720 * 2
if(i==0):
nHeight = rows
nWidth = cols
pData = data
# 使用 ctypes 将 LP_c_ubyte 指针转换为 NumPy 数组
np_data = lp_c_ubyte_to_numpy(data, stFrameData.stImageData[i].nDataLen)
# 将数据类型转换为 16 位无符号整数,如果需要的话(根据原代码使用 CV_16UC1)
# image_data = image_data.astype(np.uint16)
np_data = np_data.astype(np.uint16)
# mCvmat = image_data # 直接使用 NumPy 数组
# print(np_data.shape)
# depth_image = mCvmat.reshape((720, 1280))
np_data = np_data.reshape((720, 2*1280))
np_data = cv2.resize(np_data, (1280, 720), interpolation=cv2.INTER_AREA)
# print(depth_image.shape)
# plt.imsave('dasdas.jpg',depth_image)
plt.imsave('depth_image.png',np_data, cmap='gray')
# plt.imshow(depth_image, cmap='gray')
# plt.colorbar()
# plt.show()
if(i==1):
# 转换为 numpy 数据
np_data = lp_c_ubyte_to_numpy(data, size)
print(np_data.shape)
# 提取 YUV 分量
y, u, v = extract_yuv422(np_data, rows, cols)
print("U component stats:", np.min(u), np.max(u), np.mean(u))
print("V component stats:", np.min(v), np.max(v), np.mean(v))
print(u.shape)
print(v.shape)
# cv2直接转换 YUV 到 RGB(这个接口用的是420不是422,但是结果也ok)
yuv_image = np.stack((y, u, v), axis=-1)
rgb_image = cv2.cvtColor(yuv_image, cv2.COLOR_YUV2RGB)
bgr_image = cv2.cvtColor(rgb_image, cv2.COLOR_RGB2BGR)
# 手动转换为 RGB(试的方法很慢)
# print(y.shape[:2])
# rgb_image = yuv422_to_rgb(y, u, v)
plt.imsave('rgb_image.png',rgb_image)
print("RGB stats:", np.min(rgb_image), np.max(rgb_image), np.mean(rgb_image))
# 显示图像
# cv2.imshow("U Component", u)
# cv2.imshow("V Component", v)
# cv2.imshow("Y Component", y) # 显示 Y 分量
# cv2.imshow("RGB Image", bgr_image) # 显示 RGB 图像
# cv2.waitKey(0)
# cv2.destroyAllWindows()
print('图像格式:', stFrameData.stImageData[i].enImageType)
# 01分别是:
'''
ImageType_Depth = 0x011000B8
ImageType_YUV422 = 0x02100032
'''
else:
print("no data[0x%x]" % ret)
if g_bExit == True:
break
# 停止取流
ret=camera.MV3D_RGBD_Stop()
# ch:销毁句柄 | en:Destroy the device handle
ret=camera.MV3D_RGBD_CloseDevice()
记得每次运行完如果报错了,设备会一直被占用,重启python内核就行。或者在try catch里停止取流,销毁句柄。
作者:我认为可以!