Python监控摄像头控制学习指南:ONVIF协议详解及代码实践

摄像头用的是海康(DS系列)的智能球型摄像头,摄像头是已经激活的。
a.实现电脑浏览器访问摄像头
1.先用网线把摄像头和电脑直连(也可以摄像头和路由器直连)。
2.接下来确认摄像头的ip地址
可以用海康官方提供的软件:https://www.hikvision.com/cn/support/tools/hitools/clea8b3e4ea7da90a9/

3.修改电脑的ip和摄像头ip到同频
以下是海康官方给的操作方案:

→ 电脑IP:192.168.0.5 ,摄像机IP:192.168.1.64 则表示不在同一网段

→ 电脑IP:192.168.1.5 ,摄像机IP:192.168.1.64则表示在同一网段

ca05dea2b7fe45238a161265e928e892.png

4.浏览器登录
直接在浏览器输入摄像头的ip,进入登录页面,输入账号密码
ff76d836a2974752b0d2637f42e33c9a.png

5.下载插件并安装
插件下载在登录后页面处
045d33d7caa541539da595eb12004484.png

7ca207af1c144c3a8735c51942cd25f0.png

安装完成后就可以成功看到
830cd7e037234a36ab969210e3d77d10.png

b.python控制摄像头(用的onvif协议)

1.先安装好cv和onvif

conda activate xxx
pip install onvif_zeep
pip install opencv-python

2.确认端口
a08a28d23a51456399bf8f4dbbba0127.png

3.在浏览器配置摄像头
fec678413743455ba1d9b3ffa5ea7283.png
4.运行代码
效果是实时监控的同时,每三秒在预置点1和预置点2中转换
修改自己的ip地址,端口,账号,密码就可以使用了

import time
import requests
import threading
import zeep
from onvif import ONVIFCamera
from requests.auth import HTTPDigestAuth
import cv2

def zeep_pythonvalue(self, xmlvalue):
    return xmlvalue


class Onvif_hik(object):

    def __init__(self, ip: str, username: str, password: str):
        self.ip = ip
        self.username = username
        self.password = password
        self.save_path = "./{}T{}.jpg".format(self.ip, str(time.time()))  # 截图保存路径
        self.content_cam()
        self.exit_flag = False  # 用于退出视频流显示的标志

    def content_cam(self):
        """
        链接相机地址
        :return:
        """
        try:
            self.mycam = ONVIFCamera(self.ip, 80, self.username, self.password)
            self.media = self.mycam.create_media_service()  # 创建媒体服务
            # 得到目标概要文件
            zeep.xsd.simple.AnySimpleType.pythonvalue = zeep_pythonvalue
            self.media_profile = self.media.GetProfiles()[0]  # 获取配置信息
            self.ptz = self.mycam.create_ptz_service()  # 创建控制台服务
            return True
        except Exception as e:
            return False

    def Snapshot(self):
        """
        截图
        :return:
        """
        res = self.media.GetSnapshotUri({'ProfileToken': self.media_profile.token})

        response = requests.get(res.Uri, auth=HTTPDigestAuth(self.username, self.password))
        with open(self.save_path, 'wb') as f:  # 保存截图
            f.write(response.content)

    def get_presets(self):
        """
        获取预置点列表
        :return:预置点列表--所有的预置点
        """
        presets = self.ptz.GetPresets({'ProfileToken': self.media_profile.token})  # 获取所有预置点,返回值:list
        return presets

    def goto_preset(self, presets_token: int):
        """
        移动到指定预置点
        :param presets_token: 目的位置的token,获取预置点返回值中
        :return:
        """
        try:
            params = self.ptz.create_type('GotoPreset')
            params.ProfileToken = self.media_profile.token
            params.PresetToken = presets_token
            self.ptz.GotoPreset(params)
        except Exception as e:
            print(e)

    def zoom(self, zoom: str, timeout: int = 1):
        """
        变焦
        :param zoom: 1为拉近或-1为远离 
        :param timeout: 生效时间
        :return:
        """
        request = self.ptz.create_type('ContinuousMove')
        request.ProfileToken = self.media_profile.token
        request.Velocity = {"Zoom": zoom}
        self.ptz.ContinuousMove(request)
        time.sleep(timeout)
        self.ptz.Stop({'ProfileToken': request.ProfileToken})

    def get_status(self):
        """
        获取当前预置点的信息
        :return:
        """
        params = self.ptz.create_type('GetStatus')
        params.ProfileToken = self.media_profile.token
        res = self.ptz.GetStatus(params)
        return res

    def show_video(self):
        """
        获取并展示摄像头的实时视频流
        :return:
        """
        # 获取视频流的 RTSP URL
        video_url = self.media.GetStreamUri({'StreamSetup': {'Stream': 'RTP-Unicast', 'Transport': {'Protocol': 'RTSP'}}, 'ProfileToken': self.media_profile.token})
        
        # 将用户名和密码嵌入 RTSP URL 中
        rtsp_url_with_credentials = f"rtsp://{self.username}:{self.password}@{self.ip}:554{video_url.Uri[video_url.Uri.find('/'):]}"

        # 打开视频流
        cap = cv2.VideoCapture(rtsp_url_with_credentials)


        if not cap.isOpened():
            print("无法打开视频流")
            return

        # 持续显示视频流
        while not self.exit_flag:
            ret, frame = cap.read()
            if not ret:
                print("无法获取视频帧")
                break

            cv2.imshow("Camera Stream", frame)

            # 按 'q' 键退出
            if cv2.waitKey(1) & 0xFF == ord(' '):  
                self.exit_flag = True  # 设置退出标志
                break

        # 释放资源
        cap.release()
        cv2.destroyAllWindows()

    def move_between_presets(self, time_interval_1=10, time_interval_2=10):
        """
        在指定时间间隔内移动到预置点1和预置点2
        :param time_interval_1: 移动到预置点1的时间(秒)
        :param time_interval_2: 移动到预置点2的时间(秒)
        :return:
        """
        while True:
            # 移动到预置点1
            print("移动到预置点1...")
            self.goto_preset(1)  # 假设预置点1的 token 是 1
            time.sleep(time_interval_1)

            # 移动到预置点2
            print("移动到预置点2...")
            self.goto_preset(2)  # 假设预置点2的 token 是 2
            time.sleep(time_interval_2)
            if self.exit_flag:
                break

if __name__ == '__main__':
    o = Onvif_hik(ip="ip地址", username="账号", password="密码")
    #o.show_video()  # 展示实时视频流
    #o.move_between_presets(time_interval_1=3, time_interval_2=3)  # 每个预置点停留 10 秒    
    video_thread = threading.Thread(target=o.show_video)  # 展示视频流的线程
    preset_thread = threading.Thread(target=o.move_between_presets, args=(3, 3))  # 切换预置点的线程

    # 启动线程
    video_thread.start()
    preset_thread.start()

    # 等待线程完成
    video_thread.join()
    preset_thread.join()

    print("程序已退出")

相关参考资料:
实测python-onvif协议控制摄像头[梧桐凰]_onvif python-CSDN博客

 

 

作者:www.02

物联沃分享整理
物联沃-IOTWORD物联网 » Python监控摄像头控制学习指南:ONVIF协议详解及代码实践

发表回复