# Server-Side Request Forgery via requests.get on User-Controlled URL

Language: Python
Severity: High
CWE: CWE-918

## Source
4

## Flow
4-7-8

## Sink
8

## Vulnerable Code
```python
import requests
import json

def fetch_iot_device_telemetry(device_endpoint, auth_token):
    headers = {'Authorization': f'Bearer {auth_token}', 'Content-Type': 'application/json'}
    try:
        telemetry_url = f"{device_endpoint}/api/v2/telemetry"
        response = requests.get(telemetry_url, headers=headers, timeout=5)
        if response.status_code == 200:
            return {'status': 'success', 'data': response.json()}
        return {'status': 'error', 'message': 'Device unreachable'}
    except requests.exceptions.RequestException as e:
        return {'status': 'error', 'message': str(e)}
```

## Explanation

The function accepts a user-controlled 'device_endpoint' parameter without validation and directly uses it to construct a URL for requests.get(). This allows attackers to make the server send HTTP requests to arbitrary internal or external destinations, potentially accessing internal services, cloud metadata endpoints, or exfiltrating authentication tokens.

## Remediation

The fix adds a URL validation function that checks the scheme (HTTPS only), resolves the hostname to IP addresses, and verifies none of the resolved IPs belong to private, loopback, or link-local networks. It also restricts allowed ports, disables automatic redirects to prevent redirect-based SSRF bypasses, and rejects any endpoint that fails validation before making the HTTP request.

## Secure Code
```python
import requests
import json
from urllib.parse import urlparse
import ipaddress
import socket

ALLOWED_SCHEMES = {'https'}
BLOCKED_NETWORKS = [
    ipaddress.ip_network('127.0.0.0/8'),
    ipaddress.ip_network('10.0.0.0/8'),
    ipaddress.ip_network('172.16.0.0/12'),
    ipaddress.ip_network('192.168.0.0/16'),
    ipaddress.ip_network('169.254.0.0/16'),
    ipaddress.ip_network('0.0.0.0/8'),
    ipaddress.ip_network('::1/128'),
    ipaddress.ip_network('fc00::/7'),
    ipaddress.ip_network('fe80::/10'),
]

ALLOWED_PORTS = {443, 8443}


def is_safe_url(url):
    """Validate that the URL does not point to internal/private resources."""
    try:
        parsed = urlparse(url)
    except Exception:
        return False

    if parsed.scheme not in ALLOWED_SCHEMES:
        return False

    hostname = parsed.hostname
    if not hostname:
        return False

    port = parsed.port or (443 if parsed.scheme == 'https' else 80)
    if port not in ALLOWED_PORTS:
        return False

    try:
        resolved_ips = socket.getaddrinfo(hostname, port, proto=socket.IPPROTO_TCP)
    except socket.gaierror:
        return False

    for entry in resolved_ips:
        ip_str = entry[4][0]
        try:
            ip = ipaddress.ip_address(ip_str)
        except ValueError:
            return False
        for blocked_net in BLOCKED_NETWORKS:
            if ip in blocked_net:
                return False

    return True


def fetch_iot_device_telemetry(device_endpoint, auth_token):
    headers = {'Authorization': f'Bearer {auth_token}', 'Content-Type': 'application/json'}
    try:
        telemetry_url = f"{device_endpoint}/api/v2/telemetry"

        if not is_safe_url(telemetry_url):
            return {'status': 'error', 'message': 'Invalid or blocked device endpoint'}

        response = requests.get(telemetry_url, headers=headers, timeout=5, allow_redirects=False)
        if response.status_code == 200:
            return {'status': 'success', 'data': response.json()}
        return {'status': 'error', 'message': 'Device unreachable'}
    except requests.exceptions.RequestException as e:
        return {'status': 'error', 'message': str(e)}
```
