Data Analyse – ETL – DataViz, Automatisés

Cet article vise à vous guider à travers les étapes suivantes :

Objectif

  • Générer et stocker des données dans une base MongoDB.
  • Développer un pipeline ETL (Extraction, Transformation, Loading) pour automatiser l’exploration des données.
  • Créer des tableaux de bord avec des outils comme Dataiku, Power BI et Tableau.
  • Utiliser les données générées comme base pour développer un algorithme de prévision de stock.

Les données générées serviront de base pour développer l’algorithme de prévision de stock.

  1. Acquisition de données

Installation et Configuration de MongoDB

Avant toute chose, installez et configurez MongoDB sur votre machine. Ensuite, choisissez un environnement de développement (par exemple, VS Code) pour écrire votre code.

Programmation Orientée Objet (POO)

Pour ce projet, nous adoptons une approche orientée objet (POO), une pratique clé du MLOps. La POO permet une meilleure modularité, réutilisabilité et maintenabilité du code en structurant les données sous forme de classes et d’objets.

Simulation des Données avec Python

Nous générons un jeu de données contenant des informations sur les secteurs d’activité, produits, fournisseurs, ventes, stocks et événements spéciaux.

Extrait du code de génération des données :

#import des librairies
import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta

# Définition des secteurs d'activité et les produits
secteurs = ['Retail', 'Pharmacie', 'L\'Oréal']
produits_retail = ['T-shirt', 'Jeans', 'Sac à main', 'Chaussures', 'Smartphone']
produits_pharmacie = ['Aspirine', 'Vitamines', 'Gel désinfectant', 'Masque facial', 'Cicatridine']
produits_loreal = ['Shampooing', 'Crème anti-rides', 'Rouge à lèvres', 'Fond de teint', 'Eau micellaire']

# Définition des fournisseurs et leurs localisations fictives pour chaque secteur d'activité
fournisseurs_retail = [
    ("Fournisseur A", "France"),
    ("Fournisseur B", "États-Unis"),
    ("Fournisseur C", "Chine"),
    ("Fournisseur D", "Inde"),
    ("Fournisseur E", "Brésil")
]

..............................

fournisseurs_loreal = [
    ("Fournisseur L'Oréal A", "France"),
    ("Fournisseur L'Oréal B", "États-Unis"),
    ("Fournisseur L'Oréal C", "Brésil"),
    ("Fournisseur L'Oréal D", "Mexique"),
    ("Fournisseur L'Oréal E", "Corée du Sud")
]

# Fonction pour générer des événements spéciaux
def evenement_special(date):
    if date.month == 12 and date.day == 25:
        return 'Noël'
    elif date.month == 11 and date.day in [26, 27, 28]:  # Black Friday
        return 'Black Friday'
    else:
        return ''

# Génération des données
dates = pd.date_range(start="2021-01-01", end="2024-12-31", freq="D")
data = []

for date in dates:
    jour_semaine = date.strftime('%A')
    for secteur in secteurs:
        if secteur == 'Retail':
            produits = produits_retail
            fournisseur, localisation = random.choice(fournisseurs_retail)
        elif secteur == 'Pharmacie':
            produits = produits_pharmacie
            fournisseur, localisation = random.choice(fournisseurs_pharmacie)
        elif secteur == 'L\'Oréal':
            produits = produits_loreal
            fournisseur, localisation = random.choice(fournisseurs_loreal)

        for produit in produits:
            # Variations selon le jour de la semaine
             ..............................
            # Ajoute des données à la liste
            data.append([date, secteur, produit, ventes, stock, prix, promo_type, promo_qty, saison, jour_semaine, evenement, categorie, poids, fournisseur, localisation])

# Conversion en DataFrame
df = pd.DataFrame(data, columns=[
    "Date", "Secteur", "Produit", .....
])

2. Stockage des Données dans MongoDB

Connexion et Insertion des Données

from pymongo import MongoClient
import pandas as pd

# Connexion à MongoDB
client = MongoClient("mongodb://localhost:27017/")
db = client["stocks_data"]
collection = db["stock_data_collection"]

# Chargement du fichier CSV
df = pd.read_csv("jeu_de_donnees_journalieres.csv")

