导航

    • 登录
    • 搜索
    • 版块
    • 产品
    • 教程
    • 论坛
    • 淘宝
    1. 主页
    2. kns2
    K
    • 举报资料
    • 资料
    • 关注
    • 粉丝
    • 屏蔽
    • 帖子
    • 楼层
    • 最佳
    • 群组

    kns2

    @kns2

    0
    声望
    2
    楼层
    48
    资料浏览
    0
    粉丝
    0
    关注
    注册时间 最后登录

    kns2 关注

    kns2 发布的帖子

    • dht11的温湿度无法传输,注释掉之后图像和sgp30数据又可以传输
      import sensor
      import time
      import network
      import socket
      from machine import I2C,Pin
      
      
      # ---- dht11传感器类 ----
      class DHT11:
          DHT_OK = 0
          DHT_ERR_CHECKSUM = -1
          DHT_ERR_TIMEOUT = -2
      
          def __init__(self, pin):
              self.pin = Pin(pin, Pin.OPEN_DRAIN, Pin.PULL_UP)
              self.last_read = 0
              self.min_interval = 2000
      
          def _read_raw(self):
              self.pin.init(Pin.OUT_OD, pull=None)
              self.pin(0)
              time.sleep_ms(18)
              self.pin(1)
              time.sleep_us(20)
      
              self.pin.init(Pin.IN, Pin.PULL_UP)
              timeout = 1000
              while self.pin() == 1:
                  if (timeout := timeout - 1) <= 0:
                      return None, self.DHT_ERR_TIMEOUT
      
              self._wait_pulse(0, 80)
              self._wait_pulse(1, 80)
      
              data = bytearray(5)
              for i in range(40):
                  if not self._wait_pulse(0, 50):
                      return None, self.DHT_ERR_TIMEOUT
      
                  t = self._wait_pulse(1, 100)
                  data[i//8] = (data[i//8] << 1) | (0x01 if t > 30 else 0x00)
      
              if data[4] != ((data[0] + data[1] + data[2] + data[3]) & 0xFF):
                  return data, self.DHT_ERR_CHECKSUM
      
              return data, self.DHT_OK
      
          def _wait_pulse(self, level, max_wait_us):
              start = time.ticks_us()
              while self.pin() == (not level):
                  if time.ticks_diff(time.ticks_us(), start) > max_wait_us:
                      return 0
              start = time.ticks_us()
              while self.pin() == level:
                  if time.ticks_diff(time.ticks_us(), start) > max_wait_us:
                      return 0
              return time.ticks_diff(time.ticks_us(), start)
      
          def read(self):
              if time.ticks_diff(time.ticks_ms(), self.last_read) < self.min_interval:
                  time.sleep_ms(self.min_interval - time.ticks_diff(time.ticks_ms(), self.last_read))
      
              data, err = self._read_raw()
              self.last_read = time.ticks_ms()
      
              if err != self.DHT_OK:
                  return {'temp': None, 'humi': None, 'error': err}
      
              humidity = data[0] + data[1] * 0.1
              temperature = data[2] + data[3] * 0.1
              return {'temp': temperature, 'humi': humidity, 'error': self.DHT_OK}
      
      
      # ---- SGP30传感器类 ----
      class SGP30:
          def __init__(self, i2c):
              self.i2c = i2c
              self.addr = 0x58
              self._send_cmd([0x20, 0x03])
              time.sleep_ms(500)
      
          def measure_air_quality(self):
              self._send_cmd([0x20, 0x08])
              time.sleep_ms(12)
              data = self.i2c.readfrom(self.addr, 6)
              return self._parse_measurement(data)
      
          def _crc8(self, data):
              crc = 0xFF
              for byte in data:
                  crc ^= byte
                  for _ in range(8):
                      if crc & 0x80:
                          crc = (crc << 1) ^ 0x31
                      else:
                          crc <<= 1
                      crc &= 0xFF
              return crc
      
          def _parse_measurement(self, data):
              co2 = (data[0] << 8) | data[1]
              crc_co2 = data[2]
              tvoc = (data[3] << 8) | data[4]
              crc_tvoc = data[5]
      
              if self._crc8(bytes(data[0:2])) != crc_co2:
                  co2 = None
              if self._crc8(bytes(data[3:5])) != crc_tvoc:
                  tvoc = None
              return (co2, tvoc)
      
          def _send_cmd(self, cmd):
              self.i2c.writeto(self.addr, bytes(cmd))
      
      # ---- 初始化硬件 ----
      # 摄像头初始化
      sensor.reset()
      sensor.set_pixformat(sensor.RGB565)
      sensor.set_framesize(sensor.QQVGA)
      sensor.set_auto_exposure(False, exposure_us=30000)
      sensor.set_auto_gain(False)
      sensor.set_auto_whitebal(False)
      sensor.skip_frames(time=5000)
      
      # SGP30初始化
      i2c = I2C(scl='P4', sda='P5', freq=100000)
      sgp30 = SGP30(i2c)
      
      # DHT11初始化(新增)
      dht = DHT11(pin="P7")  # 使用P7引脚
      
      # ---- 网络配置 ----
      SSID = "xiaotang"
      PASSWORD = "12345678"
      target_ip = "192.168.75.220"
      
      # 图像传输配置
      sock_img = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
      img_port = 8080
      MAX_CHUNK_SIZE = 1000
      
      # 传感器数据配置
      sock_sgp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
      sgp_port = 8081
      last_sgp_time = time.ticks_ms()
      SGP_INTERVAL = 2000  # 2秒采样间隔
      
      # DHT数据传输配置
      sock_dht = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
      dht_port = 8082 # 新增8082端口
      last_dht_time = time.ticks_ms()
      
      # ---- WiFi连接 ----
      wlan = network.WLAN(network.STA_IF)
      wlan.active(True)
      wlan.connect(SSID, PASSWORD)
      
      start_time = time.ticks_ms()
      while not wlan.isconnected():
          if time.ticks_diff(time.ticks_ms(), start_time) > 10000:
              raise RuntimeError("WiFi连接超时")
          print("连接中...")
          time.sleep_ms(500)
      print("IP:", wlan.ifconfig()[0])
      
      # ---- 主循环 ----
      while True:
          try:
              
              # 处理sgp30传感器数据
              current_time = time.ticks_ms()
              if time.ticks_diff(current_time, last_sgp_time) >= SGP_INTERVAL:
                  co2, tvoc = sgp30.measure_air_quality()
                  data = "CO2: {}, TVOC: {}".format(co2 if co2 is not None else "NaN",
                                                  tvoc if tvoc is not None else "NaN")
                  sock_sgp.sendto(data.encode(), (target_ip, sgp_port))
                  last_sgp_time = current_time
                  print("传感器数据已发送:", data)
              
              # 新增DHT11处理逻辑
              #current_time = time.ticks_ms()
              if time.ticks_diff(current_time, last_dht_time) >= 5000:
                  dht_data = dht.read()
                  temp = dht_data['temp'] if dht_data['error'] == DHT11.DHT_OK else "NaN"
                  humi = dht_data['humi'] if dht_data['error'] == DHT11.DHT_OK else "NaN"
      
                  dht_msg = f"DHT: {temp:.1f}C, {humi:.1f}%".encode()
                  sock_dht.sendto(dht_msg, (target_ip, dht_port))
                  last_dht_time = current_time
                  print("DHT数据已发送:", dht_msg)
              
              # 处理图像数据
              img = sensor.snapshot()
              img_bytes = img.compress(quality=50)
      
              if img_bytes:
                  img_bytes = bytes(img_bytes)
                  for i in range(0, len(img_bytes), MAX_CHUNK_SIZE):
                      sock_img.sendto(img_bytes[i:i+MAX_CHUNK_SIZE], (target_ip, img_port))
                  sock_img.sendto(b"END", (target_ip, img_port))
                  print("图像帧已发送,大小:", len(img_bytes))
              else:
                  print("图像压缩失败")
              
          except Exception as e:
              print("运行异常:", e)
              # 网络重连
              wlan.disconnect()
              wlan.connect(SSID, PASSWORD)
              while not wlan.isconnected():
                  time.sleep_ms(500)
              sock_img = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
              sock_sgp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
              sock_dht = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
              print("网络已重启")
      
      # 理论上不会执行到这里
      sock_img.close()
      sock_sgp.close()
      sock_dht.close()
      
      
      发布在 OpenMV Cam
      K
      kns2
    • wifi模块可连接手机热点,但是连接不上电脑,但之前是可以连接的

      ``openmv端代码`
      import network
      import sensor
      import time
      import socket
      from pyb import UART

      初始化摄像头

      sensor.reset()
      sensor.set_pixformat(sensor.RGB565) # 或sensor.GRAYSCALE
      sensor.set_framesize(sensor.QVGA) # 分辨率320x240
      sensor.skip_frames(time=2000)

      WiFi配置(STA模式连接到路由器)

      SSID = "Honor"
      PASSWORD = "12345678"

      初始化WiFi模块

      wlan = network.WLAN(network.STA_IF)
      wlan.active(True)
      wlan.connect(SSID, PASSWORD)

      等待连接成功

      while not wlan.isconnected():
      print("连接中...")
      time.sleep_ms(500)

      print("连接成功!IP地址:", wlan.ifconfig()[0])

      创建TCP服务器

      server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      server.bind(("0.0.0.0", 8080)) # 绑定所有IP,端口8080
      server.listen(2) # 允许1个客户端连接

      print("等待客户端连接...")
      client, addr = server.accept()
      print("客户端已连接:", addr)

      发送图像数据

      while True:
      img = sensor.snapshot()
      img_bytes = img.compress(quality=50).to_bytes() # JPEG压缩(质量50%)

      # 发送数据长度(4字节)
      header = len(img_bytes).to_bytes(4, 'little')
      client.send(header)
      
      # 发送图像数据
      client.send(img_bytes)
      
      import socket
      import cv2
      import numpy as np
      
      # 配置连接参数
      HOST = "192.168.201.39"  # 替换为OpenMV的IP
      PORT = 8080
      
      # 创建TCP客户端
      client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      client.connect((HOST, PORT))
      
      while True:
          # 读取4字节的帧长度
          header = client.recv(4)
          if not header:
              break
          length = int.from_bytes(header, 'little')
      
          # 读取完整图像数据
          data = b''
          while len(data) < length:
              packet = client.recv(length - len(data))
              if not packet:
                  break
              data += packet
      
          # 将字节转换为图像
          img = cv2.imdecode(np.frombuffer(data, dtype=np.uint8), cv2.IMREAD_COLOR)
      
          # 显示图像
          if img is not None:
              cv2.imshow("OpenMV Stream", img)
      
          # 按'q'退出
          if cv2.waitKey(1) & 0xFF == ord('q'):
              break
      
      # 清理资源
      client.close()
      cv2.destroyAllWindows()
      
      发布在 OpenMV Cam
      K
      kns2