# Arbitrary Code Execution via eval on Untrusted Expression

Language: Python
Severity: Critical
CWE: CWE-95

## Source
4

## Flow
4-9

## Sink
9

## Vulnerable Code
```python
@app.route('/iot/device/metrics', methods=['POST'])
def process_device_telemetry():
    device_id = request.json.get('device_id')
    metric_formula = request.json.get('aggregation_rule')
    sensor_data = fetch_sensor_readings(device_id)
    temp = sensor_data.get('temperature', 0)
    humidity = sensor_data.get('humidity', 0)
    pressure = sensor_data.get('pressure', 0)
    computed_metric = eval(metric_formula)
    store_computed_value(device_id, computed_metric)
    return jsonify({'device': device_id, 'result': computed_metric})
```

## Explanation

The application accepts a user-controlled 'aggregation_rule' parameter from the JSON request and passes it directly to the eval() function without any validation or sanitization. This allows attackers to execute arbitrary Python code on the server by injecting malicious payloads through the metric_formula variable.

## Remediation

The fix replaces the dangerous eval() call with a custom safe expression evaluator built on Python's ast module. The safe_eval_formula function parses the formula into an AST and only allows numeric constants, whitelisted variable names (temp, humidity, pressure), basic arithmetic operators, and a small set of safe math functions (abs, min, max, round). Any attempt to use attribute access, imports, function calls to non-whitelisted functions, or other dangerous constructs will raise a ValueError.

## Secure Code
```python
import ast
import operator

# Safe mathematical operations allowed in aggregation rules
SAFE_OPERATORS = {
    ast.Add: operator.add,
    ast.Sub: operator.sub,
    ast.Mult: operator.mul,
    ast.Div: operator.truediv,
    ast.FloorDiv: operator.floordiv,
    ast.Mod: operator.mod,
    ast.Pow: operator.pow,
    ast.USub: operator.neg,
    ast.UAdd: operator.pos,
}

SAFE_FUNCTIONS = {
    'abs': abs,
    'min': min,
    'max': max,
    'round': round,
}


def safe_eval_formula(formula, variables):
    """Safely evaluate a mathematical formula with given variables."""
    try:
        tree = ast.parse(formula, mode='eval')
    except SyntaxError:
        raise ValueError("Invalid formula syntax")

    def _eval_node(node):
        if isinstance(node, ast.Expression):
            return _eval_node(node.body)
        elif isinstance(node, ast.Constant):
            if isinstance(node.value, (int, float)):
                return node.value
            raise ValueError(f"Unsupported constant type: {type(node.value)}")
        elif isinstance(node, ast.Name):
            if node.id in variables:
                return variables[node.id]
            raise ValueError(f"Unknown variable: {node.id}")
        elif isinstance(node, ast.BinOp):
            op_type = type(node.op)
            if op_type not in SAFE_OPERATORS:
                raise ValueError(f"Unsupported operator: {op_type.__name__}")
            left = _eval_node(node.left)
            right = _eval_node(node.right)
            if op_type == ast.Div and right == 0:
                raise ValueError("Division by zero")
            return SAFE_OPERATORS[op_type](left, right)
        elif isinstance(node, ast.UnaryOp):
            op_type = type(node.op)
            if op_type not in SAFE_OPERATORS:
                raise ValueError(f"Unsupported unary operator: {op_type.__name__}")
            operand = _eval_node(node.operand)
            return SAFE_OPERATORS[op_type](operand)
        elif isinstance(node, ast.Call):
            if isinstance(node.func, ast.Name) and node.func.id in SAFE_FUNCTIONS:
                args = [_eval_node(arg) for arg in node.args]
                if node.keywords:
                    raise ValueError("Keyword arguments not supported")
                return SAFE_FUNCTIONS[node.func.id](*args)
            raise ValueError(f"Unsupported function call")
        else:
            raise ValueError(f"Unsupported expression type: {type(node).__name__}")

    return _eval_node(tree)


@app.route('/iot/device/metrics', methods=['POST'])
def process_device_telemetry():
    device_id = request.json.get('device_id')
    metric_formula = request.json.get('aggregation_rule')
    if not metric_formula or not isinstance(metric_formula, str):
        return jsonify({'error': 'Invalid aggregation rule'}), 400
    if len(metric_formula) > 200:
        return jsonify({'error': 'Aggregation rule too long'}), 400
    sensor_data = fetch_sensor_readings(device_id)
    variables = {
        'temp': sensor_data.get('temperature', 0),
        'humidity': sensor_data.get('humidity', 0),
        'pressure': sensor_data.get('pressure', 0),
    }
    try:
        computed_metric = safe_eval_formula(metric_formula, variables)
    except ValueError as e:
        return jsonify({'error': f'Invalid formula: {str(e)}'}), 400
    store_computed_value(device_id, computed_metric)
    return jsonify({'device': device_id, 'result': computed_metric})
```
