# Import Hijacking via sys.meta_path Finder Abuse

Language: Python
Severity: Critical
CWE: CWE-94

## Source
17

## Flow
17-18-11

## Sink
18, 11

## Vulnerable Code
```python
import sys
import importlib.abc
import importlib.machinery

class CloudConfigLoader(importlib.abc.MetaPathFinder):
    def find_module(self, fullname, path=None):
        if fullname == 'aws_credentials':
            return CloudModuleLoader()
        return None

class CloudModuleLoader(importlib.abc.Loader):
    def load_module(self, fullname):
        mod = sys.modules.setdefault(fullname, importlib.util.module_from_spec(importlib.machinery.ModuleSpec(fullname, self)))
        exec(open('/tmp/cloud_config.py').read(), mod.__dict__)
        return mod

def initialize_aws_environment(user_plugin_path):
    sys.meta_path.insert(0, CloudConfigLoader())
    exec(open(user_plugin_path).read())
    import aws_credentials
    return aws_credentials.get_session()
```

## Explanation

The code accepts a user-controlled file path (user_plugin_path) and executes its contents using exec() without validation. Additionally, the custom import system executes arbitrary code from /tmp/cloud_config.py. An attacker can inject malicious Python code through either the user_plugin_path parameter or by placing malicious code in /tmp/cloud_config.py, leading to arbitrary code execution with the application's privileges.

## Remediation

The fix eliminates arbitrary code execution by replacing exec() with importlib-based module loading, enforcing strict path validation to ensure files are within designated allowed directories (not world-writable /tmp), and adding file integrity verification via SHA-256 hash checking against pre-approved plugins. Symlink traversal attacks are also prevented by resolving real paths before validation.

## Secure Code
```python
import sys
import os
import importlib.abc
import importlib.machinery
import importlib.util
import hashlib

# Allowlisted configuration directory (not world-writable)
ALLOWED_CONFIG_DIR = '/etc/cloud_platform/configs'
ALLOWED_PLUGIN_DIR = '/etc/cloud_platform/plugins'
# Set of allowed plugin file hashes (pre-approved plugins)
APPROVED_PLUGIN_HASHES = set()


def _validate_path(file_path, allowed_directory):
    """Validate that a file path is within the allowed directory and exists."""
    if not file_path or not isinstance(file_path, str):
        raise ValueError("Invalid file path provided")
    
    real_path = os.path.realpath(file_path)
    allowed_real = os.path.realpath(allowed_directory)
    
    if not real_path.startswith(allowed_real + os.sep):
        raise PermissionError(
            f"Access denied: path must be within {allowed_directory}"
        )
    
    if not os.path.isfile(real_path):
        raise FileNotFoundError(f"File not found: {real_path}")
    
    # Ensure the file is not a symlink to outside the allowed directory
    if os.path.islink(file_path):
        link_target = os.path.realpath(file_path)
        if not link_target.startswith(allowed_real + os.sep):
            raise PermissionError("Symlinks to outside allowed directory are not permitted")
    
    return real_path


def _verify_file_integrity(file_path):
    """Verify file integrity against approved hashes."""
    with open(file_path, 'rb') as f:
        file_hash = hashlib.sha256(f.read()).hexdigest()
    if APPROVED_PLUGIN_HASHES and file_hash not in APPROVED_PLUGIN_HASHES:
        raise PermissionError(
            f"File integrity check failed: {file_path} is not an approved plugin"
        )
    return True


class CloudConfigLoader(importlib.abc.MetaPathFinder):
    def find_module(self, fullname, path=None):
        if fullname == 'aws_credentials':
            return CloudModuleLoader()
        return None


class CloudModuleLoader(importlib.abc.Loader):
    def load_module(self, fullname):
        config_path = _validate_path(
            os.path.join(ALLOWED_CONFIG_DIR, 'cloud_config.py'),
            ALLOWED_CONFIG_DIR
        )
        _verify_file_integrity(config_path)
        
        spec = importlib.machinery.ModuleSpec(fullname, self, origin=config_path)
        mod = importlib.util.module_from_spec(spec)
        sys.modules[fullname] = mod
        
        with open(config_path, 'r') as f:
            code = compile(f.read(), config_path, 'exec')
        exec(code, mod.__dict__)
        return mod


def initialize_aws_environment(user_plugin_path):
    """Initialize AWS environment with validated plugin loading."""
    # Validate the plugin path is within allowed directory
    validated_path = _validate_path(user_plugin_path, ALLOWED_PLUGIN_DIR)
    
    # Verify plugin file integrity
    _verify_file_integrity(validated_path)
    
    # Register the custom loader
    sys.meta_path.insert(0, CloudConfigLoader())
    
    # Load the plugin module safely using importlib instead of exec
    spec = importlib.util.spec_from_file_location("user_plugin", validated_path)
    if spec is None or spec.loader is None:
        raise ImportError(f"Cannot load plugin from {validated_path}")
    
    plugin_module = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(plugin_module)
    
    import aws_credentials
    return aws_credentials.get_session()
```
