Source code for pcapng_utils.tshark.wrapper

  1import json
  2import logging
  3import subprocess
  4import shlex
  5import re
  6import platform
  7from pathlib import Path
  8from hashlib import file_digest
  9from functools import cached_property
 10from dataclasses import dataclass, KW_ONLY
 11from collections.abc import Sequence, Mapping
 12from typing import Any, Literal, Annotated
 13
 14import tyro
 15
 16from .types import DictPacket, DictLayers
 17
 18
[docs] 19@dataclass(frozen=True) 20class TsharkOutput: 21 """Output of tshark network traffic dump, together with some metadata of about it.""" 22 23 list_packets: Sequence[DictPacket] 24 metadata: Mapping[str, Any] 25 26 def __post_init__(self) -> None: 27 assert isinstance(self.list_packets, Sequence), type(self.list_packets) 28 29 @property 30 def list_layers(self) -> Sequence[DictLayers]: 31 """Extract layers: for each packet, it extracts the layers from the `_source` key.""" 32 return [ 33 packet['_source']['layers'] for packet in self.list_packets 34 ]
35 36 37LOGGER = logging.getLogger("pcapng_utils.tshark") 38 39DEFAULT_HOSTS_FILE = h if (h := Path("/etc/hosts")).is_file() else None 40 41DEFAULT_TSHARK_CMD = { 42 "Linux": "/usr/bin/tshark", 43 "Darwin": "/Applications/Wireshark.app/Contents/MacOS/tshark", 44}.get(platform.system(), "tshark") 45 46
[docs] 47@dataclass(frozen=True) 48class Tshark: 49 """ 50 A class to interact with tshark for loading and parsing network traffic data from a PCAPNG file. 51 52 **tshark** is a command-line tool for capturing and analyzing network traffic. 53 It is part of the Wireshark suite and provides similar functionality to the Wireshark GUI in a terminal environment. 54 55 - Packet capture and analysis: `tshark` can capture live network traffic and analyze packets from capture files (e.g., PCAP, PCAPNG). 56 - Protocol decoding: It supports decoding a wide range of network protocols, providing detailed information about each packet. 57 - Filtering: `tshark` allows filtering packets using display filters to focus on specific traffic. 58 - Statistics: It can generate various statistics about the captured traffic, such as protocol hierarchy, 59 endpoint statistics, and conversation lists. 60 - Exporting data: `tshark` can export packet data to different formats, including JSON, CSV, and plain text. 61 - Decryption: `tshark` supports decryption of encrypted traffic using SSL/TLS keys provided in an SSLKEYLOG file. 62 63 `tshark` can convert PCAPNG files to JSON format using the `-T json` option. 64 This allows for easy parsing and analysis of network traffic data in a structured format. 65 66 **Useful commands**: 67 68 - Capture live traffic: `tshark -i <interface>` 69 - Read from a PCAP file: `tshark -r <file.[pcap|pcapng]>` 70 - Display packet details: `tshark -V` 71 - Filter packets: `tshark -Y <filter>` 72 - Export to JSON: `tshark -r <file.[pcap|pcapng]> -T json` 73 - Decrypt SSL/TLS traffic: `tshark -r <file.[pcap|pcapng]> -o "ssl.keys_list: <key_file>"` 74 - Inject the TLS secrets: `editcap --inject-secrets tls,<keylog_file> <file.pcap> <output.pcapng>` 75 """ 76 77 tshark_cmd: Annotated[str, tyro.conf.arg(name='tshark', aliases=['-c'], metavar='CMD')] = DEFAULT_TSHARK_CMD 78 """Path/command for tshark executable""" 79 80 _: KW_ONLY 81 82 hash_algo: Annotated[str, tyro.conf.arg(metavar='ALGO')] = 'sha1' 83 """Hash algorithm to generate digest of input .pcapng""" 84 85 name_resolution: Annotated[Literal[False] | str, tyro.conf.arg(metavar='FLAGS|False')] = 'nds' 86 """Name resolution flags, as documented in tshark manual under -N flag; 87 by default we avoid using any external DNS resolver""" 88 89 hosts_file: Path | None = DEFAULT_HOSTS_FILE 90 """Hosts file for tshark name resolution - only used when name resolution contains'n'""" 91 92 display_filter: Annotated[str, tyro.conf.arg(aliases=['-Y'])] = "http || http2 || websocket" 93 """Display filter (documented in tshark manual under -Y flag)""" 94 95 protocol_match_filter: Annotated[str, tyro.conf.arg(aliases=['-J'])] = "http http2 websocket" 96 """Protocol match filter (documented in tshark manual under -J flag), in addition to base protocols""" 97 98 tcp_reassemble_out_of_order: bool = True 99 """Whether to allow or not to reassemble out-of-order TCP segments""" 100 101 timeout: Annotated[float, tyro.conf.arg(metavar='SECONDS')] = 60.0 102 """Timeout in seconds for tshark command completion""" 103 104 @cached_property 105 def _tshark_cmd_split(self) -> list[str]: 106 return shlex.split(self.tshark_cmd) 107
[docs] 108 @cached_property 109 def version(self) -> str: 110 proc = subprocess.run( 111 [*self._tshark_cmd_split, '--version'], text=True, capture_output=True, timeout=self.timeout 112 ) 113 if proc.returncode != 0: 114 raise RuntimeError(proc.stderr) 115 VERSION_LINE_PREFIX = 'TShark (Wireshark) ' 116 version_first_line = next(line for line in proc.stdout.splitlines() if line.startswith(VERSION_LINE_PREFIX)) 117 version = version_first_line.removeprefix(VERSION_LINE_PREFIX).removesuffix('.') 118 if not version.startswith("4."): 119 raise NotImplementedError(f"Unsupported tshark version (expected v4.x): {version}") 120 return version
121
[docs] 122 @cached_property 123 def less_than_v4_4(self) -> bool: 124 # no version parsing for now 125 for minor in range(4): 126 if self.version.startswith(f"4.{minor}."): 127 return True 128 return False
129
[docs] 130 def get_command(self, pcapng_file: Path) -> list[str]: 131 """Get full command to be executed""" 132 name_resolution_flags = list[str]() 133 if not self.name_resolution: 134 name_resolution_flags.append('-n') 135 else: 136 name_resolution = self.name_resolution 137 if 's' in name_resolution and self.less_than_v4_4: 138 # name resolution from SNI only supported in tshark >=4.4 139 name_resolution = name_resolution.replace('s', '') 140 name_resolution_flags += ['-N', name_resolution] 141 if 'n' in name_resolution and self.hosts_file: 142 name_resolution_flags += ['-H', self.hosts_file.as_posix()] 143 return [ 144 *self._tshark_cmd_split, 145 '-r', pcapng_file.resolve().as_posix(), 146 '-2', # two passes (can't read from stdin in this case) 147 '-x', # output raw fields as well 148 '-T', 'json', 149 '--no-duplicate-keys', # merge json keys 150 *name_resolution_flags, 151 '-Y', self.display_filter, 152 '-J', f'frame ip ipv6 tcp {self.protocol_match_filter}', # do not export data of useless layers 153 '--enable-protocol', 'communityid', 154 '-o', f'tcp.reassemble_out_of_order:{str(self.tcp_reassemble_out_of_order).upper()}', 155 ]
156
[docs] 157 def load_traffic(self, pcapng_file: Path) -> TsharkOutput: 158 """ 159 Loads network traffic data from the provided pcapng file using tshark. 160 161 This method runs the tshark command to read the pcapng file and parse the output as JSON. 162 The parsed traffic data is then returned, together with some metadata. 163 164 Raises: 165 subprocess.CalledProcessError: If the tshark command fails. 166 167 Note that no HTTP3 traffic is expected since it is rejected by Pirogue. 168 """ 169 with pcapng_file.open('rb') as fp: 170 metadata = { 171 'tshark_version': self.version, 172 f'input_{self.hash_algo}': file_digest(fp, self.hash_algo).hexdigest(), 173 } 174 cmd = self.get_command(pcapng_file) 175 LOGGER.debug(f"Command for tshark {self.version}: {cmd}") 176 proc = subprocess.run(cmd, capture_output=True, timeout=self.timeout) 177 if proc.returncode != 0: 178 err = list[str]() 179 if proc.stderr: 180 err.append(proc.stderr.decode()) 181 if proc.stdout: 182 err.append(proc.stdout.decode()) 183 raise RuntimeError("\n".join(err)) 184 # We remove any leading/trailing information between actual tshark output 185 # (e.g. from OCI container prologue) 186 out = proc.stdout.strip() 187 if not out.startswith(b"["): 188 out = re.sub(rb"^[^\[]+\[", b"[", out) 189 if not out.endswith(b"]"): 190 out = re.sub(rb"\][^\]]+$", b"]", out) 191 list_packets = json.loads(out) 192 return TsharkOutput(list_packets, metadata)
193 194
[docs] 195def cli_dump_tshark_layers_as_json() -> None: 196 """Standard output may be redirected to a .json to inspect tshark intermediate output""" 197 198 import sys 199 from pprint import pprint 200 201 @dataclass(frozen=True, kw_only=True) 202 class TsharkCli(Tshark): 203 pcapng_file: Annotated[Path, tyro.conf.arg(aliases=["-i"], metavar="PATH")] 204 """Path to input .pcapng""" 205 206 TsharkCli.__doc__ = Tshark.__doc__ 207 208 tshark = tyro.cli(TsharkCli, config=(tyro.conf.DisallowNone,)) 209 out = tshark.load_traffic(tshark.pcapng_file) 210 211 pprint(out.metadata, stream=sys.stderr, indent=2, width=100) 212 print(json.dumps(out.list_layers, ensure_ascii=False, allow_nan=False, indent=2))
213 214 215if __name__ == "__main__": 216 cli_dump_tshark_layers_as_json()