from __future__ import annotations

from django.test import TestCase
from django.contrib.auth import get_user_model
from django.core.management import call_command
from django.core.management.base import CommandError
from django.utils import timezone as dj_tz
from unittest.mock import patch, MagicMock
import threading
import time
import signal

# Import models inside test methods to avoid Django setup issues
# from bazi.models import Person
# from bazi.models_group import GroupRelation


class RecalcBaziRelationsTimeoutTests(TestCase):
    """Test the new timeout handling in the recalc_bazi_relations command."""
    
    def setUp(self):
        User = get_user_model()
        self.user = User.objects.create_user(
            phone='13900000000', 
            password='x', 
            email='u@example.com'
        )
        
        # Create owner with BaZi data
        from datetime import date
        from bazi.models import Person
        
        self.owner = Person.objects.create(
            name='owner', 
            gender='M', 
            birth_date=date(1990, 1, 1), 
            created_by=self.user, 
            owner=True
        )
        self.owner.calculate_bazi()
        self.owner.save()
        
        # Create some test persons
        p1 = Person.objects.create(
            name='p1', 
            gender='M', 
            birth_date=date(1991, 1, 1), 
            created_by=self.user
        )
        p1.calculate_bazi()
        p1.save()
        
        p2 = Person.objects.create(
            name='p2', 
            gender='M', 
            birth_date=date(1992, 1, 1), 
            created_by=self.user
        )
        p2.calculate_bazi()
        p2.save()

    def test_main_thread_signal_timeout_works(self):
        """Test that signal-based timeout works in main thread."""
        # This test runs in the main thread
        self.assertTrue(threading.current_thread() is threading.main_thread())
        
        # Test that the command can run without signal errors
        try:
            call_command('recalc_bazi_relations', user=self.user.id)
            # If successful, verify state
            self.user.refresh_from_db()
            self.assertEqual(self.user.group_relations_state, 'completed')
        except Exception as e:
            # Should not fail due to signal issues
            self.assertNotIn('signal only works in main thread', str(e))

    def test_background_thread_event_timeout_works(self):
        """Test that event-based timeout works in background threads."""
        # Create a thread to run the command
        result = {"status": None, "error": None}
        
        def run_command():
            try:
                call_command('recalc_bazi_relations', user=self.user.id)
                result["status"] = "success"
            except Exception as e:
                result["error"] = str(e)
        
        # Run in background thread
        thread = threading.Thread(target=run_command)
        thread.start()
        thread.join(timeout=30)  # Wait up to 30 seconds
        
        # Verify thread completed
        self.assertFalse(thread.is_alive())
        
        # Check result
        if result["error"]:
            # If it failed, it should not be due to signal issues
            self.assertNotIn('signal only works in main thread', result["error"])
        else:
            # If successful, verify state
            self.user.refresh_from_db()
            self.assertEqual(self.user.group_relations_state, 'completed')

    def test_timeout_detection_mechanism(self):
        """Test that the command properly detects thread context."""
        from bazi.management.commands.recalc_bazi_relations import Command
        
        # Create command instance
        cmd = Command()
        
        # Test in main thread
        is_main = threading.current_thread() is threading.main_thread()
        
        if is_main:
            # We're in main thread
            self.assertTrue(is_main)
            # Should use signal-based timeout
            self.assertIn('signal', str(cmd._handle_single_user.__code__.co_consts))
        else:
            # We're in background thread
            self.assertFalse(is_main)

    def test_signal_timeout_in_main_thread(self):
        """Test that signal timeout actually works in main thread."""
        # This test verifies the signal timeout mechanism
        # We'll test by running a command that should complete quickly
        
        # Set user to processing state
        self.user.group_relations_state = 'processing'
        self.user.group_relations_started_at = dj_tz.now()
        self.user.save(update_fields=['group_relations_state', 'group_relations_started_at'])
        
        # Run the command
        call_command('recalc_bazi_relations', user=self.user.id)
        
        # Verify it completed
        self.user.refresh_from_db()
        self.assertEqual(self.user.group_relations_state, 'completed')

    def test_event_timeout_in_background_thread(self):
        """Test that event timeout works in background thread."""
        # This test verifies the event-based timeout mechanism
        
        # Set user to processing state
        self.user.group_relations_state = 'processing'
        self.user.group_relations_started_at = dj_tz.now()
        self.user.save(update_fields=['group_relations_state', 'group_relations_started_at'])
        
        # Run command in background thread
        result = {"status": None}
        
        def run_command():
            try:
                call_command('recalc_bazi_relations', user=self.user.id)
                result["status"] = "success"
            except Exception as e:
                result["status"] = f"error: {str(e)}"
        
        thread = threading.Thread(target=run_command)
        thread.start()
        thread.join(timeout=30)
        
        # Verify thread completed
        self.assertFalse(thread.is_alive())
        
        # Check result
        if result["status"] == "success":
            self.user.refresh_from_db()
            self.assertEqual(self.user.group_relations_state, 'completed')
        else:
            # Should not fail due to signal issues
            self.assertNotIn('signal only works in main thread', result["status"])

    def test_command_handles_both_thread_types(self):
        """Test that the command can handle both main and background threads."""
        from bazi.management.commands.recalc_bazi_relations import Command
        
        # Test main thread execution
        if threading.current_thread() is threading.main_thread():
            # Main thread - should use signal timeout
            try:
                call_command('recalc_bazi_relations', user=self.user.id)
                self.user.refresh_from_db()
                self.assertEqual(self.user.group_relations_state, 'completed')
            except Exception as e:
                self.assertNotIn('signal only works in main thread', str(e))
        
        # Test background thread execution
        result = {"status": None}
        
        def run_command():
            try:
                call_command('recalc_bazi_relations', user=self.user.id, force=True)
                result["status"] = "success"
            except Exception as e:
                result["status"] = f"error: {str(e)}"
        
        # Reset user state for background thread test
        self.user.group_relations_state = 'idle'
        self.user.save(update_fields=['group_relations_state'])
        
        thread = threading.Thread(target=run_command)
        thread.start()
        thread.join(timeout=30)
        
        # Verify background thread completed
        self.assertFalse(thread.is_alive())
        
        # Check result
        if result["status"] == "success":
            self.user.refresh_from_db()
            self.assertEqual(self.user.group_relations_state, 'completed')
        else:
            # Should not fail due to signal issues
            self.assertNotIn('signal only works in main thread', result["status"])

    def test_timeout_mechanism_selection(self):
        """Test that the correct timeout mechanism is selected based on thread context."""
        from bazi.management.commands.recalc_bazi_relations import Command
        
        # Create command instance
        cmd = Command()
        
        # Test the thread detection logic
        is_main = threading.current_thread() is threading.main_thread()
        
        if is_main:
            # Main thread - should use signal timeout
            self.assertTrue(is_main)
            # The command should detect this and use signal timeout
            # We can't easily test the internal logic, but we can verify it works
            try:
                call_command('recalc_bazi_relations', user=self.user.id)
                self.user.refresh_from_db()
                self.assertEqual(self.user.group_relations_state, 'completed')
            except Exception as e:
                self.assertNotIn('signal only works in main thread', str(e))
        else:
            # Background thread - should use event timeout
            self.assertFalse(is_main)
            # Test background thread execution
            result = {"status": None}
            
            def run_command():
                try:
                    call_command('recalc_bazi_relations', user=self.user.id, force=True)
                    result["status"] = "success"
                except Exception as e:
                    result["status"] = f"error: {str(e)}"
            
            # Reset user state
            self.user.group_relations_state = 'idle'
            self.user.save(update_fields=['group_relations_state'])
            
            thread = threading.Thread(target=run_command)
            thread.start()
            thread.join(timeout=30)
            
            # Verify completion
            self.assertFalse(thread.is_alive())
            
            if result["status"] == "success":
                self.user.refresh_from_db()
                self.assertEqual(self.user.group_relations_state, 'completed')
            else:
                self.assertNotIn('signal only works in main thread', result["status"])


