# zip_choropleth_app.py # pip install shapely fiona pyproj matplotlib pandas openpyxl tk import os, zipfile, tempfile, warnings, tkinter as tk from tkinter import filedialog, messagebox, ttk import pandas as pd import numpy as np import fiona import matplotlib.pyplot as plt from shapely.geometry import shape, box from shapely.ops import unary_union, transform from pyproj import Transformer from matplotlib.patches import Polygon as MplPolygon from matplotlib.collections import PatchCollection from matplotlib.colors import BoundaryNorm from matplotlib.cm import get_cmap import matplotlib.patheffects as pe # ------------------------- IO helpers ------------------------- def unzip_return_path(path, outdir=None): if outdir is None: outdir = tempfile.mkdtemp(prefix="zcta_cnty_") os.makedirs(outdir, exist_ok=True) if path.lower().endswith(".zip"): with zipfile.ZipFile(path, "r") as zf: zf.extractall(outdir) shp_candidates = [f for f in os.listdir(outdir) if f.lower().endswith(".shp")] if not shp_candidates: raise FileNotFoundError("No .shp found inside ZIP: " + path) return os.path.join(outdir, shp_candidates[0]) return path # .shp, .geojson, .gpkg are fine def detect_excel_sheets(path): try: xls = pd.ExcelFile(path) return xls.sheet_names except Exception: return [] def read_counts_raw(path, sheet_name=None, header_row=0): ext = os.path.splitext(path)[1].lower() if ext in (".csv", ".txt"): return pd.read_csv(path, header=header_row) elif ext in (".xlsx", ".xls"): return pd.read_excel(path, sheet_name=sheet_name if sheet_name is not None else 0, header=header_row, engine="openpyxl") else: raise ValueError("Unsupported counts file. Use CSV/XLSX/XLS.") # ------------------------- geometry utils ------------------------- _to3857 = Transformer.from_crs("EPSG:4326", "EPSG:3857", always_xy=True).transform _to4326 = Transformer.from_crs("EPSG:3857", "EPSG:4326", always_xy=True).transform def proj(g, forward=True): return transform(_to3857 if forward else _to4326, g) def simplify_m(g, meters): try: return proj(proj(g, True).simplify(meters, preserve_topology=True), False) except Exception: return g def make_bins(int_values): vals = sorted(set(int(v) for v in int_values if pd.notna(v))) if not vals: return [0.5, 1.5] return [vals[0] - 0.5] + [v + 0.5 for v in vals] def luma(rgb): r, g, b = rgb[:3] return 0.2126*r + 0.7152*g + 0.0722*b def detect_zip_field(props): for c in ["ZCTA5CE20","ZCTA5CE10","ZCTA5CE","GEOID10","GEOID","ZIP","ZIPCODE"]: if c in props: return c raise ValueError("ZIP field not found in ZCTA layer") # ------------------------- renderer ------------------------- def render_map( counts_df, zcta_path, county_path, title="Students by ZIP", focus_counties=("Duval","Nassau"), context_counties=("Clay","St. Johns","Alachua"), buffer_km=30, simplify_zip_m=400, simplify_cnty_m=600, zoom_factor=2.0, figsize=(9,9), dpi=160, out_png="zip_map.png" ): counts_df = counts_df.copy() counts_df["zip"] = counts_df["zip"].astype(str).str.zfill(5) counts_df["students"] = pd.to_numeric(counts_df["students"], errors="coerce").astype("Int64").fillna(0).astype(int) zip2val = dict(zip(counts_df["zip"], counts_df["students"])) county_src = unzip_return_path(county_path) zcta_src = unzip_return_path(zcta_path) focus, outlines = [], [] with fiona.open(county_src) as csrc: for rec in csrc: if rec["properties"].get("STATEFP") != "12": continue nm = rec["properties"]["NAME"] g = shape(rec["geometry"]) if nm in focus_counties or nm in context_counties: outlines.append((nm, g)) if nm in focus_counties: focus.append(g) if not focus: raise ValueError("Focus counties not found in county layer") focus_union_3857 = unary_union([proj(g, True) for g in focus]) focus_buffer = proj(focus_union_3857.buffer(buffer_km * 1000), False) student_geoms, context_geoms = [], [] with fiona.open(zcta_src) as zsrc: zip_field = detect_zip_field(zsrc.schema["properties"]) fb = focus_buffer.bounds pad = 0.7 bbox = box(fb[0]-pad, fb[1]-pad, fb[2]+pad, fb[3]+pad) for rec in zsrc: z = str(rec["properties"][zip_field]).zfill(5) g = shape(rec["geometry"]) if not bbox.intersects(box(*g.bounds)): continue g = simplify_m(g, simplify_zip_m) if z in zip2val: student_geoms.append((z, g)) else: context_geoms.append((z, g)) if not student_geoms: raise ValueError("No student ZIPs found in filtered area") xs, ys = [], [] for _, g in student_geoms + context_geoms: b = g.bounds; xs += [b[0], b[2]]; ys += [b[1], b[3]] minx, maxx = min(xs), max(xs); miny, maxy = min(ys), max(ys) w, h = maxx - minx, maxy - miny cx, cy = unary_union(focus).centroid.xy cx, cy = cx[0], cy[0] half_w, half_h = (w/2)/zoom_factor, (h/2)/zoom_factor x0, x1, y0, y1 = cx-half_w, cx+half_w, cy-half_h, cy+half_h vals = [int(v) for v in zip2val.values()] bins = make_bins(vals) cmap = get_cmap("YlGnBu") norm = BoundaryNorm(bins, cmap.N, clip=True) fig, ax = plt.subplots(figsize=figsize, dpi=dpi) ctx_patches = [] for _, g in context_geoms: if g.geom_type == "Polygon": ctx_patches.append(MplPolygon(list(g.exterior.coords), closed=True)) else: for p in g.geoms: ctx_patches.append(MplPolygon(list(p.exterior.coords), closed=True)) ax.add_collection(PatchCollection(ctx_patches, facecolor="white", edgecolor="#cfcfcf", linewidths=0.6, zorder=1)) by_val = {} for z, g in student_geoms: v = int(zip2val[z]) by_val.setdefault(v, []).append(g) for v in sorted(by_val.keys()): patches = [] for g in by_val[v]: if g.geom_type == "Polygon": patches.append(MplPolygon(list(g.exterior.coords), closed=True)) else: for p in g.geoms: patches.append(MplPolygon(list(p.exterior.coords), closed=True)) color = cmap(norm([v]))[0] ax.add_collection(PatchCollection(patches, facecolor=color, edgecolor="white", linewidths=1.0, zorder=2+v)) for nm, g in outlines: if not box(x0, y0, x1, y1).intersects(g): continue gs = simplify_m(g, simplify_cnty_m) lw = 2.2 if nm in set(focus_counties) else 1.2 if gs.geom_type == "Polygon": x, y = gs.exterior.xy; ax.plot(x, y, color="#333333", linewidth=lw, zorder=10) else: for p in gs.geoms: x, y = p.exterior.xy; ax.plot(x, y, color="#333333", linewidth=lw, zorder=10) for z, g in student_geoms: v = int(zip2val[z]) c = cmap(norm([v]))[0] txt = "white" if luma(c) < 0.45 else "#1a1a1a" halo = pe.withStroke(linewidth=2.2, foreground="#1a1a1a" if txt == "white" else "white") rp = g.representative_point() ax.text(rp.x, rp.y, f"{z}\n{v}", ha="center", va="center", fontsize=8, color=txt, zorder=20, path_effects=[halo]) ax.set_xlim(x0, x1); ax.set_ylim(y0, y1) sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm); sm.set_array(np.array(vals)) ticks = sorted(set(vals)) with warnings.catch_warnings(): warnings.simplefilter("ignore", category=UserWarning) cbar = plt.colorbar(sm, ax=ax, ticks=ticks, shrink=0.85) cbar.set_label("Number of Students") ax.set_aspect("equal"); ax.set_axis_off() ax.set_title(title, pad=10) plt.tight_layout(); plt.savefig(out_png, dpi=dpi) # ------------------------- counts normalizer ------------------------- def normalize_counts(df, zip_col=None, value_col=None, is_raw=False, dedup=True): # is_raw=False: aggregated file with columns [zip, students] -> ensure types # is_raw=True: raw file with [student_id, zip] -> aggregate to counts by ZIP cols = list(df.columns) if is_raw: if not zip_col: raise ValueError("ZIP column is required.") if not value_col and dedup: raise ValueError("Student ID column is required when deduplicate is enabled.") zc = zip_col if zc not in cols: raise ValueError("ZIP column not found.") if dedup and value_col not in cols: raise ValueError("Student ID column not found.") work = df[[zc] + ([value_col] if value_col else [])].copy() work[zc] = work[zc].astype(str).str.extract(r"(\d{5})", expand=False).str.zfill(5) work = work.dropna(subset=[zc]) if dedup and value_col: work[value_col] = work[value_col].astype(str) out = work.groupby(zc, as_index=False)[value_col].nunique().rename(columns={value_col: "students"}) else: out = work.groupby(zc, as_index=False)[zc].size().rename(columns={"size": "students", zc: "zip"}) return out out = out.rename(columns={zc: "zip"}) out["students"] = out["students"].astype(int) return out else: if not zip_col or not value_col: # try auto-detect cmap = {c.lower().strip(): c for c in cols} zc = next((cmap.get(a) for a in ["zip","zipcode","zip_code","zcta","postal","postal_code","zip5","zip_5"]), None) vc = next((cmap.get(a) for a in ["students","student","count","counts","n","value","num","num_students","headcount","enrollment"]), None) if not zc or not vc: raise ValueError("Input must have columns for ZIP and Students.") else: zc, vc = zip_col, value_col if zc not in cols or vc not in cols: raise ValueError("Selected ZIP/Students columns not in file.") out = df[[zc, vc]].copy() out.columns = ["zip","students"] out["zip"] = out["zip"].astype(str).str.extract(r"(\d{5})", expand=False).str.zfill(5) out["students"] = pd.to_numeric(out["students"], errors="coerce").fillna(0).astype(int) out = out.dropna(subset=["zip"]) return out # ------------------------- Tkinter UI ------------------------- class ColumnMapper(tk.Toplevel): def __init__(self, master, columns, is_raw_mode, on_ok): super().__init__(master) self.title("Map Columns") self.resizable(False, False) self.on_ok = on_ok self.is_raw_mode = is_raw_mode tk.Label(self, text="Select columns:").grid(row=0, column=0, columnspan=2, padx=10, pady=8) tk.Label(self, text="ZIP column").grid(row=1, column=0, sticky="e", padx=8, pady=4) self.zip_var = tk.StringVar() zip_dd = ttk.Combobox(self, textvariable=self.zip_var, values=list(columns), state="readonly", width=28) zip_dd.grid(row=1, column=1, padx=8, pady=4) if self.is_raw_mode: lab2 = "Student ID column" else: lab2 = "Students (count) column" tk.Label(self, text=lab2).grid(row=2, column=0, sticky="e", padx=8, pady=4) self.val_var = tk.StringVar() val_dd = ttk.Combobox(self, textvariable=self.val_var, values=list(columns), state="readonly", width=28) val_dd.grid(row=2, column=1, padx=8, pady=4) self.dedup_var = tk.BooleanVar(value=True) if self.is_raw_mode: tk.Checkbutton(self, text="Deduplicate by Student ID", variable=self.dedup_var).grid(row=3, column=0, columnspan=2, pady=6) tk.Button(self, text="OK", command=self._ok).grid(row=4, column=0, columnspan=2, pady=10) self.grab_set() def _ok(self): if not self.zip_var.get(): messagebox.showerror("Error", "Pick the ZIP column.") return if not self.val_var.get() and not self.is_raw_mode: messagebox.showerror("Error", "Pick the Students (count) column.") return self.on_ok(self.zip_var.get(), self.val_var.get(), self.dedup_var.get() if self.is_raw_mode else None) self.destroy() class App(tk.Tk): def __init__(self): super().__init__() self.title("ZIP Choropleth + Counts Table") self.geometry("820x640") self.counts_path = tk.StringVar() self.zcta_path = tk.StringVar() self.county_path = tk.StringVar() self.title_text = tk.StringVar(value="Students by ZIP") self.focus_text = tk.StringVar(value="Duval,Nassau") self.context_text= tk.StringVar(value="Clay,St. Johns,Alachua") self.zoom = tk.DoubleVar(value=2.0) self.buffer_km = tk.DoubleVar(value=30.0) self.out_png = tk.StringVar(value="zip_map.png") self.header_row = tk.IntVar(value=0) self.sheet_name = tk.StringVar(value="") self.is_raw = tk.BooleanVar(value=False) self.raw_dedup = tk.BooleanVar(value=True) self.preview_df = None self.counts_agg = None self.manual_zip_col = None self.manual_value_col = None self.manual_raw_dedup = True def row(lbl, var, browse=False, save=False, filetypes=None): f = tk.Frame(self); f.pack(fill="x", padx=10, pady=6) tk.Label(f, text=lbl, width=18, anchor="w").pack(side="left") tk.Entry(f, textvariable=var).pack(side="left", fill="x", expand=True) if browse: def pick(): p = filedialog.askopenfilename(filetypes=filetypes or [("All files", "*.*")]) if p: var.set(p) if var is self.counts_path: self._after_counts_pick(p) tk.Button(f, text="Browse", command=pick).pack(side="left", padx=6) if save: def pick_save(): p = filedialog.asksaveasfilename(defaultextension=".png", filetypes=[("PNG","*.png")]) if p: var.set(p) tk.Button(f, text="Save As", command=pick_save).pack(side="left", padx=6) counts_ft = [("CSV/Excel", "*.csv *.xlsx *.xls"), ("CSV", "*.csv"), ("Excel", "*.xlsx *.xls"), ("All files", "*.*")] geo_ft = [("ZCTA/County layers", "*.zip *.shp *.geojson *.gpkg"), ("Shapefile ZIP", "*.zip"), ("Shapefile", "*.shp"), ("GeoJSON", "*.geojson"), ("GeoPackage", "*.gpkg"), ("All files", "*.*")] row("Counts file", self.counts_path, browse=True, filetypes=counts_ft) row("ZCTA file", self.zcta_path, browse=True, filetypes=geo_ft) row("County file", self.county_path, browse=True, filetypes=geo_ft) row("Chart title", self.title_text) row("Focus counties", self.focus_text) row("Context counties", self.context_text) f_opts = tk.Frame(self); f_opts.pack(fill="x", padx=10, pady=6) tk.Label(f_opts, text="Zoom").pack(side="left") tk.Scale(f_opts, from_=1.2, to=3.0, resolution=0.1, orient="horizontal", variable=self.zoom, length=220).pack(side="left", padx=6) tk.Label(f_opts, text="Buffer km").pack(side="left", padx=10) tk.Spinbox(f_opts, from_=5, to=100, increment=1, textvariable=self.buffer_km, width=6).pack(side="left", padx=4) tk.Label(f_opts, text="Header row (0-based)").pack(side="left", padx=10) tk.Spinbox(f_opts, from_=0, to=50, increment=1, textvariable=self.header_row, width=6, command=self._refresh_preview).pack(side="left", padx=4) f_sheet = tk.Frame(self); f_sheet.pack(fill="x", padx=10, pady=6) tk.Label(f_sheet, text="Excel sheet").pack(side="left") self.sheet_menu = ttk.Combobox(f_sheet, textvariable=self.sheet_name, values=[], state="readonly", width=28) self.sheet_menu.pack(side="left", padx=6) tk.Checkbutton(f_sheet, text="Raw file (one row per student)", variable=self.is_raw, command=self._toggle_raw_ui).pack(side="left", padx=10) self.dedup_chk = tk.Checkbutton(f_sheet, text="Deduplicate by Student ID", variable=self.raw_dedup) self.dedup_chk.pack(side="left", padx=10) tk.Button(f_sheet, text="Map Columns…", command=self._map_columns).pack(side="left", padx=10) row("Output PNG", self.out_png, save=True) f_actions = tk.Frame(self); f_actions.pack(fill="x", padx=10, pady=4) tk.Button(f_actions, text="Preview counts", command=self._build_counts_preview).pack(side="left") tk.Button(f_actions, text="Save counts table…", command=self._save_counts).pack(side="left", padx=8) tk.Button(f_actions, text="Render map", command=self._render).pack(side="left", padx=8) tk.Label(self, text="Preview (first 12 rows):").pack(anchor="w", padx=12) self.preview = tk.Text(self, height=10, width=120) self.preview.pack(fill="both", expand=True, padx=10, pady=6) self._toggle_raw_ui() # ----- counts helpers ----- def _toggle_raw_ui(self): if self.is_raw.get(): self.dedup_chk.configure(state="normal") else: self.dedup_chk.configure(state="disabled") def _after_counts_pick(self, path): ext = os.path.splitext(path)[1].lower() if ext in (".xlsx", ".xls"): sheets = detect_excel_sheets(path) self.sheet_menu["values"] = sheets if sheets: self.sheet_name.set(sheets[0]) else: self.sheet_menu["values"] = [] self.sheet_name.set("") self.manual_zip_col = None self.manual_value_col = None self._refresh_preview() def _refresh_preview(self): path = self.counts_path.get() if not path: return try: df = read_counts_raw(path, sheet_name=(self.sheet_name.get() or None), header_row=self.header_row.get()) self.preview_df = df self.preview.delete("1.0", tk.END) self.preview.insert(tk.END, "Columns: " + ", ".join(map(str, df.columns)) + "\n") self.preview.insert(tk.END, df.head(12).to_string(index=False)) except Exception as e: self.preview.delete("1.0", tk.END) self.preview.insert(tk.END, f"Error reading file: {e}") def _map_columns(self): if self.preview_df is None: self._refresh_preview() if self.preview_df is None: messagebox.showerror("Error", "Load a counts file first.") return cols = list(self.preview_df.columns) def on_ok(zip_col, val_col, dedup_flag): self.manual_zip_col = zip_col self.manual_value_col = val_col if val_col else None if dedup_flag is not None: self.raw_dedup.set(bool(dedup_flag)) self.manual_raw_dedup = bool(dedup_flag) ColumnMapper(self, cols, self.is_raw.get(), on_ok) def _build_counts_preview(self): try: if self.preview_df is None: self._refresh_preview() df_raw = read_counts_raw(self.counts_path.get(), sheet_name=(self.sheet_name.get() or None) if self.sheet_name.get() else None, header_row=self.header_row.get()) counts = normalize_counts( df_raw, zip_col=self.manual_zip_col, value_col=self.manual_value_col, is_raw=self.is_raw.get(), dedup=self.raw_dedup.get() ) self.counts_agg = counts.sort_values(["students","zip"], ascending=[False, True]).reset_index(drop=True) self.preview.delete("1.0", tk.END) self.preview.insert(tk.END, "Aggregated counts (zip, students):\n") self.preview.insert(tk.END, self.counts_agg.head(12).to_string(index=False)) except Exception as e: messagebox.showerror("Error", str(e)) def _save_counts(self): if self.counts_agg is None: self._build_counts_preview() if self.counts_agg is None: return p = filedialog.asksaveasfilename(defaultextension=".csv", filetypes=[("CSV","*.csv"), ("Excel","*.xlsx")]) if not p: return try: if p.lower().endswith(".xlsx"): self.counts_agg.to_excel(p, index=False) else: self.counts_agg.to_csv(p, index=False) messagebox.showinfo("Saved", p) except Exception as e: messagebox.showerror("Error", str(e)) # ----- render ----- def _render(self): try: if self.counts_agg is None: self._build_counts_preview() if self.counts_agg is None: return render_map( counts_df=self.counts_agg, zcta_path=self.zcta_path.get(), county_path=self.county_path.get(), title=self.title_text.get(), focus_counties=tuple(s.strip() for s in self.focus_text.get().split(",")), context_counties=tuple(s.strip() for s in self.context_text.get().split(",")), buffer_km=float(self.buffer_km.get()), zoom_factor=float(self.zoom.get()), out_png=self.out_png.get() ) messagebox.showinfo("Done", f"Saved {self.out_png.get()}") except Exception as e: messagebox.showerror("Error", str(e)) if __name__ == "__main__": App().mainloop()