micpy init

This commit is contained in:
derlole
2025-04-23 12:50:14 +00:00
parent cd7d958d87
commit fce872b7fe
3 changed files with 91 additions and 39 deletions

View File

@@ -0,0 +1,45 @@
import network
import time
import gc
gc.collect()
# (SSID: Passwort)
known_networks = {
"Vodafone-9454": "AchXHta93YCTgC3M",
"no": "",
"placeholder1": "placeholder1",
"placeholder2": "placeholder2"
}
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
# Suche nach verfügbaren Netzwerken
print("Scanne nach WLAN-Netzwerken...")
available_networks = wlan.scan()
print(f"Gefundene Netzwerke: {(available_networks)}")
for net in available_networks:
ssid = net[0].decode('utf-8') if isinstance(net[0], bytes) else net[0]
if ssid in known_networks:
password = known_networks[ssid]
print(f"Versuche Verbindung mit '{ssid}'...")
wlan.connect(ssid, password)
while not wlan.isconnected():
time.sleep(1)
print(".", end="")
if wlan.isconnected():
print(f"\nVerbunden mit '{ssid}'")
print("IP-Adresse:", wlan.ifconfig()[0])
return True
else:
print(f"\n❌ Verbindung mit '{ssid}' fehlgeschlagen.")
print("Kein bekanntes Netzwerk verfügbar oder Verbindung fehlgeschlagen.")
return False
# Verbindung herstellen beim Boot
connect_wifi()

View File

@@ -0,0 +1,44 @@
import urequests #this is not an issue, because micropython has its own urequests module already
import json
import socket
# one time dns resolve, damit der arme ESP und nicht wegkocht.
def resolve_ip(hostname):
try:
print(f"🌐 Resolvieren von '{hostname}' ...")
addr_info = socket.getaddrinfo(hostname, 80)
ip = addr_info[0][4][0]
print(f"🔎 IP-Adresse gefunden: {ip}")
return ip
except Exception as e:
print("❌ Fehler beim Resolvieren:", e)
return None
# beispiel-fetch von der hauptadresse (muss evtl in ne roop und nen thread)
def fetch_json(ip, host, path):
url = f"http://{ip}{path}"
try:
print(f"Sende Anfrage an {url} mit Host '{host}' ...")
headers = {"Host": host}
response = urequests.get(url, headers=headers)
if response.status_code == 200:
print("✅ Antwort erhalten:")
print(response.text)
data = json.loads(response.text)
print("📦 JSON geladen:", data)
else:
print(f"❌ Fehler: Status-Code {response.status_code}")
response.close()
except Exception as e:
print("⚠️ Fehler bei der Anfrage (äußerer Block):", e)
# --- Hauptteil ---
host = "lires.de"
path = "/unsecure/esp/"
ip = resolve_ip(host)
if ip:
fetch_json(ip, host, path)

View File

@@ -1,44 +1,9 @@
# import eventlet
# eventlet.monkey_patch() # Das sollte ganz oben sein, um sicherzustellen, dass eventlet alles patcht
# from flask import Flask, render_template
# from flask_socketio import SocketIO
# import paho.mqtt.client as mqtt
# from routes.unsecure_routes import unsecure
# app = Flask(__name__, static_url_path='/unsecure/static')
# socketio = SocketIO(app, cors_allowed_origins="*")
# # MQTT Setup
# mqtt_client = mqtt.Client(protocol=mqtt.MQTTv5)
# def on_mqtt_message(client, userdata, msg):
# payload = msg.payload.decode()
# print(f"[MQTT] {msg.topic}: {payload}")
# # Sende die erhaltenen MQTT-Daten über WebSocket an alle verbundenen Clients
# socketio.emit("esp_update", {"topic": msg.topic, "payload": payload})
# mqtt_client.on_message = on_mqtt_message
# mqtt_client.connect("localhost", 1883) # MQTT-Broker-Adresse
# mqtt_client.subscribe("lires/esp1/status")
# mqtt_client.loop_start()
# @socketio.on("send_to_esp")
# def handle_send(data):
# mqtt_client.publish("lires/esp1/control", data)
# app.register_blueprint(unsecure)
# if __name__ == '__main__':
# socketio.run(app, host="0.0.0.0", port=3060)
from flask import Flask from flask import Flask
from flask_socketio import SocketIO from flask_socketio import SocketIO
from routes.unsecure_routes import unsecure from routes.unsecure_routes import unsecure
from routes.esp_routes import esp from routes.esp_routes import esp
import threading
import time
app = Flask(__name__, static_url_path='/unsecure/static') app = Flask(__name__, static_url_path='/unsecure/static')
@@ -51,8 +16,6 @@ app.register_blueprint(unsecure)
app.register_blueprint(esp) app.register_blueprint(esp)
# Simuliere regelmäßige Daten-Updates # Simuliere regelmäßige Daten-Updates
import threading
import time
def send_data(): def send_data():
counter = 0 counter = 0