# Insertion des données
data_dict = df.to_dict("records")
collection.insert_many(data_dict)
print("Données insérées avec succès dans MongoDB !")

Lecture des Données depuis MongoDB :

data = list(collection.find({}, {"_id": 0}))  # Exclure l'ID MongoDB
df = pd.DataFrame(data)
print(df.head(10))  # Afficher les premières lignes

Automatisons de l’interaction avec la base MongoDB

Maintenant que les données sont disponibles, je vais créer des fonctions pour interagir avec la base.

Le script Python database.py ci-dessous contient les fonctions nécessaires pour interagir avec la base de données MongoDB, y compris la connexion, l’insertion de données et la récupération de données. Il définit une classe Database qui facilite l’interaction avec une base de données MongoDB en utilisant la bibliothèque pymongo.

# app/utils/database.py

# import de la librairie
from pymongo import MongoClient

class Database:
    def __init__(self, uri, db_name):
        """
        Initialisation de la classe Database
        :uri: URI de connexion à MongoDB
        :db_name: Nom de la base de données
        """
        self.client = MongoClient(uri)
        self.db = self.client[db_name]

    def get_collection(self, collection_name):
        """
        Récupération d'une collection MongoDB
        :collection_name: Nom de la collection
        :return: Objet de collection MongoDB
        """
        return self.db[collection_name]

    ..............................

    def fetch_data(self, collection_name, query=None):
        """
        Récupération des données d'une collection MongoDB
        :collection_name: Nom de la collection
        :query: Filtre de recherche 
        :return: Liste de documents correspondant au filtre
        """
        collection = self.get_collection(collection_name)
        return list(collection.find(query or {}))

    def close_connection(self):
        """
        Fermeture de la connexion à la base de données MongoDB
        """
        self.client.close()

3. Développement d’un Pipeline ETL

L’objectif est d’extraire les données de MongoDB, les transformer (nettoyage, calculs, normalisation) puis les recharger dans une collection transformée.

Implémentation du Pipeline ETL :

import pandas as pd  # Importer pandas
from app.utils.database import Database # Importer la classe Database

def etl_process(uri, db_name, collection_name, new_collection_name=None):
    # 1. EXTRACT : Chargement des données depuis MongoDB
    print(f"Chargement des données depuis MongoDB ({db_name}.{collection_name})...")
    
    # Création d'une instance de la classe Database
    db = Database(uri, db_name)
    
    # Récupération de la collection
    collection = db.get_collection(collection_name)
    
    # Récupération des données depuis MongoDB
    data = list(collection.find({}, {"_id": 0}))  # Exclure le champ "_id"
    print(f"Nombre de documents chargés : {len(data)}")
    
    # 2. TRANSFORM : Nettoyage et structuration des données
    print("Transformation des données...")
    df = pd.DataFrame(data)
    
    # Renommage des colonnes pour plus de cohérence
    df.rename(columns=lambda x: x.strip().replace("é", "é").replace("à ", "à"), inplace=True)
    
    # Conversion de la colonne Date en format datetime
    df["Date"] = pd.to_datetime(df["Date"], errors="coerce")

    # Remplissage des valeurs manquantes
    df["Type de promotion"].fillna("Aucune", inplace=True)
    df["Quantité promo"].fillna(0, inplace=True)
    df["Événement spécial"].fillna("Aucun", inplace=True)

    # Suppression des lignes où les données essentielles sont manquantes
    df.dropna(subset=["Date", "Produit", "Ventes", "Stock"], inplace=True)

    # Ajout des colonnes calculées (par exemple, marge)
    df["Marge"] = df["Prix"] * 0.3  # Exemple : 30% du prix comme marge

    print(f"Données après transformation : {df.shape[0]} lignes.")

    # Suppression de la colonne '_id' si elle existe dans le DataFrame
    df = df.drop(columns=['_id'], errors='ignore')

    # 3. LOAD : Insertion dans une nouvelle collection MongoDB (si spécifié) ou préparer le DataFrame pour des analyses
    if new_collection_name:
        print(f"Insertion des données transformées dans la nouvelle collection MongoDB ({new_collection_name})...")
        new_collection = db.get_collection(new_collection_name)
        new_collection.insert_many(df.to_dict("records"))
        print("Données insérées avec succès dans MongoDB.")
    else:
        print("Aucune nouvelle collection spécifiée. Utilisation des données pour analyse.")
    
    # DataFrame pour analyses ou exportation
    return df

