2025-11-17 15:15:29 +01:00

911 lines
27 KiB
Python

"""Minimal Flask API with token-based authentication middleware."""
from __future__ import annotations
import os
import json
from copy import deepcopy
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping
from uuid import UUID, uuid4
import psycopg
from psycopg import sql
from psycopg.rows import dict_row
from psycopg.types.json import Json
from deepmerge import always_merger
from dotenv import load_dotenv
from flask import Flask, abort, jsonify, request, g
from flask_cors import CORS
import requests
load_dotenv()
API_TOKEN = os.getenv("API_TOKEN")
FLUXIMMO_API_KEY = os.getenv("FLUXIMMO_API_KEY")
FLUXIMMO_COUNT_URL = os.getenv(
"FLUXIMMO_COUNT_URL",
"https://analytics.fluximmo.io/properties/count",
)
REQUIRED_DB_SETTINGS = {
"DB_NAME": os.getenv("DB_NAME", ""),
"DB_HOST": os.getenv("DB_HOST", ""),
"DB_PORT": os.getenv("DB_PORT", ""),
"DB_USERNAME": os.getenv("DB_USERNAME", ""),
"DB_PASSWORD": os.getenv("DB_PASSWORD", ""),
}
missing_db_settings = [
name for name, value in REQUIRED_DB_SETTINGS.items() if not value
]
if missing_db_settings:
missing = ", ".join(missing_db_settings)
raise RuntimeError(
f"Database configuration missing for: {missing}. Did you configure the .env file?"
)
DB_NAME = REQUIRED_DB_SETTINGS["DB_NAME"]
DB_HOST = REQUIRED_DB_SETTINGS["DB_HOST"]
DB_USERNAME = REQUIRED_DB_SETTINGS["DB_USERNAME"]
DB_PASSWORD = REQUIRED_DB_SETTINGS["DB_PASSWORD"]
try:
DB_PORT = int(REQUIRED_DB_SETTINGS["DB_PORT"])
except ValueError as exc:
raise RuntimeError("DB_PORT must be an integer") from exc
USER_TABLE = os.getenv("DB_TABLE_USERS", "auth_user")
INVESTMENT_PROFILE_TABLE = os.getenv(
"DB_TABLE_INVESTMENT_PROFILES", "users_investmentprofile"
)
SCRAPER_TABLE = os.getenv("DB_TABLE_SCRAPERS", "scraper")
if not API_TOKEN:
raise RuntimeError(
"API_TOKEN missing from environment. Did you configure the .env file?"
)
if not FLUXIMMO_API_KEY:
raise RuntimeError(
"FLUXIMMO_API_KEY missing from environment. Did you configure the .env file?"
)
app = Flask(__name__)
CORS(app, resources={r"/*": {"origins": "*"}}, supports_credentials=True)
def get_db_connection() -> psycopg.Connection:
connection = psycopg.connect(
dbname=DB_NAME,
user=DB_USERNAME,
password=DB_PASSWORD,
host=DB_HOST,
port=DB_PORT,
row_factory=dict_row,
)
connection.autocommit = True
return connection
def get_db() -> psycopg.Connection:
if "db_connection" not in g:
try:
g.db_connection = get_db_connection()
except psycopg.OperationalError:
abort(503, description="Database connection failed")
return g.db_connection
@app.teardown_appcontext
def close_db_connection(_: BaseException | None) -> None:
db_connection = g.pop("db_connection", None)
if db_connection is not None:
db_connection.close()
def _get_json_body() -> MutableMapping[str, Any]:
payload = request.get_json(silent=True)
if not isinstance(payload, MutableMapping):
abort(400, description="Request body must be a JSON object")
return payload
def _parse_bool(value: Any, field_name: str) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, str):
lowered = value.strip().lower()
if lowered in {"true", "1", "yes", "y"}:
return True
if lowered in {"false", "0", "no", "n"}:
return False
if isinstance(value, (int, float)):
if value in {0, 1}:
return bool(value)
abort(400, description=f"Field '{field_name}' must be a boolean value")
def _parse_datetime(value: Any, field_name: str) -> datetime | None:
if value is None:
return None
if isinstance(value, datetime):
dt = value
elif isinstance(value, str):
try:
normalized = value.replace("Z", "+00:00")
dt = datetime.fromisoformat(normalized)
except ValueError:
abort(
400,
description=f"Field '{field_name}' must be a valid ISO 8601 datetime",
)
else:
abort(
400, description=f"Field '{field_name}' must be a valid ISO 8601 datetime"
)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
else:
dt = dt.astimezone(timezone.utc)
return dt
def _parse_string(value: Any, field_name: str, *, allow_empty: bool = False) -> str:
if not isinstance(value, str):
abort(400, description=f"Field '{field_name}' must be a string")
stripped = value.strip()
if not allow_empty and not stripped:
abort(400, description=f"Field '{field_name}' cannot be empty")
return stripped if not allow_empty else value
def _isoformat(dt: datetime | None) -> str | None:
if dt is None:
return None
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
def _ensure_json_compatible(value: Any, field_name: str) -> Any:
if value is None:
return None
if isinstance(value, (dict, list)):
return Json(value)
if isinstance(value, (str, int, float, bool)):
return value
abort(400, description=f"Field '{field_name}' must be valid JSON data")
def _parse_int(value: Any, field_name: str) -> int:
if isinstance(value, int):
return value
if isinstance(value, str):
try:
return int(value, 10)
except ValueError:
abort(400, description=f"Field '{field_name}' must be an integer")
abort(400, description=f"Field '{field_name}' must be an integer")
def _parse_optional_int(value: Any, field_name: str) -> int | None:
if value is None:
return None
try:
return _parse_int(value, field_name)
except Exception:
abort(400, description=f"Field '{field_name}' must be an integer or null")
def _parse_uuid(value: Any, field_name: str) -> UUID:
if isinstance(value, UUID):
return value
if isinstance(value, str):
try:
return UUID(value)
except ValueError:
abort(400, description=f"Field '{field_name}' must be a valid UUID")
abort(400, description=f"Field '{field_name}' must be a valid UUID")
def _load_scraper_params(value: Any) -> Dict[str, Any]:
if value is None:
return {}
if isinstance(value, Json):
value = value.data
if isinstance(value, dict):
return deepcopy(value)
if isinstance(value, str):
try:
parsed = json.loads(value)
except json.JSONDecodeError:
abort(400, description="Field 'params' must contain valid JSON")
if isinstance(parsed, dict):
return parsed
abort(400, description="Field 'params' must decode to an object")
abort(400, description="Field 'params' must be a JSON object or string")
def _validate_property_types(value: str | None) -> str | None:
"""Valide que les types de propriétés sont dans la liste autorisée."""
if value is None or value.strip() == "":
return None
valid_types = {"immeuble", "appartement", "maison"}
types = [t.strip().lower() for t in value.split(",") if t.strip()]
invalid_types = [t for t in types if t not in valid_types]
if invalid_types:
abort(
400,
description=f"Invalid property types: {', '.join(invalid_types)}. "
f"Allowed values: {', '.join(sorted(valid_types))}"
)
return value.strip()
def build_scraper_params(
params: Dict[str, Any],
first_seen_days: int | None,
last_seen_days: int | None,
) -> Dict[str, Any]:
now = datetime.now(timezone.utc)
dynamic_params: Dict[str, Any] = {"meta": {}}
if first_seen_days is not None:
dynamic_params["meta"]["firstSeenAt"] = {
"min": (
(now - timedelta(days=first_seen_days))
.replace(microsecond=0)
.isoformat()
.replace("+00:00", "Z")
)
}
if last_seen_days is not None:
dynamic_params["meta"]["lastSeenAt"] = {
"max": (
(now - timedelta(days=last_seen_days))
.replace(microsecond=0)
.isoformat()
.replace("+00:00", "Z")
)
}
if not dynamic_params["meta"]:
dynamic_params.pop("meta")
base = deepcopy(params)
merged = always_merger.merge(base, dynamic_params)
return merged
def _require_bearer_token(header_value: str | None) -> str:
if not header_value:
abort(401, description="Missing bearer token")
parts = header_value.strip().split()
if len(parts) != 2 or parts[0].lower() != "bearer":
abort(401, description="Authorization header must be 'Bearer <token>'")
token = parts[1].strip()
if not token:
abort(401, description="Authorization header must include a token")
return token
def _serialize_row(
row: Mapping[str, Any], *, datetime_fields: Iterable[str] | None = None
) -> Dict[str, Any]:
result: Dict[str, Any] = dict(row)
for field in datetime_fields or ():
result[field] = _isoformat(result.get(field))
for field, value in list(result.items()):
if isinstance(value, Json):
result[field] = value.data
if isinstance(value, UUID):
result[field] = str(value)
return result
def _fetch_one(
query: sql.Composed, params: Mapping[str, Any]
) -> Mapping[str, Any] | None:
conn = get_db()
with conn.cursor() as cur:
cur.execute(query, params)
return cur.fetchone()
def _fetch_all(
query: sql.Composed, params: Mapping[str, Any] | None = None
) -> List[Mapping[str, Any]]:
conn = get_db()
with conn.cursor() as cur:
cur.execute(query, params or {})
return cur.fetchall()
def _execute(query: sql.Composed, params: Mapping[str, Any]) -> None:
conn = get_db()
with conn.cursor() as cur:
cur.execute(query, params)
def _columns_sql(columns: Iterable[str]) -> sql.Composed:
return sql.SQL(", ").join(sql.Identifier(column) for column in columns)
def _placeholders(columns: Iterable[str]) -> sql.Composed:
return sql.SQL(", ").join(sql.Placeholder(column) for column in columns)
def _insert_row(
table: str, data: Mapping[str, Any], returning: Iterable[str]
) -> Mapping[str, Any]:
if not data:
raise ValueError("Cannot insert without data")
adapted_data: Dict[str, Any] = {}
for key, value in data.items():
if isinstance(value, (dict, list)):
adapted_data[key] = Json(value)
else:
adapted_data[key] = value
query = sql.SQL(
"INSERT INTO {table} ({columns}) VALUES ({values}) RETURNING {returning}"
).format(
table=sql.Identifier(table),
columns=_columns_sql(data.keys()),
values=_placeholders(data.keys()),
returning=_columns_sql(returning),
)
row = _fetch_one(query, adapted_data)
if row is None:
raise RuntimeError("Insert statement did not return a row")
return row
def _update_row(
table: str,
identifier_column: str,
identifier_value: Any,
data: Mapping[str, Any],
returning: Iterable[str],
) -> Mapping[str, Any] | None:
if not data:
raise ValueError("Cannot update without data")
assignments = sql.SQL(", ").join(
sql.SQL("{column} = {placeholder}").format(
column=sql.Identifier(column),
placeholder=sql.Placeholder(column),
)
for column in data.keys()
)
query = sql.SQL(
"UPDATE {table} SET {assignments} WHERE {identifier_column} = {identifier} "
"RETURNING {returning}"
).format(
table=sql.Identifier(table),
assignments=assignments,
identifier_column=sql.Identifier(identifier_column),
identifier=sql.Placeholder("identifier"),
returning=_columns_sql(returning),
)
params: Dict[str, Any] = dict(data)
params["identifier"] = identifier_value
for key, value in list(params.items()):
if isinstance(value, (dict, list)):
params[key] = Json(value)
return _fetch_one(query, params)
def _delete_row(table: str, identifier_column: str, identifier_value: Any) -> bool:
query = sql.SQL(
"DELETE FROM {table} WHERE {identifier_column} = {identifier}"
).format(
table=sql.Identifier(table),
identifier_column=sql.Identifier(identifier_column),
identifier=sql.Placeholder("identifier"),
)
conn = get_db()
with conn.cursor() as cur:
cur.execute(query, {"identifier": identifier_value})
return cur.rowcount > 0
def _abort_for_integrity_error(exc: psycopg.IntegrityError) -> None:
detail = getattr(getattr(exc, "diag", None), "message_detail", None)
abort(409, description=detail or "Database constraint violation")
USER_RESPONSE_FIELDS = (
"id",
"username",
"first_name",
"last_name",
"email",
"is_superuser",
"is_staff",
"is_active",
"date_joined",
"last_login",
)
USER_DATETIME_FIELDS = ("date_joined", "last_login")
USER_BOOL_FIELDS = ("is_superuser", "is_staff", "is_active")
PROFILE_RESPONSE_FIELDS = (
"profile_id",
"name",
"description",
"criteria",
"created_at",
"is_active",
)
PROFILE_DATETIME_FIELDS = ("created_at",)
SCRAPER_RESPONSE_FIELDS = (
"id",
"params",
"last_seen_days",
"first_seen_days",
"frequency",
"task_name",
"enabled",
"property_types",
"page_size",
"max_pages",
"enrich_llm",
"only_match",
"once",
)
SCRAPER_BOOL_FIELDS = (
"once",
)
SCRAPER_INT_FIELDS = (
"last_seen_days",
"first_seen_days",
"page_size",
"max_pages",
"enabled",
"enrich_llm",
"only_match",
)
@app.before_request
def enforce_bearer_token() -> None:
if request.method == "OPTIONS":
return
# Allow Flask internals without auth.
if request.endpoint == "static":
return
provided_token = _require_bearer_token(request.headers.get("Authorization"))
if provided_token != API_TOKEN:
abort(401, description="Invalid bearer token")
@app.get("/profiles")
def get_profiles():
rows = _fetch_all(
sql.SQL("SELECT {columns} FROM {table} ORDER BY created_at DESC").format(
columns=_columns_sql(PROFILE_RESPONSE_FIELDS),
table=sql.Identifier(INVESTMENT_PROFILE_TABLE),
)
)
payload = [
_serialize_row(row, datetime_fields=PROFILE_DATETIME_FIELDS) for row in rows
]
return jsonify(payload)
@app.get("/profiles/<profile_id>")
def get_profile(profile_id: str):
profile_uuid = _parse_uuid(profile_id, "profile_id")
row = _fetch_one(
sql.SQL("SELECT {columns} FROM {table} WHERE profile_id = {identifier}").format(
columns=_columns_sql(PROFILE_RESPONSE_FIELDS),
table=sql.Identifier(INVESTMENT_PROFILE_TABLE),
identifier=sql.Placeholder("profile_id"),
),
{"profile_id": profile_uuid},
)
if row is None:
abort(404, description="Profile not found")
return jsonify(_serialize_row(row, datetime_fields=PROFILE_DATETIME_FIELDS))
@app.post("/profiles")
def create_profile():
payload = _get_json_body()
profile_uuid = uuid4()
name = _parse_string(payload.get("name"), "name")
description_value = payload.get("description")
description = (
None
if description_value is None
else _parse_string(description_value, "description", allow_empty=True)
)
criteria_raw = payload.get("criteria")
if criteria_raw is None:
abort(400, description="Field 'criteria' is required")
criteria = _ensure_json_compatible(criteria_raw, "criteria")
created_at_value = payload.get("created_at")
created_at = (
datetime.now(timezone.utc)
if created_at_value is None
else _parse_datetime(created_at_value, "created_at")
)
is_active = _parse_bool(payload.get("is_active", True), "is_active")
data = {
"profile_id": profile_uuid,
"name": name,
"description": description,
"criteria": criteria,
"created_at": created_at,
"is_active": is_active,
}
try:
row = _insert_row(INVESTMENT_PROFILE_TABLE, data, PROFILE_RESPONSE_FIELDS)
except psycopg.IntegrityError as exc:
_abort_for_integrity_error(exc)
return (
jsonify(_serialize_row(row, datetime_fields=PROFILE_DATETIME_FIELDS)),
201,
)
@app.put("/profiles/<profile_id>")
def update_profile(profile_id: str):
profile_uuid = _parse_uuid(profile_id, "profile_id")
payload = _get_json_body()
updates: Dict[str, Any] = {}
if "name" in payload:
updates["name"] = _parse_string(payload["name"], "name")
if "description" in payload:
description_value = payload["description"]
updates["description"] = (
None
if description_value is None
else _parse_string(description_value, "description", allow_empty=True)
)
if "criteria" in payload:
updates["criteria"] = _ensure_json_compatible(payload["criteria"], "criteria")
if "created_at" in payload:
updates["created_at"] = _parse_datetime(payload["created_at"], "created_at")
if "is_active" in payload:
updates["is_active"] = _parse_bool(payload["is_active"], "is_active")
if not updates:
abort(400, description="No updatable fields provided")
try:
row = _update_row(
INVESTMENT_PROFILE_TABLE,
"profile_id",
profile_uuid,
updates,
PROFILE_RESPONSE_FIELDS,
)
except psycopg.IntegrityError as exc:
_abort_for_integrity_error(exc)
if row is None:
abort(404, description="Profile not found")
return jsonify(_serialize_row(row, datetime_fields=PROFILE_DATETIME_FIELDS))
@app.delete("/profiles/<profile_id>")
def delete_profile(profile_id: str):
profile_uuid = _parse_uuid(profile_id, "profile_id")
deleted = _delete_row(INVESTMENT_PROFILE_TABLE, "profile_id", profile_uuid)
if not deleted:
abort(404, description="Profile not found")
return "", 204
@app.get("/users")
def get_users():
rows = _fetch_all(
sql.SQL("SELECT {columns} FROM {table} ORDER BY id").format(
columns=_columns_sql(USER_RESPONSE_FIELDS),
table=sql.Identifier(USER_TABLE),
)
)
payload = [
_serialize_row(row, datetime_fields=USER_DATETIME_FIELDS) for row in rows
]
return jsonify(payload)
@app.get("/users/<int:user_id>")
def get_user(user_id: int):
row = _fetch_one(
sql.SQL("SELECT {columns} FROM {table} WHERE id = {identifier}").format(
columns=_columns_sql(USER_RESPONSE_FIELDS),
table=sql.Identifier(USER_TABLE),
identifier=sql.Placeholder("user_id"),
),
{"user_id": user_id},
)
if row is None:
abort(404, description="User not found")
return jsonify(_serialize_row(row, datetime_fields=USER_DATETIME_FIELDS))
@app.post("/users")
def create_user():
payload = _get_json_body()
user_data: Dict[str, Any] = {}
user_data["password"] = _parse_string(payload.get("password"), "password")
user_data["username"] = _parse_string(payload.get("username"), "username")
user_data["first_name"] = _parse_string(payload.get("first_name"), "first_name")
user_data["last_name"] = _parse_string(payload.get("last_name"), "last_name")
user_data["email"] = _parse_string(payload.get("email"), "email")
user_data["is_superuser"] = _parse_bool(
payload.get("is_superuser", False), "is_superuser"
)
user_data["is_staff"] = _parse_bool(payload.get("is_staff", False), "is_staff")
user_data["is_active"] = _parse_bool(payload.get("is_active", True), "is_active")
user_data["date_joined"] = _parse_datetime(
payload.get("date_joined"), "date_joined"
)
if user_data["date_joined"] is None:
user_data["date_joined"] = datetime.now(timezone.utc)
user_data["last_login"] = _parse_datetime(payload.get("last_login"), "last_login")
try:
row = _insert_row(USER_TABLE, user_data, USER_RESPONSE_FIELDS)
except psycopg.IntegrityError as exc:
_abort_for_integrity_error(exc)
return (
jsonify(_serialize_row(row, datetime_fields=USER_DATETIME_FIELDS)),
201,
)
@app.put("/users/<int:user_id>")
def update_user(user_id: int):
payload = _get_json_body()
updates: Dict[str, Any] = {}
if "password" in payload:
updates["password"] = _parse_string(payload["password"], "password")
if "username" in payload:
updates["username"] = _parse_string(payload["username"], "username")
if "first_name" in payload:
updates["first_name"] = _parse_string(payload["first_name"], "first_name")
if "last_name" in payload:
updates["last_name"] = _parse_string(payload["last_name"], "last_name")
if "email" in payload:
updates["email"] = _parse_string(payload["email"], "email")
for field in USER_BOOL_FIELDS:
if field in payload:
updates[field] = _parse_bool(payload[field], field)
if "date_joined" in payload:
updates["date_joined"] = _parse_datetime(payload["date_joined"], "date_joined")
if "last_login" in payload:
updates["last_login"] = _parse_datetime(payload["last_login"], "last_login")
if not updates:
abort(400, description="No updatable fields provided")
try:
row = _update_row(USER_TABLE, "id", user_id, updates, USER_RESPONSE_FIELDS)
except psycopg.IntegrityError as exc:
_abort_for_integrity_error(exc)
if row is None:
abort(404, description="User not found")
return jsonify(_serialize_row(row, datetime_fields=USER_DATETIME_FIELDS))
@app.delete("/users/<int:user_id>")
def delete_user(user_id: int):
deleted = _delete_row(USER_TABLE, "id", user_id)
if not deleted:
abort(404, description="User not found")
return "", 204
@app.get("/scrapers")
def get_scrapers():
rows = _fetch_all(
sql.SQL("SELECT {columns} FROM {table} ORDER BY id").format(
columns=_columns_sql(SCRAPER_RESPONSE_FIELDS),
table=sql.Identifier(SCRAPER_TABLE),
)
)
return jsonify([dict(row) for row in rows])
@app.get("/scrapers/<scraper_id>")
def get_scraper(scraper_id: str):
row = _fetch_one(
sql.SQL("SELECT {columns} FROM {table} WHERE id = {identifier}").format(
columns=_columns_sql(SCRAPER_RESPONSE_FIELDS),
table=sql.Identifier(SCRAPER_TABLE),
identifier=sql.Placeholder("scraper_id"),
),
{"scraper_id": _parse_string(scraper_id, "id")},
)
if row is None:
abort(404, description="Scraper not found")
return jsonify(dict(row))
@app.post("/scrapers")
def create_scraper():
payload = _get_json_body()
scraper_id = str(uuid4())
data: Dict[str, Any] = {"id": scraper_id}
for field in ("params", "frequency", "task_name"):
if field in payload:
value = payload[field]
data[field] = (
None if value is None else _parse_string(value, field, allow_empty=True)
)
# Validation spéciale pour property_types
if "property_types" in payload:
value = payload["property_types"]
parsed_value = None if value is None else _parse_string(value, "property_types", allow_empty=True)
data["property_types"] = _validate_property_types(parsed_value)
for field in SCRAPER_INT_FIELDS:
if field in payload:
value = payload[field]
data[field] = None if value is None else _parse_int(value, field)
for field in SCRAPER_BOOL_FIELDS:
if field in payload:
value = payload[field]
data[field] = None if value is None else _parse_bool(value, field)
try:
row = _insert_row(SCRAPER_TABLE, data, SCRAPER_RESPONSE_FIELDS)
except psycopg.IntegrityError as exc:
_abort_for_integrity_error(exc)
return jsonify(dict(row)), 201
@app.put("/scrapers/<scraper_id>")
def update_scraper(scraper_id: str):
payload = _get_json_body()
updates: Dict[str, Any] = {}
for field in ("params", "frequency", "task_name"):
if field in payload:
value = payload[field]
updates[field] = (
None if value is None else _parse_string(value, field, allow_empty=True)
)
# Validation spéciale pour property_types
if "property_types" in payload:
value = payload["property_types"]
parsed_value = None if value is None else _parse_string(value, "property_types", allow_empty=True)
updates["property_types"] = _validate_property_types(parsed_value)
for field in SCRAPER_INT_FIELDS:
if field in payload:
value = payload[field]
updates[field] = None if value is None else _parse_int(value, field)
for field in SCRAPER_BOOL_FIELDS:
if field in payload:
value = payload[field]
updates[field] = None if value is None else _parse_bool(value, field)
if not updates:
abort(400, description="No updatable fields provided")
try:
row = _update_row(
SCRAPER_TABLE,
"id",
_parse_string(scraper_id, "id"),
updates,
SCRAPER_RESPONSE_FIELDS,
)
except psycopg.IntegrityError as exc:
_abort_for_integrity_error(exc)
if row is None:
abort(404, description="Scraper not found")
return jsonify(dict(row))
@app.delete("/scrapers/<scraper_id>")
def delete_scraper(scraper_id: str):
deleted = _delete_row(SCRAPER_TABLE, "id", _parse_string(scraper_id, "id"))
if not deleted:
abort(404, description="Scraper not found")
return "", 204
@app.post("/scrapers/count")
def count_scraper_properties():
payload = _get_json_body()
base_params = _load_scraper_params(payload.get("params"))
first_seen_days = _parse_optional_int(
payload.get("first_seen_days"), "first_seen_days"
)
last_seen_days = _parse_optional_int(
payload.get("last_seen_days"), "last_seen_days"
)
query_filters = build_scraper_params(base_params, first_seen_days, last_seen_days)
flux_payload = {"query": query_filters}
headers = {
"x-api-key": FLUXIMMO_API_KEY,
"Content-Type": "application/json",
}
try:
response = requests.post(
FLUXIMMO_COUNT_URL, json=flux_payload, headers=headers, timeout=15
)
except requests.RequestException:
abort(502, description="Fluximmo request failed")
if response.status_code >= 400:
try:
detail = response.json()
except ValueError:
detail = response.text
abort(502, description=f"Fluximmo error: {detail}")
try:
response_data = response.json()
except ValueError:
abort(502, description="Fluximmo response was not JSON")
count = response_data.get("data", {}).get("count")
if count is None:
abort(502, description="Fluximmo response missing count")
try:
count_value = int(count)
except (TypeError, ValueError):
abort(502, description="Fluximmo count is not an integer")
return jsonify({"count": count_value})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=int(os.getenv("PORT", "3000")), debug=False)