import sensor
import time
import network
import socket
from machine import I2C,Pin
# ---- dht11传感器类 ----
class DHT11:
DHT_OK = 0
DHT_ERR_CHECKSUM = -1
DHT_ERR_TIMEOUT = -2
def __init__(self, pin):
self.pin = Pin(pin, Pin.OPEN_DRAIN, Pin.PULL_UP)
self.last_read = 0
self.min_interval = 2000
def _read_raw(self):
self.pin.init(Pin.OUT_OD, pull=None)
self.pin(0)
time.sleep_ms(18)
self.pin(1)
time.sleep_us(20)
self.pin.init(Pin.IN, Pin.PULL_UP)
timeout = 1000
while self.pin() == 1:
if (timeout := timeout - 1) <= 0:
return None, self.DHT_ERR_TIMEOUT
self._wait_pulse(0, 80)
self._wait_pulse(1, 80)
data = bytearray(5)
for i in range(40):
if not self._wait_pulse(0, 50):
return None, self.DHT_ERR_TIMEOUT
t = self._wait_pulse(1, 100)
data[i//8] = (data[i//8] << 1) | (0x01 if t > 30 else 0x00)
if data[4] != ((data[0] + data[1] + data[2] + data[3]) & 0xFF):
return data, self.DHT_ERR_CHECKSUM
return data, self.DHT_OK
def _wait_pulse(self, level, max_wait_us):
start = time.ticks_us()
while self.pin() == (not level):
if time.ticks_diff(time.ticks_us(), start) > max_wait_us:
return 0
start = time.ticks_us()
while self.pin() == level:
if time.ticks_diff(time.ticks_us(), start) > max_wait_us:
return 0
return time.ticks_diff(time.ticks_us(), start)
def read(self):
if time.ticks_diff(time.ticks_ms(), self.last_read) < self.min_interval:
time.sleep_ms(self.min_interval - time.ticks_diff(time.ticks_ms(), self.last_read))
data, err = self._read_raw()
self.last_read = time.ticks_ms()
if err != self.DHT_OK:
return {'temp': None, 'humi': None, 'error': err}
humidity = data[0] + data[1] * 0.1
temperature = data[2] + data[3] * 0.1
return {'temp': temperature, 'humi': humidity, 'error': self.DHT_OK}
# ---- SGP30传感器类 ----
class SGP30:
def __init__(self, i2c):
self.i2c = i2c
self.addr = 0x58
self._send_cmd([0x20, 0x03])
time.sleep_ms(500)
def measure_air_quality(self):
self._send_cmd([0x20, 0x08])
time.sleep_ms(12)
data = self.i2c.readfrom(self.addr, 6)
return self._parse_measurement(data)
def _crc8(self, data):
crc = 0xFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x80:
crc = (crc << 1) ^ 0x31
else:
crc <<= 1
crc &= 0xFF
return crc
def _parse_measurement(self, data):
co2 = (data[0] << 8) | data[1]
crc_co2 = data[2]
tvoc = (data[3] << 8) | data[4]
crc_tvoc = data[5]
if self._crc8(bytes(data[0:2])) != crc_co2:
co2 = None
if self._crc8(bytes(data[3:5])) != crc_tvoc:
tvoc = None
return (co2, tvoc)
def _send_cmd(self, cmd):
self.i2c.writeto(self.addr, bytes(cmd))
# ---- 初始化硬件 ----
# 摄像头初始化
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QQVGA)
sensor.set_auto_exposure(False, exposure_us=30000)
sensor.set_auto_gain(False)
sensor.set_auto_whitebal(False)
sensor.skip_frames(time=5000)
# SGP30初始化
i2c = I2C(scl='P4', sda='P5', freq=100000)
sgp30 = SGP30(i2c)
# DHT11初始化(新增)
dht = DHT11(pin="P7") # 使用P7引脚
# ---- 网络配置 ----
SSID = "xiaotang"
PASSWORD = "12345678"
target_ip = "192.168.75.220"
# 图像传输配置
sock_img = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
img_port = 8080
MAX_CHUNK_SIZE = 1000
# 传感器数据配置
sock_sgp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sgp_port = 8081
last_sgp_time = time.ticks_ms()
SGP_INTERVAL = 2000 # 2秒采样间隔
# DHT数据传输配置
sock_dht = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
dht_port = 8082 # 新增8082端口
last_dht_time = time.ticks_ms()
# ---- WiFi连接 ----
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(SSID, PASSWORD)
start_time = time.ticks_ms()
while not wlan.isconnected():
if time.ticks_diff(time.ticks_ms(), start_time) > 10000:
raise RuntimeError("WiFi连接超时")
print("连接中...")
time.sleep_ms(500)
print("IP:", wlan.ifconfig()[0])
# ---- 主循环 ----
while True:
try:
# 处理sgp30传感器数据
current_time = time.ticks_ms()
if time.ticks_diff(current_time, last_sgp_time) >= SGP_INTERVAL:
co2, tvoc = sgp30.measure_air_quality()
data = "CO2: {}, TVOC: {}".format(co2 if co2 is not None else "NaN",
tvoc if tvoc is not None else "NaN")
sock_sgp.sendto(data.encode(), (target_ip, sgp_port))
last_sgp_time = current_time
print("传感器数据已发送:", data)
# 新增DHT11处理逻辑
#current_time = time.ticks_ms()
if time.ticks_diff(current_time, last_dht_time) >= 5000:
dht_data = dht.read()
temp = dht_data['temp'] if dht_data['error'] == DHT11.DHT_OK else "NaN"
humi = dht_data['humi'] if dht_data['error'] == DHT11.DHT_OK else "NaN"
dht_msg = f"DHT: {temp:.1f}C, {humi:.1f}%".encode()
sock_dht.sendto(dht_msg, (target_ip, dht_port))
last_dht_time = current_time
print("DHT数据已发送:", dht_msg)
# 处理图像数据
img = sensor.snapshot()
img_bytes = img.compress(quality=50)
if img_bytes:
img_bytes = bytes(img_bytes)
for i in range(0, len(img_bytes), MAX_CHUNK_SIZE):
sock_img.sendto(img_bytes[i:i+MAX_CHUNK_SIZE], (target_ip, img_port))
sock_img.sendto(b"END", (target_ip, img_port))
print("图像帧已发送,大小:", len(img_bytes))
else:
print("图像压缩失败")
except Exception as e:
print("运行异常:", e)
# 网络重连
wlan.disconnect()
wlan.connect(SSID, PASSWORD)
while not wlan.isconnected():
time.sleep_ms(500)
sock_img = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_sgp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_dht = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print("网络已重启")
# 理论上不会执行到这里
sock_img.close()
sock_sgp.close()
sock_dht.close()