from alive_progress import alive_bar, alive_it
from typing import Any
from loguru import logger
from .observe import Observe
import os
import duckdb
import time
from importlib.resources import files
import threading # allows the monitor to run concurrently without blocking multiprocessing
from multiprocessing import Pool, Manager
[docs]
class Parse:
"""
General parser for eyeon. Given a folder path, will return a list of observations.
Parameters
----------
dirpath : str
A string specifying the folder to parse.
"""
def __init__(self, dirpath: str) -> None:
self.path = dirpath
def _observe(self, file_and_path: tuple) -> None:
file, result_path = file_and_path
try:
o = Observe(file)
o.write_json(result_path)
except PermissionError:
logger.warning(f"File {file} cannot be read.")
except FileNotFoundError:
logger.warning(f"No such file {file}.")
def _observe_worker(self, args) -> None:
"""
wrapper to handle and monitor observe workers.
Assists in identifying problematic files
:param args: (file: str, result_path: str, progress_map: dict)
"""
file, result_path, progress_map = args
pid= os.getpid()
start_time=time.time()
progress_map[pid] = {
"file": file,
"start": start_time,
}
try:
self._observe((file, result_path))
finally:
# Clear the entry when done or on error
progress_map.pop(pid, None)
def __call__(self, result_path: str = "./results", threads: int = 1) -> Any:
with alive_bar(
bar=None,
elapsed_end=False,
monitor_end=False,
stats_end=False,
receipt_text=True,
spinner="waves",
stats=False,
monitor=False,
) as bar:
bar.title("Collecting Files... ")
files = [
(os.path.join(dir, file), result_path)
for dir, _, files in os.walk(self.path)
for file in files
]
bar.title("")
bar.text(f"{len(files)} files collected")
if threads > 1:
manager=Manager()
progress_map= manager.dict()
def monitor():
CHECK_INTERVAL=30 #seconds between checks
HANG_THRESHOLD=120
while True:
now = time.time()
workers=list(progress_map.items())
if not workers:
continue
for pid, info in workers:
file=info.get("file")
start=info.get("start", now)
duration=now-start
if duration > HANG_THRESHOLD:
logger.warning(
f"[monitor] - possible hung process: pid={pid} processing {file} for {duration:.1f}s"
)
time.sleep(CHECK_INTERVAL) #sleep so it's not infinitely spinning
monitor_thread = threading.Thread(target=monitor, daemon=True) #run monitor thread in the background, removes when finished
monitor_thread.start()
with Pool(threads) as p:
with alive_bar(
len(files),
spinner="waves",
title=f"Parsing with {threads} threads..."
) as bar:
# each worker gets the file, result_path, and the shared progress_map
iterable = [
(file, result_path, progress_map) for (file, result_path) in files
]
for _ in p.imap_unordered(self._observe_worker, iterable):
bar() # update the bar when a thread finishes
else:
#Single process path (no inter‑process monitoring needed)
for filet in alive_it(files, spinner="waves", title="Parsing files..."):
self._observe(filet)
[docs]
def write_database(self, database: str, outdir: str = "./results") -> None:
"""
Parse all output json files and add to database
Parameters
----------
database : str
The filepath to the duckdb database
outdir : str
A string specifying where results were saved
"""
if os.path.exists(outdir) and database:
try:
with alive_bar(
bar=None,
elapsed_end=False,
monitor_end=False,
stats_end=False,
receipt_text=True,
spinner="waves",
stats=False,
monitor=False,
) as bar:
bar.title(f"Writing to database {database}")
db_exists = os.path.exists(database)
db_path = os.path.dirname(database)
if db_path:
os.makedirs(db_path, exist_ok=True)
con = duckdb.connect(database) # creates or connects
if not db_exists: # database exists, load the json file in
# create table and views from sql
con.sql(files("database").joinpath("eyeon-ddl.sql").read_text())
# add the file to the observations table, making it match template
# observations with missing keys keys with null
con.sql(
f"""
insert into observations by name
select * from
read_json_auto(['{outdir}/*.json',
'{files('database').joinpath('observations.json')}'],
union_by_name=true, auto_detect=true)
where filename is not null;
"""
)
bar.title("")
bar.text("Database updated")
con.close()
except duckdb.IOException as ioe:
con = None
s = f":exclamation: Failed to attach to db {database}: {ioe}"
print(s)
else:
raise FileNotFoundError