Skip to main content
  1. Documentation/
  2. Developer Guide/

Building Transport Plugins

Table of Contents
Transport plugins define how listeners accept and handle connections from agents. Build a custom transport to implement any C2 channel — DNS, named pipes, WebSocket, or domain fronting.

How Transports Work
#

Transport Data Flow

A transport plugin does exactly two things:

  1. Receive raw bytes from an agent and call the on_message callback
  2. Queue response bytes to send back when the callback returns them

The transport never inspects or modifies wire content. All encryption, routing, and decoding happen in the MessagePipeline. Your transport is just a delivery mechanism.

Base Class
#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# tantoc2/src/tantoc2/server/transports_module.py

@dataclass
class TransportConfig:
    host: str = "0.0.0.0"
    port: int = 443
    tls_enabled: bool = False
    tls_cert_file: str | None = None
    tls_key_file: str | None = None
    options: dict[str, Any] = field(default_factory=dict)


class TransportBase(PluginBase):
    @classmethod
    def plugin_type(cls) -> str:
        return "transport"

    @abstractmethod
    def __init__(self, config: TransportConfig, on_message: Callable[..., Any]) -> None:
        """Initialize with config and the pipeline callback."""

    @abstractmethod
    def start(self) -> None:
        """Start listening. Must return immediately (spawn a background thread)."""

    @abstractmethod
    def stop(self) -> None:
        """Stop listening and release the port."""

    @abstractmethod
    def is_running(self) -> bool:
        """Whether the transport is actively accepting connections."""

    @abstractmethod
    def send(self, client_id: str, data: bytes) -> None:
        """Queue response bytes for a specific client."""

The on_message callback signature:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def on_message(client_id: str, data: bytes) -> bytes | None:
    """
    Args:
        client_id: Your transport-assigned identifier for this client.
                   Must be consistent across calls for the same logical connection.
        data:      Raw wire bytes starting at byte 0 of the TantoC2 wire header
                   (4-byte magic + 16-byte session token + encrypted payload).
    Returns:
        Response bytes to send back, or None if no response.
    """

Step-by-Step: Writing a Transport
#

Step 1: Create the project structure
#

1
2
3
4
5
6
7
8
tantoc2-transport-dns/
  pyproject.toml
  src/
    tantoc2_transport_dns/
      __init__.py
      dns_transport.py
  tests/
    test_dns_transport.py

Step 2: Write pyproject.toml
#

Copy this template and fill in your plugin name:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "tantoc2-transport-dns"
version = "0.1.0"
description = "TantoC2 DNS tunnel transport"
requires-python = ">=3.11"
dependencies = [
    "tantoc2",
    "dnslib>=0.9",          # your protocol library
]

[project.optional-dependencies]
dev = ["pytest>=8.0", "pytest-cov>=5.0"]

[project.entry-points."tantoc2.transports"]
dns = "tantoc2_transport_dns.dns_transport:DNSTransport"

[tool.hatch.build.targets.wheel]
packages = ["src/tantoc2_transport_dns"]

The entry point key (dns) must match plugin_name().

Step 3: Implement TransportBase
#

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from __future__ import annotations

import logging
import threading
from typing import Any
from collections.abc import Callable

from tantoc2.server.transports_module import TransportBase, TransportConfig

logger = logging.getLogger(__name__)


