《使用Micropython控制ESP32-CAM摄像头——物联网开发笔记(57)》

一、目的

        这一节我们学习如何使用我们的ESP32开发板来控制ESP32-CAM摄像头。

二、环境

        ESP32 + ESP32-CAM摄像头 + Thonny IDE + 几根杜邦线

        ESP32-CAM是一个开发板,它上面集成了Camera摄像头模块,可以直接用Micropython进行操作。从MicroPython官网下载的固件 没有Camera库,我们需要重新刷带Camera摄像头库的固件。固件大家可以从这里下载:

链接: https://pan.baidu.com/s/1FtwZfstPkn4Rsm9sRaQlIg 提取码: q1gg 复制这段内容后打开百度网盘手机App,操作更方便哦。

        购买ESP32模块时,部分商家的配套购买的下载模块粗暴地将IO0下拉,无法对模块进行串口调试,也需要手动按下rst进行复位,而且只能对esp32-cam模块下载,功能单一。建议大家从以下网址购买,以免踩坑:
https://item.taobao.com/item.htm?spm=a1z10.1-c-s.w4004-24500067207.10.fab823f4mCgBW4&id=687614728872

三、固件刷新方法

        在开始的时候,我已经讲解过:

物联网开发笔记(17)- 使用Micropython开发ESP32开发板开发环境准备_魔都飘雪的博客-CSDN博客_micropython开发esp32

这里再和大家一起复习一下:首先将ESP32 开发板通过USB线连接到电脑上,然后安装驱动,最后打开我们的Thonny IDE编辑器:

烧录完成,移除USB线,换上我们的另一块通信地板,重新接上USB线,打开Thonny IED,验证是否成功烧录:

 四、实验代码1 – 拍照

'''
注意:
此程序可以运行1次,如果再次运行会出现OSError: Camera Init Failed,这个问题后面研究看看;临时的解决办法把板子的USB数据线拔掉,重新链接即可
此程序运行后的图片,需要下载到电脑上才能浏览,不支持在线浏览(不过,后面的课程会添加网络实时显示拍摄的画面,所以本节课不要想那么多,能看到图片就行)
'''

import camera


# 初始化摄像头
camera.init(0, format=camera.JPEG, fb_location=camera.PSRAM)

# 拍摄一张图片
buf = camera.capture()  # 大小是640x480

# 保存图片到文件
with open("第一张图片.png", "wb") as f:
    f.write(buf)  # buf中的数据就是图片的数据,所以直接写入到文件就行了
    print("拍照已完成,点击Thonny左侧【MicroPython设备】右侧的三,\n然后看到‘刷新’,点击刷新会看到 图片,\n然后右击图片名称,选择下载到电脑的路径即可...")

大家运行代码,按照如下方法,下载到电脑上就可以查看。

 不过这个代码,有个bug就是再次运行时,会报错“OSError: Camera Init Failed”

那么怎么解决呢?请往下看。

五、实验代码2 – 解决再次运行报错问题

        在使用摄像头后,调用camera.deinit释放

import camera


# 初始化摄像头
try:
    camera.init(0, format=camera.JPEG)  # 使用try捕获初始化异常
except Exception as e:
    camera.deinit()  # 如果捕获到异常,则释放。
    camera.init(0, format=camera.JPEG)

# 拍摄一张图片
buf = camera.capture()  # 大小是640x480

# 保存图片到文件
with open("第二张图片.png", "wb") as f:
    f.write(buf)  # buf中的数据就是图片的数据,所以直接写入到文件就行了
    print("拍照已完成,点击Thonny左侧【MicroPython设备】右侧的三,\n然后看到‘刷新’,点击刷新会看到 图片,\n然后右击图片名称,选择下载到电脑的路径即可...")

camera.deinit()  # 在使用摄像头后,调用camera.deinit释放

五、测试代码3 – 实时显示摄像头画面

        我们在电脑端运行一段代码,作为服务器端。ESP32 camera作为客户端。

电脑端:

import socket
import cv2
import io
from PIL import Image
import numpy as np

# 创建套接字
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
s.bind(("0.0.0.0", 9090))  # 绑定你电脑上的所有IP地址,如果电脑有多个IP地址,数据都接收