class RecalcBaziRelationsIntegrationTests(TestCase):
    """Integration tests for the recalc_bazi_relations command with timeout handling."""
    
    def setUp(self):
        User = get_user_model()
        self.user = User.objects.create_user(
            phone='13900000000', 
            password='x', 
            email='u@example.com'
        )
        
        # Create owner with BaZi data
        from datetime import date
        from bazi.models import Person
        
        self.owner = Person.objects.create(
            name='owner', 
            gender='M', 
            birth_date=date(1990, 1, 1), 
            created_by=self.user, 
            owner=True
        )
        self.owner.calculate_bazi()
        self.owner.save()

    def test_command_works_in_both_contexts(self):
        """Integration test that the command works in both main and background contexts."""
        # Test 1: Direct execution (main thread)
        try:
            call_command('recalc_bazi_relations', user=self.user.id)
            self.user.refresh_from_db()
            self.assertEqual(self.user.group_relations_state, 'completed')
        except Exception as e:
            self.assertNotIn('signal only works in main thread', str(e))
        
        # Test 2: Background thread execution
        result = {"status": None}
        
        def run_command():
            try:
                call_command('recalc_bazi_relations', user=self.user.id, force=True)
                result["status"] = "success"
            except Exception as e:
                result["status"] = f"error: {str(e)}"
        
        # Reset user state
        self.user.group_relations_state = 'idle'
        self.user.save(update_fields=['group_relations_state'])
        
        thread = threading.Thread(target=run_command)
        thread.start()
        thread.join(timeout=30)
        
        # Verify completion
        self.assertFalse(thread.is_alive())
        
        if result["status"] == "success":
            self.user.refresh_from_db()
            self.assertEqual(self.user.group_relations_state, 'completed')
        else:
            self.assertNotIn('signal only works in main thread', result["status"])

    def test_force_flag_works_in_both_contexts(self):
        """Test that the --force flag works in both thread contexts."""
        # Test 1: Main thread with force
        try:
            call_command('recalc_bazi_relations', user=self.user.id, force=True)
            self.user.refresh_from_db()
            self.assertEqual(self.user.group_relations_state, 'completed')
        except Exception as e:
            self.assertNotIn('signal only works in main thread', str(e))
        
        # Test 2: Background thread with force
        result = {"status": None}
        
        def run_command():
            try:
                call_command('recalc_bazi_relations', user=self.user.id, force=True)
                result["status"] = "success"
            except Exception as e:
                result["status"] = f"error: {str(e)}"
        
        # Reset user state
        self.user.group_relations_state = 'idle'
        self.user.save(update_fields=['group_relations_state'])
        
        thread = threading.Thread(target=run_command)
        thread.start()
        thread.join(timeout=30)
        
        # Verify completion
        self.assertFalse(thread.is_alive())
        
        if result["status"] == "success":
            self.user.refresh_from_db()
            self.assertEqual(self.user.group_relations_state, 'completed')
        else:
            self.assertNotIn('signal only works in main thread', result["status"])
