import panel as pn
import os
import io
import PubmedModule
from datetime import datetime
import re
import yaml
import pandas as pd
from io import StringIO, BytesIO
from lxml import etree
import tempfile
import openpyxl
import bokeh
from bokeh.settings import settings
import seaborn as sns
import matplotlib.pyplot as plt
import plotly.express as px
fetch_download_button = pn.widgets.FileDownload(
label="Download PubMed Results",
button_type="success",
filename="pubmed_results.xlsx",
file=None,
disabled=True,
width=300
)
pn.extension(template="fast")
pn.extension(raw_css=[
"""
/* General Tabulator styling */
.tabulator {
font-family: "Segoe UI", Tahoma, Geneva, Verdana, sans-serif !important;
font-size: 14px !important;
border: 1px solid #ddd;
background-color: #fff;
border-radius: 4px;
overflow-x: auto !important; /* Enable horizontal scrolling for the table */
overflow-y: auto !important; /* Ensure vertical scrolling works */
width: 100% !important; /* Full width */
max-width: 100% !important; /* Prevent overflow */
}
/* Header styling */
.tabulator .tabulator-header {
background-color: #f4f6f6;
border-bottom: 2px solid #bbb;
font-weight: 600;
color: #333;
}
.tabulator .tabulator-col .tabulator-col-title {
white-space: normal !important;
text-align: left;
padding: 8px;
}
/* Ensure all cells wrap text by default */
.tabulator .tabulator-cell {
padding: 6px 8px;
border-right: 1px solid #eee;
white-space: normal !important;
word-break: break-word !important;
line-height: 1.5 !important;
overflow: hidden !important;
text-overflow: clip !important;
}
/* Specific styling for Abstract column */
.tabulator .tabulator-cell.wrap-cell {
white-space: normal !important;
word-break: break-word !important;
line-height: 1.5 !important;
overflow-y: visible !important;
overflow-x: hidden !important; /* Wrap text, but don’t scroll within cells */
max-height: none !important;
height: auto !important;
}
/* Ensure tableholder allows scrolling */
.tabulator-tableholder {
overflow-x: auto !important; /* Enable horizontal scrolling */
overflow-y: auto !important; /* Ensure vertical scrolling */
width: 100% !important; /* Full width */
max-width: 100% !important; /* Prevent overflow */
}
.tabulator-table {
width: 100% !important; /* Allow table to adjust width based on content */
max-width: 100% !important; /* Prevent overflow */
}
/* Row styling */
.tabulator .tabulator-row:nth-child(even) {
background-color: #f9f9f9;
}
.tabulator .tabulator-row:hover {
background-color: #e6f3fa;
}
/* Custom tabs (unchanged) */
.bk-tabs-header {
background-color: #f4f6f6;
border-bottom: 2px solid #bbb;
padding: 8px 10px;
font-family: "Segoe UI", sans-serif;
font-weight: 600;
font-size: 16px;
}
.bk-tab {
margin-right: 8px;
padding: 6px 14px;
background-color: #d6eaf8;
color: #1b4f72;
border-radius: 6px 6px 0 0;
transition: all 0.3s ease-in-out;
border: 1px solid #aed6f1;
}
.bk-tab:hover {
background-color: #aed6f1;
cursor: pointer;
color: #154360;
}
.bk-tab.bk-active {
background-color: #2874a6 !important;
color: white !important;
border-bottom: 2px solid white;
}
.bk-tabs {
border: none;
}
/* Widget styling */
.pn-widget {
margin-bottom: 10px;
width: 100%; !important;
max-width: 100%; !important;
}
/* Button styling */
.pn-button {
font-family: "Segoe UI", sans-serif;
font-weight: 500;
}
/* Responsive adjustments for smaller screens */
@media screen and (max-width: 768px) {
.tabulator {
font-size: 12px !important; /* Smaller font for mobile */
}
.tabulator .tabulator-cell {
padding: 4px 6px; /* Reduce padding for smaller screens */
}
.pn-widget {
width: 100% !important;
}
}
"""
])
pn.extension()
pn.extension('plotly', 'tabulator')
# Set server configurations for large files
settings.max_file_size = "2GB" # Allow up to 100 MB uploads
settings.websocket_max_message_size = 200 * 1024 * 1024 # 20 MB WebSocket buffer
# settings.websocket_max_message_size = 100_000_000 # 100 MB
settings.websocket_ping_timeout = 30000 # 60 seconds timeout
# Load queries from YAML file
with open("search_queries.yaml", "r") as f:
full_yaml = yaml.safe_load(f)
search_query = full_yaml["search_queries"]
full_df = None
filtered_searched_df = pd.DataFrame()
cached_file_value = None
cached_file_name = None
# QUERY WIDGETS
all_column = ["PubMed_ID", "Title", "Abstract", "Authors", "Journal",
"DOI", "Publication_Date", "Keywords", "Article_Types"]
default_column_to_show = ["Index", "PubMed_ID", "Title"]
mode_selector = pn.widgets.RadioBoxGroup(name="Query Mode", options=["Predefined", "Custom"])
query_selector = pn.widgets.Select(name="Predefined Query", options=list(search_query.keys()))
custom_query_input = pn.widgets.TextAreaInput(name="Custom PubMed Query", placeholder="Paste your PubMed query here...", height=200, width=800)
email_input = pn.widgets.TextInput(name="NCBI Email", placeholder="e.g., your.name@domain.com")
api_key_input = pn.widgets.TextInput(name="NCBI API Key (optional)", placeholder="Paste your API key...")
output_file_input = pn.widgets.TextInput(name="Output File Name", placeholder="filename.csv / .txt / .xlsx")
extraction_mode = pn.widgets.RadioBoxGroup(name="What to Extract", options=["Only PubMed IDs", "Abstract + Metadata"])
abstract_metadata = pn.pane.Markdown("## Abstract with other metadata", visible=False)
select_column = pn.widgets.MultiChoice(name="",
placeholder="Select additional columns to show",
options=[col for col in all_column if col not in default_column_to_show],
value=[], # start with only default column
width=600,
visible=False)
metadata_container = pn.Column(abstract_metadata, select_column, visible=False)
@pn.depends(extraction_mode.param.value, watch=True)
def toggle_metadata_widgets(mode):
show = mode == "Abstract + Metadata"
metadata_container.visible = show
abstract_metadata.visible = show
select_column.visible = show
toggle_metadata_widgets(extraction_mode.value)
run_button = pn.widgets.Button(name="Run Query", button_type="primary")
log_output = pn.widgets.TextAreaInput(name="Query Log", height=300, width=800, value="", disabled=True, visible=False)
extracted_data_table = pn.widgets.Tabulator(
value=None,
pagination='remote',
page_size=20,
layout='fit_columns',
visible=False,
disabled=True,
sizing_mode='stretch_both',
show_index=False,
configuration={
"columnDefaults": {
"resizable": True,
"headerSort": True,
"formatter": "textarea",
"cssClass": "wrap-cell",
"vertAlign": "top",
"editor": False,
},
"editable": False,
"autoColumns": False,
"scrollable": True
}
)
extracted_data_table.columns = [{"title": col, "field": col} for col in default_column_to_show]
def abstract_column(event=None):
global full_df
if full_df is None or full_df.empty:
extracted_data_table.visible = False
return
selected = select_column.value
columns = default_column_to_show + selected
tabulator_columns = []
for col in columns:
if col == "Abstract":
tabulator_columns.append({
"title": "Abstract",
"field": "Abstract",
"formatter": "textarea", # Use textarea for multiline text
"cssClass": "wrap-cell",
"width": "40%",
"minWidth": 300,
"editor": False,
"resizable": True,
"vertAlign": "top" # Align text to the top for better readability
})
elif col == "Title":
tabulator_columns.append({
"title": "Title",
"field": "Title",
"formatter": "textarea",
"cssClass": "wrap-cell",
"width": "30%",
"minWidth": 300,
"editor": False,
"resizable": True,
"vertAlign": "top"
})
else:
tabulator_columns.append({
"title": col,
"field": col,
"width": "10%",
"minWidth": 100,
"editor": False,
"resizable": True
})
extracted_data_table.configuration = {"columns": tabulator_columns,
"editable": False,}
extracted_data_table.value = full_df[columns]
# Also filter DataFrame columns for display
if extracted_data_table.value is not None:
# This ensures only selected columns are displayed, not just hidden
df = extracted_data_table.value
extracted_data_table.value = df[columns]
select_column.param.watch(abstract_column, "value")
# Toggle visibility of query input
@pn.depends(mode_selector.param.value, watch=True)
def update_visibility(mode):
query_selector.visible = mode == "Predefined"
custom_query_input.visible = mode == "Custom"
update_visibility(mode_selector.value)
def run_fetch(event):
mode = mode_selector.value
query = search_query[query_selector.value] if mode == "Predefined" else custom_query_input.value.strip()
email = email_input.value.strip()
api_key = api_key_input.value.strip() or None
user_filename = output_file_input.value.strip()
selected_mode = extraction_mode.value
log_output.value = ""
if not email:
status_panel.object = "Email is required by NCBI."
return
if not query:
status_panel.object = "Query is empty. Please select or write a query."
return
if not selected_mode:
status_panel.object = "Please select what you want to extract (IDs or Abstracts)."
return
safe_query = re.sub(r'[^a-zA-Z0-9]+', '_', query)[:50]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if not user_filename:
user_filename = f"pubmed_ids_{safe_query}_{timestamp}.txt" if selected_mode == "Only PubMed IDs" else f"abstract_data_{safe_query}_{timestamp}.xlsx"
base, ext = os.path.splitext(user_filename)
ext = ext.lower().strip()
if selected_mode == "Only PubMed IDs" and ext not in [".txt", ".csv"]:
ext = ".txt"
elif selected_mode == "Abstract + Metadata" and ext not in [".xlsx", ".csv"]:
ext = ".xlsx"
user_filename = base + ext
try:
if selected_mode == "Only PubMed IDs":
pubmed_ids = PubmedModule.fetch_pubmed_id(
query=query,
output_file=None,
email=email,
api_key=api_key,
log_widget=log_output,
return_ids=True
)
df_ids = pd.DataFrame({"PubMed_ID": pubmed_ids})
if ext == ".csv":
string_buf = StringIO()
df_ids.to_csv(string_buf, index=False)
fetch_download_button.callback = lambda: StringIO(string_buf.getvalue())
elif ext == ".txt":
string_buf = StringIO("\n".join(pubmed_ids))
fetch_download_button.callback = lambda: StringIO(string_buf.getvalue())
fetch_download_button.filename = user_filename
fetch_download_button.embed = True
fetch_download_button.disabled = False
log_output.visible = True
extracted_data_table.visible = False
status_panel.object = f"PubMed IDs fetched and ready to download."
else:
pubmed_ids = PubmedModule.fetch_pubmed_id(
query=query,
output_file=None,
email=email,
api_key=api_key,
log_widget=log_output,
return_ids=True
)
df_result = PubmedModule.extract_pubmed_abstract(
pubmed_id_list=pubmed_ids,
output_file=None,
log_widget=log_output
)
if isinstance(df_result, pd.DataFrame) and not df_result.empty:
df_result = df_result.sort_values(by='Publication_Date', ascending=False).reset_index(drop=True)
df_result["Index"] = df_result.index + 1
full_df = df_result
select_column.value = []
abstract_column()
extracted_data_table.value = full_df[default_column_to_show]
extracted_data_table.visible = True
if ext == ".xlsx":
buffer = BytesIO()
df_result.to_excel(buffer, index=False)
buffer.seek(0)
fetch_download_button.callback = lambda: BytesIO(buffer.getvalue())
elif ext == ".csv":
string_buf = StringIO()
df_result.to_csv(string_buf, index=False)
fetch_download_button.callback = lambda: StringIO(string_buf.getvalue())
fetch_download_button.filename = user_filename
fetch_download_button.embed = True
fetch_download_button.disabled = False
status_panel.object = f"Abstract metadata is ready. Download as `{user_filename}`"
else:
fetch_download_button.disabled = True
status_panel.object = "No data available for download."
except Exception as e:
extracted_data_table.visible = False
status_panel.object = f"Error: {str(e)}"
run_button.on_click(run_fetch)
# Organize layout
status_panel = pn.pane.Markdown("## Output will appear here")
input_widgets = pn.Column(
pn.Row(mode_selector, query_selector, custom_query_input),
pn.Row(email_input, api_key_input),
pn.Row(output_file_input, extraction_mode),
run_button,
metadata_container,
select_column,
sizing_mode='stretch_width'
)
main_layout = pn.Column(
pn.pane.Markdown("# PubMed Query Tool"),
input_widgets,
status_panel,
log_output,
extracted_data_table,
fetch_download_button,
sizing_mode='stretch_both'
)
# VIEW SAVED DATA WIDGETS
instructions = pn.pane.Markdown("""
# View PubMed Data
1. Click the **Choose File** button to select a file (CSV, Excel, or XML).
2. Wait for the file to load (a spinner will appear).
3. Choose columns to display using the dropdown.
4. View your data in the table below.
""", styles={'font-size': '16px', 'margin-bottom': '20px'})
uploaded_file_selector = pn.widgets.Select(name="Uploaded Files", options=[])
def refresh_uploaded_files(event=None):
files = os.listdir("saved_data")
uploaded_file_selector.options = [f for f in files if f.endswith(('.csv', '.xlsx', '.xml'))]
refresh_uploaded_files_button = pn.widgets.Button(name="Refresh File List", button_type="primary")
refresh_uploaded_files_button.on_click(refresh_uploaded_files)
upload_form = pn.pane.HTML("""
<div>
<form id='upload-form' enctype='multipart/form-data'>
<label><b>Upload File (.csv, .xlsx, .xml)</b></label><br>
<input type='file' id='file' name='file'><br><br>
<button type='submit'>Upload</button>
</form>
<p id='upload-status'></p>
</div>
<script type="text/javascript">
document.addEventListener("DOMContentLoaded", () => {
const form = document.getElementById('upload-form');
form.onsubmit = async function(event) {
event.preventDefault();
const fileInput = document.getElementById('file');
if (!fileInput.files.length) {
document.getElementById('upload-status').innerText = 'Please select a file.';
return;
}
const formData = new FormData();
formData.append('file', fileInput.files[0]);
try {
const response = await fetch('http://localhost:8001/upload', {
method: 'POST',
body: formData
});
const result = await response.json();
document.getElementById('upload-status').innerText =
'Upload: ' + result.status + ' (' + result.filename + ')';
} catch (err) {
document.getElementById('upload-status').innerText =
'Upload failed: ' + err.message;
}
};
});
</script>
""", width=400, height=200, sizing_mode='fixed')
reload_file_button = pn.widgets.Button(name="Reload File", button_type="warning", width=120)
status = pn.pane.Markdown("**Status**: Ready to upload a file.", styles={'font-size': '14px'})
loading = pn.indicators.LoadingSpinner(value=False, width=30, height=30, visible=False)
column_selector = pn.widgets.MultiChoice(
name="Select Columns to Display",
options=all_column,
value=default_column_to_show,
width=400,
visible=False,
styles={'font-size': '14px'}
)
search_label = pn.pane.Markdown("**Search Abstract by Keyword**", styles={'font-size': '16px', 'margin-bottom': '4px'})
search_keyword_input = pn.widgets.TextInput(name="", placeholder="e.g., Lung, EGFR, inflammation", width=300)
search_button = pn.widgets.Button(name="Search", button_type="primary", width=100, height=30)
search_row = pn.Row(search_keyword_input,pn.Column(search_button, margin = (0, 0, 0, 0)), align="start", width=500)
search_container = pn.Column(search_label,search_row, sizing_mode='stretch_width')
download_filtered_button = pn.widgets.FileDownload(name="Download Filtered", button_type="success", disabled=True)
filtered_searched_df = pd.DataFrame()
data_table = pn.widgets.Tabulator(
value=None,
pagination='remote',
page_size=20,
layout='fit_columns',
visible=False,
sizing_mode='stretch_both',
show_index=False,
disabled=True,
configuration={
"columnDefaults": {
"resizable": True,
"headerSort": True,
"formatter": "textarea",
"vertAlign": "top",
"editor": False,
},
"editable": False,
"autoColumns": False,
"scrollable": True,
"initialSort": [{"column": "Index", "dir": "asc"}]
}
)
under_construction_msg = pn.pane.Markdown(
"<b style='color:red; font-size:24px;'>🚧 Under Construction!!!</b>",
width=600
)
view_status = pn.pane.Markdown("## Upload a file to view abstracts and metadata")
view_data_table = pn.widgets.Tabulator(
value=None,
pagination='remote',
page_size=20,
layout='fit_columns',
visible=False,
disabled=True,
sizing_mode='stretch_both',
show_index=False,
configuration={
"columnDefaults": {
"resizable": True,
"headerSort": True,
"formatter": "textarea",
"cssClass": "wrap-cell",
"vertAlign": "top",
"editor": False,
},
"editable": False,
"autoColumns": False,
"scrollable": True
}
)
def load_file(event):
global full_df, filtered_searched_df, cached_file_value, cached_file_name
selected = uploaded_file_selector.value
if not selected:
status.object = "**Status**: No file selected."
return
file_path = os.path.join("saved_data", selected)
status.object = "**Status**: Uploading and processing file..."
loading.visible = True
loading.value = True
try:
ext = os.path.splitext(file_path)[1].lower()
# Read file from disk using appropriate method
if ext == '.csv':
df_chunks = pd.read_csv(file_path, chunksize=5000)
df = pd.concat(df_chunks, ignore_index=True)
elif ext == '.xlsx':
df = pd.read_excel(file_path)
elif ext == '.xml':
records = []
context = etree.iterparse(file_path, events=("end",), tag="Record") # <--- CHANGED!
for _, elem in context:
try:
record = {
'PubMed_ID': elem.findtext('PubMed_ID'),
'Title': elem.findtext('Title'),
'Abstract': elem.findtext('Abstract'),
'Keywords': elem.findtext('Keywords'),
'Journal': elem.findtext('Journal'),
'Article_Types': elem.findtext('Article_Types'),
'DOI': elem.findtext('DOI'),
'Publication_Date': elem.findtext('Publication_Date'),
'Authors': elem.findtext('Authors')
}
records.append(record)
except Exception as e:
print(f"Error reading record: {e}")
finally:
elem.clear()
df = pd.DataFrame(records)
else:
status.object = "**Status**: Unsupported file format. Use CSV, Excel, or XML."
loading.visible = False
return
# Clean and normalize
if df.empty:
status.object = "**Status**: File is empty."
loading.visible = False
return
df.columns = [col.strip() for col in df.columns]
df.reset_index(drop=True, inplace=True)
df["Index"] = df.index + 1
if 'PubMed_ID' in df.columns:
df['PubMed_ID'] = df['PubMed_ID'].astype(str).str.strip()
if 'Publication_Date' in df.columns:
df = df.sort_values(by='Publication_Date', ascending=False).reset_index(drop=True)
full_df = df
filtered_searched_df = pd.DataFrame()
available_columns = [col for col in df.columns if col != 'Index']
column_selector.options = available_columns
column_selector.value = [col for col in default_column_to_show if col in available_columns]
column_selector.visible = True
search_keyword_input.value = "" # Clear the search input
data_table.value = None
data_table.page = 1
data_table.sorters = []
data_table.filters = []
update_table(None)
data_table.visible = True
status.object = f"**Status**: Successfully loaded {selected} with {len(df)} records."
except Exception as e:
status.object = f"**Status**: Error: {e}"
data_table.visible = False
column_selector.visible = False
finally:
loading.visible = False
loading.value = False
def update_table(event):
global full_df
if full_df is None:
return
selected = column_selector.value
columns = ['Index'] + selected
source_df = filtered_searched_df if not filtered_searched_df.empty and search_keyword_input.value.strip() else full_df
tabulator_columns = []
for col in columns:
col_config = {
"title": col,
"field": col,
"resizable": True,
"editor": False,
"headerSort": True,
}
if col == "Abstract":
col_config.update({
"title": "Abstract",
"field": "Abstract",
"formatter": "textarea",
"width": "40%",
"minWidth": 300,
"resizable": True,
"vertAlign": "top",
"editor": False,
})
elif col == "Title":
col_config.update({
"title": "Title",
"field": "Title",
"formatter": "textarea",
"width": "30%",
"minWidth": 300,
"resizable": True,
"vertAlign": "top",
"editor": False,
})
else:
col_config.update({
"width": "10%",
"minWidth": 100,
"editor": False
})
tabulator_columns.append(col_config)
data_table.configuration = {
"columns": tabulator_columns,
"editable": False,
"scrollable": True,
}
existing_cols = [col for col in columns if col in source_df.columns]
data_table.value = source_df[existing_cols]
column_selector.param.watch(update_table, 'value')
def filter_uploaded_data(event):
global full_df, filtered_searched_df
# Check if data is loaded and has an Abstract column
if full_df is None or 'Abstract' not in full_df.columns:
status.object = "**Status**: No valid data or 'Abstract' column not found."
return
# Get the user's search input
search_keyword = search_keyword_input.value.strip().lower()
# If query is empty, return full dataset
if not search_keyword:
filtered_searched_df = pd.DataFrame()
status.object = "**Status**: No keyword entered. Showing all data."
data_table.page = 1
data_table.sorters = []
data_table.filters = []
update_table(None)
data_table.value = data_table.value.copy()
return
else:
# Prepare the Abstract column (lowercased and null-safe)
abstract_col = full_df['Abstract'].str.lower().fillna("")
# Initialize the mask (boolean list) for filtering
mask = []
# ---- CASE 1: "AND" logic (e.g., EGFR and ALK3) ----
if " and " in search_keyword:
terms = [term.strip() for term in search_keyword.split(" and ")]
patterns = []
for term in terms:
compiled_pattern = re.compile(r'\b' + re.escape(term) + r'\b')
patterns.append(compiled_pattern)
for text in abstract_col:
match_all = True
for pattern in patterns:
if not pattern.search(text):
match_all = False
break
mask.append(match_all)
# ---- CASE 2: "OR" logic (e.g., EGFR or ALK3) ----
elif " or " in search_keyword:
terms = [term.strip() for term in search_keyword.split(" or ")]
patterns = []
for term in terms:
compiled_pattern = re.compile(r'\b' + re.escape(term) + r'\b')
patterns.append(compiled_pattern)
for text in abstract_col:
match_any = False
for pattern in patterns:
if pattern.search(text):
match_any = True
break
mask.append(match_any)
# ---- CASE 3: Single keyword or phrase (exact match only) ----
else:
pattern = re.compile(r'\b' + re.escape(search_keyword) + r'\b')
for text in abstract_col:
if pattern.search(text):
mask.append(True)
else:
mask.append(False)
# Apply the boolean mask to filter the DataFrame
filtered_searched_df = full_df[mask].copy()
# Update the status
status.object = f"**Status**: Found {len(filtered_searched_df)} abstracts matching '{search_keyword}'."
# Display filtered data
update_table(None)
# Enable download button
download_filtered_data_button()
search_button.on_click(filter_uploaded_data)
search_keyword_input.param.watch(filter_uploaded_data, 'value')
def download_filtered_data_button():
if filtered_searched_df.empty:
download_filtered_button.disabled = True
return
def _download_callback():
buffer = BytesIO()
filtered_searched_df.to_excel(buffer, index=False)
buffer.seek(0)
return buffer
download_filtered_button.callback = _download_callback
download_filtered_button.filename = "filtered_pubmed_results.xlsx"
download_filtered_button.disabled = False
# Event watchers
uploaded_file_selector.param.watch(load_file, 'value')
reload_file_button.on_click(load_file)
# FILE COMPARISON WIDGETS
file_uploader = pn.widgets.FileInput(accept=".csv,.txt,.xlsx", multiple=True, name="Upload PubMed ID Files")
compare_button = pn.widgets.Button(name="Compare Files", button_type="primary")
# compare_status = pn.pane.Markdown("## Comparison results will appear here", width=800, height=500)
compare_status = pn.pane.HTML("<h3>Comparison results will appear here</h3>", sizing_mode='stretch_width')
summary_table = pn.widgets.DataFrame(name="Summary Table", width=600, show_index=False)
total_ids_display = pn.pane.Markdown("**Total IDs**: 0")
unique_ids_display = pn.pane.Markdown("**Unique IDs**: 0")
top10_ids_display = pn.widgets.DataFrame(name="Top 10 Frequent IDs", width=600, show_index=False)
def save_uploaded_files(uploaded_files):
file_paths = []
upload_dir = "uploaded_files"
os.makedirs(upload_dir, exist_ok=True)
for file_data, filename in uploaded_files:
file_path = os.path.join(upload_dir, filename)
with open(file_path, "wb") as f:
f.write(file_data)
file_paths.append(file_path)
return file_paths
def run_comparison(event):
if not file_uploader.value or not file_uploader.filename:
compare_status.object = "<b style='color:red;'>Please upload at least one file to compare.</b>"
return
summary_table.value = None
top10_ids_display.value = None
total_ids_display.object = "**Total IDs**: 0"
unique_ids_display.object = "**Unique IDs**: 0"
uploaded_files = [(file_data, filename) for file_data, filename in zip(file_uploader.value, file_uploader.filename)]
file_paths = save_uploaded_files(uploaded_files)
try:
import sys
old_stdout = sys.stdout
sys.stdout = StringIO()
result = PubmedModule.compare_pubmed_id_files(file_paths)
logs = sys.stdout.getvalue()
sys.stdout = old_stdout
if result is None:
compare_status.object = "<b style='color:red;'>Comparison failed. No valid data returned.</b>"
return
summary_df, total_ids, unique_ids, top10_df = result
if summary_df.empty:
summary_df = pd.DataFrame({"Query_Name": ["None"], "No of ID": [0]})
if top10_df.empty:
top10_df = pd.DataFrame({"PubMed Id": ["None"], "Frequency_Count": [0]})
summary_table.value = summary_df
top10_ids_display.value = top10_df
total_ids_display.object = f"<b>Total IDs:</b> {total_ids}"
unique_ids_display.object = f"<b>Unique IDs:</b> {unique_ids}"
# Parse clean log components
loaded_lines = [f"<li>{line}</li>" for line in logs.splitlines() if "Loaded" in line]
combined_line = next((line for line in logs.splitlines() if "Combined DataFrame" in line), "")
unique_line = next((line for line in logs.splitlines() if "Total unique PubMed IDs" in line), "")
least_line = next((line for line in logs.splitlines() if "Least frequent IDs" in line), "")
non_least_line = next((line for line in logs.splitlines() if "Non-least frequent IDs" in line), "")
freq_summary_line = next((line for line in logs.splitlines() if "→ Files:" in line), "")
# HTML template
html_log = f"""
<div style=" line-height:1.6; font-size:15px;">
<h3 style="color:#2E86C1;">Comparison Completed</h3>
<ul style="margin-top:0; padding-left:20px;">
{''.join(loaded_lines)}
</ul>
<p><b>Combined:</b> {combined_line}</p>
<p><b>Unique IDs:</b> {unique_line}</p>
<h4 style="margin-bottom:5px;">Saved Files:</h4>
<ul style="padding-left:20px;">
<li><code>unique_pubmed_ids_with_queries.xlsx</code></li>
<li><code>pubmed_id_frequencies.xlsx</code></li>
<li><code>not_least_frequent_pubmed_ids.xlsx</code></li>
<li><code>least_frequent_pubmed_ids.xlsx</code></li>
</ul>
<h4 style="margin-bottom:5px;">ID Frequency Summary:</h4>
<ul style="padding-left:20px;">
{f'<li>{least_line}</li>' if least_line else ''}
{f'<li>{non_least_line}</li>' if non_least_line else ''}
{f'<li>{freq_summary_line}</li>' if freq_summary_line else ''}
</ul>
</div>
"""
compare_status.object = html_log.strip()
except Exception as e:
sys.stdout = old_stdout
compare_status.object = f"<b style='color:red;'>Error during comparison:</b><br><code>{str(e)}</code>"
compare_button.on_click(run_comparison)
# Co-occurrence widgets
cooccurrence_file_uploader = pn.widgets.FileInput(accept=".csv,.txt,.xlsx", multiple=True, name="Upload PubMed ID Files for Co-Occurrence")
top_n_selector = pn.widgets.IntSlider(name="Top N Queries", start=2, end=50, step=1, value=10, width=300)
cooccurrence_button = pn.widgets.Button(name="Generate Co-occurrence Heatmaps", button_type="primary")
cooccurrence_dropdown = pn.widgets.Select(
name="Select Heatmap View",
options=[
"Full Heatmap",
"Non-Zero Heatmap",
">50% Overlap (With Zeros)",
">50% Overlap (No Zeros)",
"<50% Overlap (With Zeros)",
"<50% Overlap (No Zeros)"
],
value="Full Heatmap",
width=300
)
cooccurrence_image = pn.pane.PNG(None, width=1600, height=1200, visible=False)
cooccurrence_status = pn.pane.Markdown("## Upload files to view co-occurrence heatmaps", width=800)
# Paths to all heatmaps
cooccurrence_ready = False
cooccurrence_full_path = None
cooccurrence_nonzero_path = None
cooccurrence_gt50_path = None
cooccurrence_gt50_masked_path = None
cooccurrence_lt50_path = None
cooccurrence_lt50_masked_path = None
# Callback to generate both heatmaps
def run_cooccurrence(event):
global cooccurrence_ready, cooccurrence_full_path, cooccurrence_nonzero_path, cooccurrence_gt50_path, cooccurrence_gt50_masked_path, cooccurrence_lt50_path, cooccurrence_lt50_masked_path
cooccurrence_ready = False
if not cooccurrence_file_uploader.value or not cooccurrence_file_uploader.filename:
cooccurrence_status.object = "<b style='color:red;'>Please upload at least one file to generate heatmap.</b>"
cooccurrence_image.visible = False
return
uploaded_files = [
(file_data, filename)
for file_data, filename in zip(cooccurrence_file_uploader.value, cooccurrence_file_uploader.filename)
]
file_paths = []
upload_dir = "uploaded_files"
os.makedirs(upload_dir, exist_ok=True)
for file_data, filename in uploaded_files:
file_path = os.path.join(upload_dir, filename)
with open(file_path, "wb") as f:
f.write(file_data)
file_paths.append(file_path)
try:
(query_names,matrix,full_path,nonzero_path,_,gt50_path,gt50_masked_path,_,lt50_path,lt50_masked_path) = PubmedModule.generate_cooccurrence_heatmap(file_paths)
# Save all paths globally
cooccurrence_full_path = full_path
cooccurrence_nonzero_path = nonzero_path
cooccurrence_gt50_path = gt50_path
cooccurrence_gt50_masked_path = gt50_masked_path
cooccurrence_lt50_path = lt50_path
cooccurrence_lt50_masked_path = lt50_masked_path
# Set display based on dropdown
# dropdown_heatmap(None)
if cooccurrence_full_path:
cooccurrence_image.object = cooccurrence_full_path
cooccurrence_image.visible = True
else:
cooccurrence_image.visible = False
cooccurrence_image.visible = True
cooccurrence_status.object = f"All heatmaps rendered and saved for <b>{len(query_names)} queries</b>."
cooccurrence_ready = True
except Exception as e:
cooccurrence_status.object = f"<b style='color:red;'>Error during heatmap generation:</b><br><code>{str(e)}</code>"
cooccurrence_image.visible = False
# Callback to switch display between full and non-zero heatmaps
def dropdown_heatmap(event):
if not cooccurrence_ready:
return
selected = cooccurrence_dropdown.value
if selected == "Full Heatmap" and cooccurrence_full_path:
cooccurrence_image.object = cooccurrence_full_path
elif selected == "Non-Zero Heatmap" and cooccurrence_nonzero_path:
cooccurrence_image.object = cooccurrence_nonzero_path
elif selected == ">50% Overlap (With Zeros)" and cooccurrence_gt50_path:
cooccurrence_image.object = cooccurrence_gt50_path
elif selected == ">50% Overlap (No Zeros)" and cooccurrence_gt50_masked_path:
cooccurrence_image.object = cooccurrence_gt50_masked_path
elif selected == "<50% Overlap (With Zeros)" and cooccurrence_lt50_path:
cooccurrence_image.object = cooccurrence_lt50_path
elif selected == "<50% Overlap (No Zeros)" and cooccurrence_lt50_masked_path:
cooccurrence_image.object = cooccurrence_lt50_masked_path
cooccurrence_button.on_click(run_cooccurrence)
cooccurrence_dropdown.param.watch(dropdown_heatmap, 'value')
# Top N Co-occurrence widgets
topn_image = pn.pane.PNG(None, width=1400, height=1000, visible=False)
topn_status = pn.pane.Markdown("## Upload files and select N to generate heatmap", width=800)
topn_button = pn.widgets.Button(name="Generate Top N Co-occurrence Heatmap", button_type="primary")
def run_topn_heatmap(event):
if not cooccurrence_file_uploader.value or not cooccurrence_file_uploader.filename:
topn_status.object = "<b style='color:red;'>Please upload at least one file.</b>"
topn_image.visible = False
return
uploaded_files = [
(file_data, filename)
for file_data, filename in zip(cooccurrence_file_uploader.value, cooccurrence_file_uploader.filename)
]
file_paths = save_uploaded_files(uploaded_files)
try:
top_n = top_n_selector.value
top_queries, matrix, fig, image_path = PubmedModule.generate_top_n_cooccurrence_heatmap(file_paths, top_n=top_n)
with open(image_path, "rb") as f:
topn_image.object = f.read()
topn_image.visible = True
topn_status.object = f"Top {top_n} Co-occurrence heatmap generated across <b>{len(top_queries)}</b> queries."
except Exception as e:
topn_status.object = f"<b style='color:red;'>Error:</b><br><code>{str(e)}</code>"
topn_image.visible = False
topn_button.on_click(run_topn_heatmap)
topn_bar_button = pn.widgets.Button(name="Generate Top N Distribution Barplot", button_type="primary")
topn_bar_status = pn.pane.Markdown("## Upload files to generate interactive bar plot", width=800)
topn_bar_plot = pn.pane.Plotly(height=800, width=1100, visible=False)
def run_topn_barplot(event):
if not cooccurrence_file_uploader.value or not cooccurrence_file_uploader.filename:
topn_bar_status.object = "<b style='color:red;'>Please upload at least one file.</b>"
topn_bar_plot.visible = False
return
uploaded_files = [
(file_data, filename)
for file_data, filename in zip(cooccurrence_file_uploader.value, cooccurrence_file_uploader.filename)
]
file_paths = save_uploaded_files(uploaded_files)
try:
top_n = top_n_selector.value
fig, _ = PubmedModule.generate_top_n_pubmed_distribution_barplot(file_paths, top_n=top_n)
# Display inline using to_html
topn_bar_plot.object = fig
topn_bar_plot.visible = True
topn_bar_status.object = f"Interactive bar chart generated for top {top_n} queries."
except Exception as e:
topn_bar_status.object = f"<b style='color:red;'>Error:</b><br><code>{str(e)}</code>"
topn_bar_plot.visible = False
topn_bar_button.on_click(run_topn_barplot)
topn_pie_button = pn.widgets.Button(name="Generate Top N Pie Chart", button_type="primary")
topn_pie_status = pn.pane.Markdown("## Upload files to generate proportion chart", width=800)
topn_pie_plot = pn.pane.Plotly(height=600, width=800, visible=False)
def run_topn_pie_chart(event):
if not cooccurrence_file_uploader.value or not cooccurrence_file_uploader.filename:
topn_pie_status.object = "<b style='color:red;'>Please upload at least one file.</b>"
topn_pie_plot.visible = False
return
uploaded_files = [
(file_data, filename)
for file_data, filename in zip(cooccurrence_file_uploader.value, cooccurrence_file_uploader.filename)
]
file_paths = save_uploaded_files(uploaded_files)
try:
top_n = top_n_selector.value
fig, pie_df = PubmedModule.generate_top_n_pie_chart(file_paths, top_n=top_n)
topn_pie_plot.object = fig
topn_pie_plot.visible = True
topn_pie_status.object = f"Pie chart generated showing distribution for top {top_n} queries."
except Exception as e:
topn_pie_status.object = f"<b style='color:red;'>Error:</b><br><code>{str(e)}</code>"
topn_pie_plot.visible = False
topn_pie_button.on_click(run_topn_pie_chart)
percent_overlap_button = pn.widgets.Button(name="Generate % Overlap Heatmaps", button_type="primary")
percent_overlap_status = pn.pane.Markdown("## Upload files to compute % overlap matrix", width=800)
percent_overlap_image = pn.pane.PNG(width=1600, height=1200, visible=False)
percent_overlap_mode = pn.widgets.Select(name="Display Mode",options=["Full Heatmap", ">50% Overlap", "<50% Overlap"],value="Full Heatmap")
mode_mapping = {
"Full Heatmap": "full",
">50% Overlap": ">50",
"<50% Overlap": "<50"
}
percent_overlap_paths = {}
def run_percent_overlap(event):
global percent_overlap_paths
if not cooccurrence_file_uploader.value or not cooccurrence_file_uploader.filename:
percent_overlap_status.object = "<b style='color:red;'>Please upload files to generate heatmaps.</b>"
percent_overlap_image.visible = False
return
uploaded_files = [
(file_data, filename)
for file_data, filename in zip(cooccurrence_file_uploader.value, cooccurrence_file_uploader.filename)
]
file_paths = save_uploaded_files(uploaded_files)
try:
percent_overlap_paths = PubmedModule.generate_percent_overlap_heatmaps(file_paths)
selected_mode = percent_overlap_mode.value
percent_overlap_image.object = percent_overlap_paths.get(mode_mapping[selected_mode])
percent_overlap_image.visible = True
percent_overlap_status.object = f"Heatmaps generated. Currently showing: <b>{selected_mode}</b>"
except Exception as e:
percent_overlap_status.object = f"<b style='color:red;'>Error:</b><br><code>{str(e)}</code>"
percent_overlap_image.visible = False
def update_percent_heatmap(event):
mode = mode_mapping[percent_overlap_mode.value]
path = percent_overlap_paths.get(mode)
if path:
percent_overlap_image.object = path
percent_overlap_status.object = f"Currently showing: <b>{percent_overlap_mode.value}</b>"
percent_overlap_image.visible = True
percent_overlap_button.on_click(run_percent_overlap)
percent_overlap_mode.param.watch(update_percent_heatmap, 'value')
mapping_button = pn.widgets.Button(name="Generate PubMed→Query Mapping", button_type="primary")
mapping_status = pn.pane.Markdown("## Upload files to see PubMed to Query mapping", width=800)
mapping_table = pn.widgets.Tabulator(
pagination="remote",
page_size=25,
layout="fit_columns",
sizing_mode="stretch_width",
visible=False,
disabled=True,
show_index=False,
configuration={
"columns": [
{
"title": "Index",
"field": "Index",
"hozAlign": "left",
"headerAlign": "center",
"formatter": "plaintext",
"resizable": True,
"editor": False
},
{
"title": "PubMed_ID",
"field": "PubMed_ID",
"hozAlign": "left",
"headerAlign": "center",
"formatter": "plaintext",
"resizable": True,
"editor": False
},
{
"title": "Frequency",
"field": "Frequency",
"hozAlign": "left",
"headerAlign": "center",
"formatter": "plaintext",
"resizable": True,
"editor": False
},
{
"title": "Queries",
"field": "Queries",
"formatter": "html",
"hozAlign": "left",
"cssClass": "scrollable-cell",
"resizable": True,
"headerSort": True,
"headerTooltip": False,
"headerVertical": False,
"minWidth": 400,
"editor": False,
"formatterParams": {
"htmlElement": "div",
"style": "overflow-x: auto; white-space: nowrap; display: block;"
}
}
],
"editable": False,
"autoResize": True,
"resizableColumns": True,
"headerSortTristate": True,
}
)
def run_pubmed_to_query_mapping(event):
if not cooccurrence_file_uploader.value or not cooccurrence_file_uploader.filename:
mapping_status.object = "<b style='color:red;'>Please upload files first.</b>"
mapping_table.visible = False
return
uploaded_files = [
(file_data, filename)
for file_data, filename in zip(cooccurrence_file_uploader.value, cooccurrence_file_uploader.filename)
]
file_paths = save_uploaded_files(uploaded_files)
try:
df, excel_path = PubmedModule.generate_pubmed_to_queries_mapping(file_paths)
if "index" in df.columns:
df.drop(columns=["index"], inplace=True)
df["Queries"] = df["Queries"].apply(lambda q: f"<div class='scrollable-cell'>{q}</div>")
df.reset_index(drop=True, inplace=True) # Drops the default index
df["Index"] = df.index + 1 # Create a new index starting from 1
mapping_table.value = df[["Index", "PubMed_ID", "Queries", "Frequency"]]
mapping_table.visible = True
mapping_status.object = f"Mapping generated with <b>{len(df)} PubMed IDs</b>."
except Exception as e:
mapping_status.object = f"<b style='color:red;'>Error:</b><br><code>{str(e)}</code>"
mapping_table.visible = False
mapping_button.on_click(run_pubmed_to_query_mapping)
# FINAL LAYOUT
app = pn.Tabs(
("PubMed ID & Abstract Extractor", pn.Column(
# pn.pane.HTML("<h2 style='color:#2E86C1;'>PubMed ID & Abstract Extractor</h2>"),
mode_selector,
query_selector,
custom_query_input,
email_input,
api_key_input,
output_file_input,
extraction_mode,
run_button,
status_panel,
log_output,
metadata_container,
extracted_data_table,
fetch_download_button
)),
("View Saved Data", pn.Column(
instructions,
pn.Column(
upload_form,
pn.Row(refresh_uploaded_files_button, uploaded_file_selector),
reload_file_button,
loading
),
status,
column_selector,
search_container,
data_table,
download_filtered_button,
sizing_mode='stretch_both',
styles={'padding': '20px', 'background': '#f9f9f9'}
)),
("PubMed ID File Comparison", pn.Column(
file_uploader,
compare_button,
compare_status,
pn.layout.Divider(),
pn.pane.Markdown("### Summary Table"),
summary_table,
pn.layout.Divider(),
total_ids_display,
unique_ids_display,
pn.layout.Divider(),
pn.pane.Markdown("### Top 10 Most Frequent PubMed IDs"),
top10_ids_display,
pn.layout.Divider(),
)),
("PubMed ID Co-Occurrence Analysis", pn.Column(
cooccurrence_file_uploader,
cooccurrence_button,
cooccurrence_dropdown,
cooccurrence_status,
cooccurrence_image
)),
("Top N Co-Occurrence Analysis", pn.Column(
cooccurrence_file_uploader,
top_n_selector,
topn_button,
topn_status,
topn_image
)),
("Top N Distribution Chart", pn.Column(
cooccurrence_file_uploader,
top_n_selector,
topn_bar_button,
topn_bar_status,
topn_bar_plot
)),
("Top N Pie Chart", pn.Column(
cooccurrence_file_uploader,
top_n_selector,
topn_pie_button,
topn_pie_status,
topn_pie_plot
)),
("Percentage Overlap Heatmaps", pn.Column(
cooccurrence_file_uploader,
percent_overlap_button,
percent_overlap_mode,
percent_overlap_status,
percent_overlap_image
)),
("PubMed Query Mapping", pn.Column(
cooccurrence_file_uploader,
mapping_button,
mapping_status,
mapping_table,
# mapping_download
))
)
app.servable()