while True:
    data, IP = s.recvfrom(100000)  # 接收数据,设置数据包大小。图片大小100K
    bytes_stream = io.BytesIO(data)  # 收到数据后转为字节流
    image = Image.open(bytes_stream)  # 打开这个图片
    img = np.asarray(image)  # 转化为数组格式
    img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)  # ESP32采集的是RGB格式,要转换为BGR(opencv的格式)
    cv2.imshow("ESP32 Capture Image", img)  # 显示图片
    if cv2.waitKey(1) == ord("q"):  # 按键盘上的q键,退出
        break

ESP-32 Camera端:

import socket
import network
import camera
import time


# 连接wifi
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
    print('connecting to network...')
    wlan.connect('WIFI名字', 'WIFI密码')
    
    while not wlan.isconnected():
        pass
print('网络配置:', wlan.ifconfig())
 
 
# 摄像头初始化
try:
    camera.init(0, format=camera.JPEG)
except Exception as e:
    camera.deinit()
    camera.init(0, format=camera.JPEG)


# 其他设置:
# 上翻下翻
camera.flip(0)
#左/右
camera.mirror(1)

# 分辨率
camera.framesize(camera.FRAME_HVGA)
# 选项如下:
# FRAME_96X96 FRAME_QQVGA FRAME_QCIF FRAME_HQVGA FRAME_240X240
# FRAME_QVGA FRAME_CIF FRAME_HVGA FRAME_VGA FRAME_SVGA
# FRAME_XGA FRAME_HD FRAME_SXGA FRAME_UXGA FRAME_FHD
# FRAME_P_HD FRAME_P_3MP FRAME_QXGA FRAME_QHD FRAME_WQXGA
# FRAME_P_FHD FRAME_QSXGA
# 有关详细信息,请查看此链接:https://bit.ly/2YOzizz

# 特效
camera.speffect(camera.EFFECT_NONE)
#选项如下:
# 效果\无(默认)效果\负效果\ BW效果\红色效果\绿色效果\蓝色效果\复古效果
# EFFECT_NONE (default) EFFECT_NEG \EFFECT_BW\ EFFECT_RED\ EFFECT_GREEN\ EFFECT_BLUE\ EFFECT_RETRO

# 白平衡
camera.whitebalance(camera.WB_HOME)
#选项如下:
# WB_NONE (default) WB_SUNNY WB_CLOUDY WB_OFFICE WB_HOME

# 饱和
camera.saturation(0)
#-2,2(默认为0). -2灰度
# -2,2 (default 0). -2 grayscale 

# 亮度
camera.brightness(0)
#-2,2(默认为0). 2亮度
# -2,2 (default 0). 2 brightness

# 对比度
camera.contrast(0)
#-2,2(默认为0).2高对比度
#-2,2 (default 0). 2 highcontrast

# 质量
camera.quality(10)
#10-63数字越小质量越高

# socket UDP 的创建
s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)

try:
    while True:
        buf = camera.capture()  # 获取图像数据
        s.sendto(buf, ("192.168.0.101", 9090))  # 向服务器发送图像数据。这个IP地址是电脑的IP地址
        time.sleep(0.1)
except:
    pass
finally:
    camera.deinit()

我们先运行电脑端的代码,再运行ESP32 camera端的代码:

 然后我们就会看到,弹出来一个摄像头的窗口:

 然后我们在电脑上选中摄像头窗口,按q键,摄像头窗口就会自动退出,同时电脑端程序也就停止运行了。如果你觉得拍摄的画面有卡顿,你可以将下面这个时间设置小点:

 六、测试代码4 – 录制视屏

        将ESP32-CAM拍摄的画面存储到PC端,运行程序后会在当前程序路径下生成以时间戳为名字的.avi视频文件。在测试代码3的基础上,增加3行代码即可:

import socket
import cv2
import io
from PIL import Image
import numpy as np
import time

s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
s.bind(("0.0.0.0", 9090))

# 设置视频的编码解码方式avi
video_type = cv2.VideoWriter_fourcc(*'XVID')  # 视频存储的格式  # 增加的代码
# 保存的位置,以及编码解码方式,帧率,视频帧大小
mp4_file = cv2.VideoWriter('%s.avi' % str(time.time()), video_type, 5, (480, 320))  # 增加的代码

