"""SearXNG async client.""" from __future__ import annotations import hashlib import json from typing import Any import httpx from pydantic import ValidationError from src.models.schemas import SearchRequest, SearchResponse, SearchResult class SearXNGError(Exception): """Base exception for SearXNG errors.""" pass class SearXNGClient: """Async client for SearXNG meta-search engine.""" def __init__( self, base_url: str = "http://localhost:8080", timeout: float = 10.0, max_results: int = 10 ): self.base_url = base_url.rstrip("/") self.timeout = timeout self.max_results = max_results self._client: httpx.AsyncClient | None = None async def __aenter__(self) -> SearXNGClient: self._client = httpx.AsyncClient(timeout=self.timeout) return self async def __aexit__(self, *args: Any) -> None: if self._client: await self._client.aclose() def _get_client(self) -> httpx.AsyncClient: if self._client is None: raise SearXNGError("Client not initialized. Use async context manager.") return self._client def _build_url(self, params: dict[str, Any]) -> str: """Build SearXNG search URL with parameters.""" from urllib.parse import quote_plus query_parts = [] for k, v in params.items(): if isinstance(v, list): # Join list values with comma encoded_v = quote_plus(",".join(str(x) for x in v)) else: encoded_v = quote_plus(str(v)) query_parts.append(f"{k}={encoded_v}") query_string = "&".join(query_parts) return f"{self.base_url}/search?{query_string}" async def search(self, request: SearchRequest, time_range: str | None = None) -> SearchResponse: """Execute search query against SearXNG. Args: request: SearchRequest with query, engines, page time_range: Optional time filter (day, week, month, year) Returns: SearchResponse with results Raises: SearXNGError: If request fails or response is invalid """ params = { "q": request.q, "format": "json", "engines": ",".join(request.engines), "pageno": request.page, } # Add time_range if specified (SearXNG supports: day, week, month, year) if time_range: params["time_range"] = time_range url = self._build_url(params) client = self._get_client() try: response = await client.get(url) response.raise_for_status() data = response.json() except httpx.HTTPStatusError as e: raise SearXNGError(f"HTTP error {e.response.status_code}: {e.response.text}") from e except httpx.RequestError as e: raise SearXNGError(f"Request failed: {e}") from e except json.JSONDecodeError as e: raise SearXNGError(f"Invalid JSON response: {e}") from e return self._parse_response(data, request) def _parse_response(self, data: dict[str, Any], request: SearchRequest) -> SearchResponse: """Parse SearXNG JSON response into SearchResponse.""" results = [] for item in data.get("results", [])[:self.max_results]: try: result = SearchResult( title=item.get("title", ""), url=item.get("url", ""), content=item.get("content") or item.get("snippet"), source=item.get("engine", "unknown"), score=item.get("score"), published=item.get("publishedDate") ) results.append(result) except ValidationError: # Skip invalid results continue return SearchResponse( query=request.q, results=results, total=data.get("number_of_results", len(results)), page=request.page, metadata={ "engines": data.get("engines", []), "response_time": data.get("response_time"), } ) async def health_check(self) -> bool: """Check if SearXNG is reachable. Returns: True if healthy, False otherwise """ try: client = self._get_client() response = await client.get(f"{self.base_url}/healthz", timeout=5.0) return response.status_code == 200 except Exception: return False