# SSRF via Untrusted requests.get() URL Fetching

Language: Python
Severity: Critical
CWE: CWE-918

## Source
8

## Flow
8-12

## Sink
12

## Vulnerable Code
```python
import requests
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/api/iot/device/health', methods=['POST'])
def check_device_health():
    device_callback = request.json.get('callback_url')
    device_id = request.json.get('device_id', 'unknown')
    timeout_val = request.json.get('timeout', 5)
    headers = {'X-Device-ID': device_id, 'User-Agent': 'IoT-Health-Monitor/2.1'}
    try:
        health_response = requests.get(device_callback, headers=headers, timeout=timeout_val)
        return jsonify({'status': 'success', 'device_id': device_id, 'health_data': health_response.text, 'status_code': health_response.status_code})
    except requests.exceptions.RequestException as e:
        return jsonify({'status': 'error', 'device_id': device_id, 'message': str(e)}), 500
```

## Explanation

The application accepts a user-controlled 'callback_url' parameter without validation and directly passes it to requests.get(), enabling Server-Side Request Forgery (SSRF). An attacker can force the server to make requests to internal resources, cloud metadata services, or arbitrary external URLs, potentially exposing sensitive data or bypassing network security controls.

## Remediation

The fix validates the user-provided callback URL before making the request by checking the scheme (HTTPS only), port (allowlisted ports only), and resolving the hostname to ensure it does not point to private, loopback, link-local, or reserved IP addresses. Additionally, redirects are disabled to prevent redirect-based SSRF bypasses, the timeout is sanitized, and response data is truncated to limit data exfiltration.

## Secure Code
```python
import requests
from flask import Flask, request, jsonify
from urllib.parse import urlparse
import ipaddress
import socket

app = Flask(__name__)

# Allowlist of permitted schemes and port ranges for IoT devices
ALLOWED_SCHEMES = {'https'}
ALLOWED_PORTS = {443, 8443, 8080}
# Optional: allowlist of known device subnets (external only)
ALLOWED_DEVICE_SUBNETS = [
    ipaddress.ip_network('192.0.2.0/24'),  # Example: replace with actual device subnet
]

def is_private_or_reserved(ip_str):
    """Check if an IP address is private, loopback, link-local, or reserved."""
    try:
        ip = ipaddress.ip_address(ip_str)
        return (
            ip.is_private or
            ip.is_loopback or
            ip.is_link_local or
            ip.is_multicast or
            ip.is_reserved or
            ip.is_unspecified
        )
    except ValueError:
        return True

def validate_callback_url(url):
    """Validate that the callback URL is safe to fetch."""
    if not url or not isinstance(url, str):
        return False, "Invalid or missing callback URL"

    try:
        parsed = urlparse(url)
    except Exception:
        return False, "Malformed URL"

    # Check scheme
    if parsed.scheme not in ALLOWED_SCHEMES:
        return False, f"Scheme '{parsed.scheme}' not allowed. Only HTTPS is permitted."

    # Extract hostname
    hostname = parsed.hostname
    if not hostname:
        return False, "No hostname found in URL"

    # Check port
    port = parsed.port or (443 if parsed.scheme == 'https' else 80)
    if port not in ALLOWED_PORTS:
        return False, f"Port {port} is not in the allowed port list"

    # Resolve hostname to IP and check for internal addresses
    try:
        resolved_ips = socket.getaddrinfo(hostname, port, proto=socket.IPPROTO_TCP)
    except socket.gaierror:
        return False, "Unable to resolve hostname"

    for family, socktype, proto, canonname, sockaddr in resolved_ips:
        ip_str = sockaddr[0]
        if is_private_or_reserved(ip_str):
            return False, "URL resolves to a private or reserved IP address"

    return True, "URL is valid"

@app.route('/api/iot/device/health', methods=['POST'])
def check_device_health():
    if not request.json:
        return jsonify({'status': 'error', 'message': 'Request body must be JSON'}), 400

    device_callback = request.json.get('callback_url')
    device_id = request.json.get('device_id', 'unknown')
    timeout_val = request.json.get('timeout', 5)

    # Sanitize timeout to prevent abuse
    if not isinstance(timeout_val, (int, float)) or timeout_val < 1 or timeout_val > 10:
        timeout_val = 5

    # Validate the callback URL before making the request
    is_valid, validation_message = validate_callback_url(device_callback)
    if not is_valid:
        return jsonify({'status': 'error', 'device_id': device_id, 'message': f'Invalid callback URL: {validation_message}'}), 400

    headers = {'X-Device-ID': device_id, 'User-Agent': 'IoT-Health-Monitor/2.1'}

    try:
        health_response = requests.get(
            device_callback,
            headers=headers,
            timeout=timeout_val,
            allow_redirects=False  # Prevent redirect-based SSRF bypass
        )
        return jsonify({
            'status': 'success',
            'device_id': device_id,
            'health_data': health_response.text[:4096],  # Limit response size
            'status_code': health_response.status_code
        })
    except requests.exceptions.RequestException as e:
        return jsonify({'status': 'error', 'device_id': device_id, 'message': 'Failed to reach device'}), 500
```
