import streamlit as st
import requests
import json
import pandas as pd
from datetime import datetime
# Configuration de la page
st.set_page_config(
page_title="OpenSanctions API Search",
page_icon="🔍",
layout="wide"
)
# Titre principal
st.title("🔍 Recherche base de donnée")
st.markdown("### Recherche en temps réel dans la base de données Canadiennes et Internationales")
# Configuration de l'API
API_KEY = "c63ffd50bf236645ff762d55320b9e4e"
BASE_URL = "https://api.opensanctions.org"
def search_opensanctions(query, dataset="default", limit=20):
"""
Effectue une recherche dans OpenSanctions avec la bonne authentification
"""
try:
# URL de l'endpoint de recherche
url = f"{BASE_URL}/search/{dataset}"
# Headers avec authentification selon la documentation OpenSanctions
headers = {
"Authorization": f"ApiKey {API_KEY}",
"Content-Type": "application/json"
}
# Paramètres de requête
params = {
"q": query,
"limit": limit
}
# Exécuter la requête
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
return response.json()
else:
st.error(f"Erreur API: {response.status_code} - {response.text}")
return None
except Exception as e:
st.error(f"Erreur lors de la requête: {str(e)}")
return None
def match_opensanctions(query_data, dataset="default"):
"""
Utilise l'endpoint de matching pour des recherches précises
"""
try:
url = f"{BASE_URL}/match/{dataset}"
headers = {
"Authorization": f"ApiKey {API_KEY}",
"Content-Type": "application/json"
}
# Structure pour le matching
batch = {"queries": {"q1": query_data}}
response = requests.post(url, headers=headers, json=batch)
if response.status_code == 200:
return response.json()
else:
st.error(f"Erreur API Match: {response.status_code} - {response.text}")
return None
except Exception as e:
st.error(f"Erreur lors du matching: {str(e)}")
return None
# Interface utilisateur
st.sidebar.header("Configuration")
# Choix du dataset
dataset_options = {
"default": "Base complète (défaut)",
"sanctions": "Sanctions uniquement",
"peps": "Personnes politiquement exposées",
"ca_dfatd_sema_sanctions": "Sanctions canadiennes",
"us_ofac_sdn": "Liste OFAC USA",
"eu_fsf": "Sanctions Union Européenne"
}
selected_dataset = st.sidebar.selectbox(
"Dataset à interroger:",
options=list(dataset_options.keys()),
format_func=lambda x: dataset_options[x]
)
# Mode de recherche
search_mode = st.sidebar.radio(
"Mode de recherche:",
["Recherche textuelle", "Matching structuré"]
)
# Interface principale
if search_mode == "Recherche textuelle":
st.header("Recherche textuelle simple")
search_query = st.text_input(
"Nom à rechercher:",
placeholder="Ex: Vladimir Putin, Bashar Assad, OFAC..."
)
limit = st.slider("Nombre de résultats max:", 1, 100, 20)
if st.button("🔍 Rechercher", type="primary"):
if search_query:
with st.spinner("Recherche en cours..."):
results = search_opensanctions(search_query, selected_dataset, limit)
if results and "results" in results:
st.success(f"Trouvé {len(results['results'])} résultat(s)")
# Affichage des résultats
for i, result in enumerate(results["results"]):
with st.expander(f"Résultat {i+1}: {result.get('caption', 'N/A')}", expanded=i==0):
# Informations de base
col1, col2 = st.columns(2)
with col1:
st.write(f"**ID:** {result.get('id', 'N/A')}")
st.write(f"**Type:** {result.get('schema', 'N/A')}")
st.write(f"**Score:** {result.get('score', 'N/A')}")
with col2:
datasets = result.get('datasets', [])
st.write(f"**Sources:** {', '.join(datasets)}")
countries = result.get('countries', [])
if countries:
st.write(f"**Pays:** {', '.join(countries)}")
# Propriétés détaillées
properties = result.get('properties', {})
if properties:
st.write("**Propriétés détaillées:**")
for key, values in properties.items():
if values: # Seulement si il y a des valeurs
values_str = ", ".join(str(v) for v in values)
st.write(f"- **{key}:** {values_str}")
# Données brutes en JSON
if st.checkbox(f"Voir données JSON brutes {i+1}", key=f"json_{i}"):
st.json(result)
# Statistiques globales
st.subheader("Statistiques de la recherche")
if "facets" in results:
facets = results["facets"]
col1, col2, col3 = st.columns(3)
with col1:
if "countries" in facets:
st.write("**Pays les plus fréquents:**")
for country in facets["countries"][:5]:
st.write(f"- {country['name']}: {country['count']}")
with col2:
if "datasets" in facets:
st.write("**Datasets les plus représentés:**")
for dataset in facets["datasets"][:5]:
st.write(f"- {dataset['name']}: {dataset['count']}")
with col3:
if "topics" in facets:
st.write("**Topics principaux:**")
for topic in facets["topics"][:5]:
st.write(f"- {topic['name']}: {topic['count']}")
else:
st.info("Aucun résultat trouvé")
else:
st.warning("Veuillez entrer un terme de recherche")
else: # Matching structuré
st.header("Matching structuré (précis)")
entity_type = st.selectbox(
"Type d'entité:",
["Person", "Company", "Organization", "Vessel"]
)
if entity_type == "Person":
col1, col2 = st.columns(2)
with col1:
first_name = st.text_input("Prénom:")
last_name = st.text_input("Nom de famille:")
birth_date = st.text_input("Date de naissance (YYYY ou YYYY-MM-DD):")
with col2:
nationality = st.text_input("Nationalité:")
father_name = st.text_input("Nom du père:")
mother_name = st.text_input("Nom de la mère:")
if st.button("🎯 Rechercher avec Matching", type="primary"):
# Construire la requête structurée
query_data = {"schema": "Person", "properties": {}}
if first_name:
query_data["properties"]["firstName"] = [first_name]
if last_name:
query_data["properties"]["lastName"] = [last_name]
if birth_date:
query_data["properties"]["birthDate"] = [birth_date]
if nationality:
query_data["properties"]["nationality"] = [nationality]
if father_name:
query_data["properties"]["fatherName"] = [father_name]
if mother_name:
query_data["properties"]["motherName"] = [mother_name]
if query_data["properties"]:
with st.spinner("Matching en cours..."):
results = match_opensanctions(query_data, selected_dataset)
if results and "responses" in results:
response = results["responses"]["q1"]
matches = response.get("results", [])
if matches:
st.success(f"Trouvé {len(matches)} correspondance(s)")
for i, match in enumerate(matches):
score = match.get("score", 0)
# Couleur selon le score
if score >= 0.95:
color = "🔴" # Rouge - correspondance très forte
elif score >= 0.8:
color = "🟠" # Orange - correspondance forte
elif score >= 0.6:
color = "🟡" # Jaune - correspondance modérée
else:
color = "⚪" # Blanc - correspondance faible
with st.expander(f"{color} Match {i+1}: {match.get('caption', 'N/A')} (Score: {score:.2f})", expanded=i==0):
# Afficher toutes les propriétés
properties = match.get("properties", {})
if properties:
for key, values in properties.items():
if values:
values_str = ", ".join(str(v) for v in values)
st.write(f"**{key}:** {values_str}")
# Datasets sources
datasets = match.get("datasets", [])
if datasets:
st.write(f"**Sources:** {', '.join(datasets)}")
# Données complètes
if st.checkbox(f"Voir toutes les données {i+1}", key=f"match_json_{i}"):
st.json(match)
else:
st.info("Aucune correspondance trouvée")
else:
st.error("Erreur dans la réponse de l'API")
else:
st.warning("Veuillez remplir au moins un champ")
else: # Company/Organization/Vessel
name = st.text_input("Nom de l'entité:")
jurisdiction = st.text_input("Juridiction/Pays:")
if st.button("🎯 Rechercher avec Matching", type="primary"):
query_data = {
"schema": entity_type,
"properties": {}
}
if name:
query_data["properties"]["name"] = [name]
if jurisdiction:
query_data["properties"]["jurisdiction"] = [jurisdiction]
if query_data["properties"]:
with st.spinner("Matching en cours..."):
results = match_opensanctions(query_data, selected_dataset)
if results and "responses" in results:
response = results["responses"]["q1"]
matches = response.get("results", [])
if matches:
st.success(f"Trouvé {len(matches)} correspondance(s)")
for i, match in enumerate(matches):
score = match.get("score", 0)
with st.expander(f"Match {i+1}: {match.get('caption', 'N/A')} (Score: {score:.2f})", expanded=i==0):
properties = match.get("properties", {})
if properties:
for key, values in properties.items():
if values:
values_str = ", ".join(str(v) for v in values)
st.write(f"**{key}:** {values_str}")
datasets = match.get("datasets", [])
if datasets:
st.write(f"**Sources:** {', '.join(datasets)}")
if st.checkbox(f"Voir données JSON {i+1}", key=f"comp_json_{i}"):
st.json(match)
else:
st.info("Aucune correspondance trouvée")
else:
st.warning("Veuillez remplir au moins le nom")
# Informations dans la sidebar
st.sidebar.markdown("---")
st.sidebar.header("Informations")
st.sidebar.markdown("""
**Cette application utilise:**
- OpenSanctions API officielle
- Plus de 298 base de donnée international
- Authentification ApiKey
- Endpoints /search et /match
- Base de donnée Sanction Canada
- Base de donnée Fraude économique Canadienne
**Types de recherche:**
1. **Textuelle:** Recherche simple par mots-clés
2. **Matching:** Recherche structurée avec correspondance précise
**Datasets disponibles:**
- default: Toutes les données
- sanctions: Sanctions seulement
- peps: Personnes politiquement exposées
- ca_dfatd_sema_sanctions: Canada
""")
st.sidebar.markdown(f"**Heure:** {datetime.now().strftime('%H:%M:%S')}")