1import time
2from base64 import b64encode
3from datetime import datetime
4from uuid import uuid4
5
6import sqlalchemy as sa
7from pydantic import BaseModel, Field, validator
8
9from mongoose.models.base import Base
10
11
[docs]
12class NetworkAlert(BaseModel):
[docs]
13 class Config:
14 from_attributes = True
15
16 id: str = Field(default_factory=lambda: uuid4().hex, frozen=True)
17 time: datetime = Field(default_factory=datetime.now)
18 timestamp: float = Field(default_factory=lambda: int(time.time()))
19 community_id: str = ""
20 flow_id: int
21 src_ip: str
22 src_port: int
23 dst_ip: str = Field(validation_alias="dest_ip")
24 dst_port: int = Field(validation_alias="dest_port")
25 protocol: str = Field(validation_alias="proto")
26 app_proto: str = ""
27 rule: str = ""
28 action: str
29 gid: int
30 signature_id: int
31 rev: int
32 signature: str
33 category: str
34 severity: int
35 enrichment: dict = Field(default_factory=dict)
36 extra: dict = Field(default_factory=dict)
37
38 @property
39 def community_id_b64(self) -> str:
40 return b64encode(bytes(self.community_id, "utf-8")).decode("utf-8")
41
[docs]
42 @validator("timestamp", pre=True)
43 def timestamp_validator(cls, v):
44 return datetime.fromisoformat(v).timestamp()
45
[docs]
46 @validator("app_proto", pre=True)
47 def app_proto_validator(cls, v):
48 if v and v != "failed":
49 return v.upper()
50 return v
51
52
[docs]
53class NetworkAlertTable(Base):
54 __tablename__ = "network_alert"
55
56 id = sa.Column(sa.String, primary_key=True)
57 time = sa.Column(sa.DateTime)
58 timestamp = sa.Column(sa.Float)
59 community_id = sa.Column(sa.String, index=True)
60 community_id_b64 = sa.Column(sa.String)
61 flow_id = sa.Column(sa.Integer)
62 src_ip = sa.Column(sa.String, index=True)
63 src_port = sa.Column(sa.Integer)
64 dst_ip = sa.Column(sa.String, index=True)
65 dst_port = sa.Column(sa.Integer)
66 protocol = sa.Column(sa.String)
67 app_proto = sa.Column(sa.String)
68 rule = sa.Column(sa.String)
69 action = sa.Column(sa.String)
70 gid = sa.Column(sa.Integer)
71 signature_id = sa.Column(sa.Integer)
72 rev = sa.Column(sa.Integer)
73 signature = sa.Column(sa.String)
74 category = sa.Column(sa.String)
75 severity = sa.Column(sa.Integer)
76 enrichment = sa.Column(sa.JSON)
77 extra = sa.Column(sa.JSON)