Stockage des événements IoT — contrat SQL¶
Statut : contrat SQL figé, sans branchement base de données. Ce ticket (
IOT-STORAGE-EVENTS-001) définit la table cible, l'ordre canonique des colonnes, et fournit les fonctions pures qui sérialisent uneMeasurementen(sql, params). La migration versionnée et l'insertion réelle sont traitées par les tickets suivants (IOT-STORAGE-MIGRATION-001,IOT-STORAGE-REPOSITORY-001).
Objectif¶
Poser le contrat de stockage avant d'écrire le repository qui
exécute le SQL. Le subscriber de IOT-MQTT-SUBSCRIBER-001 produit déjà
des Measurement valides ; il manquait juste un consommateur officiel
côté storage, factorisable, testable hors base.
Module concerné¶
API exposée :
| Symbole | Type | Rôle |
|---|---|---|
TABLE_NAME |
str |
nom canonique de la table SQL ("iot_events") |
COLUMNS |
tuple[str, ...] |
ordre canonique des colonnes (sans id) |
INSERT_IOT_EVENT_SQL |
str |
requête INSERT avec placeholders ? |
serialize_measurement_for_storage(measurement, *, received_at=None) |
dict[str, object] |
dict prêt à insertion |
build_insert_iot_event_sql(measurement, *, received_at=None) |
tuple[str, tuple] |
(sql, params) |
Schéma SQL cible¶
La migration versionnée est livrée comme ressource embarquée du
package depuis IOT-PACKAGE-DATA-MIGRATIONS-001 :
Elle est déclarée dans pyproject.toml via
[tool.setuptools.package-data] pour voyager avec la distribution
PyPI. Lecture côté code Python :
from importlib import resources
migration = (
resources.files("forge_mvc_iot")
/ "migrations"
/ "20260528120000_create_iot_events.sql"
)
content = migration.read_text(encoding="utf-8")
La migration utilise CREATE TABLE IF NOT EXISTS (idempotente) et
reproduit exactement le schéma ci-dessous. Le contrat Python
(COLUMNS) reste la source de vérité : le test
tests/test_iot_storage_migration_001.py::TestMigrationMirrorsPythonContract
vérifie que les colonnes du DDL correspondent à l'ordre canonique
Python.
L'application réelle de la migration (forge migration:apply après
copie dans mvc/migrations/ du projet utilisateur) reste à la charge
de l'utilisateur et sera automatisée dans un ticket dédié.
CREATE TABLE IF NOT EXISTS iot_events (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
site VARCHAR(64) NOT NULL,
device_id VARCHAR(64) NOT NULL,
kind VARCHAR(64) NOT NULL,
value DOUBLE NOT NULL,
unit VARCHAR(32) NOT NULL,
timestamp VARCHAR(40) NOT NULL, -- ISO 8601 UTC, suffixe Z
metadata_json TEXT NULL, -- JSON sérialisé ou NULL
received_at DATETIME(6) NOT NULL,
PRIMARY KEY (id),
INDEX idx_iot_events_site_device (site, device_id),
INDEX idx_iot_events_received_at (received_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
Notes :
valueenDOUBLEcouvre lesintetfloatdu contrat MQTT.timestampreste enVARCHAR(40)pour préserver la chaîne ISO 8601 reçue dans le payload. Le consommateur convertit enDATETIMEquand il en a besoin (préserver la valeur d'origine évite les conversions silencieuses de fuseau et la perte de microsecondes).metadata_jsonenTEXT NULL— JSON sérialisé viajson.dumps(..., sort_keys=True, ensure_ascii=False)ouNULL.received_atenDATETIME(6)côté serveur (microsecondes).- Deux index minimaux : couple
(site, device_id)pour filtrer par capteur,received_atpour les fenêtres temporelles.
API Python¶
serialize_measurement_for_storage¶
from datetime import UTC, datetime
from forge_mvc_iot.mqtt.contract import Measurement
from forge_mvc_iot.storage.events import serialize_measurement_for_storage
m = Measurement(
site="atelier",
device_id="esp32-001",
kind="temperature",
value=22.4,
unit="°C",
timestamp="2026-05-28T10:00:00Z",
metadata={"room": "atelier", "sensor": "dht22"},
)
row = serialize_measurement_for_storage(
m,
received_at=datetime(2026, 5, 28, 10, 0, 5, tzinfo=UTC),
)
# {
# 'site': 'atelier',
# 'device_id': 'esp32-001',
# 'kind': 'temperature',
# 'value': 22.4,
# 'unit': '°C',
# 'timestamp': '2026-05-28T10:00:00Z',
# 'metadata_json': '{"room": "atelier", "sensor": "dht22"}',
# 'received_at': datetime(2026, 5, 28, 10, 0, 5, tzinfo=UTC),
# }
Si received_at n'est pas fourni, datetime.now(UTC) est utilisé.
Si measurement.metadata est None, metadata_json est None (pas
la chaîne "null") — c'est cette valeur qui sera passée au connecteur
SQL pour produire un vrai NULL.
build_insert_iot_event_sql¶
from forge_mvc_iot.storage.events import build_insert_iot_event_sql
sql, params = build_insert_iot_event_sql(m)
# sql = "INSERT INTO iot_events (site, device_id, kind, value, unit, timestamp, metadata_json, received_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
# params = ('atelier', 'esp32-001', 'temperature', 22.4, '°C',
# '2026-05-28T10:00:00Z',
# '{"room": "atelier", "sensor": "dht22"}',
# datetime(..., tzinfo=UTC))
Le tuple params suit exactement l'ordre de COLUMNS. Cette paire
(sql, params) est conçue pour être consommée plus tard par :
# Code à venir dans IOT-STORAGE-REPOSITORY-001 :
from core.database.db import execute
execute(sql, params)
Décisions verrouillées¶
- Le module reste pur. Pas d'import de
core.database.db, pas de connexion, pas de migration appliquée. La fonctionbuild_insert_iot_event_sqlproduit du SQL textuel + paramètres : c'est l'appelant qui exécutera la requête. - Le SQL est visible.
INSERT_IOT_EVENT_SQLest une chaîne Python lisible, conforme à la charte v2 §5 « Garder SQL visible ». - Placeholders
?. Style qmark, cohérent avec le reste de Forge (voir le starter Contacts etcore.database.db.execute). received_attoujours UTC. Pas de fuseau implicite.datetime.now(UTC)par défaut, ou injection explicite via le paramètre.metadata→ JSON. Sérialisation déterministe (sort_keys=True) — utile pour les tests, le diff, et la réindexation future.idexclu des colonnes. Généré par la base, jamais inséré explicitement.
Tests sans base¶
Le module est testé entièrement hors ligne :
from forge_mvc_iot.mqtt.contract import Measurement
from forge_mvc_iot.storage.events import (
INSERT_IOT_EVENT_SQL, build_insert_iot_event_sql,
)
m = Measurement(
site="atelier", device_id="esp32-001",
kind="temperature", value=22.4, unit="°C",
timestamp="2026-05-28T10:00:00Z",
metadata=None,
)
sql, params = build_insert_iot_event_sql(m)
assert sql == INSERT_IOT_EVENT_SQL
assert params[0] == "atelier"
assert params[6] is None # metadata_json
Aucun MariaDB n'est requis. La validation runtime du schéma viendra avec la migration appliquée.
Repository d'insertion¶
Le repository est la première couche qui exécute réellement le SQL.
Il ne crée pas la table — il suppose que la migration iot_events a
déjà été appliquée.
Module : forge_mvc_iot.storage.repository — exporté par
forge_mvc_iot.storage.
from forge_mvc_iot.storage import IotEventRepository
repo = IotEventRepository() # adapter par défaut : core.database.db
result = repo.insert(measurement) # exécute INSERT_IOT_EVENT_SQL via db.execute(...)
Adapter injectable¶
Le repository accepte n'importe quel objet exposant
execute(sql, params). Par défaut, il utilise core.database.db —
qui gère le pool de connexions, le commit et le rollback
automatiquement (voir core/database/db.py).
Pour les tests, on injecte un MagicMock (aucun MariaDB requis) :
from unittest.mock import MagicMock
from forge_mvc_iot.storage import IotEventRepository
adapter = MagicMock()
repo = IotEventRepository(db_adapter=adapter)
repo.insert(measurement)
adapter.execute.assert_called_once()
Pour récupérer le lastrowid plutôt que le rowcount, on enveloppe
soi-même core.database.db.insert :
from core.database import db
class _InsertAdapter:
def execute(self, sql, params):
return db.insert(sql, params) # retourne lastrowid
repo = IotEventRepository(db_adapter=_InsertAdapter())
new_id = repo.insert(measurement)
Flux complet (subscriber → repository → base)¶
subscriber.handle_message(topic, payload)
│
│ parse_message(topic, payload)
▼
Measurement
│
│ repo.insert(measurement, received_at=...)
▼
build_insert_iot_event_sql(...) → (sql, params)
│
│ db_adapter.execute(sql, params)
▼
iot_events
Le branchement automatique
on_measurement=repo.insertreste un exemple documentaire. Le subscriber ne tire pas le repository dans son code ; c'est le code applicatif qui décide explicitement de connecter les deux :
Comportement¶
- Les erreurs SQL sont propagées telles quelles — le repository n'intercepte rien silencieusement.
- Aucun
commitourollbackmanuel — c'est l'adapter (par défaut Forge) qui s'en charge. - Le
idn'est pas retourné par défaut (puisquedb.executerenvoierowcount). Utiliser le pattern d'adapter ci-dessus pour récupérerlastrowid.
Méthodes de lecture¶
Trois méthodes pour interroger la table, toutes paramétrées et toutes
testables via le même adapter (fetch_one/fetch_all) :
repo.list_recent(limit=100)
repo.find_by_device("atelier", "esp32-001", limit=100)
repo.count_by_device("atelier", "esp32-001")
Chaque ligne retournée par list_recent et find_by_device est un
dict prêt à consommer :
{
"id": 42,
"site": "atelier",
"device_id": "esp32-001",
"kind": "temperature",
"value": 22.4,
"unit": "°C",
"timestamp": "2026-05-28T10:00:00Z",
"metadata": {"room": "atelier", "sensor": "dht22"}, # ou None
"received_at": datetime(2026, 5, 28, 10, 0, 5, tzinfo=UTC),
}
Notes :
metadata_json(interne stockage) est automatiquement parsé enmetadata(dict ouNone). Le consommateur n'a jamais à voir le JSON sérialisé.received_atreste undatetimetel que retourné par le connecteur MariaDB (UTC). La conversion en chaîne JSON-friendly sera le travail de la future API HTTP.- L'ordre est
received_at DESC— les événements les plus récents en premier.
Limites et validation¶
limitdoit être unintstrict dans1..MAX_LIMIT(par défautMAX_LIMIT = 1000). Hors plage →ValueError. Type incorrect →TypeError.True/Falsesont refusés bien qu'ils héritent deint.- Pas de pagination par offset à ce ticket — un appel qui voudrait
paginer au-delà de
MAX_LIMITest probablement le signe qu'il faut un autre endpoint (agrégat, filtre temporel, etc.). - Aucun filtre temporel exposé pour l'instant (
since=,until=). Sera ajouté avec l'API HTTP si nécessaire.
SQL exposé¶
Les requêtes de lecture sont publiques pour rester lisibles (charte v2 §5) :
from forge_mvc_iot.storage import (
SELECT_IOT_EVENTS_RECENT_SQL,
SELECT_IOT_EVENTS_BY_DEVICE_SQL,
COUNT_IOT_EVENTS_BY_DEVICE_SQL,
)
Toutes utilisent les placeholders qmark ? (cohérent avec le reste
de Forge) et la table iot_events.
Hors périmètre de ce ticket¶
- Pas d'API HTTP — lecture JSON par
IOT-HTTP-API-001. - Pas de CLI, pas de dashboard, pas d'intégration Forge Design.
- Pas de rétention long terme, pas d'agrégation, pas de downsampling, pas d'alertes.
Ces points feront chacun l'objet d'un ticket dédié — voir Architecture Forge IoT.