Subscriber MQTT Forge IoT¶
Statut : implémentation initiale du subscriber, branchée sur paho-mqtt. Le module produit des objets
Measurementparsés ; il n'écrit encore rien en base et n'expose aucune route HTTP — ces étapes appartiennent aux tickets suivants (IOT-STORAGE-EVENTS-001,IOT-HTTP-API-001).
Objectif¶
Brancher Forge sur un broker MQTT pour recevoir les messages des capteurs, strictement selon le contrat figé par IOT-MQTT-CONTRACT-001, et délivrer chaque mesure valide à un callback applicatif.
Architecture du module¶
Le code est volontairement scindé en deux modules :
| Module | Rôle | Dépend de paho-mqtt ? |
|---|---|---|
forge_mvc_iot.mqtt.contract |
parsing et validation pure (topic + payload), construction de Measurement |
non — testable hors ligne |
forge_mvc_iot.mqtt.subscriber |
pont vers paho.mqtt.client.Client, callbacks, abonnement, cycle de vie |
oui |
Cette séparation permet de :
- tester l'intégralité du contrat sans installer ou simuler un broker ;
- réutiliser
parse_messageailleurs (par exemple dans un test d'API HTTP qui injecte directement une mesure) ; - changer plus tard de bibliothèque MQTT sans toucher au contrat.
API publique¶
contract.parse_message(topic, payload)¶
Parse un message complet et retourne une Measurement.
from forge_mvc_iot.mqtt.contract import parse_message
m = parse_message(
"forge/atelier/esp32-001/telemetry",
b'{"kind":"temperature","value":22.4,"unit":"°C",'
b'"timestamp":"2026-05-28T10:00:00Z"}',
)
# Measurement(site='atelier', device_id='esp32-001', kind='temperature',
# value=22.4, unit='°C', timestamp='2026-05-28T10:00:00Z',
# metadata=None)
payload accepte str ou bytes.
contract.Measurement¶
Dataclass immuable (frozen=True) :
@dataclass(frozen=True)
class Measurement:
site: str
device_id: str
kind: str
value: int | float
unit: str
timestamp: str # ISO 8601 UTC, suffixe Z
metadata: dict[str, str] | None
Le timestamp reste en chaîne ISO pour rester proche du payload reçu :
les consommateurs convertissent en datetime quand ils en ont besoin.
contract.ContractError¶
Exception levée pour toute violation du contrat. Porte un attribut
code issu de la taxonomie stable :
| Constante | Valeur |
|---|---|
CODE_TOPIC_PATTERN |
"TOPIC_PATTERN" |
CODE_PAYLOAD_PARSE |
"PAYLOAD_PARSE" |
CODE_PAYLOAD_FIELD_MISSING |
"PAYLOAD_FIELD_MISSING" |
CODE_PAYLOAD_FIELD_TYPE |
"PAYLOAD_FIELD_TYPE" |
CODE_PAYLOAD_VALUE_FORMAT |
"PAYLOAD_VALUE_FORMAT" |
Ces codes sont identiques à ceux documentés dans Contrat MQTT — Erreurs.
from forge_mvc_iot.mqtt.contract import ContractError, parse_message
try:
parse_message("bad", b"{}")
except ContractError as exc:
print(exc.code) # 'TOPIC_PATTERN'
print(exc.message) # 'Topic invalide : ...'
subscriber.MqttSubscriber¶
Pont avec paho-mqtt.
from forge_mvc_iot.config import load_iot_config
from forge_mvc_iot.mqtt.subscriber import MqttSubscriber
cfg = load_iot_config() # lit os.environ
def on_measurement(m):
print(f"{m.site}/{m.device_id} → {m.kind}={m.value}{m.unit}")
sub = MqttSubscriber(cfg, on_measurement=on_measurement)
sub.connect()
sub.loop_forever()
Paramètres du constructeur¶
| Paramètre | Type | Rôle |
|---|---|---|
config |
IotConfig |
configuration chargée par load_iot_config |
on_measurement |
Callable[[Measurement], None] |
obligatoire — appelé pour chaque mesure valide |
on_contract_error |
Callable[[ContractError, str, bytes], None] \| None |
optionnel — appelé pour chaque violation de contrat. Si None, l'erreur est uniquement loguée. |
client_factory |
Callable[[IotConfig], Any] \| None |
optionnel — fabrique de client MQTT, utile pour les tests. Par défaut, instancie un paho.mqtt.client.Client configuré pour CallbackAPIVersion.VERSION2. |
Méthodes de cycle de vie¶
| Méthode | Effet |
|---|---|
connect() |
ouvre la connexion vers le broker |
disconnect() |
ferme la connexion |
loop_forever() |
boucle réseau bloquante |
loop_start() |
boucle réseau en thread séparé |
loop_stop() |
arrête la boucle démarrée par loop_start |
handle_message(topic, payload) |
point d'entrée testable — parse + dispatch |
Exemple complet — script de réception¶
import logging
from forge_mvc_iot.config import load_iot_config
from forge_mvc_iot.mqtt.contract import ContractError, Measurement
from forge_mvc_iot.mqtt.subscriber import MqttSubscriber
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("forge-iot-demo")
def on_measurement(m: Measurement) -> None:
log.info(
"%s/%s — %s = %s %s @ %s",
m.site, m.device_id, m.kind, m.value, m.unit, m.timestamp,
)
def on_contract_error(exc: ContractError, topic: str, payload: bytes) -> None:
log.warning("Refusé [%s] sur %r : %s", exc.code, topic, exc.message)
def main() -> None:
cfg = load_iot_config()
sub = MqttSubscriber(cfg, on_measurement, on_contract_error)
sub.connect()
sub.loop_forever()
if __name__ == "__main__":
main()
Avec un Mosquitto local qui tourne, on peut publier depuis un autre terminal :
mosquitto_pub \
-h localhost \
-t forge/atelier/esp32-001/telemetry \
-m '{"kind":"temperature","value":22.4,"unit":"°C","timestamp":"2026-05-28T10:00:00Z"}'
Le script affiche :
Tester sans broker¶
Tout le contrat est testable sans paho :
from forge_mvc_iot.mqtt.contract import parse_message
m = parse_message("forge/atelier/esp32-001/telemetry", b'{...}')
assert m.kind == "temperature"
Le subscriber lui-même est testable en injectant un client_factory :
from unittest.mock import MagicMock
from forge_mvc_iot.config import IotConfig
from forge_mvc_iot.mqtt.subscriber import MqttSubscriber
cfg = IotConfig(
mqtt_host="localhost", mqtt_port=1883,
mqtt_topic="forge/+/+/telemetry",
mqtt_client_id="t",
mqtt_username=None, mqtt_password=None,
)
sub = MqttSubscriber(
cfg,
on_measurement=lambda m: print(m),
client_factory=lambda c: MagicMock(),
)
sub.handle_message(
"forge/atelier/esp32-001/telemetry",
b'{"kind":"temperature","value":22.4,"unit":"°C","timestamp":"2026-05-28T10:00:00Z"}',
)
C'est le pattern utilisé par la suite de tests
tests/test_iot_mqtt_subscriber_001.py.
Comportement face aux erreurs¶
Pour chaque message reçu, le subscriber tente
parse_message(topic, payload) :
- Succès →
on_measurement(Measurement)est appelé. ContractError:- un message
WARNINGest logué surforge_mvc_iot.mqtt.subscriberavec le code d'erreur et le topic en cause ; - si
on_contract_errorest fourni, il est appelé avec(exc, topic, payload); - le subscriber ne se déconnecte pas — un message invalide ne casse pas la session MQTT.
Les erreurs de connexion broker (broker injoignable, identifiants
refusés) restent gérées par paho-mqtt. La reconnexion automatique se
fait via loop_forever() / loop_start(). Un raffinement (backoff,
politique de reconnexion personnalisée) sera abordé dans
IOT-DOCTOR-001.
Décisions verrouillées¶
siteetdevice_idviennent du topic. Tout champsiteoudevice_idprésent dans le payload est ignoré silencieusement (couvert par les tests).- Codes d'erreur stables. La taxonomie est figée — un futur test
du subscriber peut s'appuyer sur
exc.code == "PAYLOAD_PARSE"sans craindre un renommage. - Pas d'imports paho dans
contract.py. La séparation est testée :parse_messagereste utilisable même si paho-mqtt n'est pas installé.
Hors périmètre de ce ticket¶
- pas de stockage SQL des mesures (futur
IOT-STORAGE-EVENTS-001) ; - pas de migration ;
- pas de route HTTP exposant les mesures (futur
IOT-HTTP-API-001) ; - pas de commande
forge iot:*(futurIOT-DOCTOR-001) ; - pas de dashboard, pas de Forge Design ;
- pas de TLS / ACL Mosquitto ;
- pas de downlink Forge → capteur.
Voir Architecture Forge IoT pour la liste complète des jalons.