Source code for mongoose.store.history

 1import logging
 2from datetime import datetime, timedelta
 3from typing import Type
 4
 5from sqlalchemy import delete, func, select
 6from sqlalchemy.orm import Session
 7
 8from mongoose.models import Base
 9from mongoose.models.configuration import HistoryConfiguration
10
11logger = logging.getLogger(__name__)
12
13
[docs] 14class SqliteHistoryManager: 15 """Manages the size and duration of records in the SQLite database.""" 16
[docs] 17 def __init__(self, session_factory, config: HistoryConfiguration): 18 """Initialize the SqliteHistoryManager. 19 20 Args: 21 session_factory: A sessionmaker instance. 22 config: History limiting configuration. 23 """ 24 self.Session = session_factory 25 self.config = config
26
[docs] 27 def cleanup(self, table_class: Type[Base]): 28 """Perform cleanup for the specified table based on configuration. 29 30 Args: 31 table_class: The SQLAlchemy model class (table) to clean up. 32 """ 33 if not self.config or not self.config.enable: 34 return 35 36 try: 37 if self.config.max_duration_days: 38 self._cleanup_by_duration(table_class) 39 40 if self.config.max_records: 41 self._cleanup_by_size(table_class) 42 except Exception as e: 43 logger.error(f"Error during database cleanup for {table_class.__tablename__}: {e}")
44 45 def _cleanup_by_duration(self, table_class: Type[Base]): 46 """Remove records older than the configured duration. 47 48 Args: 49 table_class: The SQLAlchemy model class (table) to clean up. 50 """ 51 cutoff_date = datetime.now() - timedelta(days=self.config.max_duration_days) 52 53 with self.Session() as session: 54 # We assume the table has a 'time' column which is a DateTime 55 stmt = delete(table_class).where(table_class.time < cutoff_date) 56 result = session.execute(stmt) 57 session.commit() 58 59 if result.rowcount > 0: 60 logger.info( 61 f"Removed {result.rowcount} records from {table_class.__tablename__} " 62 f"older than {self.config.max_duration_days} days" 63 ) 64 65 def _cleanup_by_size(self, table_class: Type[Base]): 66 """Remove older records if the table exceeds the maximum number of records. 67 68 Args: 69 table_class: The SQLAlchemy model class (table) to clean up. 70 """ 71 with self.Session() as session: 72 count = session.query(func.count(table_class.id)).scalar() 73 74 if count > self.config.max_records: 75 to_delete = count - self.config.max_records 76 77 # Find the IDs of the oldest records to delete 78 # We order by time (or id if time is same) to remove older records first 79 oldest_ids_query = ( 80 select(table_class.id).order_by(table_class.time.asc(), table_class.id.asc()).limit(to_delete) 81 ) 82 oldest_ids = session.execute(oldest_ids_query).scalars().all() 83 84 if oldest_ids: 85 delete_stmt = delete(table_class).where(table_class.id.in_(oldest_ids)) 86 session.execute(delete_stmt) 87 session.commit() 88 logger.info( 89 f"Removed {len(oldest_ids)} oldest records from {table_class.__tablename__} " 90 f"to respect max_records limit of {self.config.max_records}" 91 )