while True:
    data, IP = s.recvfrom(100000)
    bytes_stream = io.BytesIO(data)
    image = Image.open(bytes_stream)
    img = np.asarray(image)
    img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)  # ESP32采集的是RGB格式,要转换为BGR(opencv的格式)
    cv2.imshow("ESP32 Capture Image", img)
    ret = mp4_file.write(img)  # 增加的代码

    print(ret)
    if cv2.waitKey(1) == ord("q"):
        break

同样的,我们先运行电脑端代码,再运行ESP32-camera端的代码。

我们拍摄一段时间后,按q退出,然后在电脑端,会发现一个AVI格式的视屏文件,我们打开就可以看到我们所拍摄到的画面。

七、测试代码5 – PC端摄像头操作界面

        我们制作界面使用的工具 — PySide:商用免费,所以本程序使用的PySide!

 代码中用到的image文件夹里是界面上的一些图标,大家下载下来,将image文件夹放在和代码同一级的目录下即可:

import os
import socket
import sys
import winreg
import time
import io

import cv2
from PIL import Image
import numpy as np
from PySide6.QtCore import QTimer, QThread, QSettings
from PySide6.QtGui import QIcon, Qt, QImage, QPixmap
from PySide6.QtWidgets import QApplication, QWidget, QLabel, QPushButton, QGroupBox, QHBoxLayout, QStyle, QVBoxLayout, QStackedLayout, QLineEdit, QComboBox, QMessageBox, QFileDialog

CAPTURE_IMAGE_DATA = None
SAVE_VIDEO_PATH = ""


class CaptureThread(QThread):
    def __init__(self, ip, port):
        super().__init__()
        self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
        self.udp_socket.bind((ip, port))
        # 设置运行的标志
        self.run_flag = True
        self.record_flag = False

    def run(self):
        global CAPTURE_IMAGE_DATA

        while self.run_flag:
            data, ip = self.udp_socket.recvfrom(100000)
            bytes_stream = io.BytesIO(data)
            image = Image.open(bytes_stream)
            img = np.asarray(image)

            if self.record_flag:  # 开始录制视频
                # 转换数据格式,便于存储视频文件
                img_2 = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)  # ESP32采集的是RGB格式,要转换为BGR(opencv的格式)
                self.mp4_file.write(img_2)

            # PySide显示不需要转换,所以直接用img
            temp_image = QImage(img.flatten(), 480, 320, QImage.Format_RGB888)
            temp_pixmap = QPixmap.fromImage(temp_image)
            CAPTURE_IMAGE_DATA = temp_pixmap  # 暂时存储udp接收到的1帧视频画面

        # 结束后 关闭套接字
        self.udp_socket.close()

    def stop_run(self):
        self.run_flag = False
        self.record_flag = False
        try:
            self.mp4_file.release()
        except Exception as ret:
            pass

    def stop_record(self):
        self.mp4_file.release()
        self.record_flag = False

    def start_record(self):
        # 设置视频的编码解码方式avi
        video_type = cv2.VideoWriter_fourcc(*'XVID')  # 视频存储的格式
        # 保存的位置,以及编码解码方式,帧率,视频帧大小
        file_name = "{}.avi".format(time.time())
        file_path_name = os.path.join(SAVE_VIDEO_PATH, file_name)
        self.mp4_file = cv2.VideoWriter(file_path_name, video_type, 5, (480, 320))
        self.record_flag = True


class ShowCaptureVideoWidget(QWidget):
    def __init__(self):
        super().__init__()

        layout = QHBoxLayout()

        # 用来显示画面的QLabel
        self.video_label = QLabel("选择顶部的操作按钮...")
        self.video_label.setAlignment(Qt.AlignHCenter | Qt.AlignVCenter)
        self.video_label.setScaledContents(True)
        layout.addWidget(self.video_label)

        self.setLayout(layout)


