Objectif
- Générer et stocker des données dans une base MongoDB.
- Développer un pipeline ETL (Extraction, Transformation, Loading) pour automatiser l’exploration des données.
- Créer des tableaux de bord avec des outils comme Dataiku, Power BI et Tableau.
- Utiliser les données générées comme base pour développer un algorithme de prévision de stock.
Les données générées serviront de base pour développer l’algorithme de prévision de stock.
- Acquisition de données
Installation et Configuration de MongoDB
Avant toute chose, installez et configurez MongoDB sur votre machine. Ensuite, choisissez un environnement de développement (par exemple, VS Code) pour écrire votre code.
Programmation Orientée Objet (POO)
Pour ce projet, nous adoptons une approche orientée objet (POO), une pratique clé du MLOps. La POO permet une meilleure modularité, réutilisabilité et maintenabilité du code en structurant les données sous forme de classes et d’objets.
Simulation des Données avec Python
Nous générons un jeu de données contenant des informations sur les secteurs d’activité, produits, fournisseurs, ventes, stocks et événements spéciaux.
Extrait du code de génération des données :
#import des librairies
import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta
# Définition des secteurs d'activité et les produits
secteurs = ['Retail', 'Pharmacie', 'L\'Oréal']
produits_retail = ['T-shirt', 'Jeans', 'Sac à main', 'Chaussures', 'Smartphone']
produits_pharmacie = ['Aspirine', 'Vitamines', 'Gel désinfectant', 'Masque facial', 'Cicatridine']
produits_loreal = ['Shampooing', 'Crème anti-rides', 'Rouge à lèvres', 'Fond de teint', 'Eau micellaire']
# Définition des fournisseurs et leurs localisations fictives pour chaque secteur d'activité
fournisseurs_retail = [
("Fournisseur A", "France"),
("Fournisseur B", "États-Unis"),
("Fournisseur C", "Chine"),
("Fournisseur D", "Inde"),
("Fournisseur E", "Brésil")
]
..............................
fournisseurs_loreal = [
("Fournisseur L'Oréal A", "France"),
("Fournisseur L'Oréal B", "États-Unis"),
("Fournisseur L'Oréal C", "Brésil"),
("Fournisseur L'Oréal D", "Mexique"),
("Fournisseur L'Oréal E", "Corée du Sud")
]
# Fonction pour générer des événements spéciaux
def evenement_special(date):
if date.month == 12 and date.day == 25:
return 'Noël'
elif date.month == 11 and date.day in [26, 27, 28]: # Black Friday
return 'Black Friday'
else:
return ''
# Génération des données
dates = pd.date_range(start="2021-01-01", end="2024-12-31", freq="D")
data = []
for date in dates:
jour_semaine = date.strftime('%A')
for secteur in secteurs:
if secteur == 'Retail':
produits = produits_retail
fournisseur, localisation = random.choice(fournisseurs_retail)
elif secteur == 'Pharmacie':
produits = produits_pharmacie
fournisseur, localisation = random.choice(fournisseurs_pharmacie)
elif secteur == 'L\'Oréal':
produits = produits_loreal
fournisseur, localisation = random.choice(fournisseurs_loreal)
for produit in produits:
# Variations selon le jour de la semaine
..............................
# Ajoute des données à la liste
data.append([date, secteur, produit, ventes, stock, prix, promo_type, promo_qty, saison, jour_semaine, evenement, categorie, poids, fournisseur, localisation])
# Conversion en DataFrame
df = pd.DataFrame(data, columns=[
"Date", "Secteur", "Produit", .....
])
2. Stockage des Données dans MongoDB
Connexion et Insertion des Données
from pymongo import MongoClient
import pandas as pd
# Connexion à MongoDB
client = MongoClient("mongodb://localhost:27017/")
db = client["stocks_data"]
collection = db["stock_data_collection"]
# Chargement du fichier CSV
df = pd.read_csv("jeu_de_donnees_journalieres.csv")
# Insertion des données
data_dict = df.to_dict("records")
collection.insert_many(data_dict)
print("Données insérées avec succès dans MongoDB !")
Lecture des Données depuis MongoDB :
data = list(collection.find({}, {"_id": 0})) # Exclure l'ID MongoDB
df = pd.DataFrame(data)
print(df.head(10)) # Afficher les premières lignes
Automatisons de l’interaction avec la base MongoDB
Maintenant que les données sont disponibles, je vais créer des fonctions pour interagir avec la base.
Le script Python database.py ci-dessous contient les fonctions nécessaires pour interagir avec la base de données MongoDB, y compris la connexion, l’insertion de données et la récupération de données. Il définit une classe Database qui facilite l’interaction avec une base de données MongoDB en utilisant la bibliothèque pymongo.
# app/utils/database.py
# import de la librairie
from pymongo import MongoClient
class Database:
def __init__(self, uri, db_name):
"""
Initialisation de la classe Database
:uri: URI de connexion à MongoDB
:db_name: Nom de la base de données
"""
self.client = MongoClient(uri)
self.db = self.client[db_name]
def get_collection(self, collection_name):
"""
Récupération d'une collection MongoDB
:collection_name: Nom de la collection
:return: Objet de collection MongoDB
"""
return self.db[collection_name]
..............................
def fetch_data(self, collection_name, query=None):
"""
Récupération des données d'une collection MongoDB
:collection_name: Nom de la collection
:query: Filtre de recherche
:return: Liste de documents correspondant au filtre
"""
collection = self.get_collection(collection_name)
return list(collection.find(query or {}))
def close_connection(self):
"""
Fermeture de la connexion à la base de données MongoDB
"""
self.client.close()
3. Développement d’un Pipeline ETL
L’objectif est d’extraire les données de MongoDB, les transformer (nettoyage, calculs, normalisation) puis les recharger dans une collection transformée.
Implémentation du Pipeline ETL :
import pandas as pd # Importer pandas
from app.utils.database import Database # Importer la classe Database
def etl_process(uri, db_name, collection_name, new_collection_name=None):
# 1. EXTRACT : Chargement des données depuis MongoDB
print(f"Chargement des données depuis MongoDB ({db_name}.{collection_name})...")
# Création d'une instance de la classe Database
db = Database(uri, db_name)
# Récupération de la collection
collection = db.get_collection(collection_name)
# Récupération des données depuis MongoDB
data = list(collection.find({}, {"_id": 0})) # Exclure le champ "_id"
print(f"Nombre de documents chargés : {len(data)}")
# 2. TRANSFORM : Nettoyage et structuration des données
print("Transformation des données...")
df = pd.DataFrame(data)
# Renommage des colonnes pour plus de cohérence
df.rename(columns=lambda x: x.strip().replace("é", "é").replace("à ", "à"), inplace=True)
# Conversion de la colonne Date en format datetime
df["Date"] = pd.to_datetime(df["Date"], errors="coerce")
# Remplissage des valeurs manquantes
df["Type de promotion"].fillna("Aucune", inplace=True)
df["Quantité promo"].fillna(0, inplace=True)
df["Événement spécial"].fillna("Aucun", inplace=True)
# Suppression des lignes où les données essentielles sont manquantes
df.dropna(subset=["Date", "Produit", "Ventes", "Stock"], inplace=True)
# Ajout des colonnes calculées (par exemple, marge)
df["Marge"] = df["Prix"] * 0.3 # Exemple : 30% du prix comme marge
print(f"Données après transformation : {df.shape[0]} lignes.")
# Suppression de la colonne '_id' si elle existe dans le DataFrame
df = df.drop(columns=['_id'], errors='ignore')
# 3. LOAD : Insertion dans une nouvelle collection MongoDB (si spécifié) ou préparer le DataFrame pour des analyses
if new_collection_name:
print(f"Insertion des données transformées dans la nouvelle collection MongoDB ({new_collection_name})...")
new_collection = db.get_collection(new_collection_name)
new_collection.insert_many(df.to_dict("records"))
print("Données insérées avec succès dans MongoDB.")
else:
print("Aucune nouvelle collection spécifiée. Utilisation des données pour analyse.")
# DataFrame pour analyses ou exportation
return df
4. Visualisation et Analyse des Données
Création de Graphiques avec Matplotlib et Seaborn
Nous allons visualiser les tendances de ventes, les distributions et détecter d’éventuelles anomalies.
Exemple de Visualisation des Ventes :
# app/dataviz/dataviz.py
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
class DataExplorer:
def __init__(self, data):
self.data = data
def preprocess_data(self):
self.data['Date'] = pd.to_datetime(self.data['Date'], errors='coerce')
self.data['Jour de la semaine'] = self.data['Jour de la semaine'].astype('category')
self.data['Saison'] = self.data['Saison'].astype('category')
self.data.dropna(subset=['Date', 'Produit', 'Ventes', 'Stock'], inplace=True)
self.data['Mois'] = self.data['Date'].dt.month
self.data['Année'] = self.data['Date'].dt.year
print("Données préparées et prêtes à l'analyse.")
def plot_sales_trends(self):
plt.figure(figsize=(12, 6))
sns.lineplot(x='Date', y='Ventes', data=self.data, hue='Produit')
plt.title("Tendance des ventes par produit")
plt.xlabel('Date')
plt.ylabel('Ventes')
plt.xticks(rotation=45)
plt.show()
..............................
def plot_anomalies(self, anomalies):
plt.figure(figsize=(12, 6))
sns.scatterplot(x='Date', y='Ventes', data=anomalies, color='red', label='Anomalies')
sns.lineplot(x='Date', y='Ventes', data=self.data, label='Ventes')
plt.title("Ventes avec anomalies détectées")
plt.xlabel('Date')
plt.ylabel('Ventes')
plt.legend()
plt.show()
Détection d’Anomalies dans les Ventes :
def detect_anomalies(df):
mean_sales = df["Ventes"].mean()
std_sales = df["Ventes"].std()
anomalies = df[(df["Ventes"] > mean_sales + 2 * std_sales) |
(df["Ventes"] < mean_sales - 2 * std_sales)]
print(f"Nombre d'anomalies détectées : {len(anomalies)}")
return anomalies
anomalies = detect_anomalies(df_transformed)
5. Lancement du Processus Complet
Un script run_etl_and_viz.py permet d’exécuter l’ensemble du processus ETL et de visualiser les données transformées.
from etl.etl_process import etl_process
from etl.dataviz import DataExplorer
# Paramètres MongoDB
uri = "mongodb://localhost:27017/"
db_name = "stocks_data"
collection_name = "stock_data_collection" # Collection source
new_collection_name = "stock_data_collection_transformed" # Collection où insérer les données transformées (optionnel)
# Lancement du processus ETL
df_transformed = etl_process(uri, db_name, collection_name, new_collection_name)
# Exploration et visualisation des données avec DataExplorer
data_explorer = DataExplorer(df_transformed)
data_explorer.preprocess_data()
# Visualisation des tendances des ventes
data_explorer.plot_sales_trends()
# Visualisation des ventes par saison
data_explorer.plot_sales_by_season()
# Visualisation de la distribution des ventes
data_explorer.plot_sales_distribution()
# Visualisation des ventes par jour de la semaine
data_explorer.plot_sales_by_weekday()
# Détection et visualisation des anomalies
anomalies = data_explorer.detect_anomalies()
data_explorer.plot_anomalies(anomalies)
Comment lancer l’ETL ?
C’est simple, il suffit de suivre les étapes suivantes :
- S’assurer que l’environnement est configuré
- Installer l’environnement virtuel via le code :
python -m venv venv - Activer l’environnement virtuel (sous windows) :
venv\Scripts\activate - Installez les dépendances :
pip install -r requirements.txt - Lancer la viz :
python run_etl_and_viz.py
Je te propose cette petite vidéo pour visualiser les sorties de l’ETL :
Conclusion
Ce projet met en place une chaîne de traitement automatisée des données depuis leur génération, leur stockage dans MongoDB, jusqu’à leur nettoyage et visualisation.