#所有的AT指令都在串口助手中发送个esp8266-01s完成过测试,都是可以用的。
#可以完成初始化,但无法完成发送数据的功能
ESP8266.py:
import pyb
from pyb import UART
import time
class ESP8266MQTT:
def init(self, uart_port=3, baudrate=115200):
self.uart = UART(uart_port, baudrate)
self.last_send_time = 0
def clear_uart_buffer(self):
"""清空串口接收缓冲区"""
while self.uart.any():
self.uart.read()
def send_at_command(self, command, expected="OK", timeout=10000): # 增加超时时间到10秒
"""
发送AT指令并检查响应
:param command: AT指令字符串
:param expected: 期望的响应关键字
:param timeout: 超时时间(毫秒)
:return: True表示成功,False表示失败
"""
self.uart.write(command + "\r\n")
start_time = pyb.millis()
response = b"" # 使用字节类型存储响应
while pyb.elapsed_millis(start_time) < timeout:
if self.uart.any():
# 读取所有可用数据
data = self.uart.read(self.uart.any())
if data:
response += data
# 打印原始字节数据(调试用)
print("Raw response:", data)
# 尝试解码为字符串
try:
decoded_str = data.decode('utf-8', errors='ignore').strip()
if decoded_str:
print("<-", decoded_str)
# 检查期望响应
if expected in decoded_str:
return True
if "ERROR" in decoded_str or "FAIL" in decoded_str:
return False
except:
# 即使解码失败,也检查字节序列中是否包含期望响应
if expected.encode('utf-8') in data:
return True
if b"ERROR" in data or b"FAIL" in data:
return False
pyb.delay(100) # 增加延迟减少CPU占用
# 最终检查整个响应
try:
full_response = response.decode('utf-8', errors='ignore')
if expected in full_response:
return True
except:
pass
print("Timeout waiting for response")
return False
def init_esp8266(self):
"""初始化ESP8266模块"""
print("Initializing ESP8266...")
self.clear_uart_buffer()
# 增加复位后的延迟
print("Sending reset command...")
self.uart.write("AT+RST\r\n")
pyb.delay(5000) # 等待5秒让模块重启
# 清空启动过程中的输出
self.clear_uart_buffer()
pyb.delay(1000)
# 使用元组存储指令配置
commands = [
("ATE0", "OK", 2000), # 禁用回显
("AT+CWMODE=1", "OK", 3000),
("AT+CWDHCP=1,1", "OK", 3000),
('AT+CWJAP="xuhan","12345678"', "GOT IP", 10000), # 连接WiFi需要更长时间
(
'AT+MQTTUSERCFG=0,1,"ESP8266","jX4NF30ETx",'
'"version=2018-10-31&res=products%2FjX4NF30ETx%2Fdevices%2FESP8266'
'&et=1956499200&method=sha1&sign=fRnCjL7FCoaoXPCY51aQU%2Fho95E%3D",0,0,""',
"OK", 5000
),
('AT+MQTTCONN=0,"mqtts.heclouds.com",1883,1', "MQTTCONNECTED", 10000),
('AT+MQTTSUB=0,"$sys/jX4NF30ETx/ESP8266/thing/property/post/reply",1', "OK", 3000),
('AT+MQTTSUB=0,"$sys/jX4NF30ETx/ESP8266/thing/property/set",1', "OK", 3000)
]
for cmd, expected, tmo in commands:
print("->", cmd)
if not self.send_at_command(cmd, expected, tmo):
print(f"Failed: {cmd}")
return False
pyb.delay(500) # 增加指令间延迟
print("ESP8266 initialized successfully")
return True
def send_data(self, empty_space, occupied_space, plate_number, province_number):
"""发送数据到MQTT服务器"""
# 使用三重引号字符串和显式格式化,避免转义问题
command_template = (
'AT+MQTTPUB=0,"$sys/jX4NF30ETx/ESP8266/thing/property/post",'
'"{{\\"id\\":\\"123\\",\\"params\\":{{\\"{key}\\":{{\\"value\\":{value}}}}}}",0,0'
)
commands = [
command_template.format(
key="DetectEmptySpace",
value=empty_space
),
command_template.format(
key="DetectOccupiedSpace",
value=occupied_space
),
command_template.format(
key="CarPlateNumber",
value='\\"{}\\"'.format(plate_number) # 字符串值需要额外转义
),
command_template.format(
key="ProvinceNumber",
value=province_number
)
]
for cmd in commands:
print("->", cmd)
if not self.send_at_command(cmd, "OK", 5000):
print("Data send failed")
return False
pyb.delay(500) # 增加指令间延迟
return True
main.py:
import pyb
import time
from ESP8266 import ESP8266MQTT
创建ESP8266MQTT实例
esp = ESP8266MQTT(uart_port=3, baudrate=115200)
初始化ESP8266
init_success = False
for i in range(3): # 最多尝试3次
if esp.init_esp8266():
print("ESP8266 initialized successfully")
init_success = True
break
else:
print(f"Initialization attempt {i+1} failed")
pyb.delay(3000) # 等待3秒再重试
if not init_success:
print("ESP8266 initialization failed after 3 attempts")
# 这里可以添加错误处理,如重启设备或进入安全模式
创建定时器(4秒周期)
使用整数参数避免类型错误
tim = pyb.Timer(4, prescaler=7999, period=19999) # 4秒周期: (7999+1)*(19999+1)/80,000,000 ≈ 4秒
定时器中断处理函数
def send_data_callback(timer):
try:
print("\nSending sensor data...")
# 使用固定值测试
empty_space = 1
occupied_space = 2
plate_number = "BZ111"
province_number = 6
# 发送数据
if esp.send_data(empty_space, occupied_space, plate_number, province_number):
print("Data sent successfully")
else:
print("Data send failed")
except Exception as e:
print("Error in send_data_callback:", e)
注册定时器回调
tim.callback(send_data_callback)
主循环
print("Entering main loop...")
while True:
pyb.delay(1000)