In [ ]:
# 必要なライブラリをインストール
!pip install japanize_matplotlib
In [ ]:
# ライブラリを起動
from dateutil.relativedelta import relativedelta
import japanize_matplotlib
import math as math
import numpy as np
import pandas as pd
import scipy as sp
from numpy.linalg import inv, cholesky, qr, eigvals
from scipy import stats
from scipy.stats import norm, invwishart, multivariate_normal
import random
import statsmodels.api as sm
from statsmodels.regression.linear_model import OLS
from statsmodels.tsa.api import VAR
from sklearn.linear_model import LinearRegression
from datetime import datetime as dt
import datetime
import os
from matplotlib import pyplot as plt
import matplotlib.dates as mdates
import matplotlib.ticker as ticker
from matplotlib.pylab import rcParams
rcParams["figure.figsize"] = 15, 6
# Colabでファイルを読み込むために、Google Driveをマウント
from google.colab import drive
drive.mount('/content/drive')
Mounted at /content/drive

(11) ナラティブ符号制約と識別集合¶

ナラティブ符号制約構造VARモデルによる金融政策の評価¶

  • 変数:実質GDP(対数値)、インフレ率(除く食料・エネルギー、消費税率の影響を調整済み)、10年物国債利回り、名目実効為替レート(対数値)、日経平均株価(対数値)、銀行貸出残高(前年比)
  • 期間は2007年1月から2024年12月
  • ラグ次数はAICにより2か月
  • 事前分布にはミネソタ事前分布を用いる
  • ショックの識別は、ショック符号制約(10年物国債利回りの構造ショックである金融政策ショックが2013年4月、2014年11月、2016年2月の3時点で負〈拡張的〉である制約)とB型の歴史分解制約(2016年2月の長期金利の変化に占める金融政策ショックが残りの構造ショックの総和より大きく寄与したという圧倒的寄与型の制約)の両方
  • 金融引締ショックに対するインパルス応答関数の事後メジアンと 68%(等裾)信用区間が示されている(対数変換されていた実質GDP、名目実効為替レート、日経平均株価は%表示に変換済み)
In [ ]:
np.random.seed(0)

# データ
dinput = pd.read_excel('/content/drive/My Drive/Data/data_VAR11.xlsx',
                       sheet_name='data_m', header=0, index_col=0)
excel_order = ['GDP', 'CPI', 'LTR', 'EXR', 'NIKKEI', 'LENDING']
Yraw = dinput[excel_order].values
dates = pd.to_datetime(dinput.index.astype(str), format='%Y.%m')
Traw, k = Yraw.shape
p = 2
hmax = 120

# ハイパーパラメータの設定
lambda_ = 0.3
alpha = 2.0
theta = 1.0
ncoef = 1 + k * p

# サンプリングの設定
numDesiredDraws = 5000
M_star = 1000
Qs_per_BetaSigma = 1000

# 補助関数
def compute_psi_ar1(Y):
    T, k = Y.shape
    psi = np.zeros(k)
    for i in range(k):
        y_dep = Y[1:, i]
        x_reg = np.column_stack([np.ones(T - 1), Y[:-1, i]])
        beta = inv(x_reg.T @ x_reg) @ (x_reg.T @ y_dep)
        resid = y_dep - x_reg @ beta
        psi[i] = (resid @ resid) / len(resid)
    return psi

def add_single_unit_root_dummy(Yraw, X, Y_trim, p, theta):
    Y0 = np.mean(Yraw[:p, :], axis=0)
    Y_dummy = (1.0 / theta) * Y0.reshape(1, -1)
    X_dummy = np.hstack([
        np.array([[1.0 / theta]]),
        (1.0 / theta) * np.tile(Y0, p).reshape(1, -1)
    ])
    Y_aug = np.vstack([Y_dummy, Y_trim])
    X_aug = np.vstack([X_dummy, X])
    return Y_aug, X_aug

def build_minnesota_prior_system(k, p):
    B0 = np.zeros((ncoef, k))
    for i in range(k):
        idx_first_lag = 1 + i
        B0[idx_first_lag, i] = 1.0
    return B0

def build_N0_minnesota(k, p, lambda_, alpha, psi):
    N0_diag = np.zeros(ncoef)
    N0_diag[0] = 1.0 / 1e6
    d = k + 2
    for l in range(1, p + 1):
        start = 1 + (l - 1) * k
        end = 1 + l * k
        omega_block = (d - k - 1) * (lambda_ ** 2) * (1.0 / (l ** alpha)) / psi
        N0_diag[start:end] = 1.0 / omega_block
    return np.diag(N0_diag)

