# Code Injection via eval() on Untrusted Input

Language: Python
Severity: Critical
CWE: CWE-94

## Source
4

## Flow
4-7

## Sink
7

## Vulnerable Code
```python
@app.route('/iot/device/configure', methods=['POST'])
def apply_device_config():
    device_id = request.json.get('device_id')
    threshold_expr = request.json.get('threshold_expression')
    sensor_data = iot_db.get_latest_reading(device_id)
    try:
        computed_threshold = eval(threshold_expr, {'sensor': sensor_data, 'math': __import__('math')})
        if sensor_data['temperature'] > computed_threshold:
            trigger_alert(device_id)
        return jsonify({'status': 'configured', 'threshold': computed_threshold})
    except Exception as e:
        return jsonify({'error': str(e)}), 400
```

## Explanation

The application accepts user-controlled input 'threshold_expression' from a POST request and directly passes it to eval() without proper sanitization. Although a restricted namespace is provided, the custom globals dict does not set '__builtins__' to None, meaning Python's built-in functions remain accessible. This allows attackers to execute arbitrary Python code by leveraging builtins or object introspection, potentially compromising the entire system.

## Remediation

The fix replaces the dangerous eval() call with a custom AST-based safe expression evaluator that only allows arithmetic operations, a whitelist of math functions, and access to predefined sensor variables. The parser walks the AST tree node by node, rejecting any constructs that are not explicitly permitted, making it impossible to execute arbitrary code. Additional input validation limits expression length, ensures required fields are present, and verifies the result is numeric.

## Secure Code
```python
import ast
import math
import operator

# Safe mathematical expression evaluator
SAFE_OPERATORS = {
    ast.Add: operator.add,
    ast.Sub: operator.sub,
    ast.Mult: operator.mul,
    ast.Div: operator.truediv,
    ast.FloorDiv: operator.floordiv,
    ast.Mod: operator.mod,
    ast.Pow: operator.pow,
    ast.USub: operator.neg,
    ast.UAdd: operator.pos,
}

SAFE_MATH_FUNCTIONS = {
    'abs': abs,
    'round': round,
    'min': min,
    'max': max,
    'sqrt': math.sqrt,
    'log': math.log,
    'log10': math.log10,
    'ceil': math.ceil,
    'floor': math.floor,
    'sin': math.sin,
    'cos': math.cos,
    'tan': math.tan,
    'pi': math.pi,
    'e': math.e,
}


def safe_eval_expr(expr, variables):
    """Safely evaluate a mathematical expression with given variables."""
    try:
        tree = ast.parse(expr, mode='eval')
    except SyntaxError:
        raise ValueError("Invalid expression syntax")
    return _eval_node(tree.body, variables)


def _eval_node(node, variables):
    if isinstance(node, ast.Expression):
        return _eval_node(node.body, variables)
    elif isinstance(node, ast.Constant):
        if isinstance(node.value, (int, float)):
            return node.value
        raise ValueError(f"Unsupported constant type: {type(node.value)}")
    elif isinstance(node, ast.Name):
        name = node.id
        if name in variables:
            return variables[name]
        if name in SAFE_MATH_FUNCTIONS:
            return SAFE_MATH_FUNCTIONS[name]
        raise ValueError(f"Unknown variable or function: {name}")
    elif isinstance(node, ast.BinOp):
        op_type = type(node.op)
        if op_type not in SAFE_OPERATORS:
            raise ValueError(f"Unsupported operator: {op_type.__name__}")
        left = _eval_node(node.left, variables)
        right = _eval_node(node.right, variables)
        if op_type == ast.Pow and right > 100:
            raise ValueError("Exponent too large")
        return SAFE_OPERATORS[op_type](left, right)
    elif isinstance(node, ast.UnaryOp):
        op_type = type(node.op)
        if op_type not in SAFE_OPERATORS:
            raise ValueError(f"Unsupported unary operator: {op_type.__name__}")
        operand = _eval_node(node.operand, variables)
        return SAFE_OPERATORS[op_type](operand)
    elif isinstance(node, ast.Call):
        func = _eval_node(node.func, variables)
        if func not in SAFE_MATH_FUNCTIONS.values():
            raise ValueError("Unsupported function call")
        args = [_eval_node(arg, variables) for arg in node.args]
        if node.keywords:
            raise ValueError("Keyword arguments not supported")
        return func(*args)
    elif isinstance(node, ast.Attribute):
        value = _eval_node(node.value, variables)
        if isinstance(value, dict):
            attr = node.attr
            if attr in value:
                return value[attr]
            raise ValueError(f"Unknown attribute: {attr}")
        raise ValueError("Attribute access not allowed on non-dict objects")
    elif isinstance(node, ast.Subscript):
        value = _eval_node(node.value, variables)
        if isinstance(value, dict):
            if isinstance(node.slice, ast.Constant):
                key = node.slice.value
                if key in value:
                    return value[key]
            raise ValueError("Invalid subscript access")
        raise ValueError("Subscript access not allowed on non-dict objects")
    else:
        raise ValueError(f"Unsupported expression type: {type(node).__name__}")


@app.route('/iot/device/configure', methods=['POST'])
def apply_device_config():
    device_id = request.json.get('device_id')
    threshold_expr = request.json.get('threshold_expression')

    if not device_id or not threshold_expr:
        return jsonify({'error': 'device_id and threshold_expression are required'}), 400

    if len(threshold_expr) > 200:
        return jsonify({'error': 'Expression too long'}), 400

    sensor_data = iot_db.get_latest_reading(device_id)

    try:
        variables = {
            'sensor': sensor_data,
            'temperature': sensor_data.get('temperature', 0),
            'humidity': sensor_data.get('humidity', 0),
            'pressure': sensor_data.get('pressure', 0),
        }
        computed_threshold = safe_eval_expr(threshold_expr, variables)

        if not isinstance(computed_threshold, (int, float)):
            return jsonify({'error': 'Expression must evaluate to a number'}), 400

        if sensor_data['temperature'] > computed_threshold:
            trigger_alert(device_id)
        return jsonify({'status': 'configured', 'threshold': computed_threshold})
    except ValueError as e:
        return jsonify({'error': str(e)}), 400
    except Exception as e:
        return jsonify({'error': 'Invalid expression'}), 400
```
