# Code Injection via eval() on Untrusted Expression

Language: Python
Severity: Critical
CWE: CWE-95

## Source
3

## Flow
3-9

## Sink
9

## Vulnerable Code
```python
@app.route('/api/iot/telemetry/filter', methods=['POST'])
def filter_sensor_data():
    device_id = request.json.get('device_id')
    filter_expr = request.json.get('filter_expression')
    telemetry_data = fetch_device_telemetry(device_id)
    filtered_results = []
    for reading in telemetry_data:
        temp = reading['temperature']
        humidity = reading['humidity']
        pressure = reading['pressure']
        if eval(filter_expr):
            filtered_results.append(reading)
    return jsonify({'device': device_id, 'filtered_data': filtered_results, 'count': len(filtered_results)})
```

## Explanation

The application accepts an unsanitized filter expression from user input (filter_expression) and directly passes it to the eval() function. This allows attackers to execute arbitrary Python code on the server by injecting malicious expressions that access system functions, modules, or data beyond the intended sensor filtering logic.

## Remediation

The fix implements a two-layer defense: first, it validates the filter expression using AST parsing to ensure it only contains allowed variable names (temp, humidity, pressure) and disallows function calls, attribute access, and imports. Second, it executes eval() with a restricted namespace that removes all builtins and only exposes the three sensor variables, preventing access to dangerous functions even if validation is bypassed.

## Secure Code
```python
@app.route('/api/iot/telemetry/filter', methods=['POST'])
def filter_sensor_data():
    device_id = request.json.get('device_id')
    filter_expr = request.json.get('filter_expression')
    telemetry_data = fetch_device_telemetry(device_id)
    filtered_results = []

    # Validate the filter expression using AST parsing
    import ast

    ALLOWED_NAMES = {'temp', 'humidity', 'pressure', 'True', 'False', 'None'}
    ALLOWED_COMPARISONS = (ast.Compare, ast.BoolOp, ast.UnaryOp, ast.BinOp, ast.Num, ast.Name, ast.Constant)

    def validate_expression(expr_str):
        """Validate that the expression only contains safe comparison operations."""
        try:
            tree = ast.parse(expr_str, mode='eval')
        except SyntaxError:
            return False
        for node in ast.walk(tree):
            if isinstance(node, ast.Name):
                if node.id not in ALLOWED_NAMES:
                    return False
            elif isinstance(node, ast.Call):
                return False
            elif isinstance(node, ast.Attribute):
                return False
            elif isinstance(node, (ast.Import, ast.ImportFrom)):
                return False
        return True

    if not filter_expr or not isinstance(filter_expr, str):
        return jsonify({'error': 'Invalid filter expression'}), 400

    if not validate_expression(filter_expr):
        return jsonify({'error': 'Filter expression contains disallowed operations. Only comparisons using temp, humidity, and pressure are permitted.'}), 400

    for reading in telemetry_data:
        temp = reading['temperature']
        humidity = reading['humidity']
        pressure = reading['pressure']
        # Use a restricted namespace with no builtins to prevent access to dangerous functions
        safe_globals = {"__builtins__": {}}
        safe_locals = {"temp": temp, "humidity": humidity, "pressure": pressure}
        try:
            result = eval(filter_expr, safe_globals, safe_locals)
            if result:
                filtered_results.append(reading)
        except Exception:
            return jsonify({'error': 'Error evaluating filter expression'}), 400

    return jsonify({'device': device_id, 'filtered_data': filtered_results, 'count': len(filtered_results)})
```