def compute_conjugate_sigma_posterior(Y_used, X_used, psi, B0, N0):
    T_eff, k = Y_used.shape
    XX = X_used.T @ X_used
    B_OLS = inv(XX) @ (X_used.T @ Y_used)
    U_OLS = Y_used - X_used @ B_OLS
    SIGMA_OLS = (U_OLS.T @ U_OLS) / T_eff
    S0 = np.diag(psi)
    v0 = k + 2
    NT = N0 + XX
    vT = T_eff + v0
    middle = (B_OLS - B0).T @ N0 @ inv(NT) @ XX @ (B_OLS - B0)
    ST = (v0 / vT) * S0 + (T_eff / vT) * SIGMA_OLS + (1.0 / vT) * middle
    S_post = ST * vT
    v_post = vT
    return S_post, v_post, XX

def draw_sigma(S_post, nu_post):
    return invwishart.rvs(df=nu_post, scale=S_post)

def draw_beta_systemwide_matlab_prior(Sigma, XX, XY, B0, N0):
    NT = N0 + XX
    B_OLS = inv(XX) @ XY
    Bbar = inv(NT) @ (N0 @ B0 + XX @ B_OLS)
    b_post = Bbar.reshape(-1, order='F')
    V_post = np.kron(Sigma, inv(NT))
    V_post = 0.5 * (V_post + V_post.T)
    C = cholesky(V_post)
    b_draw = b_post + C @ np.random.randn(len(b_post))
    beta_draw = b_draw.reshape(B0.shape, order='F')
    return beta_draw

def max_root_var(beta_draw, k, p):
    A_list = []
    for l in range(p):
        A_l = beta_draw[1 + l * k: 1 + (l + 1) * k, :].T
        A_list.append(A_l)
    if p == 1:
        companion = A_list[0]
    else:
        top = np.hstack(A_list)
        bottom = np.hstack([np.eye(k * (p - 1)), np.zeros((k * (p - 1), k))])
        companion = np.vstack([top, bottom])
    roots = eigvals(companion)
    return np.max(np.abs(roots))

def random_orthogonal(dim):
    Z = np.random.randn(dim, dim)
    Q, R = qr(Z)
    Q = Q @ np.diag(np.sign(np.diag(R)))
    return Q

def recover_shocks(U, B):
    return (inv(B) @ U.T).T

def compute_ma_matrices(A, H, p):
    k = A.shape[1]
    Phi = np.zeros((H + 1, k, k))
    Phi[0] = np.eye(k)
    for h in range(1, H + 1):
        for j in range(1, min(p, h) + 1):
            Phi[h] += A[j - 1] @ Phi[h - j]
    return Phi

def historical_decomp(A, B, U_draw, t, p):
    shocks = recover_shocks(U_draw, B)
    Phi = compute_ma_matrices(A, t, p)
    k = B.shape[0]
    contrib = np.zeros((k, k))
    for s in range(k):
        for j in range(t + 1):
            contrib[:, s] += (Phi[j] @ B[:, s]) * shocks[t - j, s]
    return contrib

X = np.hstack([Yraw[p - 1 - i:Traw - 1 - i] for i in range(p)])
X = np.hstack([np.ones((Traw - p, 1)), X])
Y_trim = Yraw[p:]

# ナラティブの設定
target_periods = pd.PeriodIndex(['2013-04', '2014-11', '2016-02'], freq='M')
date_periods = dates[p:].to_period('M')
event_idx = [np.where(date_periods == per)[0][0] for per in target_periods]
policy_var = excel_order.index('LTR')
shock_id = excel_order.index('LTR')

psi = compute_psi_ar1(Yraw)
B0 = build_minnesota_prior_system(k, p)
N0 = build_N0_minnesota(k, p, lambda_, alpha, psi)
Y_aug, X_aug = add_single_unit_root_dummy(Yraw, X, Y_trim, p, theta=theta)
S_post, nu_post, XX = compute_conjugate_sigma_posterior(
    Y_used=Y_aug,
    X_used=X_aug,
    psi=psi,
    B0=B0,
    N0=N0
)
XY = X_aug.T @ Y_aug

