Source code for mongoose.models.network_alert

 1import time
 2from base64 import b64encode
 3from datetime import datetime
 4from uuid import uuid4
 5
 6import sqlalchemy as sa
 7from pydantic import BaseModel, Field, validator
 8
 9from mongoose.models.base import Base
10
11
[docs] 12class NetworkAlert(BaseModel):
[docs] 13 class Config: 14 from_attributes = True
15 16 id: str = Field(default_factory=lambda: uuid4().hex, frozen=True) 17 time: datetime = Field(default_factory=datetime.now) 18 timestamp: float = Field(default_factory=lambda: int(time.time())) 19 community_id: str = "" 20 flow_id: int 21 src_ip: str 22 src_port: int 23 dst_ip: str = Field(validation_alias="dest_ip") 24 dst_port: int = Field(validation_alias="dest_port") 25 protocol: str = Field(validation_alias="proto") 26 app_proto: str = "" 27 rule: str = "" 28 action: str 29 gid: int 30 signature_id: int 31 rev: int 32 signature: str 33 category: str 34 severity: int 35 enrichment: dict = Field(default_factory=dict) 36 extra: dict = Field(default_factory=dict) 37 38 @property 39 def community_id_b64(self) -> str: 40 return b64encode(bytes(self.community_id, "utf-8")).decode("utf-8") 41
[docs] 42 @validator("timestamp", pre=True) 43 def timestamp_validator(cls, v): 44 return datetime.fromisoformat(v).timestamp()
45
[docs] 46 @validator("app_proto", pre=True) 47 def app_proto_validator(cls, v): 48 if v and v != "failed": 49 return v.upper() 50 return v
51 52
[docs] 53class NetworkAlertTable(Base): 54 __tablename__ = "network_alert" 55 56 id = sa.Column(sa.String, primary_key=True) 57 time = sa.Column(sa.DateTime) 58 timestamp = sa.Column(sa.Float) 59 community_id = sa.Column(sa.String, index=True) 60 community_id_b64 = sa.Column(sa.String) 61 flow_id = sa.Column(sa.Integer) 62 src_ip = sa.Column(sa.String, index=True) 63 src_port = sa.Column(sa.Integer) 64 dst_ip = sa.Column(sa.String, index=True) 65 dst_port = sa.Column(sa.Integer) 66 protocol = sa.Column(sa.String) 67 app_proto = sa.Column(sa.String) 68 rule = sa.Column(sa.String) 69 action = sa.Column(sa.String) 70 gid = sa.Column(sa.Integer) 71 signature_id = sa.Column(sa.Integer) 72 rev = sa.Column(sa.Integer) 73 signature = sa.Column(sa.String) 74 category = sa.Column(sa.String) 75 severity = sa.Column(sa.Integer) 76 enrichment = sa.Column(sa.JSON) 77 extra = sa.Column(sa.JSON)