【小贪】Kinect V2相机标定、图像采集和处理教程(Python)

完整项目链接:GitHub – xfliu1998/kinectProject

Kinect V2相机标定

使用张正友标定法对Kinect V2相机标定。原理及参考代码见以下链接:

  • 单目相机标定实现–张正友标定法:https://blog.csdn.net/weixin_43763292/article/details/128546103?spm=1001.2014.3001.5506
  • python利用opencv进行相机标定(完全版):https://blog.csdn.net/dgut_guangdian/article/details/107467070?spm=1001.2014.3001.5506
  • Kinect2.0-Python调用-PyKinect2:https://blog.csdn.net/zcz0101/article/details/115718427?spm=1001.2014.3001.5506
    1. 导入需要的包
    import os
    import time
    import datetime
    import glob as gb
    import h5py
    import keyboard
    import cv2
    import numpy as np
    from utils.calibration import Calibrator
    from utils.kinect import Kinect
    
    1. 拍摄不同角度的标定图。取照要求:不同角度不同位置拍摄(10-20)张标定图,棋盘格的视野在整张图的1/4~2/3左右。以下函数默认拍摄20张图,按空格键拍摄照片,按q键结束拍摄,拍的标定图会保存到指定路径。
    def get_chess_image(image_num=20):
        out_path = './data/chess/'
        if not os.path.exists(out_path):
            os.makedirs(out_path)
        camera = cv2.VideoCapture(0)
        i = 0
        while 1:
            (grabbed, img) = camera.read()
            cv2.imshow('img', img)
            if cv2.waitKey(1) & keyboard.is_pressed('space'):  # press space to save an image
                i += 1
                firename = str(f'{out_path}img{str(i)}.jpg')
                cv2.imwrite(firename, img)
                print('write: ', firename)
            if cv2.waitKey(1) & 0xFF == ord('q') or i == image_num:  # press q to finish
                break
    
    1. 对相机标定。函数将相机的内参和畸变系数输出至指定路径,同时可以获得每张标定图的外参矩阵。函数需要用到标定类,代码在utils包中。
    def camera_calibrator(shape_inner_corner=(11, 8), size_grid=0.025):
        '''
        :param shape_inner_corner: checkerboard size = 12*9
        :param size_grid: the length of the sides of the checkerboard = 25mm
        '''
        chess_dir = "./data/chess"
        out_path = "./data/dedistortion/chess"
        if not os.path.exists(out_path):
            os.makedirs(out_path)
        # create calibrator
        calibrator = Calibrator(chess_dir, shape_inner_corner, size_grid)
        # calibrate the camera
        mat_intri, coff_dis = calibrator.calibrate_camera()
        np.save('./data/intrinsic_matrix.npy', mat_intri)
        np.save('./data/distortion_cofficients.npy', coff_dis)
        print("intrinsic matrix: \n {}".format(mat_intri))
        print("distortion cofficients: \n {}".format(coff_dis))  # (k_1, k_2, p_1, p_2, k_3)
        # dedistortion
        calibrator.dedistortion(chess_dir, out_path)
        return mat_intri, coff_dis
    
    import os
    import glob
    import cv2
    import numpy as np
    
    
    class Calibrator(object):
        def __init__(self, img_dir, shape_inner_corner, size_grid, visualization=True):
            """
            --parameters--
            img_dir: the directory that save images for calibration, str
            shape_inner_corner: the shape of inner corner, Array of int, (h, w)
            size_grid: the real size of a grid in calibrator, float
            visualization: whether visualization, bool
            """
            self.img_dir = img_dir
            self.shape_inner_corner = shape_inner_corner
            self.size_grid = size_grid
            self.visualization = visualization
            self.mat_intri = None   # intrinsic matrix
            self.coff_dis = None    # cofficients of distortion
    
            # create the conner in world space
            w, h = shape_inner_corner
            # cp_int: save the coordinate of corner points in world space in 'int' form. like (0,0,0), ...., (10,7,0)
            cp_int = np.zeros((w * h, 3), np.float32)
            cp_int[:, :2] = np.mgrid[0:w, 0:h].T.reshape(-1, 2)
            # cp_world: corner point in world space, save the coordinate of corner points in world space
            self.cp_world = cp_int * size_grid
    
            # images
            self.img_paths = []
            for extension in ["jpg", "png", "jpeg"]:
                self.img_paths += glob.glob(os.path.join(img_dir, "*.{}".format(extension)))
            assert len(self.img_paths), "No images for calibration found!"
    
        def calibrate_camera(self):
            w, h = self.shape_inner_corner
            # criteria: only for subpix calibration, which is not used here
            # criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)
            points_world = []   # the points in world space
            points_pixel = []   # the points in pixel space (relevant to points_world)
            for img_path in self.img_paths:
                img = cv2.imread(img_path)
                gray_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
                # find the corners, cp_img: corner points in pixel space
                ret, cp_img = cv2.findChessboardCorners(gray_img, (w, h), None)
                # if ret is True, save
                if ret:
                    # cv2.cornerSubPix(gray_img, cp_img, (11,11), (-1,-1), criteria)
                    points_world.append(self.cp_world)
                    points_pixel.append(cp_img)
                    # view the corners
                    if self.visualization:
                        cv2.drawChessboardCorners(img, (w, h), cp_img, ret)
                        _, img_name = os.path.split(img_path)
                        cv2.imwrite(os.path.join(self.img_dir, f"dedistortion/coner_detect/{img_name}"), img)
                        cv2.imshow('FoundCorners', img)
                        cv2.waitKey(500)
    
            # calibrate the camera
            ret, mat_intri, coff_dis, v_rot, v_trans = cv2.calibrateCamera(points_world, points_pixel, gray_img.shape[::-1], None, None)
            # print("ret: {}".format(ret))
            # print("intrinsic matrix: \n {}".format(mat_intri))
            # # in the form of (k_1, k_2, p_1, p_2, k_3)
            # print("distortion cofficients: \n {}".format(coff_dis))
            # print("rotation vectors: \n {}".format(v_rot))
            # print("translation vectors: \n {}".format(v_trans))
    
            # calculate the error of reproject
            total_error = 0
            for i in range(len(points_world)):
                points_pixel_repro, _ = cv2.projectPoints(points_world[i], v_rot[i], v_trans[i], mat_intri, coff_dis)
                error = cv2.norm(points_pixel[i], points_pixel_repro, cv2.NORM_L2) / len(points_pixel_repro)
                total_error += error
            # print("Average error of reproject: {}".format(total_error / len(points_world)))
    
            self.mat_intri = mat_intri
            self.coff_dis = coff_dis
            return mat_intri, coff_dis
    
        def dedistortion(self, img_dir, save_dir):
            if not os.path.exists(save_dir):
                os.makedirs(save_dir)
            # if not calibrated, calibrate first
            if self.mat_intri is None:
                assert self.coff_dis is None
                self.calibrate_camera()
    
            img_paths = []
            for extension in ["jpg", "png", "jpeg"]:
                img_paths += glob.glob(os.path.join(img_dir, "*.{}".format(extension)))
    
            for img_path in img_paths:
                _, img_name = os.path.split(img_path)
                img = cv2.imread(img_path)
                h, w = img.shape[0], img.shape[1]
                newcameramtx, roi = cv2.getOptimalNewCameraMatrix(self.mat_intri, self.coff_dis, (w, h), 1, (w, h))
                dst = cv2.undistort(img, self.mat_intri, self.coff_dis, None, newcameramtx)
                # clip the data
                # x, y, w, h = roi
                # dst = dst[y:y+h, x:x+w]
                cv2.imwrite(os.path.join(save_dir, img_name), dst)
            # print("Dedistorted images have been saved to: {}".format(save_dir))
    

    Kinect V2相机获取图像及图像预处理

    1. 使用Kinect V2相机拍摄RGB-D数据流。设定拍摄目标的名字name,运行函数后延时1s开始拍摄,按ESC键结束拍摄。拍摄的RGB-D数据首先保存到指定路径下的h5文件中,同时输出拍摄的时间。拍摄Kinect图像需要用到Kinect类,代码在utils包中。
    def capture_image(name):
        file_name = './data/h5/' + name + '.h5'
        if os.path.exists(file_name):
            os.remove(file_name)
        open(file_name, "x")
        f = h5py.File(file_name, 'a')
        i = 1
        kinect = Kinect()
    
        time.sleep(1)
        s = time.time()
        while 1:
            data = kinect.get_the_data_of_color_depth_infrared_image()
            # save data
            f.create_dataset(str(i), data=data[0])
            f.create_dataset(str(i+1), data=data[1])
            i += 2
            cv2.imshow('kinect', data[0])
            cv2.waitKey(1)
            if keyboard.is_pressed('esc'):  # press ESC to exit
                break
        print('record time: %f s' % (time.time() - s))
        return file_name
    
    from pykinect2 import PyKinectV2
    from pykinect2.PyKinectV2 import *
    from pykinect2 import PyKinectRuntime
    import numpy as np
    import matplotlib.pyplot as plt
    import cv2 as cv
    import ctypes
    import math
    import time
    import copy
    import keyboard
    
    
    class Kinect(object):
    
        def __init__(self):
            self._kinect = PyKinectRuntime.PyKinectRuntime(PyKinectV2.FrameSourceTypes_Color | PyKinectV2.FrameSourceTypes_Depth | PyKinectV2.FrameSourceTypes_Infrared)
            self.color_frame = None
            self.depth_frame = None
            self.infrared_frame = None
            self.w_color = 1920
            self.h_color = 1080
            self.w_depth = 512
            self.h_depth = 424
            self.csp_type = _ColorSpacePoint * np.int(self.w_color * self.h_color)
            self.csp = ctypes.cast(self.csp_type(), ctypes.POINTER(_DepthSpacePoint))
            self.color = None
            self.depth = None
            self.infrared = None
            self.first_time = True
    
        # Copying this image directly in python is not as efficient as getting another frame directly from C++
        """Get the latest color data"""
        def get_the_last_color(self):
            if self._kinect.has_new_color_frame():
                # the obtained image data is 2d and needs to be converted to the desired format
                frame = self._kinect.get_last_color_frame()
                # return 4 channels, and one channel is not registered
                gbra = frame.reshape([self._kinect.color_frame_desc.Height, self._kinect.color_frame_desc.Width, 4])
                # remove color image data, the default is that the mirror image needs to be flipped
                color_frame = gbra[..., :3][:, ::-1, :]
                return color_frame
    
        """Get the latest depth data"""
        def get_the_last_depth(self):
            if self._kinect.has_new_depth_frame():
                frame = self._kinect.get_last_depth_frame()
                depth_frame_all = frame.reshape([self._kinect.depth_frame_desc.Height, self._kinect.depth_frame_desc.Width])
                self.depth_frame = depth_frame_all[:, ::-1]
                return self.depth_frame
    
        """Get the latest infrared data"""
        def get_the_last_infrared(self, Infrared_threshold = 16000):
            if self._kinect.has_new_infrared_frame():
                frame = self._kinect.get_last_infrared_frame()
                image_infrared_all = frame.reshape([self._kinect.infrared_frame_desc.Height, self._kinect.infrared_frame_desc.Width])
                # image_infrared_all[image_infrared_all > Infrared_threshold] = 0
                # image_infrared_all = image_infrared_all / Infrared_threshold * 255
                self.infrared_frame = image_infrared_all[:, ::-1]
                return self.infrared_frame
    
        """Match the depth pixels into the color image"""
        def map_depth_points_to_color_points(self, depth_points):
            depth_points = [self.map_depth_point_to_color_point(x) for x in depth_points]
            return depth_points
    
        def map_depth_point_to_color_point(self, depth_point):
            global valid
            depth_point_to_color = copy.deepcopy(depth_point)
            n = 0
            while 1:
                self.get_the_last_depth()
                self.get_the_last_color()
                color_point = self._kinect._mapper.MapDepthPointToColorSpace(_DepthSpacePoint(511 - depth_point_to_color[1], depth_point_to_color[0]), self.depth_frame[depth_point_to_color[0], 511 - depth_point_to_color[1]])
                if math.isinf(float(color_point.y)):
                    n += 1
                    if n >= 10000:
                        color_point = [0, 0]
                        break
                else:
                    color_point = [np.int0(color_point.y), 1920 - np.int0(color_point.x)]  # image coordinates, human eye Angle
                    valid += 1
                    print('valid number:', valid)
                    break
            return color_point
    
        """Map an array of color pixels into a depth image"""
        def map_color_points_to_depth_points(self, color_points):
            self.get_the_last_depth()
            self.get_the_last_color()
            self._kinect._mapper.MapColorFrameToDepthSpace(ctypes.c_uint(512 * 424), self._kinect._depth_frame_data, ctypes.c_uint(1920 * 1080), self.csp)
            depth_points = [self.map_color_point_to_depth_point(x, True) for x in color_points]
            return depth_points
    
        def map_color_point_to_depth_point(self, color_point, if_call_flg=False):
            n = 0
            color_point_to_depth = copy.deepcopy(color_point)
            color_point_to_depth[1] = 1920 - color_point_to_depth[1]
            while 1:
                self.get_the_last_depth()
                self.get_the_last_color()
                if not if_call_flg:
                    self._kinect._mapper.MapColorFrameToDepthSpace(ctypes.c_uint(512 * 424), self._kinect._depth_frame_data, ctypes.c_uint(1920 * 1080), self.csp)
                if math.isinf(float(self.csp[color_point_to_depth[0] * 1920 + color_point_to_depth[1] - 1].y)) \
                        or np.isnan(self.csp[color_point_to_depth[0] * 1920 + color_point_to_depth[1] - 1].y):
                    n += 1
                    if n >= 10000:
                        print('Color mapping depth, invalid points')
                        depth_point = [0, 0]
                        break
                else:
                    try:
                        depth_point = [np.int0(self.csp[color_point_to_depth[0] * 1920 + color_point_to_depth[1] - 1].y),
                                       np.int0(self.csp[color_point_to_depth[0] * 1920 + color_point_to_depth[1] - 1].x)]
                    except OverflowError as e:
                        print('Color mapping depth, invalid points')
                        depth_point = [0, 0]
                    break
            depth_point[1] = 512 - depth_point[1]
            return depth_point
    
        """Get the latest color and depth images as well as infrared images"""
        def get_the_data_of_color_depth_infrared_image(self):
            time_s = time.time()
            if self.first_time:
                while 1:
                    n = 0
                    self.color = self.get_the_last_color()
                    n += 1
    
                    self.depth = self.get_the_last_depth()
                    n += 1
    
                    if self._kinect.has_new_infrared_frame():
                        frame = self._kinect.get_last_infrared_frame()
                        image_infrared_all = frame.reshape([self._kinect.depth_frame_desc.Height, self._kinect.depth_frame_desc.Width])
                        self.infrared = image_infrared_all[:, ::-1]
                        n += 1
    
                    t = time.time() - time_s
                    if n == 3:
                        self.first_time = False
                        break
                    elif t > 5:
                        print('No image data is obtained, please check that the Kinect2 connection is normal')
                        break
            else:
                if self._kinect.has_new_color_frame():
                    frame = self._kinect.get_last_color_frame()
                    gbra = frame.reshape([self._kinect.color_frame_desc.Height, self._kinect.color_frame_desc.Width, 4])
                    gbr = gbra[:, :, :3][:, ::-1, :]
                    self.color = gbr
    
                if self._kinect.has_new_depth_frame():
                    frame = self._kinect.get_last_depth_frame()
                    image_depth_all = frame.reshape([self._kinect.depth_frame_desc.Height, self._kinect.depth_frame_desc.Width])
                    depth = image_depth_all[:, ::-1]
                    self.depth = depth
    
                if self._kinect.has_new_infrared_frame():
                    frame = self._kinect.get_last_infrared_frame()
                    image_infrared_all = frame.reshape([self._kinect.depth_frame_desc.Height, self._kinect.depth_frame_desc.Width])
                    self.infrared = image_infrared_all[:, ::-1]
    
            return self.color, self.depth, self.infrared
    
    
    if __name__ == '__main__':
        i = 1
        kinect = Kinect()
        s = time.time()
    
        while 1:
            data = kinect.get_the_data_of_color_depth_infrared_image()
            img = data[0]
            mat_intri = np.load('./data/intrinsic_matrix.npy')
            coff_dis = np.load('./data/distortion_cofficients.npy')
            h, w = img.shape[0], img.shape[1]
            newcameramtx, roi = cv.getOptimalNewCameraMatrix(mat_intri, coff_dis, (w, h), 1, (w, h))
            dst = cv.undistort(img, mat_intri, coff_dis, None, newcameramtx)
            dst = cv.cvtColor(dst, cv.COLOR_BGR2RGB)
            plt.imshow(dst/255)
            plt.show()
    
            """
            # store the mapping matrix in an npy file
            color_points = np.zeros((512 * 424, 2), dtype=np.int)  # valid number: 207662
            k = 0
            for i in range(424):
                for j in range(512):
                    color_points[k] = [i, j]
                    k += 1
            depth_map_color = kinect.map_depth_points_to_color_points(color_points)
    
            # turn to 0 that is not in the mapping range
            depth_map_color[..., 0] = np.where(depth_map_color[..., 0] >= 1080, 0, depth_map_color[..., 0])
            depth_map_color[..., 0] = np.where(depth_map_color[..., 0] < 0, 0, depth_map_color[..., 0])
            depth_map_color[..., 1] = np.where(depth_map_color[..., 1] >= 1920, 0, depth_map_color[..., 1])
            depth_map_color[..., 1] = np.where(depth_map_color[..., 1] < 0, 0, depth_map_color[..., 1])
            
            # interpolated fill 0 values
            zeros = np.array(list(set(np.where(depth_map_color == 0)[0])))
            for zero in zeros:
                if zero < 40 * 512 or zero > 360 * 512:
                    continue
                j = 1
                while depth_map_color[zero - j].any() == 0 or depth_map_color[zero + j].any() == 0:
                    j += 1
                depth_map_color[zero][0] = (depth_map_color[zero - j][0] + depth_map_color[zero + j][0]) // 2
                depth_map_color[zero][1] = (depth_map_color[zero - j][1] + depth_map_color[zero + j][1]) // 2
            np.save('full_depth_map_color.npy', full_depth_map_color)
            """
    
            depth_map_color = np.load('./data/full_depth_map_color.npy')   # (424*512, 2)
            full_depth_map_color = depth_map_color
            map_color = dst[full_depth_map_color[..., 0], full_depth_map_color[..., 1]]  # (424*512, 2)
            map_color = map_color.reshape((424, 512, 3))
            plt.imshow(map_color/255)
            plt.show()
    
            if keyboard.is_pressed('esc'):
                break
    
    1. 彩色图去畸变。利用上文计算的相机参数对每一张彩色图去除畸变,同时保存去畸变后的彩色图。需要用到以下两个函数:
    def dedistortion(mat_intri, coff_dis, img):
        h, w = img.shape[0], img.shape[1]
        newcameramtx, roi = cv2.getOptimalNewCameraMatrix(mat_intri, coff_dis, (w, h), 1, (w, h))
        dst = cv2.undistort(img, mat_intri, coff_dis, None, newcameramtx)
        return dst
    
    def save_image(data, name, type, dir='test'):
        global num
        idx = str(num).zfill(6)
        if dir == 'raw':
            color_path = './data/' + name + '/color'
            depth_path = './data/' + name + '/depth'
        else:
            color_path = "./test_data/" + name + '/color'
            depth_path = "./test_data/" + name + '/depth'
        if not os.path.exists(color_path):
            os.makedirs(color_path)
        if not os.path.exists(depth_path):
            os.makedirs(depth_path)
        if type == 'color':
            cv2.imwrite(color_path + '/color-' + idx + '.png', data)
        else:
            cv2.imwrite(depth_path + '/depth-' + idx + '.png', data)
            if dir == 'test':
                num += 1
    
    1. 深度图彩色图对齐。kinect相机的颜色相机和深度传感器之间存在距离,导致深度图和颜色图像素不是一一对应,需要对齐,使用以下函数对齐color和depth,对齐后将图像中心裁剪为尺寸(480, 360)。但是对齐后的颜色图可以会出现对齐错误(对齐npy文件存储在data包中,获取方式见kinect.py的main函数)。
    def center_crop(img,  crop_size):
        tw, th = crop_size
        h, w = img.shape[0], img.shape[1]
        if len(img.shape) == 2:
            crop_img = img[(h - th) // 2:(h + th) // 2, (w - tw) // 2:(w + tw) // 2]
        else:
            crop_img = img[(h - th) // 2:(h + th) // 2, (w - tw) // 2:(w + tw) // 2, :]
        return crop_img
    
    def match_color_depth(color, depth):
        # crop+resize is worse
        full_depth_map_color = np.load('data/full_depth_map_color.npy')
        map_color = color[full_depth_map_color[..., 0], full_depth_map_color[..., 1]]  # (424*512, 2)
        map_color = map_color.reshape((424, 512, 3))
        # 512 * 424
        color = center_crop(map_color, (480, 360))
        depth = center_crop(depth, (480, 360))
        # plt.subplot(1, 2, 1)
        # plt.imshow(color)
        # plt.subplot(1, 2, 2)
        # plt.imshow(depth)
        # plt.show()
        return color, depth
    
    1. 输出color视频。将校正后的color图像流输出为指定路径下的视频。
    def trans_video(image_path, video_name, fps, res, type):
        img_path = gb.glob(image_path + "/*.png")
        videoWriter = cv2.VideoWriter(video_name, cv2.VideoWriter_fourcc(*'DIVX'), fps, res)
        for path in img_path:
            img = cv2.imread(path)
            img = cv2.resize(img, res)
            videoWriter.write(img)
        print('transform ' + type + ' video done!')
    
    def save_video(name):
        currentdate = datetime.datetime.now()
        file_name = str(currentdate.day) + "." + str(currentdate.month) + "." + str(currentdate.hour) + "." + str(currentdate.minute)
        color_path = './data/' + name + '/color'
        # depth_path = './data/' + name + '/depth'
        video_path = './data/' + name + '/video'
        if not os.path.exists(video_path):
            os.makedirs(video_path)
        trans_video(color_path, video_path + '/color-' + file_name + '.avi', 30, (1920, 1080), 'color')
        # trans_video(depth_path, depth_path + '/depth-' + file_name + '.avi', 30, (512, 424), 'depth')
    
    1. 完整调用。调用以上程序的主程序如下:
    if __name__ == '__main__':
        # 1. shooting calibration images
        get_chess_image()
    
        # 2. camera calibration
        mat_intri, coff_dis = camera_calibrator()
        # mat_intri = np.load('./data/intrinsic_matrix.npy')
        # coff_dis = np.load('./data/distortion_cofficients.npy')
    
        # 3. capture object images to save h5 file
        name = 'object'
        file_name = capture_image(name)
    
        f = h5py.File(file_name, 'r')
        num = 0
        for i in range(1, len(f.keys()), 2):
            color = f[str(i)][:]
            depth = f[str(i + 1)][:]
    
            # 4. data process: dedistortion; match color and depth images; save color/depth images
            dedistortion_color = dedistortion(mat_intri, coff_dis, color)
            save_image(dedistortion_color, name, 'color', 'raw')
            save_image(depth, name, 'depth', 'raw')
            new_color, new_depth = match_color_depth(dedistortion_color, depth)
            save_image(new_color, name, 'color', 'test')
            save_image(new_depth, name, 'depth', 'test')
        f.close()
        print('image save done!')
    
        # 5. convert to video
        save_video(name)
    

    作者:贪钱算法还我头发

    物联沃分享整理
    物联沃-IOTWORD物联网 » 【小贪】Kinect V2相机标定、图像采集和处理教程(Python)

    发表回复