作者 by 超米 / 2025-04-30 / 暂无评论 / 1 个足迹
import paho.mqtt.client as mqtt
import time
import os
import base64
from picamera import PiCamera
import Adafruit_DHT
import RPi.GPIO as GPIO
# MQTT 配置
MQTT_BROKER = "47.92.216.73"
MQTT_PORT = 1883
TOPIC_DATA = "dht11"
TOPIC_CAMERA = "camera"
TOPIC_CONTROL = "set_led" # 新增控制LED的MQTT主题
# DHT11 配置
DHT_SENSOR = Adafruit_DHT.DHT11
DHT_PIN = 4 # 根据实际连接的 GPIO 引脚修改
# 摄像头配置
CAMERA_RESOLUTION = (640, 480)
CAMERA_INTERVAL = 10 # 每 10 秒拍摄一次
# LED 配置
LED_PIN = 17 # 根据实际连接的 GPIO 引脚修改
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
client.subscribe(TOPIC_CONTROL) # 订阅控制LED的主题
else:
print("Failed to connect, return code %d\n", rc)
def on_message(client, userdata, message):
if message.topic == TOPIC_CONTROL:
led_state = message.payload.decode("utf-8")
if led_state == "1":
GPIO.output(LED_PIN, GPIO.HIGH)
print("LED turned ON")
elif led_state == "0":
GPIO.output(LED_PIN, GPIO.LOW)
print("LED turned OFF")
def capture_image():
camera = PiCamera()
camera.resolution = CAMERA_RESOLUTION
camera.start_preview()
time.sleep(2) # 预热相机
image_path = "/tmp/camera_image.jpg"
camera.capture(image_path)
camera.stop_preview()
camera.close()
return image_path
def main():
GPIO.setmode(GPIO.BCM)
GPIO.setup(LED_PIN, GPIO.OUT) # 初始化LED引脚
GPIO.output(LED_PIN, GPIO.LOW) # 默认关闭LED
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.loop_start()
try:
while True:
# 获取温湿度数据
humidity, temperature = Adafruit_DHT.read_retry(DHT_SENSOR, DHT_PIN)
if humidity is not None and temperature is not None:
data = f"温度: {temperature:.1f}°C, 湿度: {humidity:.1f}%"
client.publish(TOPIC_DATA, data)
print(f"Published: {data}")
else:
print("Failed to retrieve data from DHT sensor")
# 拍摄图片并发送
image_path = capture_image()
with open(image_path, "rb") as image_file:
encoded_image = base64.b64encode(image_file.read()).decode("utf-8")
client.publish(TOPIC_CAMERA, encoded_image)
print("Published camera image")
time.sleep(CAMERA_INTERVAL)
except KeyboardInterrupt:
print("Exiting...")
finally:
GPIO.cleanup() # 清理GPIO设置
client.loop_stop()
client.disconnect()
if __name__ == "__main__":
main()
独特见解