para los que quieran hacer búsquedas en sus logs de sala.
necesitan saber ejecutar archivos Python y manejar mínimamente la consola cmd; claro, a menos que mark haga de las suyas.
Importamos las librerías necesarias.
import os
import re
import tempfile
from typing import List, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
import sys
import subprocess
import time
Configuración del logging.
logging.basicConfig(level=logging.INFO, format='%(message)s')
Definimos una función para obtener los archivos .txt en un directorio.
def obtener_archivos_txt(directorio: str = '.') -> List[str]:
"""
Obtiene una lista de archivos .txt en el directorio especificado.
Args:
directorio (str): Ruta del directorio donde buscar archivos .txt.
Returns:
List[str]: Lista de nombres de archivos .txt encontrados.
"""
try:
# Listamos los archivos que terminan con '.txt' y los ordenamos por fecha de modificación.
archivos = [f for f in os.listdir(directorio) if f.lower().endswith('.txt')]
archivos.sort(key=lambda x: os.path.getmtime(os.path.join(directorio, x)))
return archivos
except Exception as e:
logging.error(f"Error al obtener los archivos: {e}")
return []
Definimos una función para procesar cada archivo y buscar el patrón.
def process_file(filename: str, patron: re.Pattern, max_matches: int, encoding: str) -> List[Tuple[str, int, str]]:
"""
Busca el patrón en un archivo y devuelve una lista de coincidencias.
Args:
filename (str): Nombre del archivo a procesar.
patron (re.Pattern): Patrón de expresión regular compilado.
max_matches (int): Número máximo de coincidencias por archivo.
encoding (str): Codificación del archivo.
Returns:
List[Tuple[str, int, str]]: Lista de tuplas con nombre de archivo, número de línea y contenido.
"""
matches = []
try:
# Abrimos el archivo en modo lectura con la codificación especificada.
with open(filename, 'r', encoding=encoding, errors='ignore') as file:
for line_number, line in enumerate(file, start=1):
# Buscamos el patrón en cada línea.
if patron.search(line):
matches.append((filename, line_number, line.strip()))
# Si alcanzamos el máximo de coincidencias, terminamos.
if max_matches and len(matches) >= max_matches:
break
except Exception as e:
logging.error(f"Error al leer el archivo {filename}: {e}")
return matches
Definimos una función para guardar los resultados en un archivo temporal.
def guardar_resultados(resultados: List[str]) -> str:
"""
Guarda los resultados en un archivo temporal y devuelve la ruta del archivo.
Args:
resultados (List[str]): Lista de resultados a guardar.
Returns:
str: Ruta del archivo temporal donde se guardaron los resultados.
"""
try:
# Creamos un archivo temporal.
temp_file = tempfile.NamedTemporaryFile(mode='w+', delete=False, encoding='utf-8',
suffix='.txt', prefix='resultados_')
with temp_file as f:
# Escribimos cada resultado en una nueva línea.
for linea in resultados:
f.write(linea + '\n')
return temp_file.name
except Exception as e:
logging.error(f"Error al guardar los resultados: {e}")
raise
Definimos una función para mostrar los resultados al usuario.
def mostrar_resultados(ruta_archivo: str):
"""
Abre el archivo de resultados con el editor de texto predeterminado y lo elimina al cerrarlo.
Args:
ruta_archivo (str): Ruta del archivo de resultados.
"""
logging.info(f"Resultados guardados en: {ruta_archivo}")
try:
# Abrimos el archivo con el programa predeterminado según el sistema operativo.
if sys.platform.startswith('darwin'):
subprocess.call(['open', ruta_archivo])
- elif os.name
'nt':
os.startfile(ruta_archivo)
- elif os.name
'posix':
subprocess.call(['xdg-open', ruta_archivo])
else:
logging.error("No se pudo abrir el archivo de resultados automáticamente.")
return
# Esperamos a que el usuario cierre el archivo y presione Enter.
input("Presiona Enter después de cerrar el archivo de resultados para finalizar.")
# Eliminamos el archivo temporal.
os.remove(ruta_archivo)
logging.info("El archivo de resultados ha sido eliminado.")
except Exception as e:
logging.error(f"Error al manejar el archivo de resultados: {e}")
Definimos la función principal que coordina todo el proceso.
def main():
"""
Función principal que coordina la búsqueda en los archivos.
"""
# Establecemos el directorio actual.
directorio = '.'
# Solicitamos al usuario la expresión regular a buscar.
search_term = input("Introduce la expresión regular a buscar: ").strip()
if not search_term:
logging.error("No se ingresó ninguna expresión regular.")
return
# Solicitamos al usuario el número máximo de coincidencias por archivo (opcional).
try:
max_matches_input = input("Introduce el número máximo de coincidencias por archivo (deja en blanco para sin límite): ").strip()
max_matches = int(max_matches_input) if max_matches_input else None
except ValueError:
logging.error("Entrada inválida para el número máximo de coincidencias.")
return
# La codificación es siempre 'utf-8'.
encoding = 'utf-8'
# Compilamos el patrón de búsqueda.
try:
pattern = re.compile(search_term, re.IGNORECASE)
except re.error as e:
logging.error(f"Expresión regular inválida: {e}")
return
# Obtenemos la lista de archivos .txt en el directorio.
files = obtener_archivos_txt(directorio)
if not files:
logging.info("No se encontraron archivos .txt en el directorio especificado.")
return
total_files = len(files)
results = []
# Preparamos los argumentos para el procesamiento en paralelo.
args_list = [
(os.path.join(directorio, filename), pattern, max_matches, encoding)
for filename in files
]
# Definimos los puntos de progreso para mostrar al usuario.
progress_percentages = [2, 10, 20, 40, 70, 90]
progress_thresholds = [max(1, int(total_files * p / 100)) for p in progress_percentages]
progress_thresholds = sorted(set(progress_thresholds)) # Eliminamos duplicados y ordenamos.
files_processed = 0
# Iniciamos el temporizador para medir la duración de la búsqueda.
start_time = time.time()
# Procesamos los archivos en paralelo utilizando ThreadPoolExecutor.
with ThreadPoolExecutor() as executor:
# Enviamos las tareas al pool de threads.
futures = {executor.submit(process_file, *args): args[0] for args in args_list}
for future in as_completed(futures):
# Obtenemos los resultados de cada futuro.
matches = future.result()
for filename, line_number, content in matches:
result = f"{content} - {filename} (Línea {line_number})"
results.append(result)
files_processed += 1
# Mostramos el progreso al usuario en los puntos definidos.
while progress_thresholds and files_processed >= progress_thresholds[0]:
percent = int(100 * files_processed / total_files)
logging.info(f"Realizando búsqueda en archivo {files_processed} de {total_files} ({percent}%)")
progress_thresholds.pop(0)
# Detenemos el temporizador y calculamos la duración.
end_time = time.time()
duration = end_time - start_time
minutes, seconds = divmod(duration, 60)
logging.info(f"La búsqueda tomó {int(minutes)} minutos y {int(seconds)} segundos.")
# Si no se encontraron resultados, informamos al usuario.
if not results:
logging.info("No se encontraron coincidencias.")
return
# Guardamos y mostramos los resultados.
try:
results_file_path = guardar_resultados(results)
mostrar_resultados(results_file_path)
except Exception as e:
logging.error(f"Ocurrió un error al procesar los resultados: {e}")
Ejecutamos la función principal si el script es ejecutado directamente.
if __name__ == "__main__":
main()