1import json
2import logging
3import subprocess
4import shlex
5import re
6import platform
7from pathlib import Path
8from hashlib import file_digest
9from functools import cached_property
10from dataclasses import dataclass, KW_ONLY
11from collections.abc import Sequence, Mapping
12from typing import Any, Literal, Annotated
13
14import tyro
15
16from .types import DictPacket, DictLayers
17
18
[docs]
19@dataclass(frozen=True)
20class TsharkOutput:
21 """Output of tshark network traffic dump, together with some metadata of about it."""
22
23 list_packets: Sequence[DictPacket]
24 metadata: Mapping[str, Any]
25
26 def __post_init__(self) -> None:
27 assert isinstance(self.list_packets, Sequence), type(self.list_packets)
28
29 @property
30 def list_layers(self) -> Sequence[DictLayers]:
31 """Extract layers: for each packet, it extracts the layers from the `_source` key."""
32 return [
33 packet['_source']['layers'] for packet in self.list_packets
34 ]
35
36
37LOGGER = logging.getLogger("pcapng_utils.tshark")
38
39DEFAULT_HOSTS_FILE = h if (h := Path("/etc/hosts")).is_file() else None
40
41DEFAULT_TSHARK_CMD = {
42 "Linux": "/usr/bin/tshark",
43 "Darwin": "/Applications/Wireshark.app/Contents/MacOS/tshark",
44}.get(platform.system(), "tshark")
45
46
[docs]
47@dataclass(frozen=True)
48class Tshark:
49 """
50 A class to interact with tshark for loading and parsing network traffic data from a PCAPNG file.
51
52 **tshark** is a command-line tool for capturing and analyzing network traffic.
53 It is part of the Wireshark suite and provides similar functionality to the Wireshark GUI in a terminal environment.
54
55 - Packet capture and analysis: `tshark` can capture live network traffic and analyze packets from capture files (e.g., PCAP, PCAPNG).
56 - Protocol decoding: It supports decoding a wide range of network protocols, providing detailed information about each packet.
57 - Filtering: `tshark` allows filtering packets using display filters to focus on specific traffic.
58 - Statistics: It can generate various statistics about the captured traffic, such as protocol hierarchy,
59 endpoint statistics, and conversation lists.
60 - Exporting data: `tshark` can export packet data to different formats, including JSON, CSV, and plain text.
61 - Decryption: `tshark` supports decryption of encrypted traffic using SSL/TLS keys provided in an SSLKEYLOG file.
62
63 `tshark` can convert PCAPNG files to JSON format using the `-T json` option.
64 This allows for easy parsing and analysis of network traffic data in a structured format.
65
66 **Useful commands**:
67
68 - Capture live traffic: `tshark -i <interface>`
69 - Read from a PCAP file: `tshark -r <file.[pcap|pcapng]>`
70 - Display packet details: `tshark -V`
71 - Filter packets: `tshark -Y <filter>`
72 - Export to JSON: `tshark -r <file.[pcap|pcapng]> -T json`
73 - Decrypt SSL/TLS traffic: `tshark -r <file.[pcap|pcapng]> -o "ssl.keys_list: <key_file>"`
74 - Inject the TLS secrets: `editcap --inject-secrets tls,<keylog_file> <file.pcap> <output.pcapng>`
75 """
76
77 tshark_cmd: Annotated[str, tyro.conf.arg(name='tshark', aliases=['-c'], metavar='CMD')] = DEFAULT_TSHARK_CMD
78 """Path/command for tshark executable"""
79
80 _: KW_ONLY
81
82 hash_algo: Annotated[str, tyro.conf.arg(metavar='ALGO')] = 'sha1'
83 """Hash algorithm to generate digest of input .pcapng"""
84
85 name_resolution: Annotated[Literal[False] | str, tyro.conf.arg(metavar='FLAGS|False')] = 'nds'
86 """Name resolution flags, as documented in tshark manual under -N flag;
87 by default we avoid using any external DNS resolver"""
88
89 hosts_file: Path | None = DEFAULT_HOSTS_FILE
90 """Hosts file for tshark name resolution - only used when name resolution contains'n'"""
91
92 display_filter: Annotated[str, tyro.conf.arg(aliases=['-Y'])] = "http || http2 || websocket"
93 """Display filter (documented in tshark manual under -Y flag)"""
94
95 protocol_match_filter: Annotated[str, tyro.conf.arg(aliases=['-J'])] = "http http2 websocket"
96 """Protocol match filter (documented in tshark manual under -J flag), in addition to base protocols"""
97
98 tcp_reassemble_out_of_order: bool = True
99 """Whether to allow or not to reassemble out-of-order TCP segments"""
100
101 timeout: Annotated[float, tyro.conf.arg(metavar='SECONDS')] = 60.0
102 """Timeout in seconds for tshark command completion"""
103
104 @cached_property
105 def _tshark_cmd_split(self) -> list[str]:
106 return shlex.split(self.tshark_cmd)
107
[docs]
108 @cached_property
109 def version(self) -> str:
110 proc = subprocess.run(
111 [*self._tshark_cmd_split, '--version'], text=True, capture_output=True, timeout=self.timeout
112 )
113 if proc.returncode != 0:
114 raise RuntimeError(proc.stderr)
115 VERSION_LINE_PREFIX = 'TShark (Wireshark) '
116 version_first_line = next(line for line in proc.stdout.splitlines() if line.startswith(VERSION_LINE_PREFIX))
117 version = version_first_line.removeprefix(VERSION_LINE_PREFIX).removesuffix('.')
118 if not version.startswith("4."):
119 raise NotImplementedError(f"Unsupported tshark version (expected v4.x): {version}")
120 return version
121
[docs]
122 @cached_property
123 def less_than_v4_4(self) -> bool:
124 # no version parsing for now
125 for minor in range(4):
126 if self.version.startswith(f"4.{minor}."):
127 return True
128 return False
129
[docs]
130 def get_command(self, pcapng_file: Path) -> list[str]:
131 """Get full command to be executed"""
132 name_resolution_flags = list[str]()
133 if not self.name_resolution:
134 name_resolution_flags.append('-n')
135 else:
136 name_resolution = self.name_resolution
137 if 's' in name_resolution and self.less_than_v4_4:
138 # name resolution from SNI only supported in tshark >=4.4
139 name_resolution = name_resolution.replace('s', '')
140 name_resolution_flags += ['-N', name_resolution]
141 if 'n' in name_resolution and self.hosts_file:
142 name_resolution_flags += ['-H', self.hosts_file.as_posix()]
143 return [
144 *self._tshark_cmd_split,
145 '-r', pcapng_file.resolve().as_posix(),
146 '-2', # two passes (can't read from stdin in this case)
147 '-x', # output raw fields as well
148 '-T', 'json',
149 '--no-duplicate-keys', # merge json keys
150 *name_resolution_flags,
151 '-Y', self.display_filter,
152 '-J', f'frame ip ipv6 tcp {self.protocol_match_filter}', # do not export data of useless layers
153 '--enable-protocol', 'communityid',
154 '-o', f'tcp.reassemble_out_of_order:{str(self.tcp_reassemble_out_of_order).upper()}',
155 ]
156
[docs]
157 def load_traffic(self, pcapng_file: Path) -> TsharkOutput:
158 """
159 Loads network traffic data from the provided pcapng file using tshark.
160
161 This method runs the tshark command to read the pcapng file and parse the output as JSON.
162 The parsed traffic data is then returned, together with some metadata.
163
164 Raises:
165 subprocess.CalledProcessError: If the tshark command fails.
166
167 Note that no HTTP3 traffic is expected since it is rejected by Pirogue.
168 """
169 with pcapng_file.open('rb') as fp:
170 metadata = {
171 'tshark_version': self.version,
172 f'input_{self.hash_algo}': file_digest(fp, self.hash_algo).hexdigest(),
173 }
174 cmd = self.get_command(pcapng_file)
175 LOGGER.debug(f"Command for tshark {self.version}: {cmd}")
176 proc = subprocess.run(cmd, capture_output=True, timeout=self.timeout)
177 if proc.returncode != 0:
178 err = list[str]()
179 if proc.stderr:
180 err.append(proc.stderr.decode())
181 if proc.stdout:
182 err.append(proc.stdout.decode())
183 raise RuntimeError("\n".join(err))
184 # We remove any leading/trailing information between actual tshark output
185 # (e.g. from OCI container prologue)
186 out = proc.stdout.strip()
187 if not out.startswith(b"["):
188 out = re.sub(rb"^[^\[]+\[", b"[", out)
189 if not out.endswith(b"]"):
190 out = re.sub(rb"\][^\]]+$", b"]", out)
191 list_packets = json.loads(out)
192 return TsharkOutput(list_packets, metadata)
193
194
[docs]
195def cli_dump_tshark_layers_as_json() -> None:
196 """Standard output may be redirected to a .json to inspect tshark intermediate output"""
197
198 import sys
199 from pprint import pprint
200
201 @dataclass(frozen=True, kw_only=True)
202 class TsharkCli(Tshark):
203 pcapng_file: Annotated[Path, tyro.conf.arg(aliases=["-i"], metavar="PATH")]
204 """Path to input .pcapng"""
205
206 TsharkCli.__doc__ = Tshark.__doc__
207
208 tshark = tyro.cli(TsharkCli, config=(tyro.conf.DisallowNone,))
209 out = tshark.load_traffic(tshark.pcapng_file)
210
211 pprint(out.metadata, stream=sys.stderr, indent=2, width=100)
212 print(json.dumps(out.list_layers, ensure_ascii=False, allow_nan=False, indent=2))
213
214
215if __name__ == "__main__":
216 cli_dump_tshark_layers_as_json()