| import threading |
| from collections import deque |
| import gradio as gr |
| import paho.mqtt.client as mqtt |
| import json |
| import pandas as pd |
| from datetime import datetime |
| import plotly.express as px |
|
|
| |
| data_buffer = deque(maxlen=100) |
|
|
| |
| client = mqtt.Client() |
| MQTT_BROKER = "b6bdb89571144b3d8e5ca4bbe666ddb5.s1.eu.hivemq.cloud" |
| MQTT_PORT = 8883 |
| MQTT_TOPIC = "sensors/bme680/data" |
| MQTT_USERNAME = "LuthiraMQ" |
| MQTT_PASSWORD = "jLVx8y9v83gmgERTr0AP" |
|
|
| |
| connection_status = "β Not connected" |
| data_status = "β Waiting for data..." |
|
|
| |
| status_lock = threading.Lock() |
|
|
| def get_status(): |
| """ |
| Returns the current status of the MQTT connection and data reception. |
| """ |
| with status_lock: |
| return f"**MQTT Connection:** {connection_status}\n**Data Status:** {data_status}" |
|
|
| def update_data_status(new_status): |
| """ |
| Safely updates the data reception status. |
| """ |
| global data_status |
| with status_lock: |
| data_status = new_status |
|
|
| def on_connect(client, userdata, flags, rc): |
| """ |
| Callback for MQTT connection events. |
| """ |
| global connection_status |
| if rc == 0: |
| client.subscribe(MQTT_TOPIC) |
| connection_status = "β
Connected" |
| else: |
| connection_status = f"β Connection failed (code {rc})" |
|
|
| def on_message(client, userdata, msg): |
| """ |
| Callback for processing incoming MQTT messages. |
| Expects JSON payload with keys: "temperature", "humidity", "pressure", "gas". |
| """ |
| try: |
| payload = json.loads(msg.payload.decode()) |
| |
| temperature = float(payload.get("temperature", 0)) |
| humidity = float(payload.get("humidity", 0)) |
| pressure = float(payload.get("pressure", 0)) |
| gas = float(payload.get("gas", 0)) |
| |
| dt = datetime.now() |
| data_buffer.append({ |
| 'timestamp': dt, |
| 'temperature': temperature, |
| 'humidity': humidity, |
| 'pressure': pressure, |
| 'gas': gas |
| }) |
| update_data_status("β
Receiving data") |
| reset_inactivity_timer() |
| except Exception as e: |
| print(f"Error processing message: {e}") |
|
|
| def connect_mqtt(broker, username, password): |
| """ |
| Connects to the MQTT broker using provided credentials. |
| """ |
| global MQTT_BROKER, MQTT_USERNAME, MQTT_PASSWORD, connection_status |
| try: |
| MQTT_BROKER = broker |
| MQTT_USERNAME = username |
| MQTT_PASSWORD = password |
| |
| client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) |
| |
| if not client._ssl: |
| client.tls_set() |
| |
| client.on_connect = on_connect |
| client.on_message = on_message |
| client.connect(MQTT_BROKER, MQTT_PORT, 60) |
| client.loop_start() |
| return "Connection initiated..." |
| except Exception as e: |
| connection_status = f"β Connection error: {str(e)}" |
| return f"Connection error: {str(e)}" |
|
|
| |
| inactivity_timer = None |
|
|
| def reset_inactivity_timer(): |
| """ |
| Resets the inactivity timer to monitor data reception. |
| """ |
| global inactivity_timer |
| if inactivity_timer is not None: |
| inactivity_timer.cancel() |
| inactivity_timer = threading.Timer(5.0, on_inactivity) |
| inactivity_timer.start() |
|
|
| def on_inactivity(): |
| """ |
| Callback for when no data is received within the timeout period. |
| """ |
| update_data_status("β Waiting for data...") |
|
|
| def update_graph(): |
| """ |
| Returns a Plotly figure showing Temperature over time from the data buffer. |
| """ |
| if not data_buffer: |
| fig = px.line(title="Waiting for Data...") |
| else: |
| df = pd.DataFrame(list(data_buffer)) |
| fig = px.line(df, x='timestamp', y='temperature', title="Temperature Over Time") |
| fig.update_layout( |
| xaxis=dict( |
| type="date", |
| tickformat="%H:%M:%S", |
| title="Time" |
| ), |
| yaxis=dict(title="Temperature (Β°C)") |
| ) |
| return fig |
|
|
| def update_humidity_graph(): |
| """ |
| Returns a Plotly figure showing Humidity over time from the data buffer. |
| """ |
| if not data_buffer: |
| fig = px.line(title="Waiting for Data...") |
| else: |
| df = pd.DataFrame(list(data_buffer)) |
| fig = px.line(df, x='timestamp', y='humidity', title="Humidity Over Time") |
| fig.update_layout( |
| xaxis=dict( |
| type="date", |
| tickformat="%H:%M:%S", |
| title="Time" |
| ), |
| yaxis=dict(title="Humidity (%)") |
| ) |
| return fig |
|
|
| def update_metrics(): |
| """ |
| Returns a Markdown string with the latest sensor readings. |
| """ |
| if not data_buffer: |
| return "No sensor data available yet." |
| df = pd.DataFrame(list(data_buffer)) |
| latest = df.iloc[-1] |
| return ( |
| f"**Latest Sensor Readings:** \n" |
| f"- Temperature: {latest['temperature']:.2f} Β°C \n" |
| f"- Humidity: {latest['humidity']:.2f} % \n" |
| f"- Pressure: {latest['pressure']:.2f} hPa \n" |
| f"- Gas: {latest['gas']:.2f} ohms \n\n" |
| f"{get_status()}" |
| ) |
|
|
| |
| with gr.Blocks() as app: |
| gr.Markdown("# Temperature Sensor Monitor") |
| gr.Markdown("## Connect to MQTT") |
| |
| with gr.Row(): |
| broker_input = gr.Textbox( |
| label="MQTT Broker", |
| value="b6bdb89571144b3d8e5ca4bbe666ddb5.s1.eu.hivemq.cloud" |
| ) |
| username_input = gr.Textbox( |
| label="MQTT Username", |
| value="LuthiraMQ" |
| ) |
| password_input = gr.Textbox( |
| label="MQTT Password", |
| value="jLVx8y9v83gmgERTr0AP", |
| type="password" |
| ) |
| |
| connect_button = gr.Button("Connect") |
| connection_status_md = gr.Markdown(get_status, every=10) |
| |
| gr.Markdown("### Temperature Graph") |
| temp_plot = gr.Plot(update_graph, every=2.5) |
| |
| gr.Markdown("### Humidity Graph") |
| humidity_plot = gr.Plot(update_humidity_graph, every=2.5) |
| |
| gr.Markdown("### Latest Sensor Readings") |
| sensor_metrics_md = gr.Markdown(update_metrics, every=2.5) |
| |
| connect_button.click( |
| fn=connect_mqtt, |
| inputs=[broker_input, username_input, password_input], |
| outputs=[connection_status_md] |
| ) |
|
|
| if __name__ == "__main__": |
| app.launch() |
|
|