store_irf_raw = []
store_impact_raw = []
store_w = []
accept_count = 0
attempt_count = 0
stable_count = 0
q_attempt_count = 0
q_accept_count = 0
while accept_count < numDesiredDraws:
    attempt_count += 1
    # ステップ1:分散共分散行列と係数行列の抽出
    Sigma_draw = draw_sigma(S_post, nu_post)
    beta_draw = draw_beta_systemwide_matlab_prior(Sigma_draw, XX, XY, B0, N0)
    max_root = max_root_var(beta_draw, k, p)
    if max_root > 1:
        continue
    stable_count += 1
    A_draw = np.zeros((p, k, k))
    for l in range(p):
        A_draw[l] = beta_draw[1 + l * k: 1 + (l + 1) * k, :].T
    U_draw = Y_trim - X @ beta_draw
    # ステップ2:直交行列の抽出
    P_chol = cholesky(Sigma_draw)
    for _ in range(Qs_per_BetaSigma):
        q_attempt_count += 1
        Q_draw = random_orthogonal(k)
        B_draw = P_chol @ Q_draw
        # ステップ3:ナラティブ符号制約のチェック
        shocks = recover_shocks(U_draw, B_draw)
        # (1) ショック符号制約
        ok = True
        for t in event_idx:
            if shocks[t, shock_id] >= 0:
                ok = False
                break
        # (2) 歴史分解の「圧倒的寄与」制約(2016年2月のみ評価)
        if ok:
            t_hd = event_idx[-1]
            contrib_draw = historical_decomp(A_draw, B_draw, U_draw, t_hd, p)
            own_contrib = abs(contrib_draw[policy_var, shock_id])
            others_contrib = np.sum(np.abs(contrib_draw[policy_var, :]
                                           )) - own_contrib
            if own_contrib <= others_contrib:
                ok = False
        if not ok:
            continue
        q_accept_count += 1
        # ステップ4:重点サンプリングの重みの計算
        count_satisfy = 0
        Phi_hd = compute_ma_matrices(A_draw, t_hd, p)
        for _ in range(M_star):
            fake_shocks = np.random.randn(U_draw.shape[0], k)
            good = True
            # (1) ショック符号制約
            for t in event_idx:
                if fake_shocks[t, shock_id] >= 0:
                    good = False
                    break
            if not good:
                continue
            # (2) 歴史分解の「圧倒的寄与」制約(2016年2月のみ評価)
            contrib_sim = np.zeros((k, k))
            for s in range(k):
                for j in range(t_hd + 1):
                    contrib_sim[:, s] += (Phi_hd[j] @ B_draw[:, s]
                                          ) * fake_shocks[t_hd - j, s]
            own_abs = abs(contrib_sim[policy_var, shock_id])
            others_abs = np.sum(np.abs(contrib_sim[policy_var, :])) - own_abs
            if own_abs > others_abs:
                count_satisfy += 1
        omega = count_satisfy / M_star
        if omega == 0:
            continue
        weight_draw = 1.0 / omega
        # インパルス応答関数の計算
        Phi = compute_ma_matrices(A_draw, hmax, p)
        irf_raw = np.array([Phi[h] @ B_draw[:, shock_id] for h in range(
            hmax + 1)])
        impact_raw = B_draw[policy_var, shock_id]
        if np.isclose(impact_raw, 0.0):
            continue
        store_irf_raw.append(irf_raw)
        store_impact_raw.append(impact_raw)
        store_w.append(weight_draw)
        accept_count += 1
        print(
            f"BetaSigma attempted: {attempt_count} | "
            f"Stable: {stable_count} | "
            f"Q attempted: {q_attempt_count} | "
            f"Q narrative-pass: {q_accept_count} | "
            f"Accepted: {accept_count}"
        )
        if accept_count >= numDesiredDraws:
            break
# ステップ5: 集計(重み付き累積分布によるメジアン・信用区間の計算)
store_irf_raw = np.array(store_irf_raw)
store_impact_raw = np.array(store_impact_raw)
store_w = np.array(store_w)
def weighted_percentile(data, weights, percentile):
    sorted_indices = np.argsort(data)
    sorted_data = data[sorted_indices]
    sorted_weights = weights[sorted_indices]
    cumulative_weights = np.cumsum(sorted_weights)
    cumulative_weights /= cumulative_weights[-1]
    idx = np.searchsorted(cumulative_weights, percentile, side='left')
    return sorted_data[idx]
