#!/usr/bin/env python3
"""
JEDI Log Parser — Comprehensive Diagnostic Report Generator
============================================================
This module parses a JEDI variational DA log file and extracts:
- Configuration metadata (cost type, analysis time, resolutions, weights)
- Observation counts (loaded, total, assimilated, per‑variable)
- Jo/n evolution across outer loops
- Cost‑function convergence (J, Jb, JoJc)
- Departures (Min/Max/RMS)
- Observation error statistics
- A full human‑readable diagnostic report
Typical usage:
python parse_jedi_log.py jedi.log --output report.txt
"""
import os
import re
import sys
import argparse
from collections import OrderedDict
# ---------------------------------------------------------------------
# CONFIGURATION PARSER
# ---------------------------------------------------------------------
[docs]
def parse_configuration(lines):
config = {}
full_text = "\n".join(lines[:200])
m = re.search(r'cost type => (\S+)', full_text)
if m: config['cost_type'] = m.group(1)
m = re.search(r'datetime => (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)', full_text)
if m: config['analysis_time'] = m.group(1)
m = re.search(r'time window => \{begin => (\S+)\s*,\s*length => (\S+)', full_text)
if m:
config['window_begin'] = m.group(1)
config['window_length'] = m.group(2)
m = re.search(r'npx => (\d+)\s*,\s*npy => (\d+)\s*,\s*npz => (\d+)', full_text)
if m:
config['outer_resolution'] = f"C{int(m.group(1)) - 1}"
config['nlevels'] = int(m.group(3))
weights = re.findall(r'weight => \{value => ([\d.]+)\}', full_text)
if len(weights) >= 2:
config['static_weight'] = float(weights[0])
config['ensemble_weight'] = float(weights[1])
m = re.search(r'nmembers => (\d+)', full_text)
if m: config['ensemble_members'] = int(m.group(1))
m = re.search(r'algorithm => (\w+)', full_text)
if m: config['minimizer'] = m.group(1)
ninner_vals = re.findall(r'ninner => (\d+)', full_text)
if ninner_vals: config['ninner'] = [int(n) for n in ninner_vals]
inner_npx = re.findall(r'npx => (\d+)', full_text)
if len(inner_npx) >= 2:
config['inner_resolution'] = f"C{int(inner_npx[1]) - 1}"
return config
# ---------------------------------------------------------------------
# OBSERVATION COUNTS PARSER
# ---------------------------------------------------------------------
[docs]
def parse_obs_counts(lines):
obs_counts = OrderedDict()
nlocs_pattern = re.compile(
r'^(\S+(?:\s+\S+)?)\s+(\S+)\s+nlocs\s*=\s*(\d+),\s*nobs\s*=\s*(\d+)',
)
for line in lines:
m = nlocs_pattern.match(line.strip())
if m:
obs_type = m.group(1).strip()
nlocs = int(m.group(3))
if obs_type not in obs_counts:
obs_counts[obs_type] = {
'nlocs': 0, 'nobs_total': 0,
'nobs_assimilated': 0, 'nobs_per_var': OrderedDict(),
}
obs_counts[obs_type]['nlocs'] = nlocs
costjo_obs_pattern = re.compile(r'^(\S+(?:\s+\S+)?)\s+nobs=\s*(\d+)\s+Min=')
in_costjo_obs = False
seen_total = set()
for line in lines:
stripped = line.strip()
if 'CostJo Observations:' in stripped:
in_costjo_obs = True
continue
if in_costjo_obs:
m = costjo_obs_pattern.match(stripped)
if m:
obs_type = m.group(1).strip()
nobs = int(m.group(2))
if obs_type not in seen_total:
if obs_type in obs_counts:
obs_counts[obs_type]['nobs_total'] = nobs
else:
obs_counts[obs_type] = {
'nlocs': 0, 'nobs_total': nobs,
'nobs_assimilated': 0, 'nobs_per_var': OrderedDict(),
}
seen_total.add(obs_type)
if 'Jo Observations:' in stripped and 'CostJo' not in stripped:
break
costjo_pattern = re.compile(
r'CostJo\s*:\s*Nonlinear\s+Jo\((.+?)\)\s*=\s*([\d.e+\-]+),\s*nobs\s*=\s*(\d+)'
)
seen_first = set()
for line in lines:
m = costjo_pattern.search(line.strip())
if m:
obs_type = m.group(1).strip()
nobs = int(m.group(3))
if obs_type not in seen_first:
if obs_type in obs_counts:
obs_counts[obs_type]['nobs_assimilated'] = nobs
seen_first.add(obs_type)
jo_obs_pattern = re.compile(
r'Jo Obs\s*:(\S+(?:\s+\S+)?):(\S+)\s*:\s*Nobs=\s*(\d+)'
)
seen_jo = set()
for line in lines:
m = jo_obs_pattern.search(line.strip())
if m:
obs_type, variable = m.group(1).strip(), m.group(2).strip()
key = (obs_type, variable)
if key not in seen_jo:
if obs_type in obs_counts:
obs_counts[obs_type]['nobs_per_var'][variable] = int(m.group(3))
seen_jo.add(key)
return obs_counts
# ---------------------------------------------------------------------
# JO EVOLUTION PARSER
# ---------------------------------------------------------------------
[docs]
def parse_jo_evolution(lines):
costjo_pattern = re.compile(
r'CostJo\s*:\s*Nonlinear\s+Jo\((.+?)\)\s*=\s*([\d.e+\-]+),\s*nobs\s*=\s*(\d+),\s*Jo/n\s*=\s*([\d.e+\-]+),\s*err\s*=\s*([\d.e+\-]+)'
)
jo_data = OrderedDict()
for line in lines:
m = costjo_pattern.search(line.strip())
if m:
obs_type = m.group(1).strip()
record = {
'Jo': float(m.group(2)), 'nobs': int(m.group(3)),
'Jo_n': float(m.group(4)), 'err': float(m.group(5)),
}
jo_data.setdefault(obs_type, []).append(record)
total_pattern = re.compile(r'CostJo\s*:\s*Nonlinear\s+Jo\s*=\s*([\d.e+\-]+)')
jo_total = []
for line in lines:
m = total_pattern.search(line.strip())
if m and 'Jo(' not in line:
jo_total.append(float(m.group(1)))
return jo_data, jo_total
# ---------------------------------------------------------------------
# COST FUNCTION CONVERGENCE PARSER
# ---------------------------------------------------------------------
[docs]
def parse_cost_convergence(lines):
j_pattern = re.compile(r'Quadratic cost function:\s*J\s*\(\s*(\d+)\)\s*=\s*([\d.e+\-]+)')
jb_pattern = re.compile(r'Quadratic cost function:\s*Jb\s*\(\s*(\d+)\)\s*=\s*([\d.e+\-]+)')
jojc_pattern = re.compile(r'Quadratic cost function:\s*JoJc\(\s*(\d+)\)\s*=\s*([\d.e+\-]+)')
j_vals, jb_vals, jojc_vals = [], [], []
for line in lines:
m = j_pattern.search(line)
if m: j_vals.append((int(m.group(1)), float(m.group(2))))
m = jb_pattern.search(line)
if m: jb_vals.append((int(m.group(1)), float(m.group(2))))
m = jojc_pattern.search(line)
if m: jojc_vals.append((int(m.group(1)), float(m.group(2))))
outer_loops, current = [], []
for i, (it, j) in enumerate(j_vals):
if it == 0 and current:
outer_loops.append(current)
current = []
rec = {'iter': it, 'J': j}
if i < len(jb_vals): rec['Jb'] = jb_vals[i][1]
if i < len(jojc_vals): rec['JoJc'] = jojc_vals[i][1]
current.append(rec)
if current: outer_loops.append(current)
return outer_loops
# ---------------------------------------------------------------------
# DEPARTURES PARSER
# ---------------------------------------------------------------------
[docs]
def parse_departures(lines):
dep_pattern = re.compile(
r'Jo Dep\s*:(\S+(?:\s+\S+)?):(\S+)\s*:\s*Nobs=\s*(\d+),\s*Min=\s*([\d.e+\-]+),\s*Max=\s*([\d.e+\-]+),\s*RMS=\s*([\d.e+\-]+)'
)
departures, seen = OrderedDict(), set()
for line in lines:
m = dep_pattern.search(line.strip())
if m:
key = (m.group(1).strip(), m.group(2).strip())
if key not in seen:
departures[key] = {
'Nobs': int(m.group(3)), 'Min': float(m.group(4)),
'Max': float(m.group(5)), 'RMS': float(m.group(6)),
}
seen.add(key)
return departures
# ---------------------------------------------------------------------
# OBS ERROR PARSER
# ---------------------------------------------------------------------
[docs]
def parse_obs_errors(lines):
err_pattern = re.compile(
r'^(\S+(?:\s+\S+)?)\s+nobs=\s*(\d+)\s+Min=([\d.e+\-]+),\s*Max=([\d.e+\-]+),\s*RMS=([\d.e+\-]+)'
)
errors, in_err, seen = OrderedDict(), False, set()
for line in lines:
stripped = line.strip()
if 'Diagonal observation error covariance' in stripped:
in_err = True; continue
if in_err:
m = err_pattern.match(stripped)
if m:
obs_type = m.group(1).strip()
if obs_type not in seen:
errors[obs_type] = {
'nobs': int(m.group(2)), 'Min': float(m.group(3)),
'Max': float(m.group(4)), 'RMS': float(m.group(5)),
}
seen.add(obs_type)
if 'CostJo' in stripped and 'Nonlinear' in stripped:
break
return errors
# ---------------------------------------------------------------------
# REPORT GENERATOR
# ---------------------------------------------------------------------
[docs]
def generate_report(config, obs_counts, jo_data, jo_total, convergence, departures, obs_errors):
out = []
sep = "=" * 80
out.append(sep)
out.append(" JEDI Variational DA - Diagnostic Report")
out.append(sep)
out.append("")
# ------------------------------------------------------------
# 1. CONFIGURATION
# ------------------------------------------------------------
out.append("-" * 80)
out.append(" 1. CONFIGURATION SUMMARY")
out.append("-" * 80)
for key, val in config.items():
out.append(f" {key.replace('_',' ').title():<30s}: {val}")
out.append("")
# ------------------------------------------------------------
# 2. OBS COUNTS
# ------------------------------------------------------------
out.append("-" * 80)
out.append(" 2. OBSERVATION COUNTS")
out.append("-" * 80)
out.append("")
out.append(f" {'Obs Type':<30s} {'Locations':>12s} {'Total Obs':>12s} {'Assimilated':>12s} {'Rejected':>10s} {'Assim%':>8s}")
out.append(f" {'-'*30} {'-'*12} {'-'*12} {'-'*12} {'-'*10} {'-'*8}")
g_nlocs, g_total, g_assim = 0, 0, 0
for obs_type, c in obs_counts.items():
rej = c['nobs_total'] - c['nobs_assimilated']
pct = (c['nobs_assimilated'] / c['nobs_total'] * 100) if c['nobs_total'] > 0 else 0
out.append(f" {obs_type:<30s} {c['nlocs']:>12,d} {c['nobs_total']:>12,d} {c['nobs_assimilated']:>12,d} {rej:>10,d} {pct:>7.1f}%")
g_nlocs += c['nlocs']; g_total += c['nobs_total']; g_assim += c['nobs_assimilated']
if len(c['nobs_per_var']) > 1:
for var, n in c['nobs_per_var'].items():
out.append(f" +-- {var:<26s} {'':>12s} {'':>12s} {n:>12,d}")
out.append(f" {'-'*30} {'-'*12} {'-'*12} {'-'*12} {'-'*10} {'-'*8}")
g_pct = (g_assim / g_total * 100) if g_total > 0 else 0
out.append(f" {'TOTAL':<30s} {g_nlocs:>12,d} {g_total:>12,d} {g_assim:>12,d} {g_total-g_assim:>10,d} {g_pct:>7.1f}%")
out.append("")
out.append(" Locations = unique obs locations (nlocs).")
out.append(" Total Obs = nlocs x channels/variables (before QC).")
out.append(" Assimilated = obs passing all QC (used in Jo).")
out.append("")
# ------------------------------------------------------------
# 3. JO/N EVOLUTION
# ------------------------------------------------------------
n_outers = max(len(v) for v in jo_data.values())
out.append("-" * 80)
out.append(" 3. Jo/n EVOLUTION ACROSS OUTER ITERATIONS")
out.append("-" * 80)
out.append("")
hdr = f" {'Obs Type':<30s}" + "".join(f" {'Outer '+str(i):>12s}" for i in range(n_outers)) + f" {'Delta':>14s}"
out.append(hdr)
out.append(f" {'-'*30}" + f" {'-'*12}" * n_outers + f" {'-'*14}")
for obs_type, recs in jo_data.items():
row = f" {obs_type:<30s}" + "".join(f" {r['Jo_n']:>12.6f}" for r in recs)
if len(recs) >= 2:
row += f" {recs[-1]['Jo_n']-recs[0]['Jo_n']:>+14.6f}"
out.append(row)
out.append("")
if jo_total:
out.append(f" Total Jo: {' -> '.join(f'{j:,.1f}' for j in jo_total)}")
out.append(f" Reduction: {(1 - jo_total[-1]/jo_total[0])*100:.1f}%")
out.append("")
# ------------------------------------------------------------
# 4. COST FUNCTION CONVERGENCE
# ------------------------------------------------------------
out.append("-" * 80)
out.append(" 4. COST FUNCTION CONVERGENCE")
out.append("-" * 80)
out.append("")
for li, loop in enumerate(convergence):
out.append(f" --- Outer Loop {li} ({len(loop)-1} inner iters) ---")
out.append(f" {'Iter':>6s} {'J':>16s} {'Jb':>16s} {'JoJc':>16s}")
out.append(f" {'-'*6} {'-'*16} {'-'*16} {'-'*16}")
for r in loop:
out.append(f" {r['iter']:>6d} {r['J']:>16.2f} {r.get('Jb',0):>16.2f} {r.get('JoJc',0):>16.2f}")
out.append("")
# ------------------------------------------------------------
# 5. CHI-SQUARED CONSISTENCY — START OF BLOCK 2
# ------------------------------------------------------------
out.append("-" * 80)
out.append(" 5. CHI-SQUARED CONSISTENCY (Final Iteration)")
out.append("-" * 80)
out.append("")
out.append(" Chi^2 ~ (H(x)-y)^T R^{-1} (H(x)-y). E[Chi^2/P] = 1.0 if R,B correct.")
out.append(" JEDI prints Jo = 1/2 * Chi^2, so Chi^2/P = 2 * (Jo/P).")
out.append("")
out.append(f" {'Obs Type':<30s} {'Jo':>14s} {'P':>10s} {'Chi2/P':>10s} {'Status':>32s}")
out.append(f" {'-'*30} {'-'*14} {'-'*10} {'-'*10} {'-'*32}")
tjo, tp = 0, 0
# ------------------------------------------------------------
# 5. CHI-SQUARED CONSISTENCY (continued)
# ------------------------------------------------------------
for ot, recs in jo_data.items():
f = recs[-1] # final iteration record
jo_over_p = f['Jo_n'] # JEDI's Jo/n (with 1/2 factor)
chi2_over_p = 2.0 * jo_over_p # Talagrand-style Chi^2/P
# Unified status logic
if chi2_over_p < 0.5:
status = "R severely overestimated (too large)"
elif chi2_over_p < 0.8:
status = "R overestimated (too large)"
elif chi2_over_p <= 1.2:
status = "OK (calibrated)"
elif chi2_over_p <= 1.5:
status = "R underestimated (too small)"
else:
status = "R severely underestimated (too small)"
out.append(
f" {ot:<30s} {f['Jo']:>14.2f} {f['nobs']:>10,d} "
f"{chi2_over_p:>10.6f} {status:>32s}"
)
tjo += f['Jo']
tp += f['nobs']
out.append(f" {'-'*30} {'-'*14} {'-'*10} {'-'*10} {'-'*32}")
# TOTAL row
tr_jo_over_p = tjo / tp if tp else 0.0
tr_chi2_over_p = 2.0 * tr_jo_over_p
if tr_chi2_over_p < 0.5:
ts = "R severely overestimated (too large)"
elif tr_chi2_over_p < 0.8:
ts = "R overestimated (too large)"
elif tr_chi2_over_p <= 1.2:
ts = "OK (calibrated)"
elif tr_chi2_over_p <= 1.5:
ts = "R underestimated (too small)"
else:
ts = "R severely underestimated (too small)"
out.append(
f" {'TOTAL':<30s} {tjo:>14.2f} {tp:>10,d} "
f"{tr_chi2_over_p:>10.6f} {ts:>32s}"
)
out.append("")
out.append(sep)
return "\n".join(out)
# ---------------------------------------------------------------------
# MAIN
# ---------------------------------------------------------------------
[docs]
def main():
parser = argparse.ArgumentParser(description="JEDI Log Parser")
parser.add_argument("logfile", help="Path to JEDI log file")
parser.add_argument("--output", "-o", default=None)
args = parser.parse_args()
with open(args.logfile, 'r') as f:
raw = f.readlines()
config = parse_configuration(raw)
obs_counts = parse_obs_counts(raw)
jo_data, jo_total = parse_jo_evolution(raw)
convergence = parse_cost_convergence(raw)
departures = parse_departures(raw)
obs_errors = parse_obs_errors(raw)
report = generate_report(
config, obs_counts, jo_data, jo_total,
convergence, departures, obs_errors
)
if args.output:
outdir = os.path.dirname(args.output)
if outdir and not os.path.exists(outdir):
os.makedirs(outdir, exist_ok=True)
with open(args.output, 'w') as f:
f.write(report)
print(f"Report written to: {args.output}", file=sys.stderr)
else:
print(report)
if __name__ == "__main__":
main()