Vitrixxl 0d08b60d07 ia
2025-12-08 23:43:28 +01:00

1248 lines
39 KiB
Python

"""Minimal Flask API with token-based authentication middleware."""
from __future__ import annotations
import os
import json
from copy import deepcopy
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping
from uuid import UUID, uuid4
import psycopg
from psycopg import sql
from psycopg.rows import dict_row
from psycopg.types.json import Json
from deepmerge import always_merger
from dotenv import load_dotenv
from flask import Flask, abort, jsonify, request, g
from flask_cors import CORS
import requests
from google import genai
from google.genai import types
from schema_docs import SCRAPER_SCHEMA_DOC, PROFILE_SCHEMA_DOC
load_dotenv()
API_TOKEN = os.getenv("API_TOKEN")
FLUXIMMO_API_KEY = os.getenv("FLUXIMMO_API_KEY")
FLUXIMMO_COUNT_URL = os.getenv(
"FLUXIMMO_COUNT_URL",
"https://api.fluximmo.io/v2/protected/analytics/property/count",
)
GOOGLE_CREDENTIALS_PATH = os.getenv("GOOGLE_CREDENTIALS_PATH")
GOOGLE_PROJECT_ID = os.getenv("GOOGLE_PROJECT_ID")
GOOGLE_LOCATION = os.getenv("GOOGLE_LOCATION", "europe-west1")
REQUIRED_DB_SETTINGS = {
"DB_NAME": os.getenv("DB_NAME", ""),
"DB_HOST": os.getenv("DB_HOST", ""),
"DB_PORT": os.getenv("DB_PORT", ""),
"DB_USERNAME": os.getenv("DB_USERNAME", ""),
"DB_PASSWORD": os.getenv("DB_PASSWORD", ""),
}
missing_db_settings = [
name for name, value in REQUIRED_DB_SETTINGS.items() if not value
]
if missing_db_settings:
missing = ", ".join(missing_db_settings)
raise RuntimeError(
f"Database configuration missing for: {missing}. Did you configure the .env file?"
)
DB_NAME = REQUIRED_DB_SETTINGS["DB_NAME"]
DB_HOST = REQUIRED_DB_SETTINGS["DB_HOST"]
DB_USERNAME = REQUIRED_DB_SETTINGS["DB_USERNAME"]
DB_PASSWORD = REQUIRED_DB_SETTINGS["DB_PASSWORD"]
try:
DB_PORT = int(REQUIRED_DB_SETTINGS["DB_PORT"])
except ValueError as exc:
raise RuntimeError("DB_PORT must be an integer") from exc
USER_TABLE = os.getenv("DB_TABLE_USERS", "auth_user")
INVESTMENT_PROFILE_TABLE = os.getenv(
"DB_TABLE_INVESTMENT_PROFILES", "users_investmentprofile"
)
SCRAPER_TABLE = os.getenv("DB_TABLE_SCRAPERS", "scraper")
SUBSCRIPTION_TABLE = os.getenv(
"DB_TABLE_SUBSCRIPTIONS", "engagements_investorprofilesubscription"
)
PROPERTY_MATCH_TABLE = os.getenv(
"DB_TABLE_PROPERTY_MATCHES", "engagements_investorpropertymatch"
)
if not API_TOKEN:
raise RuntimeError(
"API_TOKEN missing from environment. Did you configure the .env file?"
)
if not FLUXIMMO_API_KEY:
raise RuntimeError(
"FLUXIMMO_API_KEY missing from environment. Did you configure the .env file?"
)
app = Flask(__name__)
CORS(app, resources={r"/*": {"origins": "*"}}, supports_credentials=True)
def get_db_connection() -> psycopg.Connection:
connection = psycopg.connect(
dbname=DB_NAME,
user=DB_USERNAME,
password=DB_PASSWORD,
host=DB_HOST,
port=DB_PORT,
row_factory=dict_row,
)
connection.autocommit = True
return connection
def get_db() -> psycopg.Connection:
if "db_connection" not in g:
try:
g.db_connection = get_db_connection()
except psycopg.OperationalError:
abort(503, description="Database connection failed")
return g.db_connection
@app.teardown_appcontext
def close_db_connection(_: BaseException | None) -> None:
db_connection = g.pop("db_connection", None)
if db_connection is not None:
db_connection.close()
def _get_json_body() -> MutableMapping[str, Any]:
payload = request.get_json(silent=True)
if not isinstance(payload, MutableMapping):
abort(400, description="Request body must be a JSON object")
return payload
def _parse_bool(value: Any, field_name: str) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, str):
lowered = value.strip().lower()
if lowered in {"true", "1", "yes", "y"}:
return True
if lowered in {"false", "0", "no", "n"}:
return False
if isinstance(value, (int, float)):
if value in {0, 1}:
return bool(value)
abort(400, description=f"Field '{field_name}' must be a boolean value")
def _parse_datetime(value: Any, field_name: str) -> datetime | None:
if value is None:
return None
if isinstance(value, datetime):
dt = value
elif isinstance(value, str):
try:
normalized = value.replace("Z", "+00:00")
dt = datetime.fromisoformat(normalized)
except ValueError:
abort(
400,
description=f"Field '{field_name}' must be a valid ISO 8601 datetime",
)
else:
abort(
400, description=f"Field '{field_name}' must be a valid ISO 8601 datetime"
)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
else:
dt = dt.astimezone(timezone.utc)
return dt
def _parse_string(value: Any, field_name: str, *, allow_empty: bool = False) -> str:
if not isinstance(value, str):
abort(400, description=f"Field '{field_name}' must be a string")
stripped = value.strip()
if not allow_empty and not stripped:
abort(400, description=f"Field '{field_name}' cannot be empty")
return stripped if not allow_empty else value
def _isoformat(dt: datetime | None) -> str | None:
if dt is None:
return None
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
def _ensure_json_compatible(value: Any, field_name: str) -> Any:
if value is None:
return None
if isinstance(value, (dict, list)):
return Json(value)
if isinstance(value, (str, int, float, bool)):
return value
abort(400, description=f"Field '{field_name}' must be valid JSON data")
def _parse_int(value: Any, field_name: str) -> int:
if isinstance(value, int):
return value
if isinstance(value, str):
try:
return int(value, 10)
except ValueError:
abort(400, description=f"Field '{field_name}' must be an integer")
abort(400, description=f"Field '{field_name}' must be an integer")
def _parse_optional_int(value: Any, field_name: str) -> int | None:
if value is None:
return None
try:
return _parse_int(value, field_name)
except Exception:
abort(400, description=f"Field '{field_name}' must be an integer or null")
def _parse_uuid(value: Any, field_name: str) -> UUID:
if isinstance(value, UUID):
return value
if isinstance(value, str):
try:
return UUID(value)
except ValueError:
abort(400, description=f"Field '{field_name}' must be a valid UUID")
abort(400, description=f"Field '{field_name}' must be a valid UUID")
def _load_scraper_params(value: Any) -> Dict[str, Any]:
if value is None:
return {}
if isinstance(value, Json):
value = value.data
if isinstance(value, dict):
return deepcopy(value)
if isinstance(value, str):
try:
parsed = json.loads(value)
except json.JSONDecodeError:
abort(400, description="Field 'params' must contain valid JSON")
if isinstance(parsed, dict):
return parsed
abort(400, description="Field 'params' must decode to an object")
abort(400, description="Field 'params' must be a JSON object or string")
def _validate_property_types(value: str | None) -> str | None:
"""Valide que les types de propriétés sont dans la liste autorisée."""
if value is None or value.strip() == "":
return None
valid_types = {"immeuble", "appartement", "maison"}
types = [t.strip().lower() for t in value.split(",") if t.strip()]
invalid_types = [t for t in types if t not in valid_types]
if invalid_types:
abort(
400,
description=f"Invalid property types: {', '.join(invalid_types)}. "
f"Allowed values: {', '.join(sorted(valid_types))}",
)
return value.strip()
def _cleanup_empty_values(data: Any) -> Any:
"""Recursively remove empty lists, empty dicts, and None values from data structure."""
if isinstance(data, dict):
cleaned = {}
for key, value in data.items():
cleaned_value = _cleanup_empty_values(value)
# Only include non-empty values
if (
cleaned_value is not None
and cleaned_value != []
and cleaned_value != {}
):
cleaned[key] = cleaned_value
return cleaned if cleaned else None
elif isinstance(data, list):
cleaned = [_cleanup_empty_values(item) for item in data]
# Remove None values from list
cleaned = [item for item in cleaned if item is not None]
return cleaned if cleaned else None
else:
return data
def build_scraper_params(
params: Dict[str, Any],
first_seen_days: int | None,
last_seen_days: int | None,
) -> Dict[str, Any]:
now = datetime.now(timezone.utc)
dynamic_params: Dict[str, Any] = {"meta": {}}
if first_seen_days is not None:
dynamic_params["meta"]["firstSeenAt"] = {
"min": (
(now - timedelta(days=first_seen_days))
.replace(microsecond=0)
.isoformat()
.replace("+00:00", "Z")
)
}
if last_seen_days is not None:
dynamic_params["meta"]["lastSeenAt"] = {
"max": (
(now - timedelta(days=last_seen_days))
.replace(microsecond=0)
.isoformat()
.replace("+00:00", "Z")
)
}
if not dynamic_params["meta"]:
dynamic_params.pop("meta")
base = deepcopy(params)
merged = always_merger.merge(base, dynamic_params)
# Clean up empty values before returning
cleaned = _cleanup_empty_values(merged)
return cleaned if cleaned else {}
def _require_bearer_token(header_value: str | None) -> str:
if not header_value:
abort(401, description="Missing bearer token")
parts = header_value.strip().split()
if len(parts) != 2 or parts[0].lower() != "bearer":
abort(401, description="Authorization header must be 'Bearer <token>'")
token = parts[1].strip()
if not token:
abort(401, description="Authorization header must include a token")
return token
def _serialize_row(
row: Mapping[str, Any], *, datetime_fields: Iterable[str] | None = None
) -> Dict[str, Any]:
result: Dict[str, Any] = dict(row)
for field in datetime_fields or ():
result[field] = _isoformat(result.get(field))
for field, value in list(result.items()):
if isinstance(value, Json):
result[field] = value.data
if isinstance(value, UUID):
result[field] = str(value)
return result
def _fetch_one(
query: sql.Composed, params: Mapping[str, Any]
) -> Mapping[str, Any] | None:
conn = get_db()
with conn.cursor() as cur:
cur.execute(query, params)
return cur.fetchone()
def _fetch_all(
query: sql.Composed, params: Mapping[str, Any] | None = None
) -> List[Mapping[str, Any]]:
conn = get_db()
with conn.cursor() as cur:
cur.execute(query, params or {})
return cur.fetchall()
def _execute(query: sql.Composed, params: Mapping[str, Any]) -> None:
conn = get_db()
with conn.cursor() as cur:
cur.execute(query, params)
def _columns_sql(columns: Iterable[str]) -> sql.Composed:
return sql.SQL(", ").join(sql.Identifier(column) for column in columns)
def _placeholders(columns: Iterable[str]) -> sql.Composed:
return sql.SQL(", ").join(sql.Placeholder(column) for column in columns)
def _insert_row(
table: str, data: Mapping[str, Any], returning: Iterable[str]
) -> Mapping[str, Any]:
if not data:
raise ValueError("Cannot insert without data")
adapted_data: Dict[str, Any] = {}
for key, value in data.items():
if isinstance(value, (dict, list)):
adapted_data[key] = Json(value)
else:
adapted_data[key] = value
query = sql.SQL(
"INSERT INTO {table} ({columns}) VALUES ({values}) RETURNING {returning}"
).format(
table=sql.Identifier(table),
columns=_columns_sql(data.keys()),
values=_placeholders(data.keys()),
returning=_columns_sql(returning),
)
row = _fetch_one(query, adapted_data)
if row is None:
raise RuntimeError("Insert statement did not return a row")
return row
def _update_row(
table: str,
identifier_column: str,
identifier_value: Any,
data: Mapping[str, Any],
returning: Iterable[str],
) -> Mapping[str, Any] | None:
if not data:
raise ValueError("Cannot update without data")
assignments = sql.SQL(", ").join(
sql.SQL("{column} = {placeholder}").format(
column=sql.Identifier(column),
placeholder=sql.Placeholder(column),
)
for column in data.keys()
)
query = sql.SQL(
"UPDATE {table} SET {assignments} WHERE {identifier_column} = {identifier} "
"RETURNING {returning}"
).format(
table=sql.Identifier(table),
assignments=assignments,
identifier_column=sql.Identifier(identifier_column),
identifier=sql.Placeholder("identifier"),
returning=_columns_sql(returning),
)
params: Dict[str, Any] = dict(data)
params["identifier"] = identifier_value
for key, value in list(params.items()):
if isinstance(value, (dict, list)):
params[key] = Json(value)
return _fetch_one(query, params)
def _delete_row(table: str, identifier_column: str, identifier_value: Any) -> bool:
query = sql.SQL(
"DELETE FROM {table} WHERE {identifier_column} = {identifier}"
).format(
table=sql.Identifier(table),
identifier_column=sql.Identifier(identifier_column),
identifier=sql.Placeholder("identifier"),
)
conn = get_db()
with conn.cursor() as cur:
cur.execute(query, {"identifier": identifier_value})
return cur.rowcount > 0
def _abort_for_integrity_error(exc: psycopg.IntegrityError) -> None:
detail = getattr(getattr(exc, "diag", None), "message_detail", None)
abort(409, description=detail or "Database constraint violation")
def _hash_django_password(password: str, iterations: int = 260000) -> str:
"""Hash a password using Django's PBKDF2 format: pbkdf2_sha256$<iterations>$<salt>$<hash>"""
import hashlib
import base64
import secrets
# Generate random salt
salt = secrets.token_urlsafe(12)
# Hash the password
hash_bytes = hashlib.pbkdf2_hmac(
"sha256", password.encode("utf-8"), salt.encode("utf-8"), iterations
)
# Encode to base64
hash_b64 = base64.b64encode(hash_bytes).decode("ascii")
return f"pbkdf2_sha256${iterations}${salt}${hash_b64}"
def _get_user_profiles(user_id: int) -> List[Dict[str, Any]]:
"""Fetch all profiles subscribed by a user."""
query = sql.SQL(
"""
SELECT {profile_columns}
FROM {profile_table}
INNER JOIN {subscription_table}
ON {profile_table}.profile_id = {subscription_table}.investment_profile_id
WHERE {subscription_table}.investor_id = {user_id}
ORDER BY {subscription_table}.subscribed_at DESC
"""
).format(
profile_columns=_columns_sql(PROFILE_RESPONSE_FIELDS),
profile_table=sql.Identifier(INVESTMENT_PROFILE_TABLE),
subscription_table=sql.Identifier(SUBSCRIPTION_TABLE),
user_id=sql.Placeholder("user_id"),
)
profiles = _fetch_all(query, {"user_id": user_id})
return [
_serialize_row(p, datetime_fields=PROFILE_DATETIME_FIELDS) for p in profiles
]
USER_RESPONSE_FIELDS = (
"id",
"username",
"first_name",
"last_name",
"email",
"is_superuser",
"is_staff",
"is_active",
"date_joined",
"last_login",
)
USER_DATETIME_FIELDS = ("date_joined", "last_login")
USER_BOOL_FIELDS = ("is_superuser", "is_staff", "is_active")
PROFILE_RESPONSE_FIELDS = (
"profile_id",
"name",
"description",
"criteria",
"created_at",
"is_active",
)
PROFILE_DATETIME_FIELDS = ("created_at",)
SCRAPER_RESPONSE_FIELDS = (
"id",
"params",
"last_seen_days",
"first_seen_days",
"frequency",
"task_name",
"enabled",
"property_types",
"page_size",
"max_pages",
"enrich_llm",
"only_match",
"once",
)
SCRAPER_BOOL_FIELDS = ("once",)
SCRAPER_INT_FIELDS = (
"last_seen_days",
"first_seen_days",
"page_size",
"max_pages",
"enabled",
"enrich_llm",
"only_match",
)
@app.before_request
def enforce_bearer_token() -> None:
if request.method == "OPTIONS":
return
# Allow Flask internals without auth.
if request.endpoint == "static":
return
provided_token = _require_bearer_token(request.headers.get("Authorization"))
if provided_token != API_TOKEN:
abort(401, description="Invalid bearer token")
@app.get("/profiles")
def get_profiles():
rows = _fetch_all(
sql.SQL("SELECT {columns} FROM {table} ORDER BY created_at DESC").format(
columns=_columns_sql(PROFILE_RESPONSE_FIELDS),
table=sql.Identifier(INVESTMENT_PROFILE_TABLE),
)
)
payload = [
_serialize_row(row, datetime_fields=PROFILE_DATETIME_FIELDS) for row in rows
]
return jsonify(payload)
@app.get("/profiles/<profile_id>")
def get_profile(profile_id: str):
profile_uuid = _parse_uuid(profile_id, "profile_id")
row = _fetch_one(
sql.SQL("SELECT {columns} FROM {table} WHERE profile_id = {identifier}").format(
columns=_columns_sql(PROFILE_RESPONSE_FIELDS),
table=sql.Identifier(INVESTMENT_PROFILE_TABLE),
identifier=sql.Placeholder("profile_id"),
),
{"profile_id": profile_uuid},
)
if row is None:
abort(404, description="Profile not found")
return jsonify(_serialize_row(row, datetime_fields=PROFILE_DATETIME_FIELDS))
@app.post("/profiles")
def create_profile():
payload = _get_json_body()
profile_uuid = uuid4()
name = _parse_string(payload.get("name"), "name")
description_value = payload.get("description")
description = (
None
if description_value is None
else _parse_string(description_value, "description", allow_empty=True)
)
criteria_raw = payload.get("criteria")
if criteria_raw is None:
abort(400, description="Field 'criteria' is required")
criteria = _ensure_json_compatible(criteria_raw, "criteria")
created_at_value = payload.get("created_at")
created_at = (
datetime.now(timezone.utc)
if created_at_value is None
else _parse_datetime(created_at_value, "created_at")
)
is_active = _parse_bool(payload.get("is_active", True), "is_active")
data = {
"profile_id": profile_uuid,
"name": name,
"description": description,
"criteria": criteria,
"created_at": created_at,
"is_active": is_active,
}
try:
row = _insert_row(INVESTMENT_PROFILE_TABLE, data, PROFILE_RESPONSE_FIELDS)
except psycopg.IntegrityError as exc:
_abort_for_integrity_error(exc)
return (
jsonify(_serialize_row(row, datetime_fields=PROFILE_DATETIME_FIELDS)),
201,
)
@app.put("/profiles/<profile_id>")
def update_profile(profile_id: str):
profile_uuid = _parse_uuid(profile_id, "profile_id")
payload = _get_json_body()
updates: Dict[str, Any] = {}
if "name" in payload:
updates["name"] = _parse_string(payload["name"], "name")
if "description" in payload:
description_value = payload["description"]
updates["description"] = (
None
if description_value is None
else _parse_string(description_value, "description", allow_empty=True)
)
if "criteria" in payload:
updates["criteria"] = _ensure_json_compatible(payload["criteria"], "criteria")
if "created_at" in payload:
updates["created_at"] = _parse_datetime(payload["created_at"], "created_at")
if "is_active" in payload:
updates["is_active"] = _parse_bool(payload["is_active"], "is_active")
if not updates:
abort(400, description="No updatable fields provided")
try:
row = _update_row(
INVESTMENT_PROFILE_TABLE,
"profile_id",
profile_uuid,
updates,
PROFILE_RESPONSE_FIELDS,
)
except psycopg.IntegrityError as exc:
_abort_for_integrity_error(exc)
if row is None:
abort(404, description="Profile not found")
return jsonify(_serialize_row(row, datetime_fields=PROFILE_DATETIME_FIELDS))
@app.delete("/profiles/<profile_id>")
def delete_profile(profile_id: str):
profile_uuid = _parse_uuid(profile_id, "profile_id")
# Delete all property matches for this profile first
delete_matches_query = sql.SQL(
"DELETE FROM {table} WHERE investment_profile_id = {profile_id}"
).format(
table=sql.Identifier(PROPERTY_MATCH_TABLE),
profile_id=sql.Placeholder("profile_id"),
)
_execute(delete_matches_query, {"profile_id": profile_uuid})
# Delete all subscriptions for this profile
delete_subscriptions_query = sql.SQL(
"DELETE FROM {table} WHERE investment_profile_id = {profile_id}"
).format(
table=sql.Identifier(SUBSCRIPTION_TABLE),
profile_id=sql.Placeholder("profile_id"),
)
_execute(delete_subscriptions_query, {"profile_id": profile_uuid})
# Now delete the profile
deleted = _delete_row(INVESTMENT_PROFILE_TABLE, "profile_id", profile_uuid)
if not deleted:
abort(404, description="Profile not found")
return "", 204
@app.get("/users")
def get_users():
rows = _fetch_all(
sql.SQL("SELECT {columns} FROM {table} ORDER BY id").format(
columns=_columns_sql(USER_RESPONSE_FIELDS),
table=sql.Identifier(USER_TABLE),
)
)
payload = []
for row in rows:
user = _serialize_row(row, datetime_fields=USER_DATETIME_FIELDS)
user["profiles"] = _get_user_profiles(row["id"])
payload.append(user)
return jsonify(payload)
@app.get("/users/<int:user_id>")
def get_user(user_id: int):
row = _fetch_one(
sql.SQL("SELECT {columns} FROM {table} WHERE id = {identifier}").format(
columns=_columns_sql(USER_RESPONSE_FIELDS),
table=sql.Identifier(USER_TABLE),
identifier=sql.Placeholder("user_id"),
),
{"user_id": user_id},
)
if row is None:
abort(404, description="User not found")
user = _serialize_row(row, datetime_fields=USER_DATETIME_FIELDS)
user["profiles"] = _get_user_profiles(user_id)
return jsonify(user)
@app.post("/users")
def create_user():
payload = _get_json_body()
user_data: Dict[str, Any] = {}
raw_password = _parse_string(payload.get("password"), "password")
user_data["password"] = _hash_django_password(raw_password)
user_data["username"] = _parse_string(payload.get("username"), "username")
user_data["first_name"] = _parse_string(payload.get("first_name"), "first_name")
user_data["last_name"] = _parse_string(payload.get("last_name"), "last_name")
user_data["email"] = _parse_string(payload.get("email"), "email")
user_data["is_superuser"] = _parse_bool(
payload.get("is_superuser", False), "is_superuser"
)
user_data["is_staff"] = _parse_bool(payload.get("is_staff", False), "is_staff")
user_data["is_active"] = _parse_bool(payload.get("is_active", True), "is_active")
user_data["date_joined"] = _parse_datetime(
payload.get("date_joined"), "date_joined"
)
if user_data["date_joined"] is None:
user_data["date_joined"] = datetime.now(timezone.utc)
user_data["last_login"] = _parse_datetime(payload.get("last_login"), "last_login")
# Parse profile_ids if provided
profile_ids: List[UUID] = []
if "profile_ids" in payload:
raw_profile_ids = payload["profile_ids"]
if not isinstance(raw_profile_ids, list):
abort(400, description="Field 'profile_ids' must be a list")
for idx, pid in enumerate(raw_profile_ids):
profile_ids.append(_parse_uuid(pid, f"profile_ids[{idx}]"))
try:
row = _insert_row(USER_TABLE, user_data, USER_RESPONSE_FIELDS)
except psycopg.IntegrityError as exc:
_abort_for_integrity_error(exc)
user_id = row["id"]
# Create profile subscriptions
for profile_id in profile_ids:
subscription_data = {
"subscription_id": uuid4(),
"investor_id": user_id,
"investment_profile_id": profile_id,
"subscribed_at": datetime.now(timezone.utc),
}
try:
_insert_row(
SUBSCRIPTION_TABLE,
subscription_data,
(
"subscription_id",
"investor_id",
"investment_profile_id",
"subscribed_at",
),
)
except psycopg.IntegrityError as exc:
_abort_for_integrity_error(exc)
# Fetch profiles for response
profiles = []
if profile_ids:
profiles_query = sql.SQL(
"SELECT {columns} FROM {table} WHERE profile_id = ANY({profile_ids})"
).format(
columns=_columns_sql(PROFILE_RESPONSE_FIELDS),
table=sql.Identifier(INVESTMENT_PROFILE_TABLE),
profile_ids=sql.Placeholder("profile_ids"),
)
profiles = _fetch_all(profiles_query, {"profile_ids": profile_ids})
response = _serialize_row(row, datetime_fields=USER_DATETIME_FIELDS)
response["profiles"] = [
_serialize_row(p, datetime_fields=PROFILE_DATETIME_FIELDS) for p in profiles
]
return jsonify(response), 201
@app.put("/users/<int:user_id>")
def update_user(user_id: int):
payload = _get_json_body()
updates: Dict[str, Any] = {}
if "password" in payload:
raw_password = _parse_string(payload["password"], "password")
updates["password"] = _hash_django_password(raw_password)
if "username" in payload:
updates["username"] = _parse_string(payload["username"], "username")
if "first_name" in payload:
updates["first_name"] = _parse_string(payload["first_name"], "first_name")
if "last_name" in payload:
updates["last_name"] = _parse_string(payload["last_name"], "last_name")
if "email" in payload:
updates["email"] = _parse_string(payload["email"], "email")
for field in USER_BOOL_FIELDS:
if field in payload:
updates[field] = _parse_bool(payload[field], field)
if "date_joined" in payload:
updates["date_joined"] = _parse_datetime(payload["date_joined"], "date_joined")
if "last_login" in payload:
updates["last_login"] = _parse_datetime(payload["last_login"], "last_login")
# Handle profile_ids update
update_profiles = False
profile_ids: List[UUID] = []
if "profile_ids" in payload:
update_profiles = True
raw_profile_ids = payload["profile_ids"]
if not isinstance(raw_profile_ids, list):
abort(400, description="Field 'profile_ids' must be a list")
for idx, pid in enumerate(raw_profile_ids):
profile_ids.append(_parse_uuid(pid, f"profile_ids[{idx}]"))
if not updates and not update_profiles:
abort(400, description="No updatable fields provided")
if updates:
try:
row = _update_row(USER_TABLE, "id", user_id, updates, USER_RESPONSE_FIELDS)
except psycopg.IntegrityError as exc:
_abort_for_integrity_error(exc)
if row is None:
abort(404, description="User not found")
else:
# Verify user exists
row = _fetch_one(
sql.SQL("SELECT {columns} FROM {table} WHERE id = {identifier}").format(
columns=_columns_sql(USER_RESPONSE_FIELDS),
table=sql.Identifier(USER_TABLE),
identifier=sql.Placeholder("user_id"),
),
{"user_id": user_id},
)
if row is None:
abort(404, description="User not found")
# Update profile subscriptions if requested
if update_profiles:
# Delete existing subscriptions
delete_query = sql.SQL(
"DELETE FROM {table} WHERE investor_id = {user_id}"
).format(
table=sql.Identifier(SUBSCRIPTION_TABLE),
user_id=sql.Placeholder("user_id"),
)
_execute(delete_query, {"user_id": user_id})
# Create new subscriptions
for profile_id in profile_ids:
subscription_data = {
"subscription_id": uuid4(),
"investor_id": user_id,
"investment_profile_id": profile_id,
"subscribed_at": datetime.now(timezone.utc),
}
try:
_insert_row(
SUBSCRIPTION_TABLE,
subscription_data,
(
"subscription_id",
"investor_id",
"investment_profile_id",
"subscribed_at",
),
)
except psycopg.IntegrityError as exc:
_abort_for_integrity_error(exc)
user = _serialize_row(row, datetime_fields=USER_DATETIME_FIELDS)
user["profiles"] = _get_user_profiles(user_id)
return jsonify(user)
@app.delete("/users/<int:user_id>")
def delete_user(user_id: int):
# Delete all profile subscriptions for this user first
delete_subscriptions_query = sql.SQL(
"DELETE FROM {table} WHERE investor_id = {user_id}"
).format(
table=sql.Identifier(SUBSCRIPTION_TABLE),
user_id=sql.Placeholder("user_id"),
)
_execute(delete_subscriptions_query, {"user_id": user_id})
# Now delete the user
deleted = _delete_row(USER_TABLE, "id", user_id)
if not deleted:
abort(404, description="User not found")
return "", 204
@app.get("/scrapers")
def get_scrapers():
rows = _fetch_all(
sql.SQL("SELECT {columns} FROM {table} ORDER BY id").format(
columns=_columns_sql(SCRAPER_RESPONSE_FIELDS),
table=sql.Identifier(SCRAPER_TABLE),
)
)
return jsonify([dict(row) for row in rows])
@app.get("/scrapers/<scraper_id>")
def get_scraper(scraper_id: str):
row = _fetch_one(
sql.SQL("SELECT {columns} FROM {table} WHERE id = {identifier}").format(
columns=_columns_sql(SCRAPER_RESPONSE_FIELDS),
table=sql.Identifier(SCRAPER_TABLE),
identifier=sql.Placeholder("scraper_id"),
),
{"scraper_id": _parse_string(scraper_id, "id")},
)
if row is None:
abort(404, description="Scraper not found")
return jsonify(dict(row))
@app.post("/scrapers")
def create_scraper():
payload = _get_json_body()
scraper_id = str(uuid4())
data: Dict[str, Any] = {"id": scraper_id}
for field in ("params", "frequency", "task_name"):
if field in payload:
value = payload[field]
data[field] = (
None if value is None else _parse_string(value, field, allow_empty=True)
)
# Validation spéciale pour property_types
if "property_types" in payload:
value = payload["property_types"]
parsed_value = (
None
if value is None
else _parse_string(value, "property_types", allow_empty=True)
)
data["property_types"] = _validate_property_types(parsed_value)
for field in SCRAPER_INT_FIELDS:
if field in payload:
value = payload[field]
data[field] = None if value is None else _parse_int(value, field)
for field in SCRAPER_BOOL_FIELDS:
if field in payload:
value = payload[field]
data[field] = None if value is None else _parse_bool(value, field)
try:
row = _insert_row(SCRAPER_TABLE, data, SCRAPER_RESPONSE_FIELDS)
except psycopg.IntegrityError as exc:
_abort_for_integrity_error(exc)
return jsonify(dict(row)), 201
@app.put("/scrapers/<scraper_id>")
def update_scraper(scraper_id: str):
payload = _get_json_body()
updates: Dict[str, Any] = {}
for field in ("params", "frequency", "task_name"):
if field in payload:
value = payload[field]
updates[field] = (
None if value is None else _parse_string(value, field, allow_empty=True)
)
# Validation spéciale pour property_types
if "property_types" in payload:
value = payload["property_types"]
parsed_value = (
None
if value is None
else _parse_string(value, "property_types", allow_empty=True)
)
updates["property_types"] = _validate_property_types(parsed_value)
for field in SCRAPER_INT_FIELDS:
if field in payload:
value = payload[field]
updates[field] = None if value is None else _parse_int(value, field)
for field in SCRAPER_BOOL_FIELDS:
if field in payload:
value = payload[field]
updates[field] = None if value is None else _parse_bool(value, field)
if not updates:
abort(400, description="No updatable fields provided")
try:
row = _update_row(
SCRAPER_TABLE,
"id",
_parse_string(scraper_id, "id"),
updates,
SCRAPER_RESPONSE_FIELDS,
)
except psycopg.IntegrityError as exc:
_abort_for_integrity_error(exc)
if row is None:
abort(404, description="Scraper not found")
return jsonify(dict(row))
@app.delete("/scrapers/<scraper_id>")
def delete_scraper(scraper_id: str):
deleted = _delete_row(SCRAPER_TABLE, "id", _parse_string(scraper_id, "id"))
if not deleted:
abort(404, description="Scraper not found")
return "", 204
@app.post("/scrapers/count")
def count_scraper_properties():
payload = _get_json_body()
print(f"[COUNT] Received payload: {payload}")
base_params = _load_scraper_params(payload.get("params"))
print(f"[COUNT] Base params: {base_params}")
first_seen_days = _parse_optional_int(
payload.get("first_seen_days"), "first_seen_days"
)
last_seen_days = _parse_optional_int(
payload.get("last_seen_days"), "last_seen_days"
)
print(
f"[COUNT] first_seen_days: {first_seen_days}, last_seen_days: {last_seen_days}"
)
query_filters = build_scraper_params(base_params, first_seen_days, last_seen_days)
print(f"[COUNT] Query filters after build: {query_filters}")
flux_payload = {"query": {"filterProperty": query_filters}}
print(f"[COUNT] Fluximmo payload: {json.dumps(flux_payload, indent=2)}")
headers = {
"x-api-key": FLUXIMMO_API_KEY,
"Content-Type": "application/json",
}
try:
response = requests.post(
FLUXIMMO_COUNT_URL, json=flux_payload, headers=headers, timeout=15
)
print(f"[COUNT] Fluximmo response status: {response.status_code}")
print(f"[COUNT] Fluximmo response body: {response.text}")
except requests.RequestException as e:
print(f"[COUNT] Request exception: {e}")
abort(502, description="Fluximmo request failed")
if response.status_code >= 400:
try:
detail = response.json()
except ValueError:
detail = response.text
abort(502, description=f"Fluximmo error: {detail}")
try:
response_data = response.json()
except ValueError:
abort(502, description="Fluximmo response was not JSON")
count = response_data.get("data", {}).get("count")
if count is None:
abort(502, description="Fluximmo response missing count")
try:
count_value = int(count)
except (TypeError, ValueError):
abort(502, description="Fluximmo count is not an integer")
return jsonify({"count": count_value})
def _get_genai_client():
"""Get or create a Gemini client using Vertex AI."""
if not GOOGLE_PROJECT_ID:
abort(503, description="GOOGLE_PROJECT_ID not configured")
# Set credentials path if provided
if GOOGLE_CREDENTIALS_PATH:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = GOOGLE_CREDENTIALS_PATH
client = genai.Client(
vertexai=True,
project=GOOGLE_PROJECT_ID,
location=GOOGLE_LOCATION,
)
return client
def _call_gemini(system_prompt: str, user_prompt: str) -> str:
"""Call Gemini API via Vertex AI and return the generated text."""
try:
client = _get_genai_client()
response = client.models.generate_content(
model="gemini-2.5-flash",
contents=user_prompt,
config=types.GenerateContentConfig(
system_instruction=system_prompt,
temperature=0.2,
top_k=40,
top_p=0.95,
max_output_tokens=8192,
),
)
print(
f"[GEMINI] Response: {response.text[:500] if response.text else 'No text'}"
)
if not response.text:
abort(502, description="Gemini returned no content")
return response.text
except Exception as e:
print(f"[GEMINI] Error: {e}")
abort(502, description=f"Gemini request failed: {str(e)}")
def _extract_json_from_response(text: str) -> Dict[str, Any]:
"""Extract JSON from Gemini response, handling markdown code blocks."""
# Remove markdown code blocks if present
text = text.strip()
if text.startswith("```json"):
text = text[7:]
elif text.startswith("```"):
text = text[3:]
if text.endswith("```"):
text = text[:-3]
text = text.strip()
try:
return json.loads(text)
except json.JSONDecodeError as e:
abort(400, description=f"Failed to parse Gemini response as JSON: {e}")
@app.post("/ai/generate-scraper")
def generate_scraper():
"""Generate scraper JSON from natural language prompt using Gemini."""
payload = _get_json_body()
user_prompt = payload.get("prompt")
if not user_prompt:
abort(400, description="Field 'prompt' is required")
generated_text = _call_gemini(SCRAPER_SCHEMA_DOC, user_prompt)
result = _extract_json_from_response(generated_text)
return jsonify({"params": result})
@app.post("/ai/generate-profile")
def generate_profile():
"""Generate profile JSON from natural language prompt using Gemini."""
payload = _get_json_body()
user_prompt = payload.get("prompt")
if not user_prompt:
abort(400, description="Field 'prompt' is required")
generated_text = _call_gemini(PROFILE_SCHEMA_DOC, user_prompt)
result = _extract_json_from_response(generated_text)
# Return the result directly at root level (not wrapped in criteria)
return jsonify(result)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=int(os.getenv("PORT", "3000")), debug=False)