class VideoWindow(QWidget):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("远程摄像头监控 v2022.10.13.001")
        self.setWindowIcon(QIcon('./images/logo.png'))
        self.resize(777, 555)

        # 选择本电脑IP
        camera_label = QLabel("选择本电脑IP:")

        # ip列表
        # 获取本地电脑的ip地址列表
        hostname, alias_list, ip_addr_list = socket.gethostbyname_ex(socket.gethostname())
        # print(hostname)  # DESKTOP
        # print(alias_list)  # []
        # print(ip_addr_list)  # ['192.168.47.1', '192.168.208.1', '192.168.31.53']
        ip_addr_list.insert(0, "0.0.0.0")
        self.combox = QComboBox()
        self.combox.addItems(ip_addr_list)
        self.ip_addr_list = ip_addr_list

        # 本地端口
        port_label = QLabel("本地端口:")
        self.port_edit = QLineEdit("9090")

        g_1 = QGroupBox("监听信息")
        g_1.setFixedHeight(60)
        g_1_h_layout = QHBoxLayout()
        g_1_h_layout.addWidget(camera_label)
        g_1_h_layout.addWidget(self.combox)
        g_1_h_layout.addWidget(port_label)
        g_1_h_layout.addWidget(self.port_edit)
        g_1.setLayout(g_1_h_layout)

        # 启动显示
        self.camera_open_close_btn = QPushButton(QIcon("./images/shexiangtou.png"), "启动显示")
        self.camera_open_close_btn.clicked.connect(self.camera_open_close)

        self.record_video_btn = QPushButton(QIcon("./images/record.png"), "开始录制")
        self.record_video_btn.clicked.connect(self.recorde_video)

        save_video_path_setting_btn = QPushButton(QIcon("./images/folder.png"), "设置保存路径")
        save_video_path_setting_btn.clicked.connect(self.save_video_path_setting)

        g_2 = QGroupBox("功能操作")
        g_2.setFixedHeight(60)
        g_2_h_layout = QHBoxLayout()
        g_2_h_layout.addWidget(self.camera_open_close_btn)
        g_2_h_layout.addWidget(self.record_video_btn)
        g_2_h_layout.addWidget(save_video_path_setting_btn)
        g_2.setLayout(g_2_h_layout)

        # --------- 整体布局 ---------
        h_layout = QHBoxLayout()
        h_layout.addWidget(g_1)
        h_layout.addWidget(g_2)
        h_layout.addStretch(1)

        v_layout = QVBoxLayout()
        v_layout.addLayout(h_layout)

        # 创建底部的显示区域
        self.stacked_layout_capture_view = ShowCaptureVideoWidget()
        v_layout.addWidget(self.stacked_layout_capture_view)

        self.setLayout(v_layout)

        # 定时刷新视频画面
        self.timer = QTimer()
        self.timer.timeout.connect(self.show_video_image)
        self.load_time = 0
        self.load_time_all = 0

    def camera_open_close(self):
        """启动创建socket线程,用来接收显示数据"""
        if self.camera_open_close_btn.text() == "启动显示":
            ip = self.combox.currentText()
            try:
                port = int(self.port_edit.text())
            except Exception as ret:
                QMessageBox.about(self, '警告', '端口设置错误!!!')
                return

            self.thread = CaptureThread(ip, port)
            self.thread.daemon = True
            self.thread.start()
            self.timer.start(100)  # 设置计时间隔并启动
            self.camera_open_close_btn.setText("关闭显示")
        else:
            self.camera_open_close_btn.setText("启动显示")
            self.timer.stop()
            self.stacked_layout_capture_view.video_label.clear()
            self.thread.stop_run()
            self.record_video_btn.setText("开始录制")

    def show_video_image(self):
        if CAPTURE_IMAGE_DATA:
            self.stacked_layout_capture_view.video_label.setPixmap(CAPTURE_IMAGE_DATA)
        else:
            if time.time() - self.load_time >= 1:
                self.load_time = time.time()
                self.load_time_all += 1
                self.stacked_layout_capture_view.video_label.setText("摄像头加载中...{}".format(self.load_time_all))

    @staticmethod
    def get_desktop():
        key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r'Software\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders')
        return winreg.QueryValueEx(key, "Desktop")[0]

    def save_video_path_setting(self):
        """视频保存路径"""
        global SAVE_VIDEO_PATH
        if SAVE_VIDEO_PATH:
            last_path = QSettings().value("LastFilePath")
        else:
            last_path = self.get_desktop()

        path_name = QFileDialog.getExistingDirectory(self, '请选择保存视频的路径', last_path)
        if not path_name:
            return

        SAVE_VIDEO_PATH = path_name

    def recorde_video(self):
        """录制视频"""
        if self.camera_open_close_btn.text() == "启动显示":
            QMessageBox.about(self, '警告', '请先启动显示,然后再开始录制!!!')
            return

        if not SAVE_VIDEO_PATH:
            QMessageBox.about(self, '警告', '请先配置视频保存路径!!!')
            return

        if self.record_video_btn.text() == "开始录制":
            self.record_video_btn.setText("停止录制")
            self.thread.start_record()
        else:
            self.record_video_btn.setText("开始录制")
            self.thread.stop_record()


