Skip to main content
  1. Documentation/
  2. Developer Guide/

Building Tool (Agentless) Plugins

Table of Contents
Tool plugins execute directly from the teamserver against remote services. No agent deployment required — ideal for initial access, lateral movement, and service enumeration over any protocol.

How Tool Plugins Work
#

A tool plugin (called an “agentless module” internally) is a Python class that:

  1. Declares its protocol, supported operations, and option schema in metadata()
  2. Implements execute() to run against a list of targets and return per-target results
  3. Receives decrypted credentials from the credential store
  4. Optionally contributes discovered credentials back to the store

The server calls execute() with fully-resolved credentials and proxy config. Your module handles all networking directly.

Base Class
#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# tantoc2/src/tantoc2/server/agentless_base.py

@dataclass
class AgentlessMetadata:
    name: str
    description: str
    author: str
    protocol: str                           # "ssh", "smb", "ldap", etc.
    connection_params: dict[str, OptionSchema]   # target connection fields
    operations: list[str]                   # supported operation names
    options_schema: dict[str, OptionSchema] # per-operation options
    mitre_attack: list[str] = field(default_factory=list)
    dependencies: list[str] = field(default_factory=list)  # auto-installed


@dataclass
class AgentlessTarget:
    host: str
    port: int | None = None
    credential_id: str | None = None        # references credential store


@dataclass
class AgentlessResult:
    target: AgentlessTarget
    success: bool
    data: dict[str, Any] = field(default_factory=dict)
    credentials: list[ExtractedCredential] = field(default_factory=list)
    error: str | None = None
    raw_output: str | None = None


class AgentlessModuleBase(PluginBase):
    @classmethod
    def plugin_type(cls) -> str:
        return "agentless_module"

    @classmethod
    def plugin_name(cls) -> str:
        return cls.metadata().name   # derived automatically

    @classmethod
    @abstractmethod
    def metadata(cls) -> AgentlessMetadata: ...

    @abstractmethod
    def execute(
        self,
        operation: str,
        targets: list[AgentlessTarget],
        options: dict[str, Any],
        credentials: dict[str, dict[str, str]] | None = None,
        proxy: dict[str, Any] | None = None,
    ) -> list[AgentlessResult]: ...

OptionSchema
#

Both connection_params and options_schema use OptionSchema from module_base:

1
2
3
4
5
6
7
8
9
# tantoc2/src/tantoc2/server/module_base.py

@dataclass
class OptionSchema:
    name: str
    type: str        # "str", "int", "bool", "float"
    description: str
    required: bool = False
    default: Any = None

Step-by-Step: Writing a Tool Plugin
#

Step 1: Create the project structure
#

1
2
3
4
5
6
7
8
tantoc2-tool-winrm/
  pyproject.toml
  src/
    tantoc2_tool_winrm/
      __init__.py
      winrm_exec.py
  tests/
    test_winrm_exec.py

Step 2: Write pyproject.toml
#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "tantoc2-tool-winrm"
version = "0.1.0"
description = "TantoC2 WinRM remote execution tool"
requires-python = ">=3.11"
dependencies = [
    "tantoc2",
    "pywinrm>=0.4",
]

[project.optional-dependencies]
proxy = ["PySocks"]
dev = ["pytest>=8.0", "pytest-cov>=5.0"]

[project.entry-points."tantoc2.agentless_modules"]
winrm = "tantoc2_tool_winrm.winrm_exec:WinRMExecModule"

[tool.hatch.build.targets.wheel]
packages = ["src/tantoc2_tool_winrm"]

Step 3: Implement AgentlessModuleBase
#

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from __future__ import annotations

import logging
from typing import Any

from tantoc2.server.agentless_base import (
    AgentlessMetadata,
    AgentlessModuleBase,
    AgentlessResult,
    AgentlessTarget,
)
from tantoc2.server.module_base import ExtractedCredential, OptionSchema

logger = logging.getLogger(__name__)