4. Visualisation et Analyse des Données

Création de Graphiques avec Matplotlib et Seaborn

Nous allons visualiser les tendances de ventes, les distributions et détecter d’éventuelles anomalies.

Exemple de Visualisation des Ventes :

# app/dataviz/dataviz.py

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

class DataExplorer:
    def __init__(self, data):
        self.data = data

    def preprocess_data(self):
        self.data['Date'] = pd.to_datetime(self.data['Date'], errors='coerce')
        self.data['Jour de la semaine'] = self.data['Jour de la semaine'].astype('category')
        self.data['Saison'] = self.data['Saison'].astype('category')
        self.data.dropna(subset=['Date', 'Produit', 'Ventes', 'Stock'], inplace=True)
        self.data['Mois'] = self.data['Date'].dt.month
        self.data['Année'] = self.data['Date'].dt.year
        print("Données préparées et prêtes à l'analyse.")

    def plot_sales_trends(self):
        plt.figure(figsize=(12, 6))
        sns.lineplot(x='Date', y='Ventes', data=self.data, hue='Produit')
        plt.title("Tendance des ventes par produit")
        plt.xlabel('Date')
        plt.ylabel('Ventes')
        plt.xticks(rotation=45)
        plt.show()

    ..............................

    def plot_anomalies(self, anomalies):
        plt.figure(figsize=(12, 6))
        sns.scatterplot(x='Date', y='Ventes', data=anomalies, color='red', label='Anomalies')
        sns.lineplot(x='Date', y='Ventes', data=self.data, label='Ventes')
        plt.title("Ventes avec anomalies détectées")
        plt.xlabel('Date')
        plt.ylabel('Ventes')
        plt.legend()
        plt.show()

Détection d’Anomalies dans les Ventes :

def detect_anomalies(df):
    mean_sales = df["Ventes"].mean()
    std_sales = df["Ventes"].std()
    anomalies = df[(df["Ventes"] > mean_sales + 2 * std_sales) | 
                   (df["Ventes"] < mean_sales - 2 * std_sales)]
    print(f"Nombre d'anomalies détectées : {len(anomalies)}")
    return anomalies

anomalies = detect_anomalies(df_transformed)

5. Lancement du Processus Complet

Un script run_etl_and_viz.py permet d’exécuter l’ensemble du processus ETL et de visualiser les données transformées.

from etl.etl_process import etl_process
from etl.dataviz import DataExplorer

# Paramètres MongoDB
uri = "mongodb://localhost:27017/"
db_name = "stocks_data"
collection_name = "stock_data_collection"  # Collection source
new_collection_name = "stock_data_collection_transformed"  # Collection où insérer les données transformées (optionnel)

# Lancement du processus ETL
df_transformed = etl_process(uri, db_name, collection_name, new_collection_name)

# Exploration et visualisation des données avec DataExplorer
data_explorer = DataExplorer(df_transformed)
data_explorer.preprocess_data()

# Visualisation des tendances des ventes
data_explorer.plot_sales_trends()

# Visualisation des ventes par saison
data_explorer.plot_sales_by_season()

# Visualisation de la distribution des ventes
data_explorer.plot_sales_distribution()

# Visualisation des ventes par jour de la semaine
data_explorer.plot_sales_by_weekday()

# Détection et visualisation des anomalies
anomalies = data_explorer.detect_anomalies()
data_explorer.plot_anomalies(anomalies)

Comment lancer l’ETL ?

C’est simple, il suffit de suivre les étapes suivantes :

  1. S’assurer que l’environnement est configuré
  2. Installer l’environnement virtuel via le code : python -m venv venv
  3. Activer l’environnement virtuel (sous windows) : venv\Scripts\activate
  4. Installez les dépendances : pip install -r requirements.txt
  5. Lancer la viz : python run_etl_and_viz.py

Je te propose cette petite vidéo pour visualiser les sorties de l’ETL :

Conclusion

Ce projet met en place une chaîne de traitement automatisée des données depuis leur génération, leur stockage dans MongoDB, jusqu’à leur nettoyage et visualisation.

Tags: