薬剤師のプログラミング学習日記

プログラミングやコンピュータに関する記事を書いていきます

Pythonによるシミュレーション高速化のためのコードの改善


はじめに

前回まで書いていたモンテカルロシミュレーションのプログラムを改善して、どれだけ処理時間が短縮できるかを試みよう、という記事です。
内容的には前回記事の続きとなります。
www.yakupro.info


定常状態の血中濃度を得る部分を最小限に

改善点その1です。今まではPK/PDパラメータを得るためにコンストラクタ内で繰り返し投与回数分ループを回して、空のリストを定常状態に達するまで延長していました。

    # これまで
    cycle_to_css = 3 if self.css_t // intervals[0] < 3 else int(self.css_t / intervals[0])
    c = []
    for _ in range(cycle_to_css):
        c.extend(self.calc_section_conc())
    self.last_interval = c[-int(self.intervals[0] / self.STEP):]

しかし、欲しいのは定常状態の血中濃度1投与間隔分だけなので、次のように何回目の投与かをnに指定して、calc_section_conc()メソッドを1回呼ぶだけで済みます。

    # 変更後
    # c=[]以下の部分を次で置き換え
    self.n = cycle_to_css
    self.last_interval = self.calc_section_conc()

この部分の改善だけでsample_size=10000で78~79秒かかっていたものが、53秒台まで短縮しました。

演算に関わる部分はNumpy配列のみを使う

自分がわかりやすいと思うように書いたり、いろいろな機能を使おう、という目的もあったのですが、今までは標準のリストとNumpy配列が同じクラス内に混在していました。これをリストを使わないように変更します。結論を先に言うと、結局これに尽きる、というくらい処理速度が向上しました。
まずはコンストラクタ内のリストの初期化です。今までは内包表記を使って書いていましたが、np.zeros()を使います。

    # これまで
    self.ex_c = [0 for _ in range(int(intervals[0] / self.STEP))]  # 延長分濃度. 0で初期化
    # 変更後
    self.ex_c = np.zeros(int(intervals[0] / self.STEP))

ちなみに、リストを使うのであれば、内包表記よりも次の書き方が速いです。

    # より高速な書き方 
    self.ex_c = [0] * int(intervals[0] / self.STEP)

内包表記自体は高速となる書き方ですが、同じ値で初期化する場合は*を使うのが良いようです。
次は、get_time_above_mic()メソッドです。

    # %T>MICを計算する. micはnp.ndarrayで受け取る
    def get_time_above_mic(self, miclist):
        mic_len = miclist.size
        conc = np.zeros((mic_len, self.last_interval.shape[0]))
        conc[:, ] = self.last_interval
        above_mic = np.sum(conc > miclist.reshape(mic_len, -1), axis=1)
        tau = len(self.last_interval)
        return above_mic / tau * 100

今まではループを回して、MICの値を一つずつ取り出し%T>MICを計算していました。ベースとなる血中濃度は同じものを使うのだから、ループを使わずにMICリストすべての値について一度に%T>MICを計算してしまおう、というやり方です。
concという「MICリストの長さ×定常状態後の血中濃度の長さ」のnumpy配列を作っておき、すべての行にself.last_intervalを代入します。そして引数で受け取ったMICリストをreshapeで形状を変換して比較、合計しています。
reshapeの引数は変換後の形状ですが、一方を設定してからもう一方を-1にすると最適な形になるよう自動で帳尻を合わせてくれます。
言葉ではわかりにくいので具体的な例で見てみます。

>>> conc = np.zeros((3, 4))       # 3行4列で初期化
>>> c = np.array([4, 4, 3, 2])    # 血中濃度を想定
>>> conc[:, ] = c                 # すべての行にcを代入
>>> miclist = np.array([2, 3, 4]) # MICリストを想定
>>> miclist.reshape(3, -1)        # reshapeするとこんなかんじ
array([[2],
       [3],
       [4]])
>>> conc > miclist.reshape(3, -1)    # 比較
array([[ True,  True,  True, False],
       [ True,  True, False, False],
       [False, False, False, False]])
