# Code Injection via eval() on Untrusted Input

Language: Python
Severity: Critical
CWE: CWE-94

## Source
3

## Flow
3-9

## Sink
9

## Vulnerable Code
```python
@app.route('/api/iot/device/metrics', methods=['POST'])
def calculate_device_metrics():
    device_id = request.json.get('device_id')
    metric_formula = request.json.get('formula')
    sensor_data = fetch_sensor_readings(device_id)
    temp = sensor_data['temperature']
    humidity = sensor_data['humidity']
    pressure = sensor_data['pressure']
    try:
        computed_value = eval(metric_formula)
        log_metric(device_id, computed_value)
        return jsonify({'device': device_id, 'result': computed_value, 'status': 'computed'})
    except Exception as e:
        return jsonify({'error': 'Invalid formula syntax'}), 400
```

## Explanation

The application accepts a user-controlled 'formula' parameter from the JSON request and directly passes it to the eval() function without any validation or sanitization. This allows an attacker to execute arbitrary Python code on the server, leading to complete system compromise.

## Remediation

The fix replaces the dangerous eval() call with a custom AST-based safe expression evaluator that only allows arithmetic operations, numeric constants, predefined variable names (temp, humidity, pressure), and a whitelist of safe mathematical functions. This prevents arbitrary code execution while still allowing engineers to define useful mathematical formulas for computing device metrics.

## Secure Code
```python
import ast
import operator

ALLOWED_OPERATORS = {
    ast.Add: operator.add,
    ast.Sub: operator.sub,
    ast.Mult: operator.mul,
    ast.Div: operator.truediv,
    ast.Pow: operator.pow,
    ast.USub: operator.neg,
    ast.UAdd: operator.pos,
    ast.Mod: operator.mod,
}

ALLOWED_FUNCTIONS = {
    'abs': abs,
    'min': min,
    'max': max,
    'round': round,
}


def safe_eval_formula(formula, variables):
    """Safely evaluate a mathematical formula with given variables."""
    try:
        tree = ast.parse(formula, mode='eval')
    except SyntaxError:
        raise ValueError('Invalid formula syntax')

    return _eval_node(tree.body, variables)


def _eval_node(node, variables):
    if isinstance(node, ast.Expression):
        return _eval_node(node.body, variables)
    elif isinstance(node, ast.Constant):
        if isinstance(node.value, (int, float)):
            return node.value
        raise ValueError(f'Unsupported constant type: {type(node.value).__name__}')
    elif isinstance(node, ast.Name):
        if node.id in variables:
            return variables[node.id]
        raise ValueError(f'Unknown variable: {node.id}')
    elif isinstance(node, ast.BinOp):
        op_type = type(node.op)
        if op_type not in ALLOWED_OPERATORS:
            raise ValueError(f'Unsupported operator: {op_type.__name__}')
        left = _eval_node(node.left, variables)
        right = _eval_node(node.right, variables)
        if op_type == ast.Div and right == 0:
            raise ValueError('Division by zero')
        if op_type == ast.Pow and right > 100:
            raise ValueError('Exponent too large')
        return ALLOWED_OPERATORS[op_type](left, right)
    elif isinstance(node, ast.UnaryOp):
        op_type = type(node.op)
        if op_type not in ALLOWED_OPERATORS:
            raise ValueError(f'Unsupported unary operator: {op_type.__name__}')
        operand = _eval_node(node.operand, variables)
        return ALLOWED_OPERATORS[op_type](operand)
    elif isinstance(node, ast.Call):
        if isinstance(node.func, ast.Name) and node.func.id in ALLOWED_FUNCTIONS:
            args = [_eval_node(arg, variables) for arg in node.args]
            if node.keywords:
                raise ValueError('Keyword arguments not supported')
            return ALLOWED_FUNCTIONS[node.func.id](*args)
        raise ValueError(f'Unsupported function call')
    else:
        raise ValueError(f'Unsupported expression type: {type(node).__name__}')


@app.route('/api/iot/device/metrics', methods=['POST'])
def calculate_device_metrics():
    device_id = request.json.get('device_id')
    metric_formula = request.json.get('formula')
    if not metric_formula or not isinstance(metric_formula, str):
        return jsonify({'error': 'Formula is required and must be a string'}), 400
    if len(metric_formula) > 500:
        return jsonify({'error': 'Formula too long'}), 400
    sensor_data = fetch_sensor_readings(device_id)
    variables = {
        'temp': sensor_data['temperature'],
        'humidity': sensor_data['humidity'],
        'pressure': sensor_data['pressure'],
    }
    try:
        computed_value = safe_eval_formula(metric_formula, variables)
        log_metric(device_id, computed_value)
        return jsonify({'device': device_id, 'result': computed_value, 'status': 'computed'})
    except ValueError as e:
        return jsonify({'error': str(e)}), 400
    except Exception as e:
        return jsonify({'error': 'Invalid formula syntax'}), 400
```