median_irf_raw = np.zeros_like(store_irf_raw[0])
lower_irf_raw = np.zeros_like(store_irf_raw[0])
upper_irf_raw = np.zeros_like(store_irf_raw[0])
for h in range(store_irf_raw.shape[1]):
    for j in range(store_irf_raw.shape[2]):
        values_hj = store_irf_raw[:, h, j]
        median_irf_raw[h, j] = weighted_percentile(values_hj, store_w, 0.5)
        lower_irf_raw[h, j]  = weighted_percentile(values_hj, store_w, 0.16)
        upper_irf_raw[h, j]  = weighted_percentile(values_hj, store_w, 0.84)
impact_median = weighted_percentile(store_impact_raw, store_w, 0.5)
common_scale = 0.10 / impact_median
# ショックのサイズをスケーリング
median_irf = common_scale * median_irf_raw
lower_irf  = common_scale * lower_irf_raw
upper_irf  = common_scale * upper_irf_raw
# 結果の出力
print("Sampling finished. Median and band calculations completed.")
print(f"Total beta-sigma attempted: {attempt_count}")
print(f"Total stable beta-sigma draws: {stable_count}")
print(f"Total Q attempted: {q_attempt_count}")
print(f"Total Q narrative-pass: {q_accept_count}")
print(f"Total accepted draws: {accept_count}")
print(f"Common scale used: {common_scale}")
print("Final IRF order:", excel_order)
In [ ]:
# グラフの描画
VAR_figname = ['実質GDP', 'インフレ率', '10年物国債利回り', '名目実効為替レート',
               '日経平均株価', '銀行貸出残高']
nvars = len(VAR_figname)
fig_hrzn = np.arange(0, hmax+1)
fig = plt.figure(figsize=(nvars*2.5, 12))
ylim=([-2.5, 1], [-0.6, 0.1], [-0.15, 0.15], [-3, 5], [-15, 10], [-1, 0.8])
taisu = ([100, 1, 1, 100, 100, 1])
for i in range(nvars):
    ax = fig.add_subplot(3, 3, i+1)
    ax.set_title(VAR_figname[i], fontsize=14)
    ax.axhline(y=0, xmin=0, xmax=1, color='black', linewidth=0.75)
    ax.plot(fig_hrzn, median_irf[:, i] * taisu[i], color='black',
            linewidth=2)
    ax.plot(fig_hrzn, np.ravel(lower_irf[:, i] * taisu[i]),
            color='black', linewidth=1, linestyle='dashed')
    ax.plot(fig_hrzn, np.ravel(upper_irf[:, i] * taisu[i]),
            color='black', linewidth=1, linestyle='dashed')
    ax.set_xlim([0, hmax])
    ax.set_xlabel('月', fontsize=14)
    ax.set_xticks(range(0, hmax+1, 12))
    ax.spines['right'].set_visible(False)
    ax.spines['top'].set_visible(False)
    ax.set_ylim(ylim[i])
fig.tight_layout()
No description has been provided for this image

(練習問題2) A型の歴史分解制約を用いたナラティブ符号制約¶

  • 用いる変数や推計期間などは、実証例と同じ
  • 歴史分解制約をB型(圧倒的寄与型)からA型(最重要寄与型)に変更
In [ ]:
np.random.seed(0)

# データ
dinput = pd.read_excel('/content/drive/My Drive/Data/data_VAR11.xlsx',
                       sheet_name='data_m', header=0, index_col=0)
excel_order = ['GDP', 'CPI', 'LTR', 'EXR', 'NIKKEI', 'LENDING']
Yraw = dinput[excel_order].values
dates = pd.to_datetime(dinput.index.astype(str), format='%Y.%m')
Traw, k = Yraw.shape
p = 2
hmax = 120

# ハイパーパラメータの設定
lambda_ = 0.3
alpha = 2.0
theta = 1.0
ncoef = 1 + k * p

# サンプリングの設定
numDesiredDraws = 5000
M_star = 1000
Qs_per_BetaSigma = 1000

# 補助関数
def compute_psi_ar1(Y):
    T, k = Y.shape
    psi = np.zeros(k)
    for i in range(k):
        y_dep = Y[1:, i]
        x_reg = np.column_stack([np.ones(T - 1), Y[:-1, i]])
        beta = inv(x_reg.T @ x_reg) @ (x_reg.T @ y_dep)
        resid = y_dep - x_reg @ beta
        psi[i] = (resid @ resid) / len(resid)
    return psi