if __name__ == "__main__":
    app = QApplication(sys.argv)
    video_window = VideoWindow()
    video_window.show()
    app.exec()

八、测试代码6 – SD卡存储

        我们将SD插入到开发板的卡槽中。可以将要存储的数据存放到SD中。

  • 下面代码可以将SD卡,挂载到一个叫做sd的文件夹
  • 为了测试挂载正常,程序在这个SD卡中创建一个test.txt文件,再写入测试数据。
  • import uos
    from machine import SDCard
    
    
    # 挂载sd卡到sd文件夹
    try:
        uos.mount(SDCard(), "/sd")
    except Exception as ret:
        print("挂载失败...", ret)
    else:
        print("挂载成功...")
        
    
    with open("/sd/test.txt", "w") as f:
        for i in range(1, 101):
            f.write(str(i)+"\n")
    
    print("已经将1 2 3....100写入到sd卡中的text.txt文件")
    
    

    八、FTDI下载器USB-TTL固件下载

            在网上看到说使用FTDI下载器USB转TTL下载器也可以烧录固件,接线方法如下,

    https://detail.tmall.com/item.htm?abbucket=8&id=599897405655&ns=1&spm=a230r.1.14.3.9b0a2889fSojPZ

     

     我没下载成功,大家下载成功的评论区留言。

    八、TF卡购买

    再送大家一个拍照和挂载TF卡的程序:我使用2G的TF卡没挂载成功,后来买了4G的TF挂载成功了,看到网上说16G的也可以挂载,不知道行不行,你可以去试验一下。此处分享下我买的4G TF卡某宝链接:

    https://detail.tmall.com/item.htm?_u=ap01rch6315&id=613649800295&spm=a1z09.2.0.0.863c2e8d47eS7h&skuId=4321704732307

    import camera
    
    # 初始化
    camera.init(0, format=camera.JPEG)
    # 分辨率(不设置分辨率默认为200万像素)
    camera.framesize(camera.FRAME_240X240)
    '''
    分辨率参数:
    camera.FRAME_96X96,分辨率96X96
    camera.FRAME_240X240, 分辨率240X240
    camera.FRAME_QVGA, 分辨率320X240
    camera.FRAME_VGA, 分辨率640X480
    camera.FRAME_SVGA, 分辨率800X600
    camera.FRAME_HD, 分辨率1280X720
    '''
    # 设置特殊模式
    # EFFECT_NONE(默认)EFFECT_NEG偏紫色 EFFECT_BW黑白 EFFECT_RED红 EFFECT_GREEN绿 EFFECT_BLUE蓝 EFFECT_RETRO怀旧风格
    camera.speffect(camera.EFFECT_NONE)
    # 设置白平衡
    # WB_NONE(默认) WB_SUNNY WB_CLOUDY WB_OFFICE WB_HOME
    camera.whitebalance(camera.WB_NONE)
    # 设置饱和度,-2 到 2(默认) -2 grayscale
    camera.saturation(0)
    # 设置亮度, -2 到 2(默认) 2 brightness
    camera.brightness(0)
    # 设置对比度, -2 到 2(默认) 2 brighcontrast
    camera.contrast(0)
    # 设置图片质量,10到63,数字越小图片质量越高
    camera.quality(10)
    # 照片以二进制数据保存在buf这个变量中
    buf = camera.capture()
    with open("dd.jpg", "wb") as f:
        f.write(buf)  # buf中的数据就是图片的数据,所以直接写入到文件就行了
    # 释放摄像头
    camera.deinit()
    
    # 如果你有SD卡,按照下面进行挂载
    import uos
    from machine import SDCard
    uos.mount(SDCard(),'/sd')
    uos.chdir('sd')
    uos.listdir()
    

    物联沃分享整理
    物联沃-IOTWORD物联网 » 《使用Micropython控制ESP32-CAM摄像头——物联网开发笔记(57)》

    发表评论