class DNSTransport(TransportBase):
    """DNS tunnel transport — encodes agent traffic in TXT queries."""

    @classmethod
    def plugin_name(cls) -> str:
        return "dns"

    def __init__(
        self,
        config: TransportConfig,
        on_message: Callable[..., Any],
    ) -> None:
        self._config = config
        self._on_message = on_message
        self._running = False
        self._thread: threading.Thread | None = None
        self._lock = threading.Lock()
        # Per-client reassembly buffers and response queues
        self._buffers: dict[str, list[bytes]] = {}
        self._response_queues: dict[str, list[bytes]] = {}

    def start(self) -> None:
        """Start the DNS server in a daemon thread."""
        if self._running:
            return
        self._running = True
        self._thread = threading.Thread(
            target=self._serve, daemon=True, name="tantoc2-dns-transport"
        )
        self._thread.start()
        logger.info("DNS transport started on %s:%d", self._config.host, self._config.port)

    def stop(self) -> None:
        """Signal the serve loop to exit and wait for the thread."""
        if not self._running:
            return
        self._running = False
        if self._thread is not None:
            self._thread.join(timeout=5.0)
        self._thread = None
        logger.info("DNS transport stopped")

    def is_running(self) -> bool:
        return self._running

    def send(self, client_id: str, data: bytes) -> None:
        """Queue response data for client_id (returned on next query)."""
        with self._lock:
            self._response_queues.setdefault(client_id, []).append(data)

    def _serve(self) -> None:
        """Main DNS server loop (runs in background thread)."""
        import socket
        import dnslib

        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.bind((self._config.host, self._config.port))
        sock.settimeout(1.0)  # Allow periodic _running check

        while self._running:
            try:
                raw, addr = sock.recvfrom(4096)
            except socket.timeout:
                continue
            except OSError:
                break

            client_id = f"{addr[0]}:{addr[1]}"
            try:
                self._handle_query(raw, addr, client_id, sock, dnslib)
            except Exception:
                logger.exception("Error handling DNS query from %s", client_id)

        sock.close()

    def _handle_query(self, raw, addr, client_id, sock, dnslib):
        """Decode a DNS query, call on_message, send a response."""
        request = dnslib.DNSRecord.parse(raw)
        qname = str(request.q.qname)

        # Decode agent data from subdomain hex encoding
        # e.g., "deadbeef01020304.c2.example.com"
        parts = qname.split(".")
        if not parts:
            return
        try:
            payload_hex = parts[0]
            agent_data = bytes.fromhex(payload_hex)
        except ValueError:
            return

        # Deliver to pipeline
        response_data = self._on_message(client_id, agent_data)

        # Store in queue for next query
        if response_data:
            with self._lock:
                self._response_queues.setdefault(client_id, []).append(response_data)

        # Build DNS reply with queued data in TXT record
        with self._lock:
            queue = self._response_queues.pop(client_id, [])
        encoded = b"".join(queue).hex() if queue else "00"

        reply = dnslib.DNSRecord(
            dnslib.DNSHeader(id=request.header.id, qr=1, aa=1, ra=1),
            q=request.q,
        )
        reply.add_answer(dnslib.RR(
            rname=request.q.qname,
            rtype=dnslib.QTYPE.TXT,
            rdata=dnslib.TXT(encoded),
        ))
        sock.sendto(reply.pack(), addr)

Step 4: Handle TLS (if needed)
#

Use the built-in helper for self-signed certificates:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from tantoc2.server.transports_module import generate_self_signed_cert

def _configure_tls(self, server):
    import ssl
    if self._config.tls_enabled:
        if self._config.tls_cert_file and self._config.tls_key_file:
            cert_path = self._config.tls_cert_file
            key_path = self._config.tls_key_file
        else:
            cert_path, key_path = generate_self_signed_cert()
            logger.info("Generated self-signed certificate for TLS listener")

        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
        ctx.load_cert_chain(certfile=cert_path, keyfile=key_path)
        server.socket = ctx.wrap_socket(server.socket, server_side=True)

generate_self_signed_cert() creates a 2048-bit RSA certificate valid for 365 days and writes it to temp files. It returns (cert_path, key_path).

Step 5: Write tests
#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import threading
import time
import pytest
from unittest.mock import MagicMock

from tantoc2_transport_dns.dns_transport import DNSTransport
from tantoc2.server.transports_module import TransportConfig


@pytest.fixture
def on_message():
    mock = MagicMock(return_value=b"\xde\xad\xc2\x01" + b"\x00" * 16 + b"response")
    return mock


def test_transport_starts_and_stops(on_message):
    config = TransportConfig(host="127.0.0.1", port=15353)
    transport = DNSTransport(config, on_message)
    transport.start()
    assert transport.is_running()
    transport.stop()
    assert not transport.is_running()


def test_send_queues_data(on_message):
    config = TransportConfig(host="127.0.0.1", port=15354)
    transport = DNSTransport(config, on_message)
    transport.start()
    transport.send("127.0.0.1:9999", b"response_data")
    with transport._lock:
        assert b"response_data" in transport._response_queues.get("127.0.0.1:9999", [])
    transport.stop()


def test_double_start_is_idempotent(on_message):
    config = TransportConfig(host="127.0.0.1", port=15355)
    transport = DNSTransport(config, on_message)
    transport.start()
    thread_before = transport._thread
    transport.start()  # second start — should be a no-op
    assert transport._thread is thread_before
    transport.stop()

Reference Implementations
#

Study these working transports before writing your own:

HTTP Transport (tantoc2-transport-http)
#

Source: transports/http/src/tantoc2_transport_http/http_transport.py

  • Stateless: POST / receives agent bytes, response queued in dict, returned inline
  • GET / is a health check (200 OK)
  • Uses http.server.HTTPServer in a daemon thread
  • TLS: wraps the HTTPServer socket with ssl.SSLContext
  • Per-client response queue keyed by "IP:port"
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# Core of the HTTP transport
class _Handler(BaseHTTPRequestHandler):
    def do_POST(self) -> None:
        content_length = int(self.headers.get("Content-Length", 0))
        body = self.rfile.read(content_length)
        client_id = f"{self.client_address[0]}:{self.client_address[1]}"

        # Call into the pipeline
        transport._on_message(client_id, body)

        # Return queued response
        with transport._lock:
            queue = transport._response_queues.pop(client_id, [])
        response_data = b"".join(queue)

        self.send_response(200)
        self.send_header("Content-Type", "application/octet-stream")
        self.send_header("Content-Length", str(len(response_data)))
        self.end_headers()
        self.wfile.write(response_data)

