# Expression Injection via pandas.query()

Language: Python
Severity: Critical
CWE: CWE-95

## Source
9

## Flow
9-10

## Sink
10

## Vulnerable Code
```python
import pandas as pd
from flask import Flask, request, jsonify

app = Flask(__name__)
iot_sensor_data = pd.DataFrame({'device_id': [101, 102, 103], 'temperature': [22.5, 24.1, 19.8], 'humidity': [45, 52, 38], 'location': ['warehouse_a', 'warehouse_b', 'warehouse_c']})

@app.route('/api/iot/filter_sensors', methods=['POST'])
def filter_sensor_readings():
    filter_expression = request.json.get('filter_criteria', 'temperature > 20')
    filtered_results = iot_sensor_data.query(filter_expression)
    return jsonify(filtered_results.to_dict(orient='records'))

if __name__ == '__main__':
    app.run(debug=True)
```

## Explanation

The application accepts a user-controlled filter expression through the 'filter_criteria' parameter and passes it directly to pandas DataFrame.query() without validation or sanitization. The query() method evaluates the expression using Python's eval(), allowing arbitrary code execution through expression injection.

## Remediation

The fix adds a strict validation layer that whitelists allowed column names, operators, and expression patterns before passing any user input to pandas query(). A regex pattern ensures the expression only contains simple comparisons joined by logical operators, and a blocklist prevents dangerous Python constructs like __import__, eval, exec, and other code injection vectors. Additionally, debug mode is disabled in production to prevent information leakage.

## Secure Code
```python
import pandas as pd
import re
from flask import Flask, request, jsonify

app = Flask(__name__)
iot_sensor_data = pd.DataFrame({'device_id': [101, 102, 103], 'temperature': [22.5, 24.1, 19.8], 'humidity': [45, 52, 38], 'location': ['warehouse_a', 'warehouse_b', 'warehouse_c']})

ALLOWED_COLUMNS = {'device_id', 'temperature', 'humidity', 'location'}
ALLOWED_OPERATORS = {'>', '<', '>=', '<=', '==', '!=', 'and', 'or', 'not'}
SAFE_EXPRESSION_PATTERN = re.compile(
    r'^[a-zA-Z_][a-zA-Z0-9_]*\s*(>|<|>=|<=|==|!=)\s*([\d.]+|"[a-zA-Z0-9_]+"|\'[a-zA-Z0-9_]+\')'
    r'(\s+(and|or)\s+[a-zA-Z_][a-zA-Z0-9_]*\s*(>|<|>=|<=|==|!=)\s*([\d.]+|"[a-zA-Z0-9_]+"|\'[a-zA-Z0-9_]+\'))*$'
)

def validate_filter_expression(expression):
    """Validate that the filter expression only contains safe column references and comparison operators."""
    expression = expression.strip()

    if not expression:
        return False, "Empty expression"

    dangerous_patterns = ['__', 'import', 'eval', 'exec', 'compile', 'getattr',
                          'setattr', 'delattr', 'globals', 'locals', 'vars',
                          'open', 'file', 'input', 'raw_input', 'reload',
                          'system', 'popen', 'subprocess', 'os.', 'sys.',
                          '@', '{', '}', '[', ']', ';', '\n', '\r']
    expression_lower = expression.lower()
    for pattern in dangerous_patterns:
        if pattern in expression_lower:
            return False, f"Forbidden pattern detected: {pattern}"

    if not SAFE_EXPRESSION_PATTERN.match(expression):
        return False, "Expression does not match allowed format"

    tokens = re.findall(r'[a-zA-Z_][a-zA-Z0-9_]*', expression)
    for token in tokens:
        if token not in ALLOWED_COLUMNS and token not in ALLOWED_OPERATORS:
            return False, f"Invalid column or keyword: {token}"

    return True, "Valid"

@app.route('/api/iot/filter_sensors', methods=['POST'])
def filter_sensor_readings():
    filter_expression = request.json.get('filter_criteria', 'temperature > 20')

    is_valid, message = validate_filter_expression(filter_expression)
    if not is_valid:
        return jsonify({'error': f'Invalid filter expression: {message}'}), 400

    try:
        filtered_results = iot_sensor_data.query(filter_expression)
        return jsonify(filtered_results.to_dict(orient='records'))
    except Exception as e:
        return jsonify({'error': f'Query execution failed: {str(e)}'}), 400

if __name__ == '__main__':
    app.run(debug=False)
```