>>> np.sum(conc > miclist.reshape(3, -1), axis=1)    # axis=1(行方向)で合計
array([3, 2, 0])

この変更により、呼び出し側でforループをひとつ減らすことができました。

        # 呼び出し側
        for si in range(sample_size):
            bc = BloodConcTwoComp2(A[si], alpha[si], B[si], beta[si], T, intervals)
            data_matrix[:, si] = bc.get_time_above_mic(mic_list)

また、単純にMIC一個ずつ取り出すやり方よりも(コードはこちらの方が短く書けますが)私のPCでは処理が約3秒ほど早くなりました。

    # MICを一つずつ取るやり方
    def get_time_above_mic(self, mic):
        above_mic = np.sum(self.last_interval > mic)
        tau = len(self.last_interval)
        return above_mic / tau * 100

・・・とnumpyの特性を活かしてforループをなくし、処理速度も上がったぞ、と思っていたのですが、np.sum()ではなくnp.count_nonzero()を使うとMIC一つずつ処理するやり方の方が速かったです(平均0.5秒程度ですが)。これくらいのループだと、初期化や代入にかかるオーバーヘッドの方が大きい、ということなんでしょうね。

コード改善後の処理時間

Patient A、sample_size=10000で実験したところ、これらの改善で15秒を切るくらいまでになりました。元々78~79秒かかっていましたから、約1/5に短縮できたことになります。
まだ改善の余地がありそうなのは、A、alpha、B、betaをインデックスを使い一つずつクラスにわたしている部分です。まとめて10000パラメータ分渡して、10000例の血中濃度をまとめて計算する方法はどうだろうか、と考えました。しかしある程度まで改善が進むと、コードの改変に時間がかかったり、トリッキーなコードになると可読性も悪くなってきます(私はそこまでできませんが)。結局それに対してどれだけ効果(速度の向上)が得られるかのトレードオフになりそうです。
とりあえずこれで完成としたいと思います。

最終的に完成した今回のコードです。

import numpy as np
import matplotlib.pyplot as plt
from collections import OrderedDict
import pandas as pd
import time
 
# モンテカルロシミュレーションの高速化
 
