# SSRF via requests.get on Unvalidated Webhook URL

Language: Python
Severity: High
CWE: CWE-918

## Source
8

## Flow
8-11

## Sink
11

## Vulnerable Code
```python
import requests
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/api/iot/device/healthcheck', methods=['POST'])
def trigger_device_health_probe():
    device_callback = request.json.get('callback_endpoint')
    device_id = request.json.get('device_id', 'unknown')
    probe_data = {'device_id': device_id, 'timestamp': '2024-01-15T10:30:00Z'}
    try:
        response = requests.get(device_callback, params=probe_data, timeout=5)
        return jsonify({'status': 'probe_sent', 'response_code': response.status_code, 'body': response.text[:200]})
    except Exception as e:
        return jsonify({'status': 'probe_failed', 'error': str(e)}), 500
```

## Explanation

The application accepts a user-controlled 'callback_endpoint' URL without validation and directly passes it to requests.get(), enabling Server-Side Request Forgery (SSRF). An attacker can force the server to make requests to internal network resources, cloud metadata endpoints, or other restricted services that are inaccessible from the external network.

## Remediation

The fix validates the callback URL before making any outbound request by checking the URL scheme (only HTTPS allowed), resolving the hostname to its IP addresses, and verifying none of them fall within private/internal network ranges (RFC 1918, link-local, loopback, cloud metadata). Additionally, redirects are disabled to prevent redirect-based SSRF bypasses, the response body is no longer leaked back to the caller, and an optional domain allowlist provides defense-in-depth.

## Secure Code
```python
import requests
import ipaddress
from urllib.parse import urlparse
from flask import Flask, request, jsonify

app = Flask(__name__)

ALLOWED_SCHEMES = {'https'}
BLOCKED_NETWORKS = [
    ipaddress.ip_network('10.0.0.0/8'),
    ipaddress.ip_network('172.16.0.0/12'),
    ipaddress.ip_network('192.168.0.0/16'),
    ipaddress.ip_network('127.0.0.0/8'),
    ipaddress.ip_network('169.254.0.0/16'),
    ipaddress.ip_network('0.0.0.0/8'),
    ipaddress.ip_network('::1/128'),
    ipaddress.ip_network('fc00::/7'),
    ipaddress.ip_network('fe80::/10'),
]

ALLOWED_CALLBACK_DOMAINS = []  # Configure with trusted device callback domains e.g. ['devices.example.com']


def is_safe_url(url):
    """Validate that the URL is safe to request (not targeting internal resources)."""
    try:
        parsed = urlparse(url)
    except Exception:
        return False, "Invalid URL format"

    # Check scheme
    if parsed.scheme not in ALLOWED_SCHEMES:
        return False, f"Scheme '{parsed.scheme}' not allowed. Only {ALLOWED_SCHEMES} permitted."

    # Check that hostname exists
    hostname = parsed.hostname
    if not hostname:
        return False, "No hostname found in URL"

    # If allowlist is configured, enforce it
    if ALLOWED_CALLBACK_DOMAINS:
        if not any(hostname == domain or hostname.endswith('.' + domain) for domain in ALLOWED_CALLBACK_DOMAINS):
            return False, f"Hostname '{hostname}' is not in the allowed callback domains"

    # Resolve hostname and check against blocked networks
    import socket
    try:
        addr_infos = socket.getaddrinfo(hostname, parsed.port or 443)
    except socket.gaierror:
        return False, f"Unable to resolve hostname '{hostname}'"

    for addr_info in addr_infos:
        ip = ipaddress.ip_address(addr_info[4][0])
        for blocked_network in BLOCKED_NETWORKS:
            if ip in blocked_network:
                return False, f"Resolved IP {ip} is in a blocked network range"

    return True, None


@app.route('/api/iot/device/healthcheck', methods=['POST'])
def trigger_device_health_probe():
    device_callback = request.json.get('callback_endpoint')
    device_id = request.json.get('device_id', 'unknown')

    if not device_callback or not isinstance(device_callback, str):
        return jsonify({'status': 'error', 'message': 'Invalid or missing callback_endpoint'}), 400

    # Validate the callback URL before making the request
    is_safe, rejection_reason = is_safe_url(device_callback)
    if not is_safe:
        return jsonify({'status': 'rejected', 'message': f'Callback URL rejected: {rejection_reason}'}), 400

    probe_data = {'device_id': device_id, 'timestamp': '2024-01-15T10:30:00Z'}
    try:
        response = requests.get(device_callback, params=probe_data, timeout=5, allow_redirects=False)
        return jsonify({'status': 'probe_sent', 'response_code': response.status_code})
    except Exception as e:
        return jsonify({'status': 'probe_failed', 'error': 'Health probe request failed'}), 500
```
