In [ ]:
# 必要なライブラリをインストール
!pip install japanize_matplotlib
!pip install pmdarima
In [ ]:
# ライブラリを起動
from dateutil.relativedelta import relativedelta
import japanize_matplotlib
import math as math
import numpy as np
import pandas as pd
import scipy as sp
import random
import pmdarima as pm
import statsmodels.api as sm
from statsmodels.tsa.api import VAR
from sklearn.linear_model import LinearRegression
from matplotlib import pyplot as plt
from matplotlib.pylab import rcParams
rcParams["figure.figsize"] = 15, 6
# Colabでファイルを読み込むために、Google Driveをマウント
from google.colab import drive
drive.mount('/content/drive')
Mounted at /content/drive
(4) 局所予測法による動学的因果推論¶
(4-1) 局所予測法(LP)による物価と金利のインパルス応答関数¶
- 変数:物価水準、金利
- 期間:1972年第1四半期から2023年第4四半期まで
- LPのラグ次数は16四半期とする
- 黒実線がLPのインパルス応答関数、破線がその信頼区間、グレー線がVARのインパルス応答関数
- 信頼区間の幅は90%ile
In [ ]:
# データの読み込み
dinput = pd.read_excel('/content/drive/My Drive/Data/data_VAR4.xlsx',
sheet_name='data_q', header=[0])
data = np.zeros([len(dinput), 2])
for i in range(1, len(dinput)):
data[:, 0] = np.log(dinput.cpi)
temp = pm.arima.decompose(data[:, 0], 'additive', m=4)
data[:, 0] = (np.log(dinput.cpi) - temp.seasonal) *100
data[:, 1] = dinput.RSR
dlen = len(data)
# 基準化定数の設定
x11 = 0.207
x12 = 0.476
x21 = 0.395
x22 = 0.395
adjcon = [1/x11, 1/x12, 1/x21, 1/x22]
# LPの推定
L = 16 + 1
MAX_H = 51
CONFIDENCE_LEVEL = 0.90
coef_result, std_result = np.zeros([MAX_H, len(data.T)*len(data.T)]), np.zeros(
[MAX_H, len(data.T)*len(data.T)])
lbound, ubound = sp.stats.norm.interval(confidence=CONFIDENCE_LEVEL, loc=0,
scale=1)
ssize = len(data)
counter = 0
h = 0
for dep in range(0, len(data.T), 1):
for indep in range(0, len(data.T), 1):
if dep == 0 and indep == 0:
coef_result[h, 0] = np.array(1)
elif dep == 0 and indep == 1:
coef_result[h, 1] = np.array(0)
elif dep == 1 and indep == 0:
sample_y = pd.DataFrame(data[L + h - 1 : ssize, dep])
sample_x = pd.DataFrame(data[L - 1 : L - 1 + len(sample_y), indep])
for l in range(1, L, 1):
sample_x_lagged = pd.DataFrame(data[L - 1 - l :
L - 1 - l + len(sample_y)])
sample_x = pd.concat([sample_x, sample_x_lagged], axis=1)
sample_x = sm.add_constant(sample_x)
model = sm.OLS(sample_y, sample_x)
result = model.fit(cov_type='HAC', cov_kwds={'maxlags':1,
'kernel':'bartlett'})
coef_result[h, 2] = np.array(result.params.iloc[indep + 1])
std_result[h, 2] = np.array(result.bse.iloc[indep + 1])
elif dep == 1 and indep == 1:
coef_result[h, 3] = np.array(1)
for dep in range(0, len(data.T), 1):
for indep in range(0, len(data.T), 1):
for h in range(1, MAX_H, 1):
if dep == 0 and indep == 0:
sample_y = pd.DataFrame(data[L + h - 1 : ssize, dep])
sample_x = pd.DataFrame(data[L - 1 : L - 1 + len(sample_y),
indep])
for l in range(1, L, 1):
sample_x_lagged = pd.DataFrame(data[L - 1 - l :
L - 1 - l +
len(sample_y)])
sample_x = pd.concat([sample_x, sample_x_lagged], axis=1)
sample_x = sm.add_constant(sample_x)
model = sm.OLS(sample_y, sample_x)
result = model.fit(cov_type='HAC',
cov_kwds={'maxlags':h + 1,
'kernel':'bartlett'})
coef_result[h, counter] = np.array(result.params.iloc[indep+1])
std_result[h, counter] = np.array(result.bse.iloc[indep+1])
if dep == 0 and indep == 1:
sample_y = pd.DataFrame(data[L + h - 1 : ssize, dep])
sample_x = pd.DataFrame(data[L - 1 : L - 1 + len(sample_y)])
for l in range(1, L, 1):
sample_x_lagged = pd.DataFrame(data[L - 1 - l :
L - 1 - l +
len(sample_y)])
sample_x = pd.concat([sample_x, sample_x_lagged], axis=1)
sample_x = sm.add_constant(sample_x)
model = sm.OLS(sample_y, sample_x)
result = model.fit(cov_type='HAC',
cov_kwds={'maxlags':h + 1,
'kernel':'bartlett'})
coef_result[h, counter] = np.array(result.params.iloc[indep+1])
std_result[h, counter] = np.array(result.bse.iloc[indep+1])
if dep == 1 and indep == 0:
sample_y = pd.DataFrame(data[L + h - 1 : ssize, dep])
sample_x = pd.DataFrame(data[L - 1 : L - 1 + len(sample_y),
indep])
for l in range(1, L, 1):
sample_x_lagged = pd.DataFrame(data[L - 1 - l :
L - 1 - l +
len(sample_y)])
sample_x = pd.concat([sample_x, sample_x_lagged], axis=1)
sample_x = sm.add_constant(sample_x)
model = sm.OLS(sample_y, sample_x)
result = model.fit(cov_type='HAC',
cov_kwds={'maxlags':h + 1,
'kernel':'bartlett'})
coef_result[h, counter] = np.array(result.params.iloc[indep+1])
std_result[h, counter] = np.array(result.bse.iloc[indep+1])
if dep == 1 and indep == 1:
sample_y = pd.DataFrame(data[L + h - 1 : ssize, dep])
sample_x = pd.DataFrame(data[L - 1 : L - 1 + len(sample_y)])
for l in range(1, L, 1):
sample_x_lagged = pd.DataFrame(data[L - 1 - l :
L - 1 - l +
len(sample_y)])
sample_x = pd.concat([sample_x, sample_x_lagged], axis=1)
sample_x = sm.add_constant(sample_x)
model = sm.OLS(sample_y, sample_x)
result = model.fit(cov_type='HAC',
cov_kwds={'maxlags':h + 1,
'kernel':'bartlett'})
coef_result[h, counter] = np.array(result.params.iloc[indep+1])
std_result[h, counter] = np.array(result.bse.iloc[indep+1])
counter = counter + 1
# VAR(5)モデルのインパルス応答関数
models = VAR(data)
results = models.fit(5)
irf = results.irf(MAX_H)
impulse = irf.orth_irfs
# グラフの描画
vnames = ['物価水準', '金利']
snames = ['物価ショック', '金融政策ショック']
fig, ax=plt.subplots(2, 2, figsize=[10,10])
gcounter = 0
for i in range(2):
for j in range(2):
ax[i,j].plot(coef_result[:, gcounter], linewidth=2, color='black',
label='LPのインパルス応答関数')
ax[i,j].plot(coef_result[:, gcounter] + std_result[:, gcounter] * lbound,
linewidth=1, linestyle='dashed', color='black',
label='信頼区間の上限下限')
ax[i,j].plot(coef_result[:, gcounter] + std_result[:, gcounter] * ubound,
linewidth=1, linestyle='dashed', color='black')
ax[i,j].plot(impulse[:,i,j] * adjcon[gcounter], linewidth=2,
color='grey', label='VARモデルのインパルス応答関数')
ax[i,j].hlines([0], 0, MAX_H, color='black', linestyle='solid')
ax[i,j].set_xlim(0, MAX_H - 1)
ax[i,j].set_xlabel('四半期', fontsize=14)
ax[i,j].set_ylabel('インパルス応答', fontsize=14)
ax[i,j].set_title('%s→%s' %(snames[j],vnames[i]), fontsize=18)
ax[0,0].legend(loc='lower left', frameon=False, fontsize=11)
ax[0,0].set_ylim(-3,5)
ax[0,1].set_ylim(-2,2)
ax[1,0].set_ylim(-2,2)
ax[1,1].set_ylim(-1,2)
gcounter = gcounter + 1
plt.subplots_adjust(wspace=0.4, hspace=0.3)
plt.show()
(4-2) 構造ショックの観測値を用いた局所予想法(LP):Romer and Romer (2023)の実質GDPのケース¶
- 変数:ナラティブ・アプローチによる米国の金融政策ショック、米国の実質GDP
- 期間:1947年第1四半期から2016年第4四半期まで
- ラグ次数は4四半期とする
In [ ]:
# データの読み込み
dinput = pd.read_excel('/content/drive/My Drive/Data/data_VAR4.xlsx',
sheet_name='RomerRomer', header=[0])
data = np.zeros([len(dinput), 2])
for i in range(1, len(dinput)):
data[:, 0] = dinput.RRDUMMY
data[:, 1] = np.log(dinput.GDP) * 100
data = data[4:284, ]
dlen = len(data)
# 基準化定数の設定
x1 = 1
# LPの推定
L = 4 + 1
MAX_H = 21
CONFIDENCE_LEVEL = 0.95
coef_result, std_result = np.zeros([MAX_H, 1]), np.zeros([MAX_H, 1])
lbound, ubound = sp.stats.norm.interval(confidence=CONFIDENCE_LEVEL, loc=0,
scale=1)
ssize = len(data)
counter = 0
h = 0
sample_y = pd.DataFrame(data[L + h - 1 : ssize, 1])
sample_x = pd.DataFrame(data[L - 1 : L - 1 + len(sample_y), 0])
for l in range(1, L, 1):
sample_x_lagged = pd.DataFrame(data[L - 1 - l :
L - 1 - l + len(sample_y)])
sample_x = pd.concat([sample_x, sample_x_lagged], axis=1)
sample_x = sm.add_constant(sample_x)
model = sm.OLS(sample_y, sample_x)
result = model.fit(cov_type='HAC', cov_kwds={'maxlags':1, 'kernel':'bartlett'})
coef_result[h, 0] = np.array(result.params.iloc[1] * x1)
std_result[h, 0] = np.array(result.bse.iloc[1] * x1)
for h in range(1, MAX_H, 1):
sample_y = pd.DataFrame(data[L + h - 1 : ssize, 1])
sample_x = pd.DataFrame(data[L - 1 : L - 1 + len(sample_y), 0])
for l in range(1, L, 1):
sample_x_lagged = pd.DataFrame(data[L - 1 - l : L - 1 - l +
len(sample_y)])
sample_x = pd.concat([sample_x, sample_x_lagged], axis=1)
sample_x = sm.add_constant(sample_x)
model = sm.OLS(sample_y, sample_x)
result = model.fit(cov_type='HAC', cov_kwds={'maxlags':h + 1,
'kernel':'bartlett'})
coef_result[h, 0] = np.array(result.params.iloc[1] * x1)
std_result[h, 0] = np.array(result.bse.iloc[1] * x1)
# グラフの描画
plt.figure(figsize=(10,5))
plt.plot(coef_result, linewidth=2, color='black')
plt.plot(coef_result + std_result * lbound, linewidth=1, linestyle='dashed',
color='black')
plt.plot(coef_result + std_result * ubound, linewidth=1, linestyle='dashed',
color='black')
plt.hlines([0], 0, MAX_H, color='black', linestyle='solid')
plt.xticks(np.arange(0, MAX_H, 2))
plt.xlim(0, MAX_H - 1)
plt.xlabel('四半期', fontsize=14)
plt.ylabel('インパルス応答', fontsize=14)
plt.ylim(-8, 2)
Out[ ]:
(-8.0, 2.0)