1import logging
2from datetime import datetime, timedelta
3from typing import Type
4
5from sqlalchemy import delete, func, select
6from sqlalchemy.orm import Session
7
8from mongoose.models import Base
9from mongoose.models.configuration import HistoryConfiguration
10
11logger = logging.getLogger(__name__)
12
13
[docs]
14class SqliteHistoryManager:
15 """Manages the size and duration of records in the SQLite database."""
16
[docs]
17 def __init__(self, session_factory, config: HistoryConfiguration):
18 """Initialize the SqliteHistoryManager.
19
20 Args:
21 session_factory: A sessionmaker instance.
22 config: History limiting configuration.
23 """
24 self.Session = session_factory
25 self.config = config
26
[docs]
27 def cleanup(self, table_class: Type[Base]):
28 """Perform cleanup for the specified table based on configuration.
29
30 Args:
31 table_class: The SQLAlchemy model class (table) to clean up.
32 """
33 if not self.config or not self.config.enable:
34 return
35
36 try:
37 if self.config.max_duration_days:
38 self._cleanup_by_duration(table_class)
39
40 if self.config.max_records:
41 self._cleanup_by_size(table_class)
42 except Exception as e:
43 logger.error(f"Error during database cleanup for {table_class.__tablename__}: {e}")
44
45 def _cleanup_by_duration(self, table_class: Type[Base]):
46 """Remove records older than the configured duration.
47
48 Args:
49 table_class: The SQLAlchemy model class (table) to clean up.
50 """
51 cutoff_date = datetime.now() - timedelta(days=self.config.max_duration_days)
52
53 with self.Session() as session:
54 # We assume the table has a 'time' column which is a DateTime
55 stmt = delete(table_class).where(table_class.time < cutoff_date)
56 result = session.execute(stmt)
57 session.commit()
58
59 if result.rowcount > 0:
60 logger.info(
61 f"Removed {result.rowcount} records from {table_class.__tablename__} "
62 f"older than {self.config.max_duration_days} days"
63 )
64
65 def _cleanup_by_size(self, table_class: Type[Base]):
66 """Remove older records if the table exceeds the maximum number of records.
67
68 Args:
69 table_class: The SQLAlchemy model class (table) to clean up.
70 """
71 with self.Session() as session:
72 count = session.query(func.count(table_class.id)).scalar()
73
74 if count > self.config.max_records:
75 to_delete = count - self.config.max_records
76
77 # Find the IDs of the oldest records to delete
78 # We order by time (or id if time is same) to remove older records first
79 oldest_ids_query = (
80 select(table_class.id).order_by(table_class.time.asc(), table_class.id.asc()).limit(to_delete)
81 )
82 oldest_ids = session.execute(oldest_ids_query).scalars().all()
83
84 if oldest_ids:
85 delete_stmt = delete(table_class).where(table_class.id.in_(oldest_ids))
86 session.execute(delete_stmt)
87 session.commit()
88 logger.info(
89 f"Removed {len(oldest_ids)} oldest records from {table_class.__tablename__} "
90 f"to respect max_records limit of {self.config.max_records}"
91 )