Transport plugins define how listeners accept and handle connections from agents. Build a custom transport to implement any C2 channel — DNS, named pipes, WebSocket, or domain fronting.
How Transports Work
#
A transport plugin does exactly two things:
- Receive raw bytes from an agent and call the
on_message callback - Queue response bytes to send back when the callback returns them
The transport never inspects or modifies wire content. All encryption, routing, and decoding happen in the MessagePipeline. Your transport is just a delivery mechanism.
Base Class
# 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| # tantoc2/src/tantoc2/server/transports_module.py
@dataclass
class TransportConfig:
host: str = "0.0.0.0"
port: int = 443
tls_enabled: bool = False
tls_cert_file: str | None = None
tls_key_file: str | None = None
options: dict[str, Any] = field(default_factory=dict)
class TransportBase(PluginBase):
@classmethod
def plugin_type(cls) -> str:
return "transport"
@abstractmethod
def __init__(self, config: TransportConfig, on_message: Callable[..., Any]) -> None:
"""Initialize with config and the pipeline callback."""
@abstractmethod
def start(self) -> None:
"""Start listening. Must return immediately (spawn a background thread)."""
@abstractmethod
def stop(self) -> None:
"""Stop listening and release the port."""
@abstractmethod
def is_running(self) -> bool:
"""Whether the transport is actively accepting connections."""
@abstractmethod
def send(self, client_id: str, data: bytes) -> None:
"""Queue response bytes for a specific client."""
|
The on_message callback signature:
1
2
3
4
5
6
7
8
9
10
| def on_message(client_id: str, data: bytes) -> bytes | None:
"""
Args:
client_id: Your transport-assigned identifier for this client.
Must be consistent across calls for the same logical connection.
data: Raw wire bytes starting at byte 0 of the TantoC2 wire header
(4-byte magic + 16-byte session token + encrypted payload).
Returns:
Response bytes to send back, or None if no response.
"""
|
Step-by-Step: Writing a Transport
#Step 1: Create the project structure
#1
2
3
4
5
6
7
8
| tantoc2-transport-dns/
pyproject.toml
src/
tantoc2_transport_dns/
__init__.py
dns_transport.py
tests/
test_dns_transport.py
|
Step 2: Write pyproject.toml
#Copy this template and fill in your plugin name:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| [build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "tantoc2-transport-dns"
version = "0.1.0"
description = "TantoC2 DNS tunnel transport"
requires-python = ">=3.11"
dependencies = [
"tantoc2",
"dnslib>=0.9", # your protocol library
]
[project.optional-dependencies]
dev = ["pytest>=8.0", "pytest-cov>=5.0"]
[project.entry-points."tantoc2.transports"]
dns = "tantoc2_transport_dns.dns_transport:DNSTransport"
[tool.hatch.build.targets.wheel]
packages = ["src/tantoc2_transport_dns"]
|
The entry point key (dns) must match plugin_name().
Step 3: Implement TransportBase
# 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
| from __future__ import annotations
import logging
import threading
from typing import Any
from collections.abc import Callable
from tantoc2.server.transports_module import TransportBase, TransportConfig
logger = logging.getLogger(__name__)
class DNSTransport(TransportBase):
"""DNS tunnel transport — encodes agent traffic in TXT queries."""
@classmethod
def plugin_name(cls) -> str:
return "dns"
def __init__(
self,
config: TransportConfig,
on_message: Callable[..., Any],
) -> None:
self._config = config
self._on_message = on_message
self._running = False
self._thread: threading.Thread | None = None
self._lock = threading.Lock()
# Per-client reassembly buffers and response queues
self._buffers: dict[str, list[bytes]] = {}
self._response_queues: dict[str, list[bytes]] = {}
def start(self) -> None:
"""Start the DNS server in a daemon thread."""
if self._running:
return
self._running = True
self._thread = threading.Thread(
target=self._serve, daemon=True, name="tantoc2-dns-transport"
)
self._thread.start()
logger.info("DNS transport started on %s:%d", self._config.host, self._config.port)
def stop(self) -> None:
"""Signal the serve loop to exit and wait for the thread."""
if not self._running:
return
self._running = False
if self._thread is not None:
self._thread.join(timeout=5.0)
self._thread = None
logger.info("DNS transport stopped")
def is_running(self) -> bool:
return self._running
def send(self, client_id: str, data: bytes) -> None:
"""Queue response data for client_id (returned on next query)."""
with self._lock:
self._response_queues.setdefault(client_id, []).append(data)
def _serve(self) -> None:
"""Main DNS server loop (runs in background thread)."""
import socket
import dnslib
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((self._config.host, self._config.port))
sock.settimeout(1.0) # Allow periodic _running check
while self._running:
try:
raw, addr = sock.recvfrom(4096)
except socket.timeout:
continue
except OSError:
break
client_id = f"{addr[0]}:{addr[1]}"
try:
self._handle_query(raw, addr, client_id, sock, dnslib)
except Exception:
logger.exception("Error handling DNS query from %s", client_id)
sock.close()
def _handle_query(self, raw, addr, client_id, sock, dnslib):
"""Decode a DNS query, call on_message, send a response."""
request = dnslib.DNSRecord.parse(raw)
qname = str(request.q.qname)
# Decode agent data from subdomain hex encoding
# e.g., "deadbeef01020304.c2.example.com"
parts = qname.split(".")
if not parts:
return
try:
payload_hex = parts[0]
agent_data = bytes.fromhex(payload_hex)
except ValueError:
return
# Deliver to pipeline
response_data = self._on_message(client_id, agent_data)
# Store in queue for next query
if response_data:
with self._lock:
self._response_queues.setdefault(client_id, []).append(response_data)
# Build DNS reply with queued data in TXT record
with self._lock:
queue = self._response_queues.pop(client_id, [])
encoded = b"".join(queue).hex() if queue else "00"
reply = dnslib.DNSRecord(
dnslib.DNSHeader(id=request.header.id, qr=1, aa=1, ra=1),
q=request.q,
)
reply.add_answer(dnslib.RR(
rname=request.q.qname,
rtype=dnslib.QTYPE.TXT,
rdata=dnslib.TXT(encoded),
))
sock.sendto(reply.pack(), addr)
|
Step 4: Handle TLS (if needed)
#Use the built-in helper for self-signed certificates:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| from tantoc2.server.transports_module import generate_self_signed_cert
def _configure_tls(self, server):
import ssl
if self._config.tls_enabled:
if self._config.tls_cert_file and self._config.tls_key_file:
cert_path = self._config.tls_cert_file
key_path = self._config.tls_key_file
else:
cert_path, key_path = generate_self_signed_cert()
logger.info("Generated self-signed certificate for TLS listener")
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.load_cert_chain(certfile=cert_path, keyfile=key_path)
server.socket = ctx.wrap_socket(server.socket, server_side=True)
|
generate_self_signed_cert() creates a 2048-bit RSA certificate valid for 365 days and writes it to temp files. It returns (cert_path, key_path).
Step 5: Write tests
# 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
| import threading
import time
import pytest
from unittest.mock import MagicMock
from tantoc2_transport_dns.dns_transport import DNSTransport
from tantoc2.server.transports_module import TransportConfig
@pytest.fixture
def on_message():
mock = MagicMock(return_value=b"\xde\xad\xc2\x01" + b"\x00" * 16 + b"response")
return mock
def test_transport_starts_and_stops(on_message):
config = TransportConfig(host="127.0.0.1", port=15353)
transport = DNSTransport(config, on_message)
transport.start()
assert transport.is_running()
transport.stop()
assert not transport.is_running()
def test_send_queues_data(on_message):
config = TransportConfig(host="127.0.0.1", port=15354)
transport = DNSTransport(config, on_message)
transport.start()
transport.send("127.0.0.1:9999", b"response_data")
with transport._lock:
assert b"response_data" in transport._response_queues.get("127.0.0.1:9999", [])
transport.stop()
def test_double_start_is_idempotent(on_message):
config = TransportConfig(host="127.0.0.1", port=15355)
transport = DNSTransport(config, on_message)
transport.start()
thread_before = transport._thread
transport.start() # second start — should be a no-op
assert transport._thread is thread_before
transport.stop()
|
Reference Implementations
#Study these working transports before writing your own:
HTTP Transport (tantoc2-transport-http)
#Source: transports/http/src/tantoc2_transport_http/http_transport.py
- Stateless:
POST / receives agent bytes, response queued in dict, returned inline GET / is a health check (200 OK)- Uses
http.server.HTTPServer in a daemon thread - TLS: wraps the HTTPServer socket with
ssl.SSLContext - Per-client response queue keyed by
"IP:port"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| # Core of the HTTP transport
class _Handler(BaseHTTPRequestHandler):
def do_POST(self) -> None:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length)
client_id = f"{self.client_address[0]}:{self.client_address[1]}"
# Call into the pipeline
transport._on_message(client_id, body)
# Return queued response
with transport._lock:
queue = transport._response_queues.pop(client_id, [])
response_data = b"".join(queue)
self.send_response(200)
self.send_header("Content-Type", "application/octet-stream")
self.send_header("Content-Length", str(len(response_data)))
self.end_headers()
self.wfile.write(response_data)
|
TCP Transport (tantoc2-transport-tcp)
#Source: transports/tcp/src/tantoc2_transport_tcp/tcp_transport.py
- Persistent connection with per-client read loop
- Length-prefixed framing:
[4-byte big-endian length][payload] - Uses
socketserver.TCPServer with allow_reuse_address = True - Response queued by
send(), dequeued and sent after each message
1
2
3
4
5
6
7
8
9
10
11
12
13
| # Length-prefixed framing
raw_len = self.rfile.read(4)
msg_len = struct.unpack("!I", raw_len)[0]
data = self.rfile.read(msg_len)
transport._on_message(client_id, data)
with transport._lock:
queue = transport._response_queues.pop(client_id, [])
response = b"".join(queue)
self.wfile.write(struct.pack("!I", len(response)))
self.wfile.write(response)
self.wfile.flush()
|
Design Patterns
#Stateless Protocols (HTTP, DNS)
#The response to each request is assembled from the queue and returned inline:
1
2
3
4
5
| def handle_request(self, data: bytes, client_id: str) -> bytes:
self._on_message(client_id, data)
with self._lock:
queue = self._response_queues.pop(client_id, [])
return b"".join(queue)
|
Persistent Connections (TCP, WebSocket)
#Each connection gets a dedicated read loop in a thread:
1
2
3
4
5
6
7
8
9
10
11
12
| def handle_client(self, conn, addr):
client_id = f"{addr[0]}:{addr[1]}"
while self._running:
data = self._read_framed(conn)
if not data:
break
self._on_message(client_id, data)
with self._lock:
queue = self._response_queues.pop(client_id, [])
response = b"".join(queue)
if response:
self._write_framed(conn, response)
|
Covert/Fragmented Protocols (DNS, ICMP)
#Buffer fragments until a complete message is assembled:
1
2
3
4
5
6
7
8
9
10
11
| def handle_fragment(self, fragment: bytes, client_id: str, seq: int, total: int):
with self._lock:
self._fragments[client_id][seq] = fragment
if len(self._fragments[client_id]) == total:
# Reassemble
full = b"".join(
self._fragments[client_id][i] for i in range(total)
)
del self._fragments[client_id]
# Deliver outside lock
self._on_message(client_id, full)
|
Key Requirements
#start() Must Return Immediately
#The ListenerManager calls start() from the main thread. Start a daemon thread and return:
1
2
3
4
5
6
| def start(self) -> None:
self._running = True
self._thread = threading.Thread(
target=self._serve, daemon=True, name="tantoc2-my-transport"
)
self._thread.start()
|
Port Release on stop()
#stop() must release the socket immediately. The operator may want to restart the listener on the same port seconds later:
1
2
3
4
5
6
7
8
| def stop(self) -> None:
self._running = False
if self._server:
self._server.shutdown() # Signal the serve loop
if self._thread:
self._thread.join(timeout=5.0)
self._server = None
self._thread = None
|
Thread-Safe Response Queues
#Multiple agents may connect simultaneously. Protect shared state with a lock:
1
2
3
4
| # Always use the lock when touching _response_queues
def send(self, client_id: str, data: bytes) -> None:
with self._lock:
self._response_queues.setdefault(client_id, []).append(data)
|
Call on_message() outside of any lock to prevent deadlocks — the pipeline may call send() recursively.
Consistent client_id
#The pipeline uses client_id to match sessions across requests. For stateless protocols derive it from the remote address. For persistent connections use the connection object identity:
1
2
3
4
5
| # HTTP / DNS (stateless)
client_id = f"{remote_addr[0]}:{remote_addr[1]}"
# TCP (persistent — addr stays fixed for the connection duration)
client_id = f"{addr[0]}:{addr[1]}"
|
Deployment
#Standalone Package (Recommended)
#1
2
3
4
5
6
| # Install in the teamserver environment
pip install ./tantoc2-transport-dns
# Or build a wheel and drop in the inbox
hatch build
cp dist/tantoc2_transport_dns-0.1.0-py3-none-any.whl /path/to/plugin_inbox/
|
File Drop
#1
| cp dns_transport.py tantoc2/plugins/transports/
|
Then trigger discovery:
1
| tantoc2> plugins refresh
|
Using Your Transport
#1
2
3
| tantoc2> listeners create dns --name my-dns --port 53
tantoc2> listeners start my-dns
tantoc2> listeners list
|
Common Pitfalls
#Forgetting to call on_message from outside a lock — the pipeline’s response path calls send(), which acquires the same lock, causing a deadlock.
start() blocking — any synchronous work in start() blocks the server’s startup. Always delegate to a background thread.
Not joining the thread in stop() — orphaned threads hold sockets open. Always thread.join(timeout=...) in stop().
Mutable default in TransportConfig.options — TransportConfig uses field(default_factory=dict) for options. Your transport-specific config lives in config.options as a plain dict. Validate it in __init__.
Forgetting idempotency — check if self._running: return at the start of both start() and the top of stop() when not running.
See Plugin Packaging for the full packaging and distribution reference.