# Code Injection via eval() on Untrusted Expression

Language: Python
Severity: Critical
CWE: CWE-95

## Source
4

## Flow
4-10

## Sink
10

## Vulnerable Code
```python
@app.route('/iot/device/metrics', methods=['POST'])
def calculate_device_metrics():
    device_id = request.json.get('device_id')
    metric_formula = request.json.get('formula')
    sensor_temp = float(request.json.get('temperature', 0))
    sensor_humid = float(request.json.get('humidity', 0))
    sensor_pressure = float(request.json.get('pressure', 0))
    context = {'temp': sensor_temp, 'humid': sensor_humid, 'press': sensor_pressure}
    try:
        computed_value = eval(metric_formula, {'__builtins__': {}}, context)
        db.store_metric(device_id, computed_value)
        return jsonify({'device': device_id, 'result': computed_value})
    except Exception as e:
        return jsonify({'error': 'Calculation failed'}), 400
```

## Explanation

The user-supplied 'formula' parameter from the JSON request is passed directly to eval() without proper sanitization. Although __builtins__ is restricted, attackers can still exploit Python's object introspection capabilities to access dangerous functions through the allowed context variables and break out of the sandbox.

## Remediation

The fix replaces the dangerous eval() call with a custom AST-based expression evaluator that parses the formula into an abstract syntax tree and only allows safe mathematical operations (arithmetic operators, numeric constants, whitelisted variable names, and a few safe built-in functions). This completely eliminates the possibility of code injection since no arbitrary Python code can be executed—only explicitly permitted mathematical constructs are evaluated.

## Secure Code
```python
import ast
import operator

# Safe mathematical operations whitelist
SAFE_OPERATORS = {
    ast.Add: operator.add,
    ast.Sub: operator.sub,
    ast.Mult: operator.mul,
    ast.Div: operator.truediv,
    ast.FloorDiv: operator.floordiv,
    ast.Mod: operator.mod,
    ast.Pow: operator.pow,
    ast.USub: operator.neg,
    ast.UAdd: operator.pos,
}

SAFE_FUNCTIONS = {
    'abs': abs,
    'round': round,
    'min': min,
    'max': max,
}


def safe_eval_formula(formula, variables):
    """Safely evaluate a mathematical formula using AST parsing."""
    try:
        tree = ast.parse(formula, mode='eval')
    except SyntaxError:
        raise ValueError("Invalid formula syntax")

    return _eval_node(tree.body, variables)


def _eval_node(node, variables):
    """Recursively evaluate AST nodes, only allowing safe operations."""
    if isinstance(node, ast.Expression):
        return _eval_node(node.body, variables)
    elif isinstance(node, ast.Constant):
        if isinstance(node.value, (int, float)):
            return node.value
        raise ValueError(f"Unsupported constant type: {type(node.value)}")
    elif isinstance(node, ast.Name):
        if node.id in variables:
            return variables[node.id]
        raise ValueError(f"Unknown variable: {node.id}")
    elif isinstance(node, ast.BinOp):
        op_type = type(node.op)
        if op_type not in SAFE_OPERATORS:
            raise ValueError(f"Unsupported operator: {op_type.__name__}")
        left = _eval_node(node.left, variables)
        right = _eval_node(node.right, variables)
        if op_type == ast.Pow and right > 100:
            raise ValueError("Exponent too large")
        return SAFE_OPERATORS[op_type](left, right)
    elif isinstance(node, ast.UnaryOp):
        op_type = type(node.op)
        if op_type not in SAFE_OPERATORS:
            raise ValueError(f"Unsupported unary operator: {op_type.__name__}")
        operand = _eval_node(node.operand, variables)
        return SAFE_OPERATORS[op_type](operand)
    elif isinstance(node, ast.Call):
        if isinstance(node.func, ast.Name) and node.func.id in SAFE_FUNCTIONS:
            args = [_eval_node(arg, variables) for arg in node.args]
            if node.keywords:
                raise ValueError("Keyword arguments not supported")
            return SAFE_FUNCTIONS[node.func.id](*args)
        raise ValueError(f"Unsupported function call")
    elif isinstance(node, ast.IfExp):
        test = _eval_node(node.test, variables)
        if test:
            return _eval_node(node.body, variables)
        return _eval_node(node.orelse, variables)
    elif isinstance(node, ast.Compare):
        left = _eval_node(node.left, variables)
        for op, comparator in zip(node.ops, node.comparators):
            right = _eval_node(comparator, variables)
            if isinstance(op, ast.Gt):
                result = left > right
            elif isinstance(op, ast.Lt):
                result = left < right
            elif isinstance(op, ast.GtE):
                result = left >= right
            elif isinstance(op, ast.LtE):
                result = left <= right
            elif isinstance(op, ast.Eq):
                result = left == right
            elif isinstance(op, ast.NotEq):
                result = left != right
            else:
                raise ValueError(f"Unsupported comparison operator")
            if not result:
                return False
            left = right
        return True
    else:
        raise ValueError(f"Unsupported expression type: {type(node).__name__}")


@app.route('/iot/device/metrics', methods=['POST'])
def calculate_device_metrics():
    device_id = request.json.get('device_id')
    metric_formula = request.json.get('formula')
    if not metric_formula or not isinstance(metric_formula, str):
        return jsonify({'error': 'Invalid formula'}), 400
    if len(metric_formula) > 500:
        return jsonify({'error': 'Formula too long'}), 400
    sensor_temp = float(request.json.get('temperature', 0))
    sensor_humid = float(request.json.get('humidity', 0))
    sensor_pressure = float(request.json.get('pressure', 0))
    context = {'temp': sensor_temp, 'humid': sensor_humid, 'press': sensor_pressure}
    try:
        computed_value = safe_eval_formula(metric_formula, context)
        db.store_metric(device_id, computed_value)
        return jsonify({'device': device_id, 'result': computed_value})
    except (ValueError, TypeError, ZeroDivisionError) as e:
        return jsonify({'error': 'Calculation failed'}), 400
```
