-
Notifications
You must be signed in to change notification settings - Fork 4
/
econometric_functions.py
1255 lines (996 loc) · 53.7 KB
/
econometric_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Functions that will do most of the basic econometric regressions, tests, panel models, IVs and Time Series applications.
Author: Vinícius de Almeida Nery Ferreira.
E-mail: vnery5@gmail.com
GitHub: https://github.com/vnery5/Econometria
"""
####################################### Imports #################################################################
import pandas as pd
import numpy as np
# Linear Regression and Statistical Tests
import statsmodels.api as sm
import statsmodels.stats.api as sms
from scipy import stats
from statsmodels.stats.diagnostic import linear_reset
from statsmodels.stats.stattools import durbin_watson
from statsmodels.stats.outliers_influence import variance_inflation_factor
# Patsy formulas
from statsmodels.formula.api import logit, probit, poisson, ols
from patsy import dmatrices
# Panel and IV regressions
from linearmodels import PanelOLS, FirstDifferenceOLS, PooledOLS, RandomEffects
from linearmodels.panel import compare as panel_compare
from linearmodels.iv import IV2SLS
from linearmodels.iv import compare as iv_compare
# Time Series
from statsmodels.tsa.stattools import adfuller
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.stattools import coint
from statsmodels.tsa.arima.model import ARIMA
from pmdarima.arima import auto_arima
# Graphs
import matplotlib.pyplot as plt
import seaborn as sns
# General
from IPython.display import clear_output
import warnings
warnings.filterwarnings("ignore", category=RuntimeWarning)
####################################### Functions ###############################################################
####################################### Continuous Dependent Variables ##########################################
def ols_reg(formula, data, subset=None, cov='robust'):
"""
Fits a standard OLS model with the corresponding covariance matrix using an R-style formula (y ~ x1 + x2...).
To compute without an intercept, use -1 or 0 in the formula.
Remember to use mod = ols_reg(...).
For generalized and weighted estimation, see statsmodels documentation or the first version of this file.
:param formula: patsy formula (R style)
:param data: dataframe containing the data
:param subset: only use a subset of the data? Defaults to None (all data)
Must be in the form of `subset=(df['subset_column'] == 1)`.
:param cov : str
unadjusted: common standard errors
robust: HC1 standard errors
cluster or clustered: clustered standard errors (must specify group)
:return : statsmodels model instance
"""
# Creating and fitting the model
if cov == "robust":
mod = ols(formula, data, subset).fit(use_t=True, cov_type='HC1')
elif cov == "cluster" or cov == "clustered":
group = str(input("Which column is the group?"))
try:
mod = ols(formula, data, subset).fit(use_t=True, cov_type='cluster', cov_kwds={'groups': data[group]})
except KeyError:
erro = "It was not possible to find the selected group. Check the spelling and try again!"
return erro
else:
mod = ols(formula, data, subset).fit(use_t=True)
## Printing the summary and returning the object
print(mod.summary())
return mod
def f_test(H0, model, level=0.05):
"""
Calculates an F test based on H0 restrictions. Uses the same type of covariance as the model.
It is not necessary to assign the function to an object!
:param H0 : must be on standard patsy/R syntax ('(var1 = var2 =...), ...').
For significance tests, the syntax is 'var1 = var2 = ... = 0'
:param model: fit instance (usually 'mod')
:param level: significance level. Defaults to 5%
"""
## Usually, we use the wald_test method from the fit instance
# For panel models (from linearmodels), we must specify the parameter 'formula'
try:
test = 'LM'
est = model.wald_test(formula=H0).stat
p = model.wald_test(formula=H0).pval
except Exception:
test = 'F'
est = float(str(model.wald_test(H0))[19:29])
p = float(str(model.wald_test(H0))[36:47])
if level > p:
print(f"The value of {test} is {round(est, 6)} and its p-value is {round(p, 7)}.")
print(f"Therefore, Ho is rejected at {level * 100}% (statistically significant).")
else:
print(f"The value of {test} is {round(est, 6)} and its p-value is {round(p, 7)}.")
print(f"Therefore, Ho is NOT rejected at {level * 100}% (statistically NOT significant).")
def heteroscedascity_test(model, formula, data, level=0.05):
"""
Executes the BP AND White test for heteroskedacity.
It is not necessary to assign the function to an object!
:param model : which model to use
ols
PooledOLS
PanelOLS (FixedEffects)
RandomEffects
FirstDifferenceOLS
:param formula : patsy/R formula of the model to be tested
:param data : dataframe
:param level : significance level (defaults to 5%)
"""
## executing model choice
try: # for sm objects
mod = model(formula, data).fit()
except Exception: # for linearmodels objects
if model == "PanelOLS":
formula += " + EntityEffects"
mod = model.from_formula(formula, data, drop_absorbed=True).fit()
else:
mod = model.from_formula(formula, data).fit()
## getting the squares of residuals
try: # for sm objects
res_sq = mod.resid ** 2
except Exception: # for linearmodels objects
res_sq = mod.resids ** 2
## getting the squares of the predicted values (for White)
predicted = mod.predict()
predicted_sq = predicted ** 2
## appending to dataframe
data['res_sq'] = res_sq
data['predicted'] = predicted
data['predicted_sq'] = predicted_sq
## creating formulas
bp_formula = 'res_sq ~ ' + formula.split(' ~ ')[1]
white_formula = 'res_sq ~ predicted + predicted_sq'
## executing the tests
print("H0: Error is homoscedastic.\n")
print("############### BREUSCH-PAGAN ##############")
mod_bp = ols(formula=bp_formula, data=data).fit()
h0_bp = bp_formula.split(' ~ ')[1].replace('+', '=') + ' = 0'
f_test(H0=h0_bp, model=mod_bp, level=level)
print("\n############## WHITE ##############")
mod_white = ols(formula=white_formula, data=data).fit()
h0_white = white_formula.split(' ~ ')[1].replace('+', '=') + ' = 0'
f_test(H0=h0_white, model=mod_white, level=level)
def ols_diagnostics(formula, model, data, y_string):
"""
Given the OLS model supplied, calculates statistics and draws graphs that check 4 of the 6 multiple linear
regressions' hypothesis. Tests done: Harvey-Collier, Variance Influence Factor, RESET, Breusch-Pagan, Jarque-Bera.
References:
https://www.statsmodels.org/dev/examples/notebooks/generated/regression_diagnostics.html
https://medium.com/@vince.shields913/regression-diagnostics-fa476b2f64db
:param formula : patsy formula of the model;
:param model : fitted model object;
:param data : DataFrame containing the data;
:param y_string : string (name) of the dependent variable
"""
## Harvey-Collier: linearity (MLR 1)
try:
print(f"Harvey-Collier P-value for linearity (MLR 1): {round(sms.linear_harvey_collier(model)[1], 4)}")
print("H0: Model is linear.")
print("For more information, see the 'Residuals vs Fitted Values' plot.\n")
except ValueError:
print("For information on linearity (MLR 1), see the 'Residuals vs Fitted Values' plot.\n")
## Reset: specification of the functional form of the model
reset = linear_reset(model, use_f=True, cov_type='HC1')
print(f"Linear Reset (MLR 1) P-value: {reset.pvalue}")
print("H0: model is well specified and linear.")
print("For more information, see the Residuals vs Fitted Values plot.\n")
### Condition number: multicollinearity (MLR 3)
print(f"Condition Number for Multicollinearity (MLR 3): {round(np.linalg.cond(model.model.exog), 2)}")
print("The larger the number, the bigger the multicollinearity. For more information, see the 'VIF' plot.\n")
## Calculating Variance Influence Factors (VIF)
# Matrices
y, X = dmatrices(formula, data, return_type='dataframe')
## Calculating VIFs and storing in a DataFrame
dfVIF = pd.DataFrame()
dfVIF["Variables"] = X.columns
dfVIF["VIF_Factor"] = [variance_inflation_factor(X.values, i) for i in range(X.shape[1])]
## Breusch-Pagan (MLR 5):
breusch_pagan_pvalue = np.around(sms.het_breuschpagan(model.resid, model.model.exog)[3], 4)
print(f"Breusch-Pagan P-value for heteroskedasticity (MLR 5): {breusch_pagan_pvalue}")
print("H0: Variance is homoskedasticity.")
print("For White's test and use in panel models, call the 'heteroscedascity_test' function.")
print("For more information, see the 'Scale-Location' plot.\n")
## Durbin-Watson: correlation between the residuals
print(f"Durbin-Watson statistic for residual correlation is: {np.around(durbin_watson(model.resid), 2)}")
print("If the value is close to 0, there is positive serial correlation.")
print("If the value is close to 4, there is negative serial correlation.")
print("Rule of thumb: 1.5 < DW < 2.5 indicates no serial correlation.\n")
## Jarque-Bera: normality of the residuals (MLR 6, used for statistic inference)
print(f"Jarque-Bera P-value (MLR 6): {np.around(sms.jarque_bera(model.resid)[1], 4)}")
print("H0: Data has a normal distribution.")
print("For more information, see the 'Normal Q-Q' plot.\n")
print("To test for exogeneity (MLR 4), an IV2SLS must be constructed.")
print("Test for random sampling (Heckit) are not yet available in this module.")
## Creating graphic object
fig, ax = plt.subplots(nrows=2, ncols=2, figsize=(12, 12))
plt.style.use('seaborn-white')
### Plots
## Linearity: residuals x predicted values. The less inclined the lowess, the more linear the model.
ax00 = sns.residplot(x=model.fittedvalues, y=y_string, data=data, lowess=True,
scatter_kws={'facecolors': 'none', 'edgecolors': 'black'},
line_kws={'color': 'blue', 'lw': 1, 'alpha': 0.8}, ax=ax[0, 0])
# Titles
ax00.set_title('Linearity: Residuals vs Fitted', fontsize=12)
ax00.set_xlabel('Fitted Values', fontsize=10)
ax00.set_ylabel('Residuals (horizontal lowess: linearity)', fontsize=10)
## Multicollinearity: VIF
ax01 = dfVIF["VIF_Factor"].plot(kind='bar', stacked=False, ax=ax[0, 1])
# X tick labels
ax01.set_xticklabels(labels=dfVIF["Variables"], rotation=0, color='k')
# Annotations
for p in ax01.patches:
ax01.annotate(round(p.get_height(), 2), (p.get_x() + p.get_width() / 2., p.get_height()),
ha='center', va='center', xytext=(0, 10), textcoords='offset points')
## Titles
ax01.set_title("Multicollinearity Test - VIF", color='k', fontsize=12)
ax01.set_ylabel("Variance Influence Factor (> 5: multicollinearity)", color='k', fontsize=10)
ax01.set_xlabel("Variable", color='k', fontsize=10)
## Heteroskedasticity: the more disperse and horizontal the points,
# the more likely it is that homoskedasticity is present
ax10 = sns.regplot(x=model.fittedvalues, y=np.sqrt(np.abs(model.get_influence().resid_studentized_internal)),
ci=False, lowess=True, line_kws={'color': 'blue', 'lw': 1, 'alpha': 0.8},
scatter_kws={'facecolors': 'none', 'edgecolors': 'black'}, ax=ax[1, 0])
# Titles
ax10.set_title('Heteroskedasticity: Scale-Location', fontsize=12)
ax10.set_xlabel('Fitted Values', fontsize=10)
ax10.set_ylabel('$\sqrt{|Standardized Residuals|}$ (disperse and horizontal: homoskedasticity)', fontsize=10)
## Normality of the residuals: Q-Q Plot
probplot = sm.ProbPlot(model.get_influence().resid_studentized_internal, fit=True)
ax11 = probplot.qqplot(line='45', marker='o', color='black', ax=ax[1, 1])
def j_davidson_mackinnon_ols(formula1, formula2, data, cov='normal', level=0.05):
"""
Executes a J test to verify which model is more adequate. H0 says that model 1 is preferable.
It is not necessary to assign the function to an object!
:param formula1 : formula for the first model (use -1 for an origin regression)
:param formula2: formula for the second model (use -1 for an origin regression)
:param data : dataframe
:param cov : str
normal: common standard errors
robust: HC1 standard errors
:param level : significance level. defaults to 5%
"""
## getting covariance type
if cov == 'normal':
cov_type = 'nonrobust'
else:
cov_type = 'HC1'
## executing a regression of the second model
mod = ols(formula=formula2, data=data).fit(use_t=True, cov_type=cov_type)
## getting predicted values and adding then to dataframe
predicted = mod.predict()
data['predicted'] = predicted
## adding the predicted values to the first formula
formula1 += ' + predicted'
## executing regression
mod1 = ols(formula=formula1, data=data).fit(use_t=True, cov_type=cov_type)
## getting p-value of the predicted coefficient
p = mod1.pvalues[-1]
if p < level:
print(f"The test's p-value is equal to {np.around(p, 6)} < {level * 100}%")
print("Therefore, Ho is rejected (model 2 is better specified).")
else:
print(f"The test's p-value is equal to {np.around(p, 6)} > {level * 100}%")
print("Therefore, Ho is rejected (model 1 is better specified).")
def cooks_distance_outlier_influence(model):
"""
Calculates and plots the Cooks Distance metric, which shows the influence of individual points
(dependent and independent variables) in the regression results.
If a point is above D = 0.5, then it affects the regression results.
High leverage: extreme X value; outlier: extreme y value.
References:
https://medium.com/@vince.shields913/regression-diagnostics-fa476b2f64db
https://www.sthda.com/english/articles/39-regression-model-diagnostics/161-linear-regression-assumptions-and-diagnostics-in-r-essentials/
:param model: fitted OLS model object.
"""
## Creating functions that define D = 0.5 and D = 1.0
def one_line(x):
return np.sqrt((1 * len(model.params) * (1 - x)) / x)
def point_five_line(x):
return np.sqrt((0.5 * len(model.params) * (1 - x)) / x)
def show_cooks_distance_lines(tx, inc, color, label):
plt.plot(inc, tx(inc), label=label, color=color, ls='--')
## Plotting
sns.regplot(x=model.get_influence().hat_matrix_diag, y=model.get_influence().resid_studentized_internal,
ci=False, lowess=True, line_kws={'color': 'blue', 'lw': 1, 'alpha': 0.8},
scatter_kws={'facecolors': 'none', 'edgecolors': 'black'})
show_cooks_distance_lines(one_line, np.linspace(.01, .14, 100), 'red', 'Cooks Distance (D=1)')
show_cooks_distance_lines(point_five_line, np.linspace(.01, .14, 100), 'black', 'Cooks Distance (D=0.5)')
plt.title('Residuals vs Leverage', fontsize=12)
plt.xlabel('Leverage', fontsize=10)
plt.ylabel('Standardized Residuals', fontsize=10)
plt.legend()
####################################### Panel Models (linearmodels) #############################################
"""
Panel models are generally used with time-period dummies in order to correct endogeneity problems that arise
from unobserved omitted variables that are fixed in time and are correlated with our regressors.
"""
def xtdescribe_panel(data, entity_column):
"""
Calculates the total appearances for each individual and checks how balanced the panel dataset is.
:param data : dataframe
:param entity_column : str, column that represents the individuals (what would be the 1st level index)
Important: the fuction must be called BEFORE panel_structure
:return : modified dataset with number of appearances column and prints how balanced the panel is
"""
## Number of appearances of each individual and adding as a column to the dataset
data["number_appearances"] = data.groupby(entity_column)[entity_column].transform('count')
## Printing xtdescribe
print(data.drop_duplicates(subset=[entity_column], keep='first')["number_appearances"].value_counts(normalize=True))
return data
def panel_structure(data, entity_column, time_column):
"""
Takes a dataframe and creates a panel MultiIndex structure, which is necessary for linearmodels estimations.
:param data : dataframe
:param entity_column : str, column that represents the individuals (1st level index)
:param time_column : str, column that represents the time periods (2nd level index)
:return : modified DataFrame with the panel structure
"""
## Creating MultiIndex and mantaining columns in the DataFrame
try:
time = pd.Categorical(data[time_column])
data = data.set_index([entity_column, time_column])
data[time_column] = time # creating a column with the time values (makes it easier to access it later)
return data
except KeyError:
print("One of the columns is not in the dataframe. Please try again!")
return None
def pooled_ols(panel_data, formula, weights=None, cov="kernel"):
"""
Fits a standard Pooled OLS model with the corresponding covariance matrix.
Like random effects, it assumes that the unobserved effects aren't correlated with the error term.
Remember to include an intercept in the formula ('y ~ 1 + x1 + ...') and to assign it to an object!
:param panel_data : dataframe (which must be in a panel structure)
:param formula : patsy formula
:param weights : N x 1 Series or vector containing weights to be used in estimation; defaults to None
Use is recommended when analyzing survey data, passing on the weight available in the survey
:param cov : str
unadjusted: common standard errors
robust: robust standard errors
kernel: robust to heteroskedacity AND serial autocorrelation (HAC)
clustered: clustered standard errors by the entity column
:return : linearmodels model instance
"""
## Creating model instance
if weights is None:
mod = PooledOLS.from_formula(formula=formula, data=panel_data)
else:
mod = PooledOLS.from_formula(formula=formula, data=panel_data, weights=weights)
## Fitting with desired covariance matrix
mod = mod.fit(cov_type='clustered', cluster_entity=True) if cov == 'clustered' else mod.fit(cov_type=cov)
# Prints summary and returning
print(mod.summary)
return mod
def first_difference(panel_data, formula, weights=None, cov="kernel"):
"""
Fits a standard FD model with the corresponding covariance matrix and WITHOUT an intercept.
Remember to assign it to an object!
Usually, it is preferred to FE when the error is a random walk, i.e., when past errors
are translated into future periods (high serial autocorrelation).
:param panel_data : dataframe (which must be in a panel structure)
:param formula : patsy formula
:param weights : N x 1 Series or vector containing weights to be used in estimation; defaults to None
Use is recommended when analyzing survey data, passing on the weight available in the survey
:param cov : str
unadjusted: common standard errors
robust: robust standard errors
kernel: robust to heteroskedacity AND serial autocorrelation (HAC)
clustered: clustered standard errors by the entity column
:return : linearmodels model instance
"""
## Creating model instance
if weights is None:
mod = FirstDifferenceOLS.from_formula(formula=formula, data=panel_data)
else:
mod = FirstDifferenceOLS.from_formula(formula=formula, data=panel_data, weights=weights)
## Fitting with desired covariance matrix
mod = mod.fit(cov_type='clustered', cluster_entity=True) if cov == 'clustered' else mod.fit(cov_type=cov)
print(mod.summary)
return mod
def fixed_effects(panel_data, formula, weights=None, time_effects=False, cov="kernel"):
"""
Fits a standard Fixed Effects model with the corresponding covariance matrix.
It can be estimated WITH (use 'y ~ 1 + ... ') and WITHOUT a constant.
It is preferred when the unobserved effects are correlated with the error term and, therefore,
CAN'T estimate constant terms.
Usually, it is preferred to FD when the short-term variations are small and the error doesn't follow
a random walk, i.e., when there is no or little serial autocorrelation.
:param panel_data : dataframe (which must be in a panel structure)
:param formula : patsy/R formula (without EntityEffects, will be added inside the function)
:param weights : N x 1 Series or vector containing weights to be used in estimation; defaults to None
Use is recommended when analyzing survey data, passing on the weight available in the survey
:param time_effects : bool, defaults to False
Whether to include time effects alongside entity effects (and estimate a two-way fixed effects)
:param cov : str
unadjusted: common standard errors
robust: robust standard errors
kernel: robust to heteroskedacity AND serial autocorrelation (HAC)
clustered: clustered standard errors by the entity column
:return : linearmodels model instance
"""
## Creating model instance
# Defining which effects to control for
formula += ' + EntityEffects + TimeEffects' if time_effects else ' + EntityEffects'
## Creating model instance
if weights is None:
mod = PanelOLS.from_formula(formula=formula, data=panel_data, drop_absorbed=True)
else:
mod = PanelOLS.from_formula(formula=formula, data=panel_data, drop_absorbed=True, weights=weights)
## Fitting with desired covariance matrix
mod = mod.fit(cov_type='clustered', cluster_entity=True) if cov == 'clustered' else mod.fit(cov_type=cov)
print(mod.summary)
return mod
def random_effects(panel_data, formula, weights=None, cov="kernel"):
"""
Fits a standard Random Effects model with the corresponding covariance matrix.
It can be estimated WITH (use 'y ~ 1 + ... ') and WITHOUT a constant.
It is preferred when the unobserved effects aren't correlated with the error term
and, therefore, CAN estimate constant terms and is more efficient than FE in that case.
:param panel_data : dataframe (which must be in a panel structure)
:param formula : patsy formula
:param weights : N x 1 Series or vector containing weights to be used in estimation; defaults to None
Use is recommended when analyzing survey data, passing on the weight available in the survey
:param cov : str
unadjusted: common standard errors
robust: robust standard errors
kernel: robust to heteroskedacity AND serial autocorrelation (HAC)
clustered: clustered standard errors by the entity column
:return : linearmodels model instance
"""
## Creating model instance
if weights is None:
mod = RandomEffects.from_formula(formula=formula, data=panel_data)
else:
mod = RandomEffects.from_formula(formula=formula, data=panel_data, weights=weights)
## Fitting with desired covariance matrix
mod = mod.fit(cov_type='clustered', cluster_entity=True) if cov == 'clustered' else mod.fit(cov_type=cov)
print(mod.summary)
return mod
def hausman_fe_re(panel_data, inef_formula, weights=None, cov="kernel", level=0.05):
"""
Executes a Hausman test, which H0: there is no correlation between unobserved effects and the independent variables
It is not necessary to assign the function to an object! But remember to include an intercept in the formulas.
:param panel_data : dataframe (which must be in a panel structure)
:param inef_formula : patsy formula for the inefficient model under H0 (fixed effects)
:param weights : N x 1 Series or vector containing weights to be used in estimation; defaults to None
Use is recommended when analyzing survey data, passing on the weight available in the survey
:param cov : str
unadjusted: common standard errors
robust: robust standard errors
kernel: robust to heteroskedacity AND serial autocorrelation (HAC)
:param level : significance level for the test. Defaults to 5%.
"""
## Random Effects
if weights is None:
random = RandomEffects.from_formula(formula=inef_formula, data=panel_data).fit(cov_type=cov)
else:
random = RandomEffects.from_formula(formula=inef_formula, data=panel_data, weights=weights).fit(cov_type=cov)
## Fixed Effects
formula_fe = inef_formula + ' + EntityEffects'
if weights is None:
fixed = PanelOLS.from_formula(formula=formula_fe, data=panel_data, drop_absorbed=True).fit(cov_type=cov)
else:
fixed = PanelOLS.from_formula(formula=formula_fe, data=panel_data,
drop_absorbed=True, weights=weights).fit(cov_type=cov)
## Computing the Hausman statistic
# Difference between asymptotic variances
var_assin = fixed.cov - random.cov
# Difference between parameters
d = fixed.params - random.params
# Calculating H (statistic)
H = d.dot(np.linalg.inv(var_assin)).dot(d)
# Degrees of freedom
freedom = random.params.size - 1
# Calculating p-value using chi2 survival function (sf, 1 - cumulative distribution function)
p = round(stats.chi2(freedom).sf(H), 6)
if p < level:
print(f"The value of H is {round(H, 6)} with {freedom} degrees of freedom in the chi-squared distribution.")
print(f"The p-value of the test is {p} and, therefore, H0 is REJECTED and FE is preferred (RE is biased).")
else:
print(f"The value of H is {round(H, 6)} with {freedom} degrees of freedom in the chi-squared distribution.")
print(f"The test p-value is {p}; H0 is NOT REJECTED and RE is more efficient than FE (both aren't biased).")
def iv_2sls(data, formula, weights=None, cov="robust", clusters=None):
"""
Fits a 2SLS model with the corresponding covariance matrix.
The endogenous terms can be formulated using the following syntax: lwage ~ 1 + [educ ~ psem + educ_married] + age...
Remember to include an intercept in the formula ('y ~ 1 + x1 + ...') and to assign it to an object!
:param data : dataframe
:param formula : patsy formula ('lwage ~ 1 + [educ ~ psem + educ_married] + age + agesq...')
:param weights : N x 1 Series or vector containing weights to be used in estimation; defaults to None
Use is recommended when analyzing survey data, passing on the weight available in the survey
:param cov : str
unadjusted: common standard errors
robust: robust standard errors
kernel: robust to heteroskedacity AND serial autocorrelation (HAC)
clustered: clustered standard errors by the entity column
:param clusters : str or list containing names of the DataFrame variables to cluster by
Only should be used when cov="clustered"
:return : linearmodels model instance
"""
## Creating model instance
if weights is None:
mod = IV2SLS.from_formula(formula=formula, data=data)
else:
mod = IV2SLS.from_formula(formula=formula, data=data, weights=weights)
## Fitting with desired covariance matrix
mod = mod.fit(cov_type='clustered', clusters=data[clusters]) if cov == 'clustered' else mod.fit(cov_type=cov)
## Summary
print(mod.summary)
# Helpful information
print("To see 1st stage results (and if the instruments are relevant with Partial P-Value), call 'mod.first_stage")
print("To check if the instrumentated variable is exogenous, call 'mod.wooldridge_regression'.")
print("To test for the instruments exogeneity (when they are more numerous than the number of endogenous variables")
print("- therefore, are overidentified restrictions), call 'mod.wooldridge_overid' (Ho: instruments are exogenous)")
## Returning the object
return mod
####################################### Discrete Dependent Variables and Selection Bias #########################
## Tobit implementation: http://www.upfie.net/downloads17.html
def probit_logit(formula, data, model=probit, subset=None, cov='robust', marg_effects='overall'):
"""
Creates a probit/logit model and returns its summary and average parcial effects (get_margeff).
Documentation: https://www.statsmodels.org/stable/examples/notebooks/generated/discrete_choice_example.html
Remember to use mod = probit_logit(...)!
:param formula: patsy formula
:param data: dataframe
:param model: probit or logit. Defaults to probit.
:param subset: only use a subset of the data? Defaults to None (all data)
Must be in the form of `subset=(df['subset_column'] == 1)`.
:param cov : str
normal: common standard errors
robust: HC1 standard errors
cluster or clustered: clustered standard errors (must specify group)
:param marg_effects : str, either 'overall' (Average Partial Effects - APE), 'mean' (Partial Effects at the Average
- PEA) or 'zero'. Defaults to 'overall' (APE).
:return : statsmodels model instance
"""
# Creating and fitting the model
if cov == "robust":
mod = model(formula, data, subset).fit(use_t=True, cov_type='HC1')
elif cov == "cluster" or cov == "clustered":
group = str(input("What is the group column?"))
try:
mod = model(formula, data, subset).fit(use_t=True, cov_type='cluster', cov_kwds={'groups': data[group]})
except KeyError:
erro = "It was not possible to find the desired group. Check the spelling and the data and try again!"
return erro
else:
mod = model(formula, data, subset).fit(use_t=True)
## Capturing the marginal effects
mfx = mod.get_margeff(at=marg_effects)
clear_output()
print(mod.summary())
print("\n##############################################################################\n")
print(mfx.summary())
print(
"\nMarginal effects on certain values can be found using 'mod.get_margeff(atexog = values).summary()', " +
"where values must be generated using:\nvalues = dict(zip(range(1,n), values.tolist())).update({0:1})")
print(
"\nFor discrete variables, create a list of the values which you want to test and compute " +
"'mod.model.cdf(sum(map(lambda x, y: x * y, list(mod.params), values)))")
print("To predict values using the CDF, use mod.predict(X). X can be blank (use values from the dataset")
print("or a K x N Dimensional array, where K = number of variables and N = number of observations.")
return mod
def poisson_reg(formula, data, subset=None, cov='robust'):
"""
Creates a poisson model (counting y variable) and returns its summary and average parcial effects (get_margeff).
Documentation: https://www.statsmodels.org/stable/examples/notebooks/generated/discrete_choice_example.html
Remember to use mod = poisson_reg(...)!
:param formula: patsy formula
:param data: dataframe
:param subset: only use a subset of the data? Defaults to None (all data)
Must be in the form of `subset=(df['subset_column'] == 1)`.
:param cov: str
normal: common standard errors
robust: HC1 standard errors
cluster or clustered: clustered standard errors (must specify group)
:return : statsmodels model instance
"""
# Creating and fitting the model
if cov == "robust":
mod = poisson(formula, data, subset).fit(use_t=True, cov_type='HC1')
elif cov == "cluster" or cov == "clustered":
group = str(input("What is the group column?"))
try:
mod = poisson(formula, data, subset).fit(use_t=True, cov_type='cluster', cov_kwds={'groups': data[group]})
except KeyError:
erro = "It was not possible to find the desired group. Check the spelling and the data and try again!"
return erro
else:
mod = poisson(formula, data, subset).fit(use_t=True)
## Calculating under/overdispersion
sigma = np.around((sum(mod.resid ** 2 / mod.predict()) / mod.df_resid) ** (1 / 2), 2)
## Capturing marginal effects
mfx = mod.get_margeff(at='overall')
clear_output()
print(mod.summary())
print(
f"The coefficient to determine over/underdispersion is σ = {sigma}, " +
f"which must be close to 1 for standard errors to be valid. " +
f"If not, they must be multiplied by {sigma}.")
print("##############################################################################")
print(mfx.summary())
print(
"\nMarginal effects on certain values can be found using 'mod.get_margeff(atexog = values).summary()', " +
"where values must be generated using:\nvalues = dict(zip(range(1,n), values.tolist())).update({0:1})")
print(
"\nUsually, the wanted effect of the poisson coefficients is it's semi-elasticity, which is 100*[exp(ß) - 1].")
print("To predict values using the CDF, use mod.predict(X). X can be blank (use values from the dataset")
print("or a K x N Dimensional array, where K = number of variables and N = number of observations.")
return mod
def heckit(formula_probit, formula_model, data, subset_model, cov='robust'):
"""
Performs the Heckit procedure for sample selection correction.
The procedure is done through (1) a probit estimation for the selection variable using all available
data and (2) a OLS regression using the 'selected' data and a formula containing the Inverse Mills Ratio (λ).
Remember to use modHeckit = heckit(...)!
:param formula_probit: patsy/R formula for the probit model on the selection variable;
:param formula_model: patsy/R formula for the model on the 'selected' data;
:param data: dataframe containing the data
:param subset_model: only use a subset of the data? Defaults to None (all data)
Must be in the form of `subset=(df['subset_column'] == 1)`.
:param cov : str
unadjusted: common standard errors
robust: HC1 standard errors
cluster or clustered: clustered standard errors (must specify group)
:return : statsmodels model instance
"""
## Fitting the probit model
mod_probit = probit(formula_probit, data).fit()
## Calculating predicted values
predicted_values = mod_probit.fittedvalues
## Calculating λ and adding as a column to the DataFrame
data['inv_mills'] = stats.norm.pdf(predicted_values) / stats.norm.cdf(predicted_values)
## Appending inv_mills to formula_model
formula_model += ' + inv_mills'
## Fitting the ols_model
mod_heckit = ols_reg(formula_model, data, subset_model, cov)
return mod_heckit
####################################### Time Series #########################
### Box-Jenkings: Identification #########################
def stationarity_test(vColumn, nLevel=0.05, sConstandTrend="c", bGraph=True, nBinsGraph=20):
"""
Performs a stationarity test using the Augmented Dickey Fuller framework.
By definition, a stationary series should have constant mean and variance/standard error.
:param vColumn: DataFrame column/numpy vector in which to perform the test;
:param nLevel: significance level; defaults to 0.05.
:param sConstandTrend: which type of regression to perform in the test;
'c' - constant
'ct' - constant and trend
'ctt' - constant, linear and quadratic trend
:param bGraph: draw a graph displaying general and binned mean/standard error? Defaults to True.
:param nBinsGraph: if a graph is drawn, in how many bins the data is to be split?
"""
## Executing test
adf_results = adfuller(vColumn, regression=sConstandTrend)
## Getting p-value
p_value = float(np.around(adf_results[1], 5))
## Printing result
if p_value < nLevel:
print(f"ADF p-value: {p_value}. The H0 of non-stationarity and unit root is rejected (series is stationary).")
print("Consider some descriptive analysis using graphs and checking for NaNs! Also, create a DateTime index!")
else:
print(f"ADF p-value: {p_value}. The H0 of non-stationarity and unit root cannot be rejected.")
print("Try differentiating the series using .diff() in order to make it stationary and ready for model use.")
print("When testing the differenciated series, remember to remove the initial NaN.")
print("Consider some descriptive analysis using graphs and checking for NaNs! Also, create a DateTime index!")
## Drawing graph (if requested)
if bGraph:
## Binning data into nBinsGraph groups of (almost) equal size
chunks = np.array_split(vColumn.values, nBinsGraph)
## List to store mean and SEs of bins
vMeans, vSEs = [], []
## Statistic for each group
for chunk in chunks:
vMeans.append(np.mean(chunk))
vSEs.append(np.std(chunk))
## Plotting
plt.title('Means and Standard Deviations', size=20)
plt.plot(np.arange(len(vMeans)), [np.mean(vColumn.values)] * len(vMeans), label='Global Mean', lw=1.5)
plt.scatter(x=np.arange(len(vMeans)), y=vMeans, label='Chunk Means', s=100)
plt.plot(np.arange(len(vSEs)), [np.std(vColumn.values)] * len(vSEs), label='Global SE', lw=1.5, color='orange')
plt.scatter(x=np.arange(len(vSEs)), y=vSEs, label='Chunk SEs', color='orange', s=100)
plt.legend()
def plot_autocorrelation(vColumn, nLags=12):
"""
Plots the autocorrelation and partial autocorrelation function for vColumn.
:param vColumn: DataFrame column/numpy vector;
:param nLags: number of lags.
"""
## Plotting autocorrelation
print("Plotting autocorrelation (determines 'Q' in ARIMA)...")
print("Shaded area: zone of significance.")
plot_acf(vColumn.tolist(), lags=nLags)
## Plotting partial autocorrelation
print("Plotting partial autocorrelation (determines 'P' in ARIMA)...")
plot_pacf(vColumn.tolist(), lags=nLags)
def cointegration(vColumn1, vColumn2, nLevel=0.05, sTrend='c'):
"""
Performs the Engle-Granger test for cointegration. Both series must be differentiated ONE time.
:param vColumn1: DataFrame column/numpy vector;
:param vColumn2: DataFrame column/numpy vector;
:param nLevel: level of significance
:param sTrend: string containing which trend to consider:
'c' - constant
'ct' - constant and trend
'ctt' - constant, linear and quadratic trend
"""
## Plotting
plt.plot(vColumn1, color="red", label="Series 1")
plt.plot(vColumn2, color="blue", label="Series 2")
plt.legend()
## Performing test
print("Reminder: both series must be I(1) (differentiated/integrated of order 1).")
coint_test = coint(vColumn1, vColumn2, trend=sTrend)
## Getting p-value
p_value = float(np.around(coint_test[1], 5))
## Printing result
if p_value < nLevel:
print(f"Coint p-value: {p_value}. The H0 of no cointegration is rejected (series are cointegrated).")
else:
print(f"Coint p-value: {p_value}. The H0 of no cointegration cannot be rejected (series are not cointegrated).")
### Box-Jenkings: Estimation #########################
def arima_model(vEndog, mExog=None, tPDQ=None):
"""
Fits an ARIMA model. Order can be specified or determined by auto_arima.
Differently from other models, it does not work on patsy/R formula syntax.
:param vEndog: DataFrame column/numpy vector containing endogenous data (which will be regressed upon itself)
:param mExog: vector/matrix containing exogenous data. Defaults to None
:param tPDQ: tuple (p, d, q) containing order of the model;
p: number of autorregressions (AR)
q: number of differentiations (I)
q: number of past prevision errors/moving averages (MA)
If None (default), performs an auto_arima()
:return mod: fitted model instance
"""
## Creating model
# If order is specified
if tPDQ is not None:
# Conditional on whether there are exogenous variables
if mExog is None:
mod_arima = ARIMA(endog=vEndog, order=tPDQ).fit(cov_type='robust')
else:
mod_arima = ARIMA(endog=vEndog, exog=mExog, order=tPDQ).fit(cov_type='robust')
# If order isn't specified, use auto_arima()
else:
mod_arima = auto_arima(y=vEndog, X=mExog)
mod_arima = mod_arima.fit(y=vEndog, cov_type='robust')
## Printing summary and diagnostics
print(mod_arima.summary())
print("For heteroskdasticity, check Prob(H), where H0: homoskedasticity, and the standardized residual graph.")
print("If there is heteroskdasticity, the model error can't be a white noise (which is the desired thing).")
print("Estimated Density and Jarque-Bera have information on normality.")
print("In the correlogram, all lollipops must be inside of the shaded area.")
# Plots
mod_arima.plot_diagnostics(figsize=(10, 10))
plt.show()
# Residual means
tMean0 = stats.ttest_1samp(mod_arima.resid(), 0, nan_policy='omit')
print(f"P-value for the test that residual mean is equal to 0: {np.around(tMean0[1], 5)}.")
print("If p < 0.05, H0 is rejected and the residual mean is different from 0 (not ideal).")
## Returning
return mod_arima
### Box-Jenkings: Diagnostics and Prediction #########################
def arima_fit_prediction(modARIMA, dfData, sColumn, nPeriods, sFreq,
sFitColumn="Fit", sFitPercentageErrorColumn="ErroFit"):
"""
Investigates the fit and predicts nPeriods ahead. In order to work, dfData must have a date-like index.
:param modARIMA: ARIMA model instance (from auto_arima())
:param dfData: DataFrame containing the data; must contain a DateTime index
:param sColumn: string of the column that contains the endogenous variable in modARIMA
:param nPeriods: number of periods ahead to forecast
:param sFreq: "months", "days", "years...". See pd.offsets.DateOffset for options.
:param sFitColumn: string of the column that will be created containing fitted values
:param sFitPercentageErrorColumn: string of the column that will be created containing fitted values % errors
:return
dfData: modified DataFrame containing fitted values and errors
seriesPrediction: series containing the prediction of nPeriods ahead
"""
## Getting fitted values
vFit = modARIMA.predict_in_sample((0, dfData[sColumn].shape[0] - 1))
## Adding to DataFrame
dfData[sFitColumn] = vFit
## Calculating percentage error
dfData[sFitPercentageErrorColumn] = 100 * np.abs((dfData[sColumn] - dfData[sFitColumn]) / dfData[sColumn])
## Describing errors
print(f"Pseudo-R2: {np.around(stats.pearsonr(dfData[sColumn], dfData[sFitColumn])[0] ** 2, 4)}.")
print("Describing percentage errors...")
print(dfData[sFitPercentageErrorColumn].describe())
## Predicting nPeriods ahead
vPrediction, mConfInt = modARIMA.predict(n_periods=nPeriods, return_conf_int=True)
## Creating index of prediction
# Dictionaries to pass sFreq as argument using **
dateStart = dfData.index[-1]
dictPandasOffsetStart = {sFreq: 1}
dictPandasOffsetEnd = {sFreq: 1 + nPeriods}
# Index
indexPrediction = pd.date_range(start=dateStart + pd.offsets.DateOffset(**dictPandasOffsetStart),
end=dateStart + pd.offsets.DateOffset(**dictPandasOffsetEnd),
periods=nPeriods, normalize=True).normalize()
# Creating series to plot
seriesPrediction = pd.Series(vPrediction, index=indexPrediction)
lower_series = pd.Series(mConfInt[:, 0], index=indexPrediction)
upper_series = pd.Series(mConfInt[:, 1], index=indexPrediction)