# SSRF via requests.get on User-Controlled URL

Language: Python
Severity: High
CWE: CWE-918

## Source
8

## Flow
8-11

## Sink
11

## Vulnerable Code
```python
import requests
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/api/iot/device-status', methods=['POST'])
def fetch_device_telemetry():
    device_endpoint = request.json.get('telemetry_url')
    auth_token = request.json.get('device_token', '')
    headers = {'Authorization': f'Bearer {auth_token}'}
    response = requests.get(device_endpoint, headers=headers, timeout=5)
    telemetry_data = response.json()
    return jsonify({'status': 'success', 'device_data': telemetry_data, 'response_code': response.status_code})
```

## Explanation

The application accepts a user-controlled 'telemetry_url' parameter and directly passes it to requests.get() without validation. This allows attackers to make the server send HTTP requests to arbitrary URLs, including internal network resources, cloud metadata endpoints, or other services not directly accessible to the attacker.

## Remediation

The fix implements multiple layers of SSRF protection: URL validation enforces an allowlist of permitted device domains and HTTPS-only scheme, DNS resolution checking blocks requests to private/internal IP ranges (including cloud metadata endpoints like 169.254.x.x), and redirects are disabled to prevent redirect-based bypasses. Additionally, proper error handling prevents information leakage.

## Secure Code
```python
import requests
import ipaddress
from urllib.parse import urlparse
from flask import Flask, request, jsonify

app = Flask(__name__)

# Allowlist of permitted device hostname patterns or domains
ALLOWED_DEVICE_DOMAINS = [
    'devices.example.com',
    'iot.example.com',
    'sensors.example.com',
]

ALLOWED_SCHEMES = ['https']

BLOCKED_IP_RANGES = [
    ipaddress.ip_network('127.0.0.0/8'),
    ipaddress.ip_network('10.0.0.0/8'),
    ipaddress.ip_network('172.16.0.0/12'),
    ipaddress.ip_network('192.168.0.0/16'),
    ipaddress.ip_network('169.254.0.0/16'),
    ipaddress.ip_network('0.0.0.0/8'),
    ipaddress.ip_network('::1/128'),
    ipaddress.ip_network('fc00::/7'),
    ipaddress.ip_network('fe80::/10'),
]


def is_ip_blocked(hostname):
    """Check if a hostname resolves to a blocked IP range."""
    import socket
    try:
        addr_info = socket.getaddrinfo(hostname, None)
        for entry in addr_info:
            ip = ipaddress.ip_address(entry[4][0])
            for blocked_range in BLOCKED_IP_RANGES:
                if ip in blocked_range:
                    return True
    except (socket.gaierror, ValueError):
        return True
    return False


def validate_url(url):
    """Validate that the URL is safe to request."""
    if not url or not isinstance(url, str):
        return False, "Invalid or missing URL"

    try:
        parsed = urlparse(url)
    except Exception:
        return False, "Malformed URL"

    # Check scheme
    if parsed.scheme not in ALLOWED_SCHEMES:
        return False, f"URL scheme '{parsed.scheme}' is not allowed. Only {ALLOWED_SCHEMES} permitted."

    # Extract hostname
    hostname = parsed.hostname
    if not hostname:
        return False, "No hostname found in URL"

    # Check against allowed domains
    domain_allowed = False
    for allowed_domain in ALLOWED_DEVICE_DOMAINS:
        if hostname == allowed_domain or hostname.endswith('.' + allowed_domain):
            domain_allowed = True
            break

    if not domain_allowed:
        return False, f"Hostname '{hostname}' is not in the allowed device domains"

    # Check that hostname doesn't resolve to a private/internal IP
    if is_ip_blocked(hostname):
        return False, "URL resolves to a blocked internal IP address"

    # Disallow non-standard ports
    if parsed.port and parsed.port not in (443,):
        return False, f"Port {parsed.port} is not allowed"

    return True, None


@app.route('/api/iot/device-status', methods=['POST'])
def fetch_device_telemetry():
    if not request.json:
        return jsonify({'status': 'error', 'message': 'Request body must be JSON'}), 400

    device_endpoint = request.json.get('telemetry_url')
    auth_token = request.json.get('device_token', '')

    # Validate the URL before making any request
    is_valid, error_message = validate_url(device_endpoint)
    if not is_valid:
        return jsonify({'status': 'error', 'message': error_message}), 400

    headers = {'Authorization': f'Bearer {auth_token}'}

    try:
        response = requests.get(device_endpoint, headers=headers, timeout=5, allow_redirects=False)
        telemetry_data = response.json()
        return jsonify({'status': 'success', 'device_data': telemetry_data, 'response_code': response.status_code})
    except requests.exceptions.RequestException as e:
        return jsonify({'status': 'error', 'message': 'Failed to reach device endpoint'}), 502
    except ValueError:
        return jsonify({'status': 'error', 'message': 'Device returned invalid JSON'}), 502
```
