« Discussion:SplitCDR » : différence entre les versions
Page créée avec « import os import csv from datetime import timedelta # Fonction pour convertir la durée en format HH:MM:SS def convert_to_hms(duration): duration_seconds = int(duration) hours, remainder = divmod(duration_seconds, 3600) minutes, seconds = divmod(remainder, 60) return f"{hours}h{minutes}m{seconds}s" # Fonction pour convertir les kilooctets en gigaoctets et arrondir au centième def convert_to_gb(data_volume): return round(float(data_volume) /... » |
mAucun résumé des modifications |
||
Ligne 1 : | Ligne 1 : | ||
<pre> | |||
import os | import os | ||
import csv | import csv | ||
Ligne 104 : | Ligne 105 : | ||
# Écrire les données agrégées dans un fichier CSV | # Écrire les données agrégées dans un fichier CSV | ||
write_aggregated_data(client_data, file_name) | write_aggregated_data(client_data, file_name) | ||
</pre> |
Dernière version du 28 mars 2024 à 15:13
import os import csv from datetime import timedelta # Fonction pour convertir la durée en format HH:MM:SS def convert_to_hms(duration): duration_seconds = int(duration) hours, remainder = divmod(duration_seconds, 3600) minutes, seconds = divmod(remainder, 60) return f"{hours}h{minutes}m{seconds}s" # Fonction pour convertir les kilooctets en gigaoctets et arrondir au centième def convert_to_gb(data_volume): return round(float(data_volume) / 1000000, 2) # 1 gigaoctet = 1 000 000 kilooctets # Fonction pour agréger les données par client et caller def aggregate_data(data): client_data = {} for row in data: client_name = row['client_name'] caller = row['caller'] if client_name not in client_data: client_data[client_name] = {} if caller not in client_data[client_name]: client_data[client_name][caller] = { 'caller': caller, 'duration': 0.0, 'price': 0.0, 'sva_service_cost': 0.0, 'data_volume': 0.0, 'categories': {} # Dictionnaire pour stocker les résultats par catégorie } client_data[client_name][caller]['duration'] += float(row['duration']) if row['duration'] else 0.0 client_data[client_name][caller]['price'] += float(row['price']) if row['price'] else 0.0 client_data[client_name][caller]['sva_service_cost'] += float(row['sva_service_cost']) if row['sva_service_cost'] else 0.0 client_data[client_name][caller]['data_volume'] += float(row['data_volume']) if row['data_volume'] else 0.0 # Mettre à jour les résultats par catégorie category = row['category'] client_data[client_name][caller]['categories'][category] = client_data[client_name][caller]['categories'].get(category, 0) + 1 return client_data # Fonction pour écrire les données agrégées dans un fichier CSV def write_aggregated_data(client_data, file_name): for client_name, client_info in client_data.items(): output_dir = os.path.splitext(file_name)[0] + "_split" os.makedirs(output_dir, exist_ok=True) # Écrire le fichier résultant pour chaque client output_file = os.path.join(output_dir, f"{client_name}.csv") with open(output_file, 'w', newline='', encoding='utf-8') as file: writer = csv.writer(file, delimiter=';') # Récupérer toutes les colonnes possibles all_columns = set() for caller_info in client_info.values(): all_columns.update(caller_info['categories'].keys()) # Supprimer les colonnes avec des données non significatives (toutes les valeurs à zéro) significant_columns = [col for col in all_columns if any(caller_info['categories'].get(col, 0) != 0 for caller_info in client_info.values())] # Écrire l'en-tête avec les colonnes significatives header = ['caller', 'duration', 'price', 'sva_service_cost', 'data_volume'] + list(significant_columns) writer.writerow(header) # Écrire les données agrégées pour chaque caller du client for caller, caller_info in client_info.items(): caller_info['duration'] = convert_to_hms(caller_info['duration']) caller_info['data_volume'] = convert_to_gb(caller_info['data_volume']) row = [ caller_info['caller'], caller_info['duration'], caller_info['price'], caller_info['sva_service_cost'], caller_info['data_volume'] ] # Ajouter les résultats par catégorie à la ligne for category in significant_columns: row.append(caller_info['categories'].get(category, 0)) writer.writerow(row) print("Les fichiers traités ont été enregistrés avec succès dans le répertoire:", output_dir) # Recherche de fichiers CSV dans le même répertoire que le script csv_files = [file for file in os.listdir() if file.endswith('.csv')] if not csv_files: print("Aucun fichier CSV trouvé dans le répertoire.") exit() for file_name in csv_files: # Lire le fichier CSV en spécifiant le séparateur data = [] with open(file_name, 'r', newline='', encoding='utf-8') as file: reader = csv.DictReader(file, delimiter=';') for row in reader: # Exclure les lignes dont le "caller" ne commence pas par '06' ou '07' ou contient un "master_number" if row['caller'].startswith(('06', '07')) and not row['master_number']: data.append(row) # Agréger les données par client et caller client_data = aggregate_data(data) # Écrire les données agrégées dans un fichier CSV write_aggregated_data(client_data, file_name)