1# SPDX-FileCopyrightText: 2026 Defensive Lab Agency
2# SPDX-FileContributor: u039b <git@0x39b.fr>
3#
4# SPDX-License-Identifier: GPL-3.0-or-later
5
6import time
7from base64 import b64encode
8from datetime import datetime
9from uuid import uuid4
10
11import sqlalchemy as sa
12from pydantic import BaseModel, Field, validator
13
14from mongoose.models.base import Base
15
16
[docs]
17class NetworkAlert(BaseModel):
[docs]
18 class Config:
19 from_attributes = True
20
21 id: str = Field(default_factory=lambda: uuid4().hex, frozen=True)
22 time: datetime = Field(default_factory=datetime.now)
23 timestamp: float = Field(default_factory=lambda: int(time.time()))
24 community_id: str = ""
25 flow_id: int
26 src_ip: str
27 src_port: int
28 dst_ip: str = Field(validation_alias="dest_ip")
29 dst_port: int = Field(validation_alias="dest_port")
30 protocol: str = Field(validation_alias="proto")
31 app_proto: str = ""
32 rule: str = ""
33 action: str
34 gid: int
35 signature_id: int
36 rev: int
37 signature: str
38 category: str
39 severity: int
40 enrichment: dict = Field(default_factory=dict)
41 extra: dict = Field(default_factory=dict)
42
43 @property
44 def community_id_b64(self) -> str:
45 return b64encode(bytes(self.community_id, "utf-8")).decode("utf-8")
46
[docs]
47 @validator("timestamp", pre=True)
48 def timestamp_validator(cls, v):
49 return datetime.fromisoformat(v).timestamp()
50
[docs]
51 @validator("app_proto", pre=True)
52 def app_proto_validator(cls, v):
53 if v and v != "failed":
54 return v.upper()
55 return v
56
57
[docs]
58class NetworkAlertTable(Base):
59 __tablename__ = "network_alert"
60
61 id = sa.Column(sa.String, primary_key=True)
62 time = sa.Column(sa.DateTime)
63 timestamp = sa.Column(sa.Float)
64 community_id = sa.Column(sa.String, index=True)
65 community_id_b64 = sa.Column(sa.String)
66 flow_id = sa.Column(sa.Integer)
67 src_ip = sa.Column(sa.String, index=True)
68 src_port = sa.Column(sa.Integer)
69 dst_ip = sa.Column(sa.String, index=True)
70 dst_port = sa.Column(sa.Integer)
71 protocol = sa.Column(sa.String)
72 app_proto = sa.Column(sa.String)
73 rule = sa.Column(sa.String)
74 action = sa.Column(sa.String)
75 gid = sa.Column(sa.Integer)
76 signature_id = sa.Column(sa.Integer)
77 rev = sa.Column(sa.Integer)
78 signature = sa.Column(sa.String)
79 category = sa.Column(sa.String)
80 severity = sa.Column(sa.Integer)
81 enrichment = sa.Column(sa.JSON)
82 extra = sa.Column(sa.JSON)