def add_single_unit_root_dummy(Yraw, X, Y_trim, p, theta):
    Y0 = np.mean(Yraw[:p, :], axis=0)
    Y_dummy = (1.0 / theta) * Y0.reshape(1, -1)
    X_dummy = np.hstack([
        np.array([[1.0 / theta]]),
        (1.0 / theta) * np.tile(Y0, p).reshape(1, -1)
    ])
    Y_aug = np.vstack([Y_dummy, Y_trim])
    X_aug = np.vstack([X_dummy, X])
    return Y_aug, X_aug

def build_minnesota_prior_system(k, p):
    B0 = np.zeros((ncoef, k))
    for i in range(k):
        idx_first_lag = 1 + i
        B0[idx_first_lag, i] = 1.0
    return B0

def build_N0_minnesota(k, p, lambda_, alpha, psi):
    N0_diag = np.zeros(ncoef)
    N0_diag[0] = 1.0 / 1e6
    d = k + 2
    for l in range(1, p + 1):
        start = 1 + (l - 1) * k
        end = 1 + l * k
        omega_block = (d - k - 1) * (lambda_ ** 2) * (1.0 / (l ** alpha)) / psi
        N0_diag[start:end] = 1.0 / omega_block
    return np.diag(N0_diag)

def compute_conjugate_sigma_posterior(Y_used, X_used, psi, B0, N0):
    T_eff, k = Y_used.shape
    XX = X_used.T @ X_used
    B_OLS = inv(XX) @ (X_used.T @ Y_used)
    U_OLS = Y_used - X_used @ B_OLS
    SIGMA_OLS = (U_OLS.T @ U_OLS) / T_eff
    S0 = np.diag(psi)
    v0 = k + 2
    NT = N0 + XX
    vT = T_eff + v0
    middle = (B_OLS - B0).T @ N0 @ inv(NT) @ XX @ (B_OLS - B0)
    ST = (v0 / vT) * S0 + (T_eff / vT) * SIGMA_OLS + (1.0 / vT) * middle
    S_post = ST * vT
    v_post = vT
    return S_post, v_post, XX

def draw_sigma(S_post, nu_post):
    return invwishart.rvs(df=nu_post, scale=S_post)

def draw_beta_systemwide_matlab_prior(Sigma, XX, XY, B0, N0):
    NT = N0 + XX
    B_OLS = inv(XX) @ XY
    Bbar = inv(NT) @ (N0 @ B0 + XX @ B_OLS)
    b_post = Bbar.reshape(-1, order='F')
    V_post = np.kron(Sigma, inv(NT))
    V_post = 0.5 * (V_post + V_post.T)
    C = cholesky(V_post)
    b_draw = b_post + C @ np.random.randn(len(b_post))
    beta_draw = b_draw.reshape(B0.shape, order='F')
    return beta_draw

def max_root_var(beta_draw, k, p):
    A_list = []
    for l in range(p):
        A_l = beta_draw[1 + l * k: 1 + (l + 1) * k, :].T
        A_list.append(A_l)
    if p == 1:
        companion = A_list[0]
    else:
        top = np.hstack(A_list)
        bottom = np.hstack([np.eye(k * (p - 1)), np.zeros((k * (p - 1), k))])
        companion = np.vstack([top, bottom])
    roots = eigvals(companion)
    return np.max(np.abs(roots))

def random_orthogonal(dim):
    Z = np.random.randn(dim, dim)
    Q, R = qr(Z)
    Q = Q @ np.diag(np.sign(np.diag(R)))
    return Q

def recover_shocks(U, B):
    return (inv(B) @ U.T).T

def compute_ma_matrices(A, H, p):
    k = A.shape[1]
    Phi = np.zeros((H + 1, k, k))
    Phi[0] = np.eye(k)
    for h in range(1, H + 1):
        for j in range(1, min(p, h) + 1):
            Phi[h] += A[j - 1] @ Phi[h - j]
    return Phi

def historical_decomp(A, B, U_draw, t, p):
    shocks = recover_shocks(U_draw, B)
    Phi = compute_ma_matrices(A, t, p)
    k = B.shape[0]
    contrib = np.zeros((k, k))
    for s in range(k):
        for j in range(t + 1):
            contrib[:, s] += (Phi[j] @ B[:, s]) * shocks[t - j, s]
    return contrib

