Source code for mongoose.models.network_alert

 1# SPDX-FileCopyrightText: 2026 Defensive Lab Agency
 2# SPDX-FileContributor: u039b <git@0x39b.fr>
 3#
 4# SPDX-License-Identifier: GPL-3.0-or-later
 5
 6import time
 7from base64 import b64encode
 8from datetime import datetime
 9from uuid import uuid4
10
11import sqlalchemy as sa
12from pydantic import BaseModel, Field, validator
13
14from mongoose.models.base import Base
15
16
[docs] 17class NetworkAlert(BaseModel):
[docs] 18 class Config: 19 from_attributes = True
20 21 id: str = Field(default_factory=lambda: uuid4().hex, frozen=True) 22 time: datetime = Field(default_factory=datetime.now) 23 timestamp: float = Field(default_factory=lambda: int(time.time())) 24 community_id: str = "" 25 flow_id: int 26 src_ip: str 27 src_port: int 28 dst_ip: str = Field(validation_alias="dest_ip") 29 dst_port: int = Field(validation_alias="dest_port") 30 protocol: str = Field(validation_alias="proto") 31 app_proto: str = "" 32 rule: str = "" 33 action: str 34 gid: int 35 signature_id: int 36 rev: int 37 signature: str 38 category: str 39 severity: int 40 enrichment: dict = Field(default_factory=dict) 41 extra: dict = Field(default_factory=dict) 42 43 @property 44 def community_id_b64(self) -> str: 45 return b64encode(bytes(self.community_id, "utf-8")).decode("utf-8") 46
[docs] 47 @validator("timestamp", pre=True) 48 def timestamp_validator(cls, v): 49 return datetime.fromisoformat(v).timestamp()
50
[docs] 51 @validator("app_proto", pre=True) 52 def app_proto_validator(cls, v): 53 if v and v != "failed": 54 return v.upper() 55 return v
56 57
[docs] 58class NetworkAlertTable(Base): 59 __tablename__ = "network_alert" 60 61 id = sa.Column(sa.String, primary_key=True) 62 time = sa.Column(sa.DateTime) 63 timestamp = sa.Column(sa.Float) 64 community_id = sa.Column(sa.String, index=True) 65 community_id_b64 = sa.Column(sa.String) 66 flow_id = sa.Column(sa.Integer) 67 src_ip = sa.Column(sa.String, index=True) 68 src_port = sa.Column(sa.Integer) 69 dst_ip = sa.Column(sa.String, index=True) 70 dst_port = sa.Column(sa.Integer) 71 protocol = sa.Column(sa.String) 72 app_proto = sa.Column(sa.String) 73 rule = sa.Column(sa.String) 74 action = sa.Column(sa.String) 75 gid = sa.Column(sa.Integer) 76 signature_id = sa.Column(sa.Integer) 77 rev = sa.Column(sa.Integer) 78 signature = sa.Column(sa.String) 79 category = sa.Column(sa.String) 80 severity = sa.Column(sa.Integer) 81 enrichment = sa.Column(sa.JSON) 82 extra = sa.Column(sa.JSON)