# Command Injection via subprocess.run(shell=True)

Language: Python
Severity: Critical
CWE: CWE-78

## Source
3

## Flow
3-4-5, 3-7-8

## Sink
5, 8

## Vulnerable Code
```python
import subprocess

def provision_iot_device(device_mac, firmware_url):
    sanitized_mac = device_mac.replace(':', '')
    flash_cmd = f"curl -s {firmware_url} | dd of=/dev/iot_{sanitized_mac} bs=4M"
    result = subprocess.run(flash_cmd, shell=True, capture_output=True, text=True)
    if result.returncode == 0:
        reboot_sequence = f"echo 'REBOOT' > /sys/devices/iot/{sanitized_mac}/control"
        subprocess.run(reboot_sequence, shell=True)
        return {"status": "provisioned", "device": sanitized_mac}
    return {"status": "failed", "error": result.stderr}
```

## Explanation

The function accepts untrusted user input via firmware_url and device_mac parameters and passes them directly into shell commands executed via subprocess.run(shell=True). While device_mac receives minimal sanitization (colon removal), the firmware_url parameter is completely unsanitized, allowing attackers to inject arbitrary shell commands through command separators, pipes, or substitution operators.

## Remediation

The fix eliminates shell=True by using subprocess with argument lists, separating the curl and dd commands into individual subprocess calls piped through Python. Input validation is added for both the MAC address (strict regex) and firmware URL (scheme validation and character whitelist). The reboot command is replaced with a direct file write operation instead of a shell echo redirect.

## Secure Code
```python
import subprocess
import re
from urllib.parse import urlparse


def validate_mac_address(device_mac):
    """Validate MAC address format strictly."""
    pattern = r'^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$'
    if not re.match(pattern, device_mac):
        raise ValueError(f"Invalid MAC address format: {device_mac}")
    return device_mac.replace(':', '')


def validate_firmware_url(firmware_url):
    """Validate firmware URL to ensure it's a proper HTTP(S) URL."""
    parsed = urlparse(firmware_url)
    if parsed.scheme not in ('http', 'https'):
        raise ValueError(f"Invalid URL scheme: {parsed.scheme}. Only http and https are allowed.")
    if not parsed.hostname:
        raise ValueError("Invalid URL: no hostname found.")
    allowed_url_pattern = r'^https?://[A-Za-z0-9._~:/?#\[\]@!$&\'()*+,;=%-]+$'
    if not re.match(allowed_url_pattern, firmware_url):
        raise ValueError(f"URL contains disallowed characters: {firmware_url}")
    return firmware_url


def provision_iot_device(device_mac, firmware_url):
    """Provision an IoT device by flashing firmware."""
    sanitized_mac = validate_mac_address(device_mac)
    validated_url = validate_firmware_url(firmware_url)

    device_path = f"/dev/iot_{sanitized_mac}"
    control_path = f"/sys/devices/iot/{sanitized_mac}/control"

    curl_process = subprocess.run(
        ["curl", "-s", "--max-redirs", "3", validated_url],
        capture_output=True
    )

    if curl_process.returncode != 0:
        return {"status": "failed", "error": "Failed to download firmware"}

    dd_process = subprocess.run(
        ["dd", f"of={device_path}", "bs=4M"],
        input=curl_process.stdout,
        capture_output=True
    )

    if dd_process.returncode == 0:
        try:
            with open(control_path, 'w') as control_file:
                control_file.write('REBOOT')
        except IOError as e:
            return {"status": "failed", "error": f"Failed to send reboot command: {e}"}
        return {"status": "provisioned", "device": sanitized_mac}

    return {"status": "failed", "error": dd_process.stderr.decode('utf-8', errors='replace')}
```
