• OpenMV VSCode 扩展发布了,在插件市场直接搜索OpenMV就可以安装
  • 如果有产品硬件故障问题,比如无法开机,论坛很难解决。可以直接找售后维修
  • 发帖子之前,请确认看过所有的视频教程,https://singtown.com/learn/ 和所有的上手教程http://book.openmv.cc/
  • 每一个新的提问,单独发一个新帖子
  • 帖子需要目的,你要做什么?
  • 如果涉及代码,需要报错提示全部代码文本,请注意不要贴代码图片
  • 必看:玩转星瞳论坛了解一下图片上传,代码格式等问题。
  • openmv用串口驱动esp8266-01s,通过发送AT指令无法完成发送数据的功能?



    • #所有的AT指令都在串口助手中发送个esp8266-01s完成过测试,都是可以用的。
      #可以完成初始化,但无法完成发送数据的功能
      ESP8266.py
      import pyb
      from pyb import UART
      import time

      class ESP8266MQTT:
      def init(self, uart_port=3, baudrate=115200):
      self.uart = UART(uart_port, baudrate)
      self.last_send_time = 0

      def clear_uart_buffer(self):
          """清空串口接收缓冲区"""
          while self.uart.any():
              self.uart.read()
      
      def send_at_command(self, command, expected="OK", timeout=10000):  # 增加超时时间到10秒
          """
          发送AT指令并检查响应
          :param command: AT指令字符串
          :param expected: 期望的响应关键字
          :param timeout: 超时时间(毫秒)
          :return: True表示成功,False表示失败
          """
          self.uart.write(command + "\r\n")
          start_time = pyb.millis()
          response = b""  # 使用字节类型存储响应
      
          while pyb.elapsed_millis(start_time) < timeout:
              if self.uart.any():
                  # 读取所有可用数据
                  data = self.uart.read(self.uart.any())
                  if data:
                      response += data
                      # 打印原始字节数据(调试用)
                      print("Raw response:", data)
      
                      # 尝试解码为字符串
                      try:
                          decoded_str = data.decode('utf-8', errors='ignore').strip()
                          if decoded_str:
                              print("<-", decoded_str)
      
                              # 检查期望响应
                              if expected in decoded_str:
                                  return True
                              if "ERROR" in decoded_str or "FAIL" in decoded_str:
                                  return False
                      except:
                          # 即使解码失败,也检查字节序列中是否包含期望响应
                          if expected.encode('utf-8') in data:
                              return True
                          if b"ERROR" in data or b"FAIL" in data:
                              return False
              pyb.delay(100)  # 增加延迟减少CPU占用
      
          # 最终检查整个响应
          try:
              full_response = response.decode('utf-8', errors='ignore')
              if expected in full_response:
                  return True
          except:
              pass
      
          print("Timeout waiting for response")
          return False
      
      def init_esp8266(self):
          """初始化ESP8266模块"""
          print("Initializing ESP8266...")
          self.clear_uart_buffer()
      
          # 增加复位后的延迟
          print("Sending reset command...")
          self.uart.write("AT+RST\r\n")
          pyb.delay(5000)  # 等待5秒让模块重启
      
          # 清空启动过程中的输出
          self.clear_uart_buffer()
          pyb.delay(1000)
      
          # 使用元组存储指令配置
          commands = [
              ("ATE0", "OK", 2000),  # 禁用回显
              ("AT+CWMODE=1", "OK", 3000),
              ("AT+CWDHCP=1,1", "OK", 3000),
              ('AT+CWJAP="xuhan","12345678"', "GOT IP", 10000),  # 连接WiFi需要更长时间
              (
                  'AT+MQTTUSERCFG=0,1,"ESP8266","jX4NF30ETx",'
                  '"version=2018-10-31&res=products%2FjX4NF30ETx%2Fdevices%2FESP8266'
                  '&et=1956499200&method=sha1&sign=fRnCjL7FCoaoXPCY51aQU%2Fho95E%3D",0,0,""',
                  "OK", 5000
              ),
              ('AT+MQTTCONN=0,"mqtts.heclouds.com",1883,1', "MQTTCONNECTED", 10000),
              ('AT+MQTTSUB=0,"$sys/jX4NF30ETx/ESP8266/thing/property/post/reply",1', "OK", 3000),
              ('AT+MQTTSUB=0,"$sys/jX4NF30ETx/ESP8266/thing/property/set",1', "OK", 3000)
          ]
      
          for cmd, expected, tmo in commands:
              print("->", cmd)
              if not self.send_at_command(cmd, expected, tmo):
                  print(f"Failed: {cmd}")
                  return False
              pyb.delay(500)  # 增加指令间延迟
      
          print("ESP8266 initialized successfully")
          return True
      
      def send_data(self, empty_space, occupied_space, plate_number, province_number):
          """发送数据到MQTT服务器"""
          # 使用三重引号字符串和显式格式化,避免转义问题
          command_template = (
              'AT+MQTTPUB=0,"$sys/jX4NF30ETx/ESP8266/thing/property/post",'
              '"{{\\"id\\":\\"123\\",\\"params\\":{{\\"{key}\\":{{\\"value\\":{value}}}}}}",0,0'
          )
      
          commands = [
              command_template.format(
                  key="DetectEmptySpace",
                  value=empty_space
              ),
              command_template.format(
                  key="DetectOccupiedSpace",
                  value=occupied_space
              ),
              command_template.format(
                  key="CarPlateNumber",
                  value='\\"{}\\"'.format(plate_number)  # 字符串值需要额外转义
              ),
              command_template.format(
                  key="ProvinceNumber",
                  value=province_number
              )
          ]
      
          for cmd in commands:
              print("->", cmd)
              if not self.send_at_command(cmd, "OK", 5000):
                  print("Data send failed")
                  return False
              pyb.delay(500)  # 增加指令间延迟
      
          return True
      

      main.py:
      import pyb
      import time
      from ESP8266 import ESP8266MQTT

      创建ESP8266MQTT实例

      esp = ESP8266MQTT(uart_port=3, baudrate=115200)

      初始化ESP8266

      init_success = False
      for i in range(3): # 最多尝试3次
      if esp.init_esp8266():
      print("ESP8266 initialized successfully")
      init_success = True
      break
      else:
      print(f"Initialization attempt {i+1} failed")
      pyb.delay(3000) # 等待3秒再重试

      if not init_success:
      print("ESP8266 initialization failed after 3 attempts")
      # 这里可以添加错误处理,如重启设备或进入安全模式

      创建定时器(4秒周期)

      使用整数参数避免类型错误

      tim = pyb.Timer(4, prescaler=7999, period=19999) # 4秒周期: (7999+1)*(19999+1)/80,000,000 ≈ 4秒

      定时器中断处理函数

      def send_data_callback(timer):
      try:
      print("\nSending sensor data...")

          # 使用固定值测试
          empty_space = 1
          occupied_space = 2
          plate_number = "BZ111"
          province_number = 6
      
          # 发送数据
          if esp.send_data(empty_space, occupied_space, plate_number, province_number):
              print("Data sent successfully")
          else:
              print("Data send failed")
      
      except Exception as e:
          print("Error in send_data_callback:", e)
      

      注册定时器回调

      tim.callback(send_data_callback)

      主循环

      print("Entering main loop...")
      while True:
      pyb.delay(1000)