class BloodConcTwoComp2:
    STEP = 0.01    # 経過時間の刻み幅
    MAGNI = 7      # 半減期の何倍で定常状態とするか
 
    def __init__(self, A, alpha, B, beta, T, intervals):
        self.A = A
        self.alpha = alpha
        self.B = B
        self.beta = beta
        self.T = T
        self.intervals = intervals
        self.idx = 0    # 現在の投与間隔のインデックス
        self.n = 1      # 何回目の投与かを表す
        # 改善点2. 標準のリストを使うのをやめる
        self.ex_c = np.zeros(int(intervals[0] / self.STEP))
        t_half = np.log(2) / beta
        # 投与間隔の倍数のうち、半減期のMAGNI倍以上の最小値を定常状態到達時間とする
        self.css_t = (t_half * self.MAGNI // intervals[0] + 1) * intervals[0]
        # 定常状態までの繰り返し投与回数. 最低3回
        cycle_to_css = 3 if self.css_t // intervals[0] < 3 else int(self.css_t / intervals[0])
        # 改善点1. self.nで何回目かを指定してからself.calc_section_conc()を呼ぶ
        self.n = cycle_to_css
        self.last_interval = self.calc_section_conc()
 
    # 点滴開始~終了直前
    def calc_during_div(self):
        td = np.linspace(0, self.T, num=int(self.T / self.STEP), endpoint=False)
        return self.A * (1 - np.exp(-self.alpha * td)) + self.B * (1 - np.exp(-self.beta * td))
 
    # 点滴終了時~次回投与直前
    def calc_after_div(self, begin, end):
        section = np.linspace(begin, end, num=int((end - begin) / self.STEP), endpoint=False)
        return self.A * (np.exp(-self.alpha * (section - self.T)) - np.exp(-self.alpha * section)) \
               + self.B * (np.exp(-self.beta * (section - self.T)) - np.exp(-self.beta * section))
 
    # 点滴開始から次回投与までの血中濃度を返す.
    # 今回投与分と前回投与時までの延長分(今回投与間隔分)の濃度の合計を足し合わせて返す.
    def calc_section_conc(self):
        c1 = self.calc_during_div()
        c2 = self.calc_after_div(self.T, self.intervals[self.idx])
        c = np.concatenate([c1, c2], axis=0)  # c1とc2を最初の軸(0次元目)で連結
        sum_c = c + self.calc_extended_conc(self.n, self.ex_c)  # 単回投与+延長分
        # 次回投与時のために以下の変数を更新
        self.n += 1
        self.idx = (self.idx + 1) % len(self.intervals)
        # 改善点2. tolist()やめる.numpyのまま
        return sum_c
 
    # 1回目~前回までの全ての投与回の延長(残存)分の血中濃度の合計を返す.
    # n: 投与回数, ex_c: 延長分の濃度の合計, begin: 求めるべき延長分濃度の計算開始位置
    def calc_extended_conc(self, n, ex_c, begin=0):
        if n == 1:
            return ex_c
        else:
            prev_idx = (n - 2) % len(self.intervals)  # 1つ前の投与間隔のインデックス
            prev_interval = self.intervals[prev_idx]
            begin += prev_interval
            end = begin + self.intervals[self.idx]
            ex_c += self.calc_after_div(begin, end)
            return self.calc_extended_conc(n - 1, ex_c, begin)
 
    # %T>MICを計算する
    def get_time_above_mic(self, mic):
        if type(mic) is np.ndarray:
            # リストで受け取った場合はこちら. MICリスト分まとめて処理
            mic_len = mic.size
            conc = np.zeros((mic_len, self.last_interval.shape[0]))
            conc[:, ] = self.last_interval
            above_mic = np.sum(conc > mic.reshape(mic_len, -1), axis=1)
        else:
            # 1個ずつ比較する場合はこちら. 実はこちらの方が高速
            above_mic = np.count_nonzero(self.last_interval > mic)
        tau = len(self.last_interval)
        return above_mic / tau * 100
 
 
if __name__ == '__main__':
    sample_size = 10000
 
    CCr = 100       # クレアチニンクリアランス(mL/min)
    Wt = 40         # 体重(kg)
    # グラフに貼るテキスト(患者タイプ, CCr, BWを表示)の作成
    pt_type = 'Patient A:\n'+'  CCr=' + str(CCr) + 'mL/min,\n  BW=' + str(Wt) + 'kg'
 
    CL = 0.0905 * CCr + 2.03
    Vc = 0.199 * Wt
    T = 0.5         # 点滴時間
 
    # seedを設定し発生する乱数を固定する.
    np.random.seed(1234)
 
    # 引数で指定した形状(4×sample_size)の標準正規乱数を生成
    X = np.random.randn(4, sample_size)
 
    # 各PKパラメータの分布は対数正規分布を仮定する
    X_cl = CL * np.exp(X[0, :] * 0.411)       # CL(L/h)
    X_vc = Vc * np.exp(X[1, :] * 0.398)       # Vc(L)
    X_q = 4.02 * np.exp(X[2, :] * 0.328)      # Q(L/h)
    X_vp = 4.55 * np.exp(X[3, :] * 0.299)     # Vp(L)
 
    # 血中濃度推移に対して試すMIC(μg/mL)のリスト
    mic_list = np.array([0.063, 0.125, 0.25, 0.5, 1, 2, 4, 8, 16, 32, 64, 128])
 
    # 順序が保持される辞書を使用
    recipe = OrderedDict()
    # key: 投与方法
    # value: (投与量(mg), 1日の投与回数, グラフ表示マーカー(o:丸, ^:三角, s:四角))
    recipe = {'0.25g twice': (250, 2, 'o'),
              '0.25g 3 times': (250, 3, 'o'),
              '0.5g twice': (500, 2, '^'),
              '0.5g 3 times': (500, 3, '^'),
              '1g twice': (1000, 2, 's'),
              '1g 3 times(daily)': (1000, 3, 's')}
 
    # 発生させたパラメータから血中濃度計算に必要なalpha, betaを求める
    ke = X_cl / X_vc    # CL / Vc
    k12 = X_q / X_vc    # Q / Vc
    k21 = X_q / X_vp    # Q / Vp
    s = k12 + k21 + ke  # alpha + beta
    p = k21 * ke        # alpha * beta
    alpha = (s + np.sqrt(s ** 2 - 4 * p)) / 2
    beta = s - alpha
 
    # key: 投与方法(dosage)
    # value: (%T>MICが40以上となった数 / sample_size * 100, グラフ表示マーカー)
    result = {}
 
    # ループの処理時間を計測
    start_time = time.time()
 
    for dosage, value in recipe.items():
        print(dosage)
        x0 = value[0]         # 投与量
        k0 = x0 / T           # 点滴速度(mg/hr) = 投与量 / 点滴時間
        intervals = (int(24 / value[1]), )  # 投与間隔(hr) = 24 / 投与回数
 
        # MICの要素数×サンプルサイズの行列. ここに%T>MICを格納していく
        data_matrix = np.zeros((len(mic_list), sample_size))
 
        # 発生させたパラメータから血中濃度計算に必要なA, Bを求める
        A = (k0 * (alpha - k21)) / (X_vc * alpha * (alpha - beta))
        B = (k0 * (k21 - beta)) / (X_vc * beta * (alpha - beta))
 
        for si in range(sample_size):
            bc = BloodConcTwoComp2(A[si], alpha[si], B[si], beta[si], T, intervals)
            # 改善点3. forループでmic_listを回すのをやめる
            # for mi, mic in enumerate(mic_list):
            #     data_matrix[mi, si] = bc.get_time_above_mic(mic)
            data_matrix[:, si] = bc.get_time_above_mic(mic_list)
        result[dosage] = (np.sum(data_matrix >= 40, axis=1) / sample_size * 100, value[-1])
 
    end_time = time.time()
    processing_time = end_time - start_time
    print('processing_time(sec): ', processing_time)
 
    # resultをプロットする
    fig, ax = plt.subplots()
    # 余白の調整(左と下を少し広げる). デフォルト値はleft=0.125, bottom=0.1
    fig.subplots_adjust(left=0.15, bottom=0.15)
 
    # key: dosage, value: 80%を達成できる最大のMIC値
    table = {}
 
    for dosage, (prob, marker) in result.items():
        plt.plot(mic_list, prob, label=dosage, marker=marker, markersize=6)
        table[dosage] = str(mic_list[np.argmax(np.where(prob >= 80))])
 
    # 辞書からDataFrameを作成. keyが行であるため、orientには'index'を渡す
    df = pd.DataFrame.from_dict(table, orient='index')
    df.columns = [pt_type[:9]]
    print('PK/PD breakpoint(μg/mL)')
    print(df)
 
    # 達成確率80%に水平ラインを引く
    plt.hlines(y=80, xmin=0, xmax=mic_list[-1], alpha=0.5, color='red', linestyles='dashed')
    plt.xlim(mic_list[0], )
    plt.xscale('log', basex=2)
    plt.xticks(mic_list, rotation=30, labels=mic_list)
    plt.ylim(0, )
    plt.yticks(np.arange(0, 110, 10))
    plt.ylabel('Probability of 40%T>MIC\n  attainment(%)')
    plt.xlabel('MIC(μg/mL)')
    plt.grid(linestyle='dashed', linewidth=0.5)
    # テキスト(患者タイプ, CCr, Wt)を右上に表示
    plt.text(0.68, 0.93, pt_type, size=12,  horizontalalignment='left',verticalalignment='top', transform=ax.transAxes)
    # グラフラベル(dosage)を左下に表示
    plt.legend(loc='lower left', frameon=False)
    plt.show()