File size: 5,888 Bytes
cc2ed2f
8871df9
cc2ed2f
 
 
 
8871df9
 
 
 
cc2ed2f
8871df9
cc2ed2f
8871df9
 
 
cc2ed2f
 
8871df9
 
 
cc2ed2f
8871df9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cc2ed2f
8871df9
cc2ed2f
8871df9
 
 
 
 
 
 
cc2ed2f
8871df9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cc2ed2f
 
 
8871df9
 
 
cc2ed2f
 
8871df9
 
 
 
 
 
 
 
 
 
 
 
 
 
cc2ed2f
8871df9
 
 
cc2ed2f
 
8871df9
 
 
 
 
 
 
 
 
 
 
 
cc2ed2f
8871df9
 
 
 
 
 
 
 
 
 
 
cc2ed2f
 
 
8871df9
 
 
 
 
 
 
 
 
 
 
 
 
 
cc2ed2f
 
 
8871df9
 
 
 
 
 
 
 
 
cc2ed2f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
"""SqlExecutor — выполняет SQL-запрос на подключённой БД.

Для SQLite соединение открывается через URI с ``mode=ro&immutable=1`` —
это обеспечивает read-only без копирования файла и режет любые попытки
выполнить DDL/DML на уровне драйвера. Для PostgreSQL/MySQL отдельный
guardrail остаётся на стороне API (см. is_select_only в postprocess.py).
"""

from __future__ import annotations

import logging
import sqlite3
from dataclasses import dataclass
from pathlib import Path
from urllib.parse import urlparse

logger = logging.getLogger(__name__)


@dataclass
class QueryResult:
    """Результат выполнения SQL-запроса."""
    columns: list[str]
    rows: list[list]
    row_count: int
    sql: str
    error: str | None = None

    @property
    def success(self) -> bool:
        return self.error is None

    def to_dict(self) -> dict:
        return {
            "columns": self.columns,
            "rows": self.rows,
            "row_count": self.row_count,
            "sql": self.sql,
            "error": self.error,
        }

    def to_markdown_table(self) -> str:
        if self.error:
            return f"Ошибка: {self.error}"
        if not self.rows:
            return "(пустой результат)"
        header = " | ".join(self.columns)
        sep = " | ".join(["---"] * len(self.columns))
        rows = "\n".join(" | ".join(str(v) for v in row) for row in self.rows)
        return f"{header}\n{sep}\n{rows}"


class SqlExecutor:
    """Выполняет SQL на подключённой БД."""

    MAX_ROWS = 500

    def __init__(self, connection_string: str):
        self.connection_string = connection_string.strip()
        self._db_type = self._detect_type(self.connection_string)

    def run(self, sql: str) -> QueryResult:
        try:
            if self._db_type == "sqlite":
                return self._run_sqlite(sql)
            elif self._db_type == "postgresql":
                return self._run_postgres(sql)
            elif self._db_type == "mysql":
                return self._run_mysql(sql)
            else:
                return QueryResult(columns=[], rows=[], row_count=0, sql=sql,
                                   error=f"Неизвестный тип БД: {self._db_type}")
        except Exception as e:  # noqa: BLE001
            logger.warning("Ошибка выполнения SQL: %s", e)
            return QueryResult(columns=[], rows=[], row_count=0, sql=sql, error=str(e))

    def _run_sqlite(self, sql: str) -> QueryResult:
        path = self._sqlite_path()
        conn = sqlite3.connect(self._sqlite_uri(path), uri=True)
        conn.text_factory = lambda b: b.decode("utf-8", errors="replace")
        try:
            cur = conn.cursor()
            cur.execute(sql)
            cols = [d[0] for d in (cur.description or [])]
            rows = [list(r) for r in cur.fetchmany(self.MAX_ROWS)]
            return QueryResult(columns=cols, rows=rows, row_count=len(rows), sql=sql)
        finally:
            conn.close()

    def _run_postgres(self, sql: str) -> QueryResult:
        try:
            import psycopg2  # type: ignore
        except ImportError as e:
            raise ImportError("Установи psycopg2: pip install psycopg2-binary") from e

        conn = psycopg2.connect(self.connection_string)
        try:
            # Транзакция в режиме READ ONLY — guardrail драйверного уровня.
            conn.set_session(readonly=True, autocommit=False)
            cur = conn.cursor()
            cur.execute(sql)
            cols = [d[0] for d in (cur.description or [])]
            rows = [list(r) for r in cur.fetchmany(self.MAX_ROWS)]
            return QueryResult(columns=cols, rows=rows, row_count=len(rows), sql=sql)
        finally:
            conn.close()

    def _run_mysql(self, sql: str) -> QueryResult:
        try:
            import pymysql  # type: ignore
        except ImportError as e:
            raise ImportError("Установи pymysql: pip install pymysql") from e

        parsed = urlparse(self.connection_string)
        conn = pymysql.connect(
            host=parsed.hostname,
            port=parsed.port or 3306,
            user=parsed.username,
            password=parsed.password,
            database=parsed.path.lstrip("/"),
        )
        try:
            cur = conn.cursor()
            # MySQL не имеет «глобального» read-only флага в драйвере,
            # но мы можем стартовать read-only-транзакцию.
            cur.execute("START TRANSACTION READ ONLY")
            cur.execute(sql)
            cols = [d[0] for d in (cur.description or [])]
            rows = [list(r) for r in cur.fetchmany(self.MAX_ROWS)]
            return QueryResult(columns=cols, rows=rows, row_count=len(rows), sql=sql)
        finally:
            conn.close()

    def _sqlite_path(self) -> Path:
        cs = self.connection_string
        if cs.startswith("sqlite:///"):
            return Path(cs[10:])
        return Path(cs)

    @staticmethod
    def _sqlite_uri(path: Path) -> str:
        """Read-only URI для SQLite с игнорированием journal/WAL."""
        return f"file:{path}?mode=ro&immutable=1"

    @staticmethod
    def _detect_type(cs: str) -> str:
        if cs.startswith("sqlite") or cs.endswith(".sqlite") or cs.endswith(".db"):
            return "sqlite"
        if cs.startswith("postgresql") or cs.startswith("postgres"):
            return "postgresql"
        if cs.startswith("mysql"):
            return "mysql"
        raise ValueError(f"Не удалось определить тип БД: {cs}")