X = np.hstack([Yraw[p - 1 - i:Traw - 1 - i] for i in range(p)])
X = np.hstack([np.ones((Traw - p, 1)), X])
Y_trim = Yraw[p:]

# ナラティブの設定
target_periods = pd.PeriodIndex(['2013-04', '2014-11', '2016-02'], freq='M')
date_periods = dates[p:].to_period('M')
event_idx = [np.where(date_periods == per)[0][0] for per in target_periods]
policy_var = excel_order.index('LTR')
shock_id = excel_order.index('LTR')

psi = compute_psi_ar1(Yraw)
B0 = build_minnesota_prior_system(k, p)
N0 = build_N0_minnesota(k, p, lambda_, alpha, psi)
Y_aug, X_aug = add_single_unit_root_dummy(Yraw, X, Y_trim, p, theta=theta)
S_post, nu_post, XX = compute_conjugate_sigma_posterior(
    Y_used=Y_aug,
    X_used=X_aug,
    psi=psi,
    B0=B0,
    N0=N0
)
XY = X_aug.T @ Y_aug

store_irf_raw = []
store_impact_raw = []
store_w = []
accept_count = 0
attempt_count = 0
stable_count = 0
q_attempt_count = 0
q_accept_count = 0
while accept_count < numDesiredDraws:
    attempt_count += 1
    # ステップ1:分散共分散行列と係数行列の抽出
    Sigma_draw = draw_sigma(S_post, nu_post)
    beta_draw = draw_beta_systemwide_matlab_prior(Sigma_draw, XX, XY, B0, N0)
    max_root = max_root_var(beta_draw, k, p)
    if max_root > 1:
        continue
    stable_count += 1
    A_draw = np.zeros((p, k, k))
    for l in range(p):
        A_draw[l] = beta_draw[1 + l * k: 1 + (l + 1) * k, :].T
    U_draw = Y_trim - X @ beta_draw
    # ステップ2:直交行列の抽出
    P_chol = cholesky(Sigma_draw)
    for _ in range(Qs_per_BetaSigma):
        q_attempt_count += 1
        Q_draw = random_orthogonal(k)
        B_draw = P_chol @ Q_draw
        # ステップ3:ナラティブ符号制約のチェック
        shocks = recover_shocks(U_draw, B_draw)
        # (1) ショック符号制約
        ok = True
        for t in event_idx:
            if shocks[t, shock_id] >= 0:
                ok = False
                break
        # (2) 歴史分解の「最重要寄与」制約(2016年2月のみ評価)
        if ok:
            t_hd = event_idx[-1]
            contrib_draw = historical_decomp(A_draw, B_draw, U_draw, t_hd, p)
            own_contrib = abs(contrib_draw[policy_var, shock_id])
            other_contribs = np.abs(contrib_draw[policy_var, :])
            max_other_contrib = np.max(np.delete(other_contribs, shock_id))
            if own_contrib <= max_other_contrib:
                ok = False
        if not ok:
            continue
        q_accept_count += 1
        # ステップ4:重点サンプリングの重みの計算
        count_satisfy = 0
        Phi_hd = compute_ma_matrices(A_draw, t_hd, p)
        for _ in range(M_star):
            fake_shocks = np.random.randn(U_draw.shape[0], k)
            good = True
            # (1) ショック符号制約
            for t in event_idx:
                if fake_shocks[t, shock_id] >= 0:
                    good = False
                    break
            if not good:
                continue
            # (2) 歴史分解の「最重要寄与」制約(2016年2月のみ評価)
            contrib_sim = np.zeros((k, k))
            for s in range(k):
                for j in range(t_hd + 1):
                    contrib_sim[:, s] += (Phi_hd[j] @ B_draw[:, s]
                                          ) * fake_shocks[t_hd - j, s]
            own_abs = abs(contrib_sim[policy_var, shock_id])
            max_other_abs = np.max(np.delete(np.abs(contrib_sim[policy_var, :]
                                                    ), shock_id))
            if own_abs > max_other_abs:
                count_satisfy += 1
        omega = count_satisfy / M_star
        if omega == 0:
            continue
        weight_draw = 1.0 / omega
        # インパルス応答関数の計算
        Phi = compute_ma_matrices(A_draw, hmax, p)
        irf_raw = np.array([Phi[h] @ B_draw[:, shock_id] for h in range(
            hmax + 1)])
        impact_raw = B_draw[policy_var, shock_id]
        if np.isclose(impact_raw, 0.0):
            continue
        store_irf_raw.append(irf_raw)
        store_impact_raw.append(impact_raw)
        store_w.append(weight_draw)
        accept_count += 1
        print(
            f"BetaSigma attempted: {attempt_count} | "
            f"Stable: {stable_count} | "
            f"Q attempted: {q_attempt_count} | "
            f"Q narrative-pass: {q_accept_count} | "
            f"Accepted: {accept_count}"
        )
        if accept_count >= numDesiredDraws:
            break
