""" Synapse-Base Main Search Engine State-of-the-art alpha-beta with advanced enhancements Research Implementation: - Alpha-Beta with PVS (Principal Variation Search) - Aspiration Windows - Null Move Pruning - Late Move Reductions (LMR) - Quiescence Search with SEE - Iterative Deepening - Transposition Table with Zobrist - Advanced Move Ordering """ import chess import time import logging from typing import Optional, Tuple, List, Dict from .evaluate import NeuralEvaluator from .transposition import TranspositionTable, NodeType from .move_ordering import MoveOrderer from .time_manager import TimeManager from .endgame import EndgameDetector logger = logging.getLogger(__name__) class SynapseEngine: """ State-of-the-art chess engine with neural evaluation """ # Search constants (tuned values from research) MATE_SCORE = 100000 MAX_PLY = 100 # Null move pruning parameters NULL_MOVE_REDUCTION = 2 NULL_MOVE_MIN_DEPTH = 3 # Late move reduction parameters LMR_MIN_DEPTH = 3 LMR_MOVE_THRESHOLD = 4 # Aspiration window size ASPIRATION_WINDOW = 50 def __init__(self, model_path: str, num_threads: int = 2): """Initialize engine components""" # Core components self.evaluator = NeuralEvaluator(model_path, num_threads) self.tt = TranspositionTable(size_mb=256) # 256MB TT self.move_orderer = MoveOrderer() self.time_manager = TimeManager() self.endgame_detector = EndgameDetector() # Search statistics self.nodes_evaluated = 0 self.depth_reached = 0 self.sel_depth = 0 # Selective depth (quiescence) self.principal_variation = [] logger.info("🎯 Synapse-Base Engine initialized") logger.info(f" Model: {self.evaluator.get_model_size_mb():.2f} MB") logger.info(f" TT Size: 256 MB") def get_best_move( self, fen: str, depth: int = 5, time_limit: int = 5000 ) -> Dict: """ Main search entry point Args: fen: Position in FEN notation depth: Maximum search depth time_limit: Time limit in milliseconds Returns: Dictionary with best_move, evaluation, stats """ board = chess.Board(fen) # Reset statistics self.nodes_evaluated = 0 self.depth_reached = 0 self.sel_depth = 0 self.principal_variation = [] # Time management time_limit_sec = time_limit / 1000.0 self.time_manager.start_search(time_limit_sec, time_limit_sec) # Age history for new search self.move_orderer.age_history(0.95) self.tt.increment_age() # Special cases legal_moves = list(board.legal_moves) if len(legal_moves) == 0: return self._no_legal_moves_result() if len(legal_moves) == 1: return self._single_move_result(board, legal_moves[0]) # Iterative deepening with aspiration windows best_move = legal_moves[0] best_score = float('-inf') alpha = float('-inf') beta = float('inf') for current_depth in range(1, depth + 1): # Time check if self.time_manager.should_stop(current_depth): break # Aspiration window for depth >= 4 if current_depth >= 4 and abs(best_score) < self.MATE_SCORE - 1000: alpha = best_score - self.ASPIRATION_WINDOW beta = best_score + self.ASPIRATION_WINDOW else: alpha = float('-inf') beta = float('inf') # Search with aspiration window score, move, pv = self._search_root( board, current_depth, alpha, beta ) # Handle aspiration window failures if score <= alpha or score >= beta: # Research with full window score, move, pv = self._search_root( board, current_depth, float('-inf'), float('inf') ) # Update best move if move: best_move = move best_score = score self.depth_reached = current_depth self.principal_variation = pv logger.info( f"Depth {current_depth}: {move.uci()} " f"({score:+.2f}) | Nodes: {self.nodes_evaluated} | " f"Time: {self.time_manager.elapsed():.2f}s" ) # Return result return { 'best_move': best_move.uci(), 'evaluation': round(best_score / 100.0, 2), # Convert to pawns 'depth_searched': self.depth_reached, 'seldepth': self.sel_depth, 'nodes_evaluated': self.nodes_evaluated, 'time_taken': int(self.time_manager.elapsed() * 1000), 'pv': [m.uci() for m in self.principal_variation], 'nps': int(self.nodes_evaluated / max(self.time_manager.elapsed(), 0.001)), 'tt_stats': self.tt.get_stats(), 'move_ordering_stats': self.move_orderer.get_stats() } def _search_root( self, board: chess.Board, depth: int, alpha: float, beta: float ) -> Tuple[float, Optional[chess.Move], List[chess.Move]]: """Root node search with PVS""" legal_moves = list(board.legal_moves) # TT probe zobrist_key = self.tt.compute_zobrist_key(board) tt_result = self.tt.probe(zobrist_key, depth, alpha, beta) tt_move = tt_result[1] if tt_result else None # Order moves ordered_moves = self.move_orderer.order_moves( board, legal_moves, depth, tt_move ) best_move = ordered_moves[0] best_score = float('-inf') best_pv = [] for i, move in enumerate(ordered_moves): board.push(move) if i == 0: # Full window search for first move (PV node) score, pv = self._pvs( board, depth - 1, -beta, -alpha, True ) score = -score else: # Null window search for remaining moves score, _ = self._pvs( board, depth - 1, -alpha - 1, -alpha, False ) score = -score # Re-search if failed high if alpha < score < beta: score, pv = self._pvs( board, depth - 1, -beta, -alpha, True ) score = -score else: pv = [] board.pop() # Update best if score > best_score: best_score = score best_move = move best_pv = [move] + pv # Update alpha if score > alpha: alpha = score # Time check if self.time_manager.should_stop(depth): break # Store in TT self.tt.store( zobrist_key, depth, best_score, NodeType.EXACT, best_move ) return best_score, best_move, best_pv def _pvs( self, board: chess.Board, depth: int, alpha: float, beta: float, do_null: bool ) -> Tuple[float, List[chess.Move]]: """ Principal Variation Search (PVS) with alpha-beta Enhanced with: - Null move pruning - Late move reductions - Transposition table """ self.sel_depth = max(self.sel_depth, self.MAX_PLY - depth) # Mate distance pruning alpha = max(alpha, -self.MATE_SCORE + (self.MAX_PLY - depth)) beta = min(beta, self.MATE_SCORE - (self.MAX_PLY - depth) - 1) if alpha >= beta: return alpha, [] # Draw detection if board.is_repetition(2) or board.is_fifty_moves(): return 0, [] # TT probe zobrist_key = self.tt.compute_zobrist_key(board) tt_result = self.tt.probe(zobrist_key, depth, alpha, beta) if tt_result and tt_result[0] is not None: return tt_result[0], [] tt_move = tt_result[1] if tt_result else None # Quiescence search at leaf nodes if depth <= 0: return self._quiescence(board, alpha, beta, 0), [] # Null move pruning if (do_null and depth >= self.NULL_MOVE_MIN_DEPTH and not board.is_check() and self._has_non_pawn_material(board)): board.push(chess.Move.null()) score, _ = self._pvs( board, depth - 1 - self.NULL_MOVE_REDUCTION, -beta, -beta + 1, False ) score = -score board.pop() if score >= beta: return beta, [] # Generate and order moves legal_moves = list(board.legal_moves) if not legal_moves: if board.is_check(): return -self.MATE_SCORE + (self.MAX_PLY - depth), [] return 0, [] # Stalemate ordered_moves = self.move_orderer.order_moves( board, legal_moves, depth, tt_move ) # Main search loop best_score = float('-inf') best_pv = [] node_type = NodeType.UPPER_BOUND moves_searched = 0 for move in ordered_moves: board.push(move) # Late move reductions reduction = 0 if (moves_searched >= self.LMR_MOVE_THRESHOLD and depth >= self.LMR_MIN_DEPTH and not board.is_check() and not board.is_capture(board.peek())): reduction = 1 # PVS if moves_searched == 0: score, pv = self._pvs( board, depth - 1, -beta, -alpha, True ) score = -score else: # Reduced search score, _ = self._pvs( board, depth - 1 - reduction, -alpha - 1, -alpha, True ) score = -score # Re-search if necessary if alpha < score < beta: score, pv = self._pvs( board, depth - 1, -beta, -alpha, True ) score = -score else: pv = [] board.pop() moves_searched += 1 # Update best if score > best_score: best_score = score best_pv = [move] + pv if score > alpha: alpha = score node_type = NodeType.EXACT # Update history for good moves if not board.is_capture(move): self.move_orderer.update_history(move, depth, True) self.move_orderer.update_killer_move(move, depth) if score >= beta: node_type = NodeType.LOWER_BOUND break # Store in TT self.tt.store(zobrist_key, depth, best_score, node_type, best_pv[0] if best_pv else None) return best_score, best_pv def _quiescence( self, board: chess.Board, alpha: float, beta: float, qs_depth: int ) -> float: """ Quiescence search to resolve tactical sequences Only searches captures and checks """ self.nodes_evaluated += 1 # Stand-pat evaluation stand_pat = self.evaluator.evaluate_hybrid(board) stand_pat = self.endgame_detector.adjust_evaluation(board, stand_pat) if stand_pat >= beta: return beta if alpha < stand_pat: alpha = stand_pat # Depth limit for quiescence if qs_depth >= 8: return stand_pat # Generate tactical moves tactical_moves = [ move for move in board.legal_moves if board.is_capture(move) or board.gives_check(move) ] if not tactical_moves: return stand_pat # Order tactical moves tactical_moves = self.move_orderer.order_moves( board, tactical_moves, 0 ) for move in tactical_moves: board.push(move) score = -self._quiescence(board, -beta, -alpha, qs_depth + 1) board.pop() if score >= beta: return beta if score > alpha: alpha = score return alpha def _has_non_pawn_material(self, board: chess.Board) -> bool: """Check if side to move has non-pawn material""" for piece_type in [chess.KNIGHT, chess.BISHOP, chess.ROOK, chess.QUEEN]: if board.pieces(piece_type, board.turn): return True return False def _no_legal_moves_result(self) -> Dict: """Result when no legal moves""" return { 'best_move': '0000', 'evaluation': 0.0, 'depth_searched': 0, 'nodes_evaluated': 0, 'time_taken': 0 } def _single_move_result(self, board: chess.Board, move: chess.Move) -> Dict: """Result when only one legal move""" eval_score = self.evaluator.evaluate_hybrid(board) return { 'best_move': move.uci(), 'evaluation': round(eval_score / 100.0, 2), 'depth_searched': 0, 'nodes_evaluated': 1, 'time_taken': 0, 'pv': [move.uci()] } def validate_fen(self, fen: str) -> bool: """Validate FEN string""" try: chess.Board(fen) return True except: return False def get_model_size(self) -> float: """Get model size in MB""" return self.evaluator.get_model_size_mb()