openmv用串口驱动esp8266-01s,通过发送AT指令无法完成发送数据的功能?
-
#所有的AT指令都在串口助手中发送个esp8266-01s完成过测试,都是可以用的。
#可以完成初始化,但无法完成发送数据的功能
ESP8266.py:
import pyb
from pyb import UART
import timeclass ESP8266MQTT:
def init(self, uart_port=3, baudrate=115200):
self.uart = UART(uart_port, baudrate)
self.last_send_time = 0def clear_uart_buffer(self): """清空串口接收缓冲区""" while self.uart.any(): self.uart.read() def send_at_command(self, command, expected="OK", timeout=10000): # 增加超时时间到10秒 """ 发送AT指令并检查响应 :param command: AT指令字符串 :param expected: 期望的响应关键字 :param timeout: 超时时间(毫秒) :return: True表示成功,False表示失败 """ self.uart.write(command + "\r\n") start_time = pyb.millis() response = b"" # 使用字节类型存储响应 while pyb.elapsed_millis(start_time) < timeout: if self.uart.any(): # 读取所有可用数据 data = self.uart.read(self.uart.any()) if data: response += data # 打印原始字节数据(调试用) print("Raw response:", data) # 尝试解码为字符串 try: decoded_str = data.decode('utf-8', errors='ignore').strip() if decoded_str: print("<-", decoded_str) # 检查期望响应 if expected in decoded_str: return True if "ERROR" in decoded_str or "FAIL" in decoded_str: return False except: # 即使解码失败,也检查字节序列中是否包含期望响应 if expected.encode('utf-8') in data: return True if b"ERROR" in data or b"FAIL" in data: return False pyb.delay(100) # 增加延迟减少CPU占用 # 最终检查整个响应 try: full_response = response.decode('utf-8', errors='ignore') if expected in full_response: return True except: pass print("Timeout waiting for response") return False def init_esp8266(self): """初始化ESP8266模块""" print("Initializing ESP8266...") self.clear_uart_buffer() # 增加复位后的延迟 print("Sending reset command...") self.uart.write("AT+RST\r\n") pyb.delay(5000) # 等待5秒让模块重启 # 清空启动过程中的输出 self.clear_uart_buffer() pyb.delay(1000) # 使用元组存储指令配置 commands = [ ("ATE0", "OK", 2000), # 禁用回显 ("AT+CWMODE=1", "OK", 3000), ("AT+CWDHCP=1,1", "OK", 3000), ('AT+CWJAP="xuhan","12345678"', "GOT IP", 10000), # 连接WiFi需要更长时间 ( 'AT+MQTTUSERCFG=0,1,"ESP8266","jX4NF30ETx",' '"version=2018-10-31&res=products%2FjX4NF30ETx%2Fdevices%2FESP8266' '&et=1956499200&method=sha1&sign=fRnCjL7FCoaoXPCY51aQU%2Fho95E%3D",0,0,""', "OK", 5000 ), ('AT+MQTTCONN=0,"mqtts.heclouds.com",1883,1', "MQTTCONNECTED", 10000), ('AT+MQTTSUB=0,"$sys/jX4NF30ETx/ESP8266/thing/property/post/reply",1', "OK", 3000), ('AT+MQTTSUB=0,"$sys/jX4NF30ETx/ESP8266/thing/property/set",1', "OK", 3000) ] for cmd, expected, tmo in commands: print("->", cmd) if not self.send_at_command(cmd, expected, tmo): print(f"Failed: {cmd}") return False pyb.delay(500) # 增加指令间延迟 print("ESP8266 initialized successfully") return True def send_data(self, empty_space, occupied_space, plate_number, province_number): """发送数据到MQTT服务器""" # 使用三重引号字符串和显式格式化,避免转义问题 command_template = ( 'AT+MQTTPUB=0,"$sys/jX4NF30ETx/ESP8266/thing/property/post",' '"{{\\"id\\":\\"123\\",\\"params\\":{{\\"{key}\\":{{\\"value\\":{value}}}}}}",0,0' ) commands = [ command_template.format( key="DetectEmptySpace", value=empty_space ), command_template.format( key="DetectOccupiedSpace", value=occupied_space ), command_template.format( key="CarPlateNumber", value='\\"{}\\"'.format(plate_number) # 字符串值需要额外转义 ), command_template.format( key="ProvinceNumber", value=province_number ) ] for cmd in commands: print("->", cmd) if not self.send_at_command(cmd, "OK", 5000): print("Data send failed") return False pyb.delay(500) # 增加指令间延迟 return True
main.py:
import pyb
import time
from ESP8266 import ESP8266MQTT创建ESP8266MQTT实例
esp = ESP8266MQTT(uart_port=3, baudrate=115200)
初始化ESP8266
init_success = False
for i in range(3): # 最多尝试3次
if esp.init_esp8266():
print("ESP8266 initialized successfully")
init_success = True
break
else:
print(f"Initialization attempt {i+1} failed")
pyb.delay(3000) # 等待3秒再重试if not init_success:
print("ESP8266 initialization failed after 3 attempts")
# 这里可以添加错误处理,如重启设备或进入安全模式创建定时器(4秒周期)
使用整数参数避免类型错误
tim = pyb.Timer(4, prescaler=7999, period=19999) # 4秒周期: (7999+1)*(19999+1)/80,000,000 ≈ 4秒
定时器中断处理函数
def send_data_callback(timer):
try:
print("\nSending sensor data...")# 使用固定值测试 empty_space = 1 occupied_space = 2 plate_number = "BZ111" province_number = 6 # 发送数据 if esp.send_data(empty_space, occupied_space, plate_number, province_number): print("Data sent successfully") else: print("Data send failed") except Exception as e: print("Error in send_data_callback:", e)
注册定时器回调
tim.callback(send_data_callback)
主循环
print("Entering main loop...")
while True:
pyb.delay(1000)