# ステップ5: 集計(重み付き累積分布によるメジアン・信用区間の計算)
store_irf_raw = np.array(store_irf_raw)
store_impact_raw = np.array(store_impact_raw)
store_w = np.array(store_w)
def weighted_percentile(data, weights, percentile):
    sorted_indices = np.argsort(data)
    sorted_data = data[sorted_indices]
    sorted_weights = weights[sorted_indices]
    cumulative_weights = np.cumsum(sorted_weights)
    cumulative_weights /= cumulative_weights[-1]
    idx = np.searchsorted(cumulative_weights, percentile, side='left')
    return sorted_data[idx]
median_irf_raw = np.zeros_like(store_irf_raw[0])
lower_irf_raw = np.zeros_like(store_irf_raw[0])
upper_irf_raw = np.zeros_like(store_irf_raw[0])
for h in range(store_irf_raw.shape[1]):
    for j in range(store_irf_raw.shape[2]):
        values_hj = store_irf_raw[:, h, j]
        median_irf_raw[h, j] = weighted_percentile(values_hj, store_w, 0.5)
        lower_irf_raw[h, j]  = weighted_percentile(values_hj, store_w, 0.16)
        upper_irf_raw[h, j]  = weighted_percentile(values_hj, store_w, 0.84)
impact_median = weighted_percentile(store_impact_raw, store_w, 0.5)
common_scale = 0.10 / impact_median
# ショックのサイズをスケーリング
median_irf = common_scale * median_irf_raw
lower_irf  = common_scale * lower_irf_raw
upper_irf  = common_scale * upper_irf_raw
# 結果の出力
print("Sampling finished. Median and band calculations completed.")
print(f"Total beta-sigma attempted: {attempt_count}")
print(f"Total stable beta-sigma draws: {stable_count}")
print(f"Total Q attempted: {q_attempt_count}")
print(f"Total Q narrative-pass: {q_accept_count}")
print(f"Total accepted draws: {accept_count}")
print(f"Common scale used: {common_scale}")
print("Final IRF order:", excel_order)
In [ ]:
# グラフの描画
VAR_figname = ['実質GDP', 'インフレ率', '10年物国債利回り', '名目実効為替レート',
               '日経平均株価', '銀行貸出残高']
nvars = len(VAR_figname)
fig_hrzn = np.arange(0, hmax+1)
fig = plt.figure(figsize=(nvars*2.5, 12))
ylim=([-2.5, 1], [-0.5, 0.2], [-0.10, 0.20], [-1, 7], [-15, 10], [-1, 0.8])
taisu = ([100, 1, 1, 100, 100, 1])
for i in range(nvars):
    ax = fig.add_subplot(3, 3, i+1)
    ax.set_title(VAR_figname[i], fontsize=14)
    ax.axhline(y=0, xmin=0, xmax=1, color='black', linewidth=0.75)
    ax.plot(fig_hrzn, median_irf[:, i] * taisu[i], color='black',
            linewidth=2)
    ax.plot(fig_hrzn, np.ravel(lower_irf[:, i] * taisu[i]),
            color='black', linewidth=1, linestyle='dashed')
    ax.plot(fig_hrzn, np.ravel(upper_irf[:, i] * taisu[i]),
            color='black', linewidth=1, linestyle='dashed')
    ax.set_xlim([0, hmax])
    ax.set_xlabel('月', fontsize=14)
    ax.set_xticks(range(0, hmax+1, 12))
    ax.spines['right'].set_visible(False)
    ax.spines['top'].set_visible(False)
    ax.set_ylim(ylim[i])
fig.tight_layout()
No description has been provided for this image