-
Notifications
You must be signed in to change notification settings - Fork 39
Expand file tree
/
Copy pathsurvey.py
More file actions
2657 lines (2362 loc) · 102 KB
/
Copy pathsurvey.py
File metadata and controls
2657 lines (2362 loc) · 102 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Survey data support for diff-diff.
Provides SurveyDesign for specifying complex survey structures (stratification,
clustering, weights, FPC) and Taylor Series Linearization (TSL) variance
estimation for design-based inference.
References
----------
- Lumley (2004) "Analysis of Complex Survey Samples", JSS 9(8).
- Binder (1983) "On the Variances of Asymptotically Normal Estimators
from Complex Surveys", International Statistical Review 51(3).
- Solon, Haider, & Wooldridge (2015) "What Are We Weighting For?",
Journal of Human Resources 50(2).
"""
import warnings
from dataclasses import dataclass, field, replace
from typing import Callable, List, Optional, Tuple
import numpy as np
import pandas as pd
from diff_diff.linalg import _factorize_cluster_ids
@dataclass
class SurveyDesign:
"""
User-facing class specifying complex survey design structure.
Column names are resolved against the DataFrame at fit-time via
:meth:`resolve`.
Parameters
----------
weights : str, optional
Column name for observation weights (sampling weights).
strata : str, optional
Column name for stratification variable.
psu : str, optional
Column name for primary sampling unit (cluster).
fpc : str, optional
Column name for finite population size (N_h per stratum).
weight_type : str, default "pweight"
Weight type: "pweight" (inverse selection probability),
"fweight" (frequency/expansion), or "aweight" (inverse variance).
nest : bool, default False
Whether PSU IDs are nested within strata (i.e., PSU IDs may repeat
across strata). If True, PSU IDs are made unique by combining with
strata.
lonely_psu : str, default "remove"
How to handle singleton strata (strata with only one PSU):
"remove" (skip, emit warning), "certainty" (set f_h=1, zero
variance contribution), or "adjust" (center around grand mean).
"""
weights: Optional[str] = None
strata: Optional[str] = None
psu: Optional[str] = None
fpc: Optional[str] = None
weight_type: str = "pweight"
nest: bool = False
lonely_psu: str = "remove"
replicate_weights: Optional[List[str]] = None
replicate_method: Optional[str] = None
fay_rho: float = 0.0
replicate_strata: Optional[List[int]] = None
combined_weights: bool = True
replicate_scale: Optional[float] = None
replicate_rscales: Optional[List[float]] = None
mse: bool = False
def __post_init__(self):
valid_weight_types = {"pweight", "fweight", "aweight"}
if self.weight_type not in valid_weight_types:
raise ValueError(
f"weight_type must be one of {valid_weight_types}, " f"got '{self.weight_type}'"
)
valid_lonely = {"remove", "certainty", "adjust"}
if self.lonely_psu not in valid_lonely:
raise ValueError(
f"lonely_psu must be one of {valid_lonely}, " f"got '{self.lonely_psu}'"
)
# Replicate weight validation
valid_rep_methods = {"BRR", "Fay", "JK1", "JKn", "SDR"}
if self.replicate_method is not None:
if self.replicate_method not in valid_rep_methods:
raise ValueError(
f"replicate_method must be one of {valid_rep_methods}, "
f"got '{self.replicate_method}'"
)
if self.replicate_weights is None:
raise ValueError("replicate_weights must be provided when replicate_method is set")
if self.replicate_weights is not None and self.replicate_method is None:
raise ValueError("replicate_method must be provided when replicate_weights is set")
if self.replicate_method == "Fay":
if not (0 < self.fay_rho < 1):
raise ValueError(f"fay_rho must be in (0, 1) for Fay's method, got {self.fay_rho}")
elif self.replicate_method is not None and self.fay_rho != 0.0:
raise ValueError(
f"fay_rho must be 0 for method '{self.replicate_method}', " f"got {self.fay_rho}"
)
# Replicate weights are mutually exclusive with strata/psu/fpc
if self.replicate_weights is not None:
if self.strata is not None or self.psu is not None or self.fpc is not None:
raise ValueError(
"replicate_weights cannot be combined with strata/psu/fpc. "
"Replicate weights encode the design structure implicitly."
)
if self.weights is None:
raise ValueError("Full-sample weights must be provided alongside replicate_weights")
# JKn requires replicate_strata
if self.replicate_method == "JKn":
if self.replicate_strata is None:
raise ValueError(
"replicate_strata is required for JKn method. Provide a list "
"of stratum assignments (one per replicate weight column)."
)
if len(self.replicate_strata) != len(self.replicate_weights):
raise ValueError(
f"replicate_strata length ({len(self.replicate_strata)}) must "
f"match replicate_weights length ({len(self.replicate_weights)})"
)
# Validate scale/rscales values and length
if self.replicate_scale is not None:
if not (np.isfinite(self.replicate_scale) and self.replicate_scale > 0):
raise ValueError(
f"replicate_scale must be a positive finite number, "
f"got {self.replicate_scale}"
)
if self.replicate_rscales is not None and self.replicate_weights is not None:
if len(self.replicate_rscales) != len(self.replicate_weights):
raise ValueError(
f"replicate_rscales length ({len(self.replicate_rscales)}) must "
f"match replicate_weights length ({len(self.replicate_weights)})"
)
rscales_arr = np.asarray(self.replicate_rscales, dtype=float)
if not np.all(np.isfinite(rscales_arr)):
raise ValueError("replicate_rscales must be finite")
if np.any(rscales_arr < 0):
raise ValueError("replicate_rscales must be non-negative")
def resolve(self, data: pd.DataFrame) -> "ResolvedSurveyDesign":
"""
Validate column names and extract numpy arrays from DataFrame.
Parameters
----------
data : pd.DataFrame
DataFrame containing survey design columns.
Returns
-------
ResolvedSurveyDesign
Internal representation with extracted numpy arrays.
"""
n = len(data)
# --- Weights ---
if self.weights is not None:
if self.weights not in data.columns:
raise ValueError(f"Weight column '{self.weights}' not found in data")
raw_weights = data[self.weights].values.astype(np.float64)
# Validate weights
if np.any(np.isnan(raw_weights)):
raise ValueError("Weights contain NaN values")
if np.any(~np.isfinite(raw_weights)):
raise ValueError("Weights contain Inf values")
if np.any(raw_weights < 0):
raise ValueError("Weights must be non-negative")
if np.any(raw_weights == 0) and np.all(raw_weights == 0):
raise ValueError(
"All weights are zero. At least one observation must " "have a positive weight."
)
# fweight validation: must be non-negative integers
if self.weight_type == "fweight":
pos_mask = raw_weights > 0
if np.any(pos_mask):
fractional = raw_weights[pos_mask] - np.round(raw_weights[pos_mask])
if np.any(np.abs(fractional) > 1e-10):
raise ValueError(
"Frequency weights (fweight) must be non-negative integers. "
"Fractional values detected. Use pweight for non-integer weights."
)
# Normalize: pweights/aweights to sum=n (mean=1); fweights unchanged
# Skip normalization for replicate designs — the IF path uses
# w_r / w_full ratios that must be on the same raw scale
if self.replicate_weights is not None:
weights = raw_weights.copy()
elif self.weight_type in ("pweight", "aweight"):
raw_sum = float(np.sum(raw_weights))
weights = raw_weights * (n / raw_sum)
if not np.isclose(raw_sum, n):
warnings.warn(
f"{self.weight_type} weights normalized to mean=1 "
f"(sum={n}). Original sum was {raw_sum:.4g}.",
UserWarning,
stacklevel=2,
)
else:
weights = raw_weights.copy()
else:
weights = np.ones(n, dtype=np.float64)
# --- Replicate weights (short-circuit strata/psu/fpc) ---
if self.replicate_weights is not None:
rep_cols = self.replicate_weights
for col in rep_cols:
if col not in data.columns:
raise ValueError(f"Replicate weight column '{col}' not found in data")
rep_arr = np.column_stack([data[col].values.astype(np.float64) for col in rep_cols])
if np.any(np.isnan(rep_arr)):
raise ValueError("Replicate weights contain NaN values")
if np.any(~np.isfinite(rep_arr)):
raise ValueError("Replicate weights contain Inf values")
if np.any(rep_arr < 0):
raise ValueError("Replicate weights must be non-negative")
# Validate combined_weights contract: when True, replicate columns
# include the full-sample weight, so w_r > 0 with w_full == 0 is
# malformed (observation excluded from full sample but included in
# a replicate).
combined = self.combined_weights if self.combined_weights is not None else True
if combined:
zero_full = weights == 0
if np.any(zero_full):
rep_positive_on_zero = np.any(rep_arr[zero_full] > 0, axis=1)
if np.any(rep_positive_on_zero):
raise ValueError(
"Malformed combined_weights=True design: some "
"replicate columns have positive weight where "
"full-sample weight is zero. Either fix the "
"replicate columns or use combined_weights=False."
)
# Do NOT normalize replicate columns — the IF path uses w_r/w_full
# ratios that must reflect the true replicate design, not rescaled sums
n_rep = rep_arr.shape[1]
if n_rep < 2:
raise ValueError("At least 2 replicate weight columns are required")
return ResolvedSurveyDesign(
weights=weights,
weight_type=self.weight_type,
strata=None,
psu=None,
fpc=None,
n_strata=0,
n_psu=0,
lonely_psu=self.lonely_psu,
replicate_weights=rep_arr,
replicate_method=self.replicate_method,
fay_rho=self.fay_rho,
n_replicates=n_rep,
replicate_strata=(
np.asarray(self.replicate_strata, dtype=int)
if self.replicate_strata is not None
else None
),
combined_weights=self.combined_weights,
replicate_scale=self.replicate_scale,
replicate_rscales=(
np.asarray(self.replicate_rscales, dtype=float)
if self.replicate_rscales is not None
else None
),
mse=self.mse,
nest=self.nest,
)
# --- Strata ---
strata_arr = None
n_strata = 0
if self.strata is not None:
if self.strata not in data.columns:
raise ValueError(f"Strata column '{self.strata}' not found in data")
strata_vals = data[self.strata].values
if pd.isna(strata_vals).any():
raise ValueError(
f"Strata column '{self.strata}' contains missing values. "
"All observations must have valid strata identifiers."
)
strata_arr = _factorize_cluster_ids(strata_vals)
n_strata = len(np.unique(strata_arr))
# --- PSU ---
psu_arr = None
n_psu = 0
if self.psu is not None:
if self.psu not in data.columns:
raise ValueError(f"PSU column '{self.psu}' not found in data")
psu_raw = data[self.psu].values
if pd.isna(psu_raw).any():
raise ValueError(
f"PSU column '{self.psu}' contains missing values. "
"All observations must have valid PSU identifiers."
)
if self.nest and strata_arr is not None:
# Make PSU IDs unique within strata by combining
combined = np.array([f"{s}_{p}" for s, p in zip(strata_arr, psu_raw)])
psu_arr = _factorize_cluster_ids(combined)
else:
psu_arr = _factorize_cluster_ids(psu_raw)
# Validate PSU labels are globally unique when nest=False
# and strata are present. Repeated labels cause wrong n_psu,
# df_survey, and lonely_psu="adjust" global mean.
if strata_arr is not None:
seen_psus: set = set()
for h in np.unique(strata_arr):
psu_in_h = set(psu_raw[strata_arr == h])
overlap = seen_psus & psu_in_h
if overlap:
raise ValueError(
f"PSU labels {overlap} appear in multiple strata. "
"Set nest=True in SurveyDesign to make PSU IDs "
"unique within strata, or use globally unique "
"PSU labels."
)
seen_psus |= psu_in_h
n_psu = len(np.unique(psu_arr))
# --- FPC ---
fpc_arr = None
if self.fpc is not None:
if self.fpc not in data.columns:
raise ValueError(f"FPC column '{self.fpc}' not found in data")
fpc_arr = data[self.fpc].values.astype(np.float64)
if np.any(np.isnan(fpc_arr)) or np.any(~np.isfinite(fpc_arr)):
raise ValueError("FPC values must be finite and non-NaN")
# Validate FPC structure (constant within strata, positive).
# FPC >= n_PSU validation is deferred to compute_survey_vcov()
# where the final effective PSU structure is known (after
# cluster-as-PSU injection and implicit per-obs PSU fallback).
if strata_arr is not None:
for h in np.unique(strata_arr):
mask_h = strata_arr == h
fpc_vals = fpc_arr[mask_h]
# Enforce FPC is constant within stratum
if len(np.unique(fpc_vals)) > 1:
raise ValueError(
f"FPC values must be constant within each stratum. "
f"Stratum {h} has values: {np.unique(fpc_vals)}"
)
fpc_h = fpc_vals[0]
# Validate FPC >= n_PSU when explicit PSU is declared
if psu_arr is not None:
n_psu_h = len(np.unique(psu_arr[mask_h]))
if fpc_h < n_psu_h:
raise ValueError(
f"FPC ({fpc_h}) is less than the number of PSUs "
f"({n_psu_h}) in stratum {h}. FPC must be >= n_PSU."
)
else:
# No strata: require FPC is a single constant value
if len(np.unique(fpc_arr)) > 1:
raise ValueError(
"FPC values must be constant when no strata are specified. "
f"Found {len(np.unique(fpc_arr))} distinct values."
)
# Validate FPC >= n_PSU when explicit PSU is declared
if psu_arr is not None and fpc_arr[0] < n_psu:
raise ValueError(
f"FPC ({fpc_arr[0]}) is less than the number of PSUs "
f"({n_psu}). FPC must be >= number of PSUs."
)
# --- Validate PSU counts per stratum ---
if psu_arr is not None and strata_arr is not None:
for h in np.unique(strata_arr):
mask_h = strata_arr == h
n_psu_h = len(np.unique(psu_arr[mask_h]))
if n_psu_h < 2:
if self.lonely_psu == "remove":
warnings.warn(
f"Stratum {h} has only {n_psu_h} PSU(s). "
"It will be excluded from variance estimation "
"(lonely_psu='remove').",
UserWarning,
stacklevel=3,
)
elif self.lonely_psu == "certainty":
pass # Handled in compute_survey_vcov
elif self.lonely_psu == "adjust":
pass # Handled in compute_survey_vcov
# Validate PSU count for unstratified designs
if psu_arr is not None and strata_arr is None:
if n_psu < 2:
if self.lonely_psu == "remove":
msg = (
f"Only {n_psu} PSU(s) found (unstratified design). "
"Variance cannot be estimated (lonely_psu='remove')."
)
elif self.lonely_psu == "certainty":
msg = (
f"Only {n_psu} PSU(s) found (unstratified design). "
"Treated as certainty PSU; zero variance contribution."
)
else:
msg = (
f"Only {n_psu} PSU(s) found (unstratified design). "
"Cannot adjust with a single cluster and no strata; "
"variance will be NaN."
)
warnings.warn(msg, UserWarning, stacklevel=3)
return ResolvedSurveyDesign(
weights=weights,
weight_type=self.weight_type,
strata=strata_arr,
psu=psu_arr,
fpc=fpc_arr,
n_strata=n_strata,
n_psu=n_psu,
lonely_psu=self.lonely_psu,
nest=self.nest,
)
def subpopulation(
self,
data: pd.DataFrame,
mask,
) -> Tuple["SurveyDesign", pd.DataFrame]:
"""Create a subpopulation design by zeroing out excluded observations.
Preserves the full survey design structure (strata, PSU) while setting
weights to zero for observations outside the subpopulation. This is
the correct approach for subpopulation analysis — unlike naive
subsetting, it retains design information for variance estimation.
Parameters
----------
mask : array-like of bool, str, or callable
Defines the subpopulation:
- bool array/Series of length ``len(data)`` — True = included
- str — column name in ``data`` containing boolean values
- callable — applied to ``data``, must return bool array
Returns
-------
(SurveyDesign, pd.DataFrame)
A new SurveyDesign pointing to a ``_subpop_weight`` column in the
returned DataFrame copy. The pair should be used together: pass
the returned DataFrame to ``fit()`` with the returned SurveyDesign.
"""
# Resolve mask to boolean array
if callable(mask):
raw_mask = np.asarray(mask(data))
elif isinstance(mask, str):
if mask not in data.columns:
raise ValueError(f"Mask column '{mask}' not found in data")
raw_mask = np.asarray(data[mask].values)
else:
raw_mask = np.asarray(mask)
# Validate: reject pd.NA/pd.NaT/None before bool coercion
try:
if pd.isna(raw_mask).any():
raise ValueError(
"Subpopulation mask contains NA/missing values. "
"Provide a boolean mask with no missing values."
)
except (TypeError, ValueError) as e:
if "NA/missing" in str(e):
raise
# pd.isna can't handle some dtypes — fall through to specific checks
if raw_mask.dtype.kind == "f" and np.any(np.isnan(raw_mask)):
raise ValueError(
"Subpopulation mask contains NaN values. "
"Provide a boolean mask with no missing values."
)
if hasattr(raw_mask, "dtype") and raw_mask.dtype == object:
# Check for None values (pd.NA, None, etc.)
if any(v is None for v in raw_mask):
raise ValueError(
"Subpopulation mask contains None/NA values. "
"Provide a boolean mask with no missing values."
)
# Reject string/object masks — non-empty strings coerce to True
# which silently defines the wrong domain
if any(isinstance(v, str) for v in raw_mask):
raise ValueError(
"Subpopulation mask has object dtype with string values. "
"Provide a boolean or numeric (0/1) mask, not strings."
)
if hasattr(raw_mask, "dtype") and raw_mask.dtype.kind in ("U", "S"):
raise ValueError(
"Subpopulation mask contains string values. "
"Provide a boolean or numeric (0/1) mask."
)
# Validate numeric masks: only {0, 1} allowed (not {1, 2}, etc.)
if hasattr(raw_mask, "dtype") and raw_mask.dtype.kind in ("i", "u", "f"):
unique_vals = set(np.unique(raw_mask[np.isfinite(raw_mask)]).tolist())
if not unique_vals.issubset({0, 1, 0.0, 1.0, True, False}):
raise ValueError(
f"Subpopulation mask contains non-binary numeric values "
f"{unique_vals - {0, 1, 0.0, 1.0}}. "
f"Provide a boolean or numeric (0/1) mask."
)
mask_arr = raw_mask.astype(bool)
if len(mask_arr) != len(data):
raise ValueError(
f"Mask length ({len(mask_arr)}) does not match data " f"length ({len(data)})"
)
if not np.any(mask_arr):
raise ValueError(
"Subpopulation mask excludes all observations. "
"At least one observation must be included."
)
# Build subpopulation weights
if self.weights is not None:
if self.weights not in data.columns:
raise ValueError(f"Weight column '{self.weights}' not found in data")
base_weights = data[self.weights].values.astype(np.float64)
else:
base_weights = np.ones(len(data), dtype=np.float64)
subpop_weights = np.where(mask_arr, base_weights, 0.0)
# Create data copy with synthetic weight column
data_out = data.copy()
data_out["_subpop_weight"] = subpop_weights
# Zero out replicate weight columns for excluded observations
if self.replicate_weights is not None:
for col in self.replicate_weights:
if col in data.columns:
data_out[col] = np.where(mask_arr, data[col].values, 0.0)
# Return new SurveyDesign using the synthetic column
new_design = SurveyDesign(
weights="_subpop_weight",
strata=self.strata,
psu=self.psu,
fpc=self.fpc,
weight_type=self.weight_type,
nest=self.nest,
lonely_psu=self.lonely_psu,
replicate_weights=self.replicate_weights,
replicate_method=self.replicate_method,
fay_rho=self.fay_rho,
replicate_strata=self.replicate_strata,
combined_weights=self.combined_weights,
replicate_scale=self.replicate_scale,
replicate_rscales=self.replicate_rscales,
mse=self.mse,
)
return new_design, data_out
@dataclass
class ResolvedSurveyDesign:
"""
Internal class with extracted numpy arrays from SurveyDesign.resolve().
Not intended for direct construction by users.
"""
weights: np.ndarray
weight_type: str
strata: Optional[np.ndarray]
psu: Optional[np.ndarray]
fpc: Optional[np.ndarray]
n_strata: int
n_psu: int
lonely_psu: str
replicate_weights: Optional[np.ndarray] = None # (n, R) array
replicate_method: Optional[str] = None
fay_rho: float = 0.0
n_replicates: int = 0
replicate_strata: Optional[np.ndarray] = None # (R,) for JKn
combined_weights: bool = True
replicate_scale: Optional[float] = None
replicate_rscales: Optional[np.ndarray] = None # (R,) per-replicate scales
mse: bool = False
# Preserved from SurveyDesign for use by downstream helpers (e.g.
# `_inject_cluster_as_psu` honors `nest` when substituting cluster=<col>
# for an absent PSU column).
nest: bool = False
@property
def uses_replicate_variance(self) -> bool:
"""Whether replicate-based variance should be used instead of TSL."""
return self.replicate_method is not None
@property
def df_survey(self) -> Optional[int]:
"""Survey degrees of freedom.
For replicate designs: QR-rank of the analysis-weight matrix minus 1,
matching R's ``survey::degf()`` which uses ``qr(..., tol=1e-5)$rank``.
Returns ``None`` when rank <= 1 (insufficient for t-based inference).
For TSL: n_PSU - n_strata.
"""
if self.uses_replicate_variance:
if self.replicate_weights is None or self.n_replicates < 2:
return None
# QR-rank of analysis-weight matrix, matching R's survey::degf()
# which uses qr(weights(design, "analysis"), tol=1e-5)$rank.
# For combined_weights=True, replicate cols ARE analysis weights.
# For combined_weights=False, analysis weights = rep * full-sample.
if self.combined_weights:
analysis_weights = self.replicate_weights
else:
analysis_weights = self.replicate_weights * self.weights[:, np.newaxis]
# Pivoted QR with R-compatible tolerance, matching R's
# qr(..., tol=1e-5) which uses column pivoting (LAPACK dgeqp3)
from scipy.linalg import qr as scipy_qr
_, R_mat, _ = scipy_qr(analysis_weights, pivoting=True, mode="economic")
diag_abs = np.abs(np.diag(R_mat))
tol = 1e-5
rank = int(np.sum(diag_abs > tol * diag_abs.max())) if diag_abs.max() > 0 else 0
df = rank - 1
return df if df > 0 else None
if self.psu is not None and self.n_psu > 0:
if self.strata is not None and self.n_strata > 0:
return self.n_psu - self.n_strata
return self.n_psu - 1
# Implicit PSU: each observation is its own PSU
n_obs = len(self.weights)
if self.strata is not None and self.n_strata > 0:
return n_obs - self.n_strata
return n_obs - 1
def subset_to_units(
self,
row_idx: np.ndarray,
weights: np.ndarray,
strata: Optional[np.ndarray],
psu: Optional[np.ndarray],
fpc: Optional[np.ndarray],
n_strata: int,
n_psu: int,
) -> "ResolvedSurveyDesign":
"""Create a unit-level copy preserving replicate metadata.
Used by panel estimators (ContinuousDiD, EfficientDiD) that collapse
panel-level survey info to one row per unit.
Parameters
----------
row_idx : np.ndarray
Indices into the panel-level arrays to select one row per unit.
weights, strata, psu, fpc, n_strata, n_psu
Already-subsetted TSL fields (computed by the caller).
"""
rep_weights_sub = None
if self.replicate_weights is not None:
rep_weights_sub = self.replicate_weights[row_idx, :]
return ResolvedSurveyDesign(
weights=weights,
weight_type=self.weight_type,
strata=strata,
psu=psu,
fpc=fpc,
n_strata=n_strata,
n_psu=n_psu,
lonely_psu=self.lonely_psu,
replicate_weights=rep_weights_sub,
replicate_method=self.replicate_method,
fay_rho=self.fay_rho,
n_replicates=self.n_replicates,
replicate_strata=self.replicate_strata,
combined_weights=self.combined_weights,
replicate_scale=self.replicate_scale,
replicate_rscales=self.replicate_rscales,
mse=self.mse,
nest=self.nest,
)
@property
def needs_survey_vcov(self) -> bool:
"""Whether survey vcov (not generic sandwich) should be used."""
return True # Any resolved survey design uses the survey vcov path
def make_pweight_design(weights: np.ndarray) -> "ResolvedSurveyDesign":
"""Construct a pweight-only ResolvedSurveyDesign from a raw weight array.
Use this on the array-in HAD pretest helpers (``stute_test``,
``yatchew_hr_test``, ``stute_joint_pretest``) when the caller has only
a per-observation weight array and no PSU/strata/FPC structure::
from diff_diff import stute_test, make_pweight_design
result = stute_test(d, dy, survey_design=make_pweight_design(w))
For the data-in HAD surfaces (``HeterogeneousAdoptionDiD.fit``,
``did_had_pretest_workflow``, ``joint_pretrends_test``,
``joint_homogeneity_test``), prefer adding the weights as a column on
your dataframe and passing ``SurveyDesign(weights="col_name")`` instead;
those surfaces resolve column references against ``data`` at fit time
(the standard library convention used by ContinuousDiD, EfficientDiD,
and ChaisemartinDHaultfoeuille).
Internal note: this constructs a synthetic ``ResolvedSurveyDesign`` with
each observation as its own PSU and no strata/FPC, so PSU-level
multiplier-bootstrap kernels reduce bit-exactly to per-observation
Mammen draws while sharing the survey-aware code path with full PSU /
strata / FPC designs (mirrors the PR #363 synthetic-trivial-resolved
pattern).
Parameters
----------
weights : np.ndarray, shape (n_obs,)
Per-observation positive weights. Must be 1-D (shape ``(n_obs,)``);
scalars, 0-D arrays, and column-vector inputs (shape ``(n, 1)``)
raise ``ValueError`` at the front door. Caller is responsible for
any non-negativity / per-unit-constancy validation. Typical usage
is positional (``make_pweight_design(arr)``); the parameter name
``weights`` collides linguistically with the deprecated
``weights=`` kwarg on HAD surfaces, so prefer positional form.
Returns
-------
ResolvedSurveyDesign
With ``weight_type="pweight"``, ``strata=psu=fpc=None``,
``n_strata=0``, ``n_psu=n_obs`` (each observation is its own PSU
under the trivial design), ``lonely_psu="remove"``,
``replicate_weights=None``.
Raises
------
ValueError
If ``weights`` is not 1-D (PR #376 R3 P1: catches scalar / 0-D /
column-vector inputs with a clear front-door message instead of
bubbling a low-level numpy or dataclass exception).
"""
w = np.asarray(weights, dtype=np.float64)
if w.ndim != 1:
raise ValueError(
f"make_pweight_design: weights must be 1-dimensional (1-D, shape "
f"(n_obs,)), got shape {w.shape}. Common mistakes: scalar / 0-D "
f"input (`make_pweight_design(1.0)`); column-vector "
f"(`make_pweight_design(df[['w']].to_numpy())` produces (n, 1) "
f"-- use `df['w'].to_numpy()` for (n,)); 2-D matrix input."
)
n_obs = int(w.shape[0])
return ResolvedSurveyDesign(
weights=w,
weight_type="pweight",
strata=None,
psu=None,
fpc=None,
n_strata=0,
n_psu=n_obs,
lonely_psu="remove",
)
_make_trivial_resolved = make_pweight_design
# Three-way mutex error messages for `survey_design=` / `survey=` / `weights=`
# kwargs across the 8 HAD surfaces (HeterogeneousAdoptionDiD.fit +
# did_had_pretest_workflow + 5 pretest helpers + qug_test). The migration
# target text differs between data-in surfaces (which can resolve
# ``SurveyDesign(weights="col_name")`` against ``data``) and array-in
# surfaces (which take pre-resolved ``ResolvedSurveyDesign`` and use
# ``make_pweight_design(arr)`` for the pweight-only convenience). Defined
# here to avoid circular imports between had.py and had_pretests.py.
HAD_DUAL_KNOB_MUTEX_MSG_DATA_IN = (
"Pass at most one of `survey_design=`, `survey=`, or `weights=`. "
"`survey=` and `weights=` are deprecated aliases of `survey_design=` "
"and will be removed in the next minor release. Prefer "
"`survey_design=SurveyDesign(weights='col_name', ...)`."
)
HAD_DUAL_KNOB_MUTEX_MSG_ARRAY_IN = (
"Pass at most one of `survey_design=`, `survey=`, or `weights=`. "
"`survey=` and `weights=` are deprecated aliases of `survey_design=` "
"and will be removed in the next minor release. Prefer "
"`survey_design=make_pweight_design(arr)` for pweight-only or "
"`survey_design=<pre-resolved ResolvedSurveyDesign>` for full "
"PSU/strata/FPC."
)
HAD_DEPRECATION_MSG_SURVEY_KWARG = (
"`survey=` is deprecated; use `survey_design=` instead "
"(same accepted types). Will be removed in the next minor release."
)
HAD_DEPRECATION_MSG_WEIGHTS_KWARG_DATA_IN = (
"`weights=np.ndarray` is deprecated; add the weights as a column on "
"`data` and pass `survey_design=SurveyDesign(weights='col_name')` "
"instead. Will be removed in the next minor release."
)
# PR #376 R11 P3: HAD.fit-specific weights= deprecation message — the
# generic data-in suggestion above (use `survey_design=SurveyDesign(...)`)
# is the long-term API target, but on `HeterogeneousAdoptionDiD.fit` the
# two paths currently produce different SE families: the deprecated
# `weights=np.ndarray` shortcut yields `variance_formula="pweight"` /
# `"pweight_2sls"` (CCT-2014 weighted-robust / 2SLS pweight-sandwich)
# while `survey_design=SurveyDesign(...)` yields `"survey_binder_tsl"` /
# `"survey_binder_tsl_2sls"`. The next-minor cleanup (TODO row 102) will
# unify the two; until then, document the SE-family caveat explicitly so
# users know what changes when they migrate.
HAD_DEPRECATION_MSG_WEIGHTS_KWARG_HAD_FIT = (
"`weights=np.ndarray` is deprecated on HeterogeneousAdoptionDiD.fit; "
"the long-term API is to add the weights as a column on `data` and "
"pass `survey_design=SurveyDesign(weights='col_name')`. Will be "
"removed in the next minor release. NOTE: in the current release the "
"two paths produce different SE families on this surface — the "
"`weights=` shortcut keeps the analytical CCT-2014 / 2SLS pweight-"
"sandwich (`variance_formula='pweight'` or `'pweight_2sls'`), while "
"`survey_design=SurveyDesign(...)` composes Binder-TSL "
"(`'survey_binder_tsl'` or `'survey_binder_tsl_2sls'`). The "
"long-term unification is tracked for the next minor release."
)
HAD_DEPRECATION_MSG_WEIGHTS_KWARG_ARRAY_IN = (
"`weights=np.ndarray` is deprecated on array-in pretest helpers; use "
"`survey_design=make_pweight_design(weights)` instead "
"(import `make_pweight_design` from `diff_diff`). Will be removed in "
"the next minor release."
)
@dataclass
class SurveyMetadata:
"""
Survey design metadata stored in results objects.
Attributes
----------
weight_type : str
Type of weights used.
effective_n : float
Kish effective sample size: (sum(w))^2 / sum(w^2).
design_effect : float
DEFF: n * sum(w^2) / (sum(w))^2.
sum_weights : float
Sum of original (pre-normalization) weights.
n_strata : int or None
Number of strata (None if unstratified).
n_psu : int or None
Number of PSUs (None if no PSU specified).
weight_range : tuple of (float, float)
(min, max) of original weights.
df_survey : int or None
Survey degrees of freedom (n_psu - n_strata).
"""
weight_type: str
effective_n: float
design_effect: float
sum_weights: float
n_strata: Optional[int] = None
n_psu: Optional[int] = None
weight_range: Tuple[float, float] = field(default=(0.0, 0.0))
df_survey: Optional[int] = None
replicate_method: Optional[str] = None
n_replicates: Optional[int] = None
deff_diagnostics: Optional["DEFFDiagnostics"] = None
def compute_survey_metadata(
resolved: "ResolvedSurveyDesign",
raw_weights: np.ndarray,
) -> SurveyMetadata:
"""
Compute survey metadata from resolved design.
Parameters
----------
resolved : ResolvedSurveyDesign
Resolved survey design.
raw_weights : np.ndarray
Original (pre-normalization) weights.
Returns
-------
SurveyMetadata
"""
sum_w = float(np.sum(raw_weights))
sum_w2 = float(np.sum(raw_weights**2))
n = len(raw_weights)
effective_n = sum_w**2 / sum_w2 if sum_w2 > 0 else float(n)
design_effect = n * sum_w2 / (sum_w**2) if sum_w > 0 else 1.0
n_strata = resolved.n_strata if resolved.strata is not None else None
if resolved.uses_replicate_variance:
# Replicate designs don't have meaningful PSU/strata counts
n_psu = None
n_strata = None
elif resolved.psu is not None:
n_psu = resolved.n_psu
else:
# Implicit PSU: each observation is its own PSU
n_psu = len(resolved.weights)
df_survey = resolved.df_survey
# Replicate info
rep_method = resolved.replicate_method if resolved.uses_replicate_variance else None
n_rep = resolved.n_replicates if resolved.uses_replicate_variance else None
return SurveyMetadata(
weight_type=resolved.weight_type,
effective_n=effective_n,
design_effect=design_effect,
sum_weights=sum_w,
n_strata=n_strata,
n_psu=n_psu,
weight_range=(float(np.min(raw_weights)), float(np.max(raw_weights))),
df_survey=df_survey,
replicate_method=rep_method,
n_replicates=n_rep,
)
@dataclass
class DEFFDiagnostics:
"""Per-coefficient design effect diagnostics.
Compares survey-design variance to simple random sampling (SRS)
variance for each coefficient, giving the variance inflation factor
due to the survey design (clustering, stratification, weighting).
Attributes
----------
deff : np.ndarray
Per-coefficient DEFF: survey_var / srs_var. Shape (k,).
effective_n : np.ndarray
Effective sample size per coefficient: n / DEFF. Shape (k,).
srs_se : np.ndarray
SRS (HC1) standard errors. Shape (k,).
survey_se : np.ndarray
Survey standard errors. Shape (k,).
coefficient_names : list of str or None
Names for display.
"""
deff: np.ndarray
effective_n: np.ndarray
srs_se: np.ndarray
survey_se: np.ndarray
coefficient_names: Optional[List[str]] = None
def compute_deff_diagnostics(
X: np.ndarray,
residuals: np.ndarray,
survey_vcov: np.ndarray,
weights: np.ndarray,
weight_type: str = "pweight",
coefficient_names: Optional[List[str]] = None,
) -> DEFFDiagnostics:
"""Compute per-coefficient design effects.
Compares the survey variance-covariance matrix to a simple random
sampling (SRS) baseline (HC1 sandwich, ignoring strata/PSU/FPC).
Parameters
----------
X : np.ndarray
Design matrix of shape (n, k).
residuals : np.ndarray
Residuals from the WLS fit, shape (n,).
survey_vcov : np.ndarray
Survey variance-covariance matrix, shape (k, k).
weights : np.ndarray
Observation weights (normalized), shape (n,).
weight_type : str, default "pweight"
Weight type for SRS computation.
coefficient_names : list of str, optional
Names for display.
Returns
-------
DEFFDiagnostics
"""
from diff_diff.linalg import compute_robust_vcov
n = X.shape[0]
# Use positive-weight count for effective n (zero-weight rows from
# subpopulation don't contribute to the effective sample)
n_eff = int(np.count_nonzero(weights > 0)) if np.any(weights == 0) else n
# SRS baseline: HC1 weighted sandwich ignoring design structure
srs_vcov = compute_robust_vcov(
X,
residuals,
cluster_ids=None,
weights=weights,
weight_type=weight_type,
)
survey_var = np.diag(survey_vcov)
srs_var = np.diag(srs_vcov)
# DEFF = survey_var / srs_var
with np.errstate(divide="ignore", invalid="ignore"):
deff = np.where(srs_var > 0, survey_var / srs_var, np.nan)