TCP Transport (tantoc2-transport-tcp)
#

Source: transports/tcp/src/tantoc2_transport_tcp/tcp_transport.py

  • Persistent connection with per-client read loop
  • Length-prefixed framing: [4-byte big-endian length][payload]
  • Uses socketserver.TCPServer with allow_reuse_address = True
  • Response queued by send(), dequeued and sent after each message
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# Length-prefixed framing
raw_len = self.rfile.read(4)
msg_len = struct.unpack("!I", raw_len)[0]
data = self.rfile.read(msg_len)

transport._on_message(client_id, data)

with transport._lock:
    queue = transport._response_queues.pop(client_id, [])
response = b"".join(queue)
self.wfile.write(struct.pack("!I", len(response)))
self.wfile.write(response)
self.wfile.flush()

Design Patterns
#

Stateless Protocols (HTTP, DNS)
#

The response to each request is assembled from the queue and returned inline:

1
2
3
4
5
def handle_request(self, data: bytes, client_id: str) -> bytes:
    self._on_message(client_id, data)
    with self._lock:
        queue = self._response_queues.pop(client_id, [])
    return b"".join(queue)

Persistent Connections (TCP, WebSocket)
#

Each connection gets a dedicated read loop in a thread:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def handle_client(self, conn, addr):
    client_id = f"{addr[0]}:{addr[1]}"
    while self._running:
        data = self._read_framed(conn)
        if not data:
            break
        self._on_message(client_id, data)
        with self._lock:
            queue = self._response_queues.pop(client_id, [])
        response = b"".join(queue)
        if response:
            self._write_framed(conn, response)

Covert/Fragmented Protocols (DNS, ICMP)
#

Buffer fragments until a complete message is assembled:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def handle_fragment(self, fragment: bytes, client_id: str, seq: int, total: int):
    with self._lock:
        self._fragments[client_id][seq] = fragment
        if len(self._fragments[client_id]) == total:
            # Reassemble
            full = b"".join(
                self._fragments[client_id][i] for i in range(total)
            )
            del self._fragments[client_id]
    # Deliver outside lock
    self._on_message(client_id, full)

Key Requirements
#

start() Must Return Immediately
#

The ListenerManager calls start() from the main thread. Start a daemon thread and return:

1
2
3
4
5
6
def start(self) -> None:
    self._running = True
    self._thread = threading.Thread(
        target=self._serve, daemon=True, name="tantoc2-my-transport"
    )
    self._thread.start()

Port Release on stop()
#

stop() must release the socket immediately. The operator may want to restart the listener on the same port seconds later:

1
2
3
4
5
6
7
8
def stop(self) -> None:
    self._running = False
    if self._server:
        self._server.shutdown()   # Signal the serve loop
    if self._thread:
        self._thread.join(timeout=5.0)
    self._server = None
    self._thread = None

Thread-Safe Response Queues
#

Multiple agents may connect simultaneously. Protect shared state with a lock:

1
2
3
4
# Always use the lock when touching _response_queues
def send(self, client_id: str, data: bytes) -> None:
    with self._lock:
        self._response_queues.setdefault(client_id, []).append(data)

Call on_message() outside of any lock to prevent deadlocks — the pipeline may call send() recursively.

Consistent client_id
#

The pipeline uses client_id to match sessions across requests. For stateless protocols derive it from the remote address. For persistent connections use the connection object identity:

1
2
3
4
5
# HTTP / DNS (stateless)
client_id = f"{remote_addr[0]}:{remote_addr[1]}"

# TCP (persistent — addr stays fixed for the connection duration)
client_id = f"{addr[0]}:{addr[1]}"

Deployment
#

Standalone Package (Recommended)#

1
2
3
4
5
6
# Install in the teamserver environment
pip install ./tantoc2-transport-dns

# Or build a wheel and drop in the inbox
hatch build
cp dist/tantoc2_transport_dns-0.1.0-py3-none-any.whl /path/to/plugin_inbox/

File Drop
#

1
cp dns_transport.py tantoc2/plugins/transports/

Then trigger discovery:

1
tantoc2> plugins refresh

Using Your Transport
#

1
2
3
tantoc2> listeners create dns --name my-dns --port 53
tantoc2> listeners start my-dns
tantoc2> listeners list

Common Pitfalls
#

Forgetting to call on_message from outside a lock — the pipeline’s response path calls send(), which acquires the same lock, causing a deadlock.

start() blocking — any synchronous work in start() blocks the server’s startup. Always delegate to a background thread.

Not joining the thread in stop() — orphaned threads hold sockets open. Always thread.join(timeout=...) in stop().

Mutable default in TransportConfig.optionsTransportConfig uses field(default_factory=dict) for options. Your transport-specific config lives in config.options as a plain dict. Validate it in __init__.

Forgetting idempotency — check if self._running: return at the start of both start() and the top of stop() when not running.

See Plugin Packaging for the full packaging and distribution reference.