• OpenMV VSCode 扩展发布了,在插件市场直接搜索OpenMV就可以安装
  • 如果有产品硬件故障问题,比如无法开机,论坛很难解决。可以直接找售后维修
  • 发帖子之前,请确认看过所有的视频教程,https://singtown.com/learn/ 和所有的上手教程http://book.openmv.cc/
  • 每一个新的提问,单独发一个新帖子
  • 帖子需要目的,你要做什么?
  • 如果涉及代码,需要报错提示全部代码文本,请注意不要贴代码图片
  • 必看:玩转星瞳论坛了解一下图片上传,代码格式等问题。
  • dht11的温湿度无法传输,注释掉之后图像和sgp30数据又可以传输



    • import sensor
      import time
      import network
      import socket
      from machine import I2C,Pin
      
      
      # ---- dht11传感器类 ----
      class DHT11:
          DHT_OK = 0
          DHT_ERR_CHECKSUM = -1
          DHT_ERR_TIMEOUT = -2
      
          def __init__(self, pin):
              self.pin = Pin(pin, Pin.OPEN_DRAIN, Pin.PULL_UP)
              self.last_read = 0
              self.min_interval = 2000
      
          def _read_raw(self):
              self.pin.init(Pin.OUT_OD, pull=None)
              self.pin(0)
              time.sleep_ms(18)
              self.pin(1)
              time.sleep_us(20)
      
              self.pin.init(Pin.IN, Pin.PULL_UP)
              timeout = 1000
              while self.pin() == 1:
                  if (timeout := timeout - 1) <= 0:
                      return None, self.DHT_ERR_TIMEOUT
      
              self._wait_pulse(0, 80)
              self._wait_pulse(1, 80)
      
              data = bytearray(5)
              for i in range(40):
                  if not self._wait_pulse(0, 50):
                      return None, self.DHT_ERR_TIMEOUT
      
                  t = self._wait_pulse(1, 100)
                  data[i//8] = (data[i//8] << 1) | (0x01 if t > 30 else 0x00)
      
              if data[4] != ((data[0] + data[1] + data[2] + data[3]) & 0xFF):
                  return data, self.DHT_ERR_CHECKSUM
      
              return data, self.DHT_OK
      
          def _wait_pulse(self, level, max_wait_us):
              start = time.ticks_us()
              while self.pin() == (not level):
                  if time.ticks_diff(time.ticks_us(), start) > max_wait_us:
                      return 0
              start = time.ticks_us()
              while self.pin() == level:
                  if time.ticks_diff(time.ticks_us(), start) > max_wait_us:
                      return 0
              return time.ticks_diff(time.ticks_us(), start)
      
          def read(self):
              if time.ticks_diff(time.ticks_ms(), self.last_read) < self.min_interval:
                  time.sleep_ms(self.min_interval - time.ticks_diff(time.ticks_ms(), self.last_read))
      
              data, err = self._read_raw()
              self.last_read = time.ticks_ms()
      
              if err != self.DHT_OK:
                  return {'temp': None, 'humi': None, 'error': err}
      
              humidity = data[0] + data[1] * 0.1
              temperature = data[2] + data[3] * 0.1
              return {'temp': temperature, 'humi': humidity, 'error': self.DHT_OK}
      
      
      # ---- SGP30传感器类 ----
      class SGP30:
          def __init__(self, i2c):
              self.i2c = i2c
              self.addr = 0x58
              self._send_cmd([0x20, 0x03])
              time.sleep_ms(500)
      
          def measure_air_quality(self):
              self._send_cmd([0x20, 0x08])
              time.sleep_ms(12)
              data = self.i2c.readfrom(self.addr, 6)
              return self._parse_measurement(data)
      
          def _crc8(self, data):
              crc = 0xFF
              for byte in data:
                  crc ^= byte
                  for _ in range(8):
                      if crc & 0x80:
                          crc = (crc << 1) ^ 0x31
                      else:
                          crc <<= 1
                      crc &= 0xFF
              return crc
      
          def _parse_measurement(self, data):
              co2 = (data[0] << 8) | data[1]
              crc_co2 = data[2]
              tvoc = (data[3] << 8) | data[4]
              crc_tvoc = data[5]
      
              if self._crc8(bytes(data[0:2])) != crc_co2:
                  co2 = None
              if self._crc8(bytes(data[3:5])) != crc_tvoc:
                  tvoc = None
              return (co2, tvoc)
      
          def _send_cmd(self, cmd):
              self.i2c.writeto(self.addr, bytes(cmd))
      
      # ---- 初始化硬件 ----
      # 摄像头初始化
      sensor.reset()
      sensor.set_pixformat(sensor.RGB565)
      sensor.set_framesize(sensor.QQVGA)
      sensor.set_auto_exposure(False, exposure_us=30000)
      sensor.set_auto_gain(False)
      sensor.set_auto_whitebal(False)
      sensor.skip_frames(time=5000)
      
      # SGP30初始化
      i2c = I2C(scl='P4', sda='P5', freq=100000)
      sgp30 = SGP30(i2c)
      
      # DHT11初始化(新增)
      dht = DHT11(pin="P7")  # 使用P7引脚
      
      # ---- 网络配置 ----
      SSID = "xiaotang"
      PASSWORD = "12345678"
      target_ip = "192.168.75.220"
      
      # 图像传输配置
      sock_img = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
      img_port = 8080
      MAX_CHUNK_SIZE = 1000
      
      # 传感器数据配置
      sock_sgp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
      sgp_port = 8081
      last_sgp_time = time.ticks_ms()
      SGP_INTERVAL = 2000  # 2秒采样间隔
      
      # DHT数据传输配置
      sock_dht = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
      dht_port = 8082 # 新增8082端口
      last_dht_time = time.ticks_ms()
      
      # ---- WiFi连接 ----
      wlan = network.WLAN(network.STA_IF)
      wlan.active(True)
      wlan.connect(SSID, PASSWORD)
      
      start_time = time.ticks_ms()
      while not wlan.isconnected():
          if time.ticks_diff(time.ticks_ms(), start_time) > 10000:
              raise RuntimeError("WiFi连接超时")
          print("连接中...")
          time.sleep_ms(500)
      print("IP:", wlan.ifconfig()[0])
      
      # ---- 主循环 ----
      while True:
          try:
              
              # 处理sgp30传感器数据
              current_time = time.ticks_ms()
              if time.ticks_diff(current_time, last_sgp_time) >= SGP_INTERVAL:
                  co2, tvoc = sgp30.measure_air_quality()
                  data = "CO2: {}, TVOC: {}".format(co2 if co2 is not None else "NaN",
                                                  tvoc if tvoc is not None else "NaN")
                  sock_sgp.sendto(data.encode(), (target_ip, sgp_port))
                  last_sgp_time = current_time
                  print("传感器数据已发送:", data)
              
              # 新增DHT11处理逻辑
              #current_time = time.ticks_ms()
              if time.ticks_diff(current_time, last_dht_time) >= 5000:
                  dht_data = dht.read()
                  temp = dht_data['temp'] if dht_data['error'] == DHT11.DHT_OK else "NaN"
                  humi = dht_data['humi'] if dht_data['error'] == DHT11.DHT_OK else "NaN"
      
                  dht_msg = f"DHT: {temp:.1f}C, {humi:.1f}%".encode()
                  sock_dht.sendto(dht_msg, (target_ip, dht_port))
                  last_dht_time = current_time
                  print("DHT数据已发送:", dht_msg)
              
              # 处理图像数据
              img = sensor.snapshot()
              img_bytes = img.compress(quality=50)
      
              if img_bytes:
                  img_bytes = bytes(img_bytes)
                  for i in range(0, len(img_bytes), MAX_CHUNK_SIZE):
                      sock_img.sendto(img_bytes[i:i+MAX_CHUNK_SIZE], (target_ip, img_port))
                  sock_img.sendto(b"END", (target_ip, img_port))
                  print("图像帧已发送,大小:", len(img_bytes))
              else:
                  print("图像压缩失败")
              
          except Exception as e:
              print("运行异常:", e)
              # 网络重连
              wlan.disconnect()
              wlan.connect(SSID, PASSWORD)
              while not wlan.isconnected():
                  time.sleep_ms(500)
              sock_img = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
              sock_sgp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
              sock_dht = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
              print("网络已重启")
      
      # 理论上不会执行到这里
      sock_img.close()
      sock_sgp.close()
      sock_dht.close()