class WinRMExecModule(AgentlessModuleBase):
    """Execute commands on Windows hosts via WinRM."""

    @classmethod
    def metadata(cls) -> AgentlessMetadata:
        return AgentlessMetadata(
            name="winrm",
            description="Execute commands on Windows hosts via WinRM/PS Remoting",
            author="Your Name",
            protocol="winrm",
            connection_params={
                "host": OptionSchema(
                    name="host",
                    type="str",
                    description="Target hostname or IP",
                    required=True,
                ),
                "port": OptionSchema(
                    name="port",
                    type="int",
                    description="WinRM port (default 5985 HTTP, 5986 HTTPS)",
                    required=False,
                    default=5985,
                ),
            },
            operations=["exec", "ps"],
            options_schema={
                "command": OptionSchema(
                    name="command",
                    type="str",
                    description="Command to execute",
                    required=True,
                ),
                "use_ssl": OptionSchema(
                    name="use_ssl",
                    type="bool",
                    description="Use HTTPS (port 5986)",
                    required=False,
                    default=False,
                ),
            },
            mitre_attack=["T1021.006"],
            dependencies=["pywinrm>=0.4"],
        )

    def execute(
        self,
        operation: str,
        targets: list[AgentlessTarget],
        options: dict[str, Any],
        credentials: dict[str, dict[str, str]] | None = None,
        proxy: dict[str, Any] | None = None,
    ) -> list[AgentlessResult]:
        """Execute against each target, returning one result per target."""
        results: list[AgentlessResult] = []
        for target in targets:
            try:
                result = self._execute_single(operation, target, options, credentials, proxy)
                results.append(result)
            except Exception as exc:
                logger.exception("WinRM failed for %s: %s", target.host, exc)
                results.append(AgentlessResult(
                    target=target,
                    success=False,
                    error=str(exc),
                ))
        return results

    def _execute_single(
        self,
        operation: str,
        target: AgentlessTarget,
        options: dict[str, Any],
        credentials: dict[str, dict[str, str]] | None,
        proxy: dict[str, Any] | None,
    ) -> AgentlessResult:
        import winrm

        port = target.port or (5986 if options.get("use_ssl") else 5985)
        scheme = "https" if options.get("use_ssl") else "http"

        # Resolve credentials
        username, password = None, None
        if target.credential_id and credentials:
            cred = credentials.get(target.credential_id, {})
            username = cred.get("username")
            password = cred.get("secret")

        if not username:
            return AgentlessResult(
                target=target,
                success=False,
                error="No credentials provided",
            )

        # Connect
        session = winrm.Session(
            f"{scheme}://{target.host}:{port}/wsman",
            auth=(username, password),
        )

        command = options["command"]

        if operation == "ps":
            response = session.run_ps(command)
        else:  # exec
            response = session.run_cmd(command)

        success = response.status_code == 0
        return AgentlessResult(
            target=target,
            success=success,
            data={
                "exit_code": response.status_code,
                "stdout": response.std_out.decode("utf-8", errors="replace"),
                "stderr": response.std_err.decode("utf-8", errors="replace"),
            },
            raw_output=response.std_out.decode("utf-8", errors="replace"),
            error=response.std_err.decode("utf-8", errors="replace") if not success else None,
        )

The execute() Contract
#

Parameters
#

ParameterTypeDescription
operationstrOne of the strings in metadata().operations. Validated before calling.
targetslist[AgentlessTarget]Each has host, optional port, optional credential_id.
optionsdict[str, Any]Validated per options_schema. Required options are already checked.
credentialsdict[str, dict[str, str]] | NoneMaps credential_id → {"username", "secret", "cred_type", "domain", ...}. Secrets are decrypted by the manager before this call.
proxydict[str, Any] | NoneProxy configuration (see below).

Return value
#

Return exactly one AgentlessResult per target — even on failure. The manager expects len(results) == len(targets). Never raise from execute() itself; catch exceptions per-target.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def execute(self, operation, targets, options, credentials, proxy):
    results = []
    for target in targets:
        try:
            results.append(self._run(operation, target, options, credentials, proxy))
        except Exception as exc:
            results.append(AgentlessResult(
                target=target, success=False, error=str(exc)
            ))
    return results

Credential Integration
#

Reading Credentials
#

When a target has a credential_id, the manager decrypts the credential and passes it in credentials:

1
2
3
4
5
6
if target.credential_id and credentials:
    cred = credentials[target.credential_id]
    username = cred["username"]
    secret = cred["secret"]       # plaintext password, hash, or key PEM
    cred_type = cred["cred_type"] # "plaintext", "hash", "ssh_key", "api_key", etc.
    domain = cred.get("domain")   # may be None

Contributing Credentials
#

Return ExtractedCredential objects in results to populate the credential store:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from tantoc2.server.module_base import ExtractedCredential

AgentlessResult(
    target=target,
    success=True,
    data={"users": found_users},
    credentials=[
        ExtractedCredential(
            cred_type="plaintext",
            username="administrator",
            secret="P@ssw0rd",
            domain="CORP",
            source_host=target.host,
            notes="Extracted from /etc/shadow",
        ),
        ExtractedCredential(
            cred_type="hash",
            username="serviceaccount",
            secret="aad3b435b51404eeaad3b435b51404ee:31d6cfe0d16ae931b73c59d7e0c089c0",
            domain="CORP",
            source_host=target.host,
        ),
    ],
)

cred_type values: plaintext, hash, ticket, token, ssh_key, api_key, certificate.

Proxy Support
#

When a proxy is configured, the proxy dict contains:

1
2
3
4
5
6
7
{
    "proxy_type": "socks5",     # "socks4", "socks5", or "ssh_tunnel"
    "host": "10.0.0.1",
    "port": 1080,
    "username": "user",          # optional
    "credential_id": "abc123",   # optional — look up in credentials
}

Route your connections through the proxy using PySocks:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def _create_proxy_socket(self, proxy: dict, dest_host: str, dest_port: int):
    try:
        import socks
    except ImportError:
        raise ImportError("PySocks is required for proxy support") from None

    proxy_type = {"socks4": socks.SOCKS4, "socks5": socks.SOCKS5}.get(
        proxy.get("proxy_type", "socks5"), socks.SOCKS5
    )
    sock = socks.socksocket()
    sock.set_proxy(
        proxy_type,
        proxy["host"],
        proxy["port"],
        username=proxy.get("username"),
    )
    sock.connect((dest_host, dest_port))
    return sock

Make proxy support optional — declare PySocks as an optional dependency:

1
2
[project.optional-dependencies]
proxy = ["PySocks"]

Reference Implementation: SSH Tool
#

Source: tools/ssh/src/tantoc2_tool_ssh/ssh_command.py

1
2
3
4
tantoc2-tool-ssh/
  pyproject.toml              # entry-point: tantoc2.agentless_modules → ssh
  src/tantoc2_tool_ssh/
    ssh_command.py            # SSHCommandModule

Key points from the SSH reference:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class SSHCommandModule(AgentlessModuleBase):
    @classmethod
    def metadata(cls) -> AgentlessMetadata:
        return AgentlessMetadata(
            name="ssh",
            protocol="ssh",
            operations=["exec", "upload", "download"],
            mitre_attack=["T1021.004"],
            dependencies=[],           # paramiko declared in pyproject.toml
        )

    def _execute_single(self, operation, target, options, credentials, proxy):
        # Credential resolution
        cred_type = cred.get("cred_type", "plaintext")
        if cred_type == "ssh_key":
            pkey = self._load_private_key(cred["secret"])   # PEM → paramiko key
        else:
            password = cred["secret"]

        # Optional SOCKS proxy
        sock = None
        if proxy:
            sock = self._create_proxy_socket(proxy, target.host, port)

        client.connect(..., sock=sock)

The _load_private_key method tries RSA, Ed25519, ECDSA, and DSS key types:

1
2
3
4
5
6
7
8
9
@staticmethod
def _load_private_key(key_data: str):
    import paramiko, io
    for cls in [paramiko.RSAKey, paramiko.Ed25519Key, paramiko.ECDSAKey, paramiko.DSSKey]:
        try:
            return cls.from_private_key(io.StringIO(key_data))
        except (paramiko.SSHException, ValueError):
            continue
    raise ValueError("Unable to parse SSH private key")

Testing
#

Unit Tests
#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import pytest
from unittest.mock import patch, MagicMock
from tantoc2_tool_winrm.winrm_exec import WinRMExecModule
from tantoc2.server.agentless_base import AgentlessTarget


def test_metadata():
    meta = WinRMExecModule.metadata()
    assert meta.name == "winrm"
    assert "exec" in meta.operations
    assert "ps" in meta.operations
    assert meta.protocol == "winrm"


def test_execute_missing_credentials():
    module = WinRMExecModule()
    target = AgentlessTarget(host="10.0.0.1", credential_id=None)
    results = module.execute("exec", [target], {"command": "whoami"})
    assert len(results) == 1
    assert not results[0].success
    assert "No credentials" in results[0].error


def test_execute_handles_exception():
    module = WinRMExecModule()
    target = AgentlessTarget(host="10.0.0.1", credential_id="cred-1")
    credentials = {"cred-1": {"username": "admin", "secret": "wrong", "cred_type": "plaintext"}}

    with patch("winrm.Session") as mock_session:
        mock_session.side_effect = Exception("Connection refused")
        results = module.execute("exec", [target], {"command": "whoami"}, credentials)

    assert len(results) == 1
    assert not results[0].success
    assert "Connection refused" in results[0].error

Integration Testing
#

1
2
3
4
5
hatch run dev:pip install -e ./tantoc2-tool-winrm/
# In the running teamserver CLI:
tantoc2> tools list
# Should show: winrm
tantoc2> tools run winrm exec --host 10.0.0.1 --credential-id <id> --command whoami

Deployment
#

Method 1: Standalone Package (Recommended)#

1
2
3
hatch build
pip install dist/tantoc2_tool_winrm-0.1.0-py3-none-any.whl
# Or drop the .whl in the plugin inbox

Method 2: File Drop
#

1
2
cp winrm_exec.py tantoc2/plugins/agentless/
tantoc2> tools refresh

Common Pitfalls
#

Raising from execute() — the manager does not catch exceptions from execute(). Always wrap per-target logic in a try/except and return AgentlessResult(success=False, error=str(exc)).

Returning wrong number of results — return exactly one result per target. Missing results cause the manager to skip auto-credential extraction for those targets.

Blocking forever — set socket timeouts. A hung execute() blocks the entire tools execution queue.

dependencies mismatch — declare dependencies in both pyproject.toml (for installation) and AgentlessMetadata.dependencies (for auto-install at discovery time). They don’t need to be identical — metadata dependencies are installed if the module is discovered via file-drop without pip.

Accessing secret before checking credentialscredentials is None when no credential IDs are on any target. Always check before subscripting.

See Plugin Packaging for the full packaging reference.