はじめに
前回まで書いていたモンテカルロシミュレーションのプログラムを改善して、どれだけ処理時間が短縮できるかを試みよう、という記事です。
内容的には前回記事の続きとなります。
www.yakupro.info
定常状態の血中濃度を得る部分を最小限に
改善点その1です。今まではPK/PDパラメータを得るためにコンストラクタ内で繰り返し投与回数分ループを回して、空のリストを定常状態に達するまで延長していました。
# これまで cycle_to_css = 3 if self.css_t // intervals[0] < 3 else int(self.css_t / intervals[0]) c = [] for _ in range(cycle_to_css): c.extend(self.calc_section_conc()) self.last_interval = c[-int(self.intervals[0] / self.STEP):]
しかし、欲しいのは定常状態の血中濃度1投与間隔分だけなので、次のように何回目の投与かをnに指定して、calc_section_conc()メソッドを1回呼ぶだけで済みます。
# 変更後 # c=[]以下の部分を次で置き換え self.n = cycle_to_css self.last_interval = self.calc_section_conc()
この部分の改善だけでsample_size=10000で78~79秒かかっていたものが、53秒台まで短縮しました。
演算に関わる部分はNumpy配列のみを使う
自分がわかりやすいと思うように書いたり、いろいろな機能を使おう、という目的もあったのですが、今までは標準のリストとNumpy配列が同じクラス内に混在していました。これをリストを使わないように変更します。結論を先に言うと、結局これに尽きる、というくらい処理速度が向上しました。
まずはコンストラクタ内のリストの初期化です。今までは内包表記を使って書いていましたが、np.zeros()を使います。
# これまで self.ex_c = [0 for _ in range(int(intervals[0] / self.STEP))] # 延長分濃度. 0で初期化 # 変更後 self.ex_c = np.zeros(int(intervals[0] / self.STEP))
ちなみに、リストを使うのであれば、内包表記よりも次の書き方が速いです。
# より高速な書き方 self.ex_c = [0] * int(intervals[0] / self.STEP)
内包表記自体は高速となる書き方ですが、同じ値で初期化する場合は*を使うのが良いようです。
次は、get_time_above_mic()メソッドです。
# %T>MICを計算する. micはnp.ndarrayで受け取る def get_time_above_mic(self, miclist): mic_len = miclist.size conc = np.zeros((mic_len, self.last_interval.shape[0])) conc[:, ] = self.last_interval above_mic = np.sum(conc > miclist.reshape(mic_len, -1), axis=1) tau = len(self.last_interval) return above_mic / tau * 100
今まではループを回して、MICの値を一つずつ取り出し%T>MICを計算していました。ベースとなる血中濃度は同じものを使うのだから、ループを使わずにMICリストすべての値について一度に%T>MICを計算してしまおう、というやり方です。
concという「MICリストの長さ×定常状態後の血中濃度の長さ」のnumpy配列を作っておき、すべての行にself.last_intervalを代入します。そして引数で受け取ったMICリストをreshapeで形状を変換して比較、合計しています。
reshapeの引数は変換後の形状ですが、一方を設定してからもう一方を-1にすると最適な形になるよう自動で帳尻を合わせてくれます。
言葉ではわかりにくいので具体的な例で見てみます。
>>> conc = np.zeros((3, 4)) # 3行4列で初期化 >>> c = np.array([4, 4, 3, 2]) # 血中濃度を想定 >>> conc[:, ] = c # すべての行にcを代入 >>> miclist = np.array([2, 3, 4]) # MICリストを想定 >>> miclist.reshape(3, -1) # reshapeするとこんなかんじ array([[2], [3], [4]]) >>> conc > miclist.reshape(3, -1) # 比較 array([[ True, True, True, False], [ True, True, False, False], [False, False, False, False]]) >>> np.sum(conc > miclist.reshape(3, -1), axis=1) # axis=1(行方向)で合計 array([3, 2, 0])
この変更により、呼び出し側でforループをひとつ減らすことができました。
# 呼び出し側 for si in range(sample_size): bc = BloodConcTwoComp2(A[si], alpha[si], B[si], beta[si], T, intervals) data_matrix[:, si] = bc.get_time_above_mic(mic_list)
また、単純にMIC一個ずつ取り出すやり方よりも(コードはこちらの方が短く書けますが)私のPCでは処理が約3秒ほど早くなりました。
# MICを一つずつ取るやり方 def get_time_above_mic(self, mic): above_mic = np.sum(self.last_interval > mic) tau = len(self.last_interval) return above_mic / tau * 100
・・・とnumpyの特性を活かしてforループをなくし、処理速度も上がったぞ、と思っていたのですが、np.sum()ではなくnp.count_nonzero()を使うとMIC一つずつ処理するやり方の方が速かったです(平均0.5秒程度ですが)。これくらいのループだと、初期化や代入にかかるオーバーヘッドの方が大きい、ということなんでしょうね。
コード改善後の処理時間
Patient A、sample_size=10000で実験したところ、これらの改善で15秒を切るくらいまでになりました。元々78~79秒かかっていましたから、約1/5に短縮できたことになります。
まだ改善の余地がありそうなのは、A、alpha、B、betaをインデックスを使い一つずつクラスにわたしている部分です。まとめて10000パラメータ分渡して、10000例の血中濃度をまとめて計算する方法はどうだろうか、と考えました。しかしある程度まで改善が進むと、コードの改変に時間がかかったり、トリッキーなコードになると可読性も悪くなってきます(私はそこまでできませんが)。結局それに対してどれだけ効果(速度の向上)が得られるかのトレードオフになりそうです。
とりあえずこれで完成としたいと思います。
最終的に完成した今回のコードです。
import numpy as np import matplotlib.pyplot as plt from collections import OrderedDict import pandas as pd import time # モンテカルロシミュレーションの高速化 class BloodConcTwoComp2: STEP = 0.01 # 経過時間の刻み幅 MAGNI = 7 # 半減期の何倍で定常状態とするか def __init__(self, A, alpha, B, beta, T, intervals): self.A = A self.alpha = alpha self.B = B self.beta = beta self.T = T self.intervals = intervals self.idx = 0 # 現在の投与間隔のインデックス self.n = 1 # 何回目の投与かを表す # 改善点2. 標準のリストを使うのをやめる self.ex_c = np.zeros(int(intervals[0] / self.STEP)) t_half = np.log(2) / beta # 投与間隔の倍数のうち、半減期のMAGNI倍以上の最小値を定常状態到達時間とする self.css_t = (t_half * self.MAGNI // intervals[0] + 1) * intervals[0] # 定常状態までの繰り返し投与回数. 最低3回 cycle_to_css = 3 if self.css_t // intervals[0] < 3 else int(self.css_t / intervals[0]) # 改善点1. self.nで何回目かを指定してからself.calc_section_conc()を呼ぶ self.n = cycle_to_css self.last_interval = self.calc_section_conc() # 点滴開始~終了直前 def calc_during_div(self): td = np.linspace(0, self.T, num=int(self.T / self.STEP), endpoint=False) return self.A * (1 - np.exp(-self.alpha * td)) + self.B * (1 - np.exp(-self.beta * td)) # 点滴終了時~次回投与直前 def calc_after_div(self, begin, end): section = np.linspace(begin, end, num=int((end - begin) / self.STEP), endpoint=False) return self.A * (np.exp(-self.alpha * (section - self.T)) - np.exp(-self.alpha * section)) \ + self.B * (np.exp(-self.beta * (section - self.T)) - np.exp(-self.beta * section)) # 点滴開始から次回投与までの血中濃度を返す. # 今回投与分と前回投与時までの延長分(今回投与間隔分)の濃度の合計を足し合わせて返す. def calc_section_conc(self): c1 = self.calc_during_div() c2 = self.calc_after_div(self.T, self.intervals[self.idx]) c = np.concatenate([c1, c2], axis=0) # c1とc2を最初の軸(0次元目)で連結 sum_c = c + self.calc_extended_conc(self.n, self.ex_c) # 単回投与+延長分 # 次回投与時のために以下の変数を更新 self.n += 1 self.idx = (self.idx + 1) % len(self.intervals) # 改善点2. tolist()やめる.numpyのまま return sum_c # 1回目~前回までの全ての投与回の延長(残存)分の血中濃度の合計を返す. # n: 投与回数, ex_c: 延長分の濃度の合計, begin: 求めるべき延長分濃度の計算開始位置 def calc_extended_conc(self, n, ex_c, begin=0): if n == 1: return ex_c else: prev_idx = (n - 2) % len(self.intervals) # 1つ前の投与間隔のインデックス prev_interval = self.intervals[prev_idx] begin += prev_interval end = begin + self.intervals[self.idx] ex_c += self.calc_after_div(begin, end) return self.calc_extended_conc(n - 1, ex_c, begin) # %T>MICを計算する def get_time_above_mic(self, mic): if type(mic) is np.ndarray: # リストで受け取った場合はこちら. MICリスト分まとめて処理 mic_len = mic.size conc = np.zeros((mic_len, self.last_interval.shape[0])) conc[:, ] = self.last_interval above_mic = np.sum(conc > mic.reshape(mic_len, -1), axis=1) else: # 1個ずつ比較する場合はこちら. 実はこちらの方が高速 above_mic = np.count_nonzero(self.last_interval > mic) tau = len(self.last_interval) return above_mic / tau * 100 if __name__ == '__main__': sample_size = 10000 CCr = 100 # クレアチニンクリアランス(mL/min) Wt = 40 # 体重(kg) # グラフに貼るテキスト(患者タイプ, CCr, BWを表示)の作成 pt_type = 'Patient A:\n'+' CCr=' + str(CCr) + 'mL/min,\n BW=' + str(Wt) + 'kg' CL = 0.0905 * CCr + 2.03 Vc = 0.199 * Wt T = 0.5 # 点滴時間 # seedを設定し発生する乱数を固定する. np.random.seed(1234) # 引数で指定した形状(4×sample_size)の標準正規乱数を生成 X = np.random.randn(4, sample_size) # 各PKパラメータの分布は対数正規分布を仮定する X_cl = CL * np.exp(X[0, :] * 0.411) # CL(L/h) X_vc = Vc * np.exp(X[1, :] * 0.398) # Vc(L) X_q = 4.02 * np.exp(X[2, :] * 0.328) # Q(L/h) X_vp = 4.55 * np.exp(X[3, :] * 0.299) # Vp(L) # 血中濃度推移に対して試すMIC(μg/mL)のリスト mic_list = np.array([0.063, 0.125, 0.25, 0.5, 1, 2, 4, 8, 16, 32, 64, 128]) # 順序が保持される辞書を使用 recipe = OrderedDict() # key: 投与方法 # value: (投与量(mg), 1日の投与回数, グラフ表示マーカー(o:丸, ^:三角, s:四角)) recipe = {'0.25g twice': (250, 2, 'o'), '0.25g 3 times': (250, 3, 'o'), '0.5g twice': (500, 2, '^'), '0.5g 3 times': (500, 3, '^'), '1g twice': (1000, 2, 's'), '1g 3 times(daily)': (1000, 3, 's')} # 発生させたパラメータから血中濃度計算に必要なalpha, betaを求める ke = X_cl / X_vc # CL / Vc k12 = X_q / X_vc # Q / Vc k21 = X_q / X_vp # Q / Vp s = k12 + k21 + ke # alpha + beta p = k21 * ke # alpha * beta alpha = (s + np.sqrt(s ** 2 - 4 * p)) / 2 beta = s - alpha # key: 投与方法(dosage) # value: (%T>MICが40以上となった数 / sample_size * 100, グラフ表示マーカー) result = {} # ループの処理時間を計測 start_time = time.time() for dosage, value in recipe.items(): print(dosage) x0 = value[0] # 投与量 k0 = x0 / T # 点滴速度(mg/hr) = 投与量 / 点滴時間 intervals = (int(24 / value[1]), ) # 投与間隔(hr) = 24 / 投与回数 # MICの要素数×サンプルサイズの行列. ここに%T>MICを格納していく data_matrix = np.zeros((len(mic_list), sample_size)) # 発生させたパラメータから血中濃度計算に必要なA, Bを求める A = (k0 * (alpha - k21)) / (X_vc * alpha * (alpha - beta)) B = (k0 * (k21 - beta)) / (X_vc * beta * (alpha - beta)) for si in range(sample_size): bc = BloodConcTwoComp2(A[si], alpha[si], B[si], beta[si], T, intervals) # 改善点3. forループでmic_listを回すのをやめる # for mi, mic in enumerate(mic_list): # data_matrix[mi, si] = bc.get_time_above_mic(mic) data_matrix[:, si] = bc.get_time_above_mic(mic_list) result[dosage] = (np.sum(data_matrix >= 40, axis=1) / sample_size * 100, value[-1]) end_time = time.time() processing_time = end_time - start_time print('processing_time(sec): ', processing_time) # resultをプロットする fig, ax = plt.subplots() # 余白の調整(左と下を少し広げる). デフォルト値はleft=0.125, bottom=0.1 fig.subplots_adjust(left=0.15, bottom=0.15) # key: dosage, value: 80%を達成できる最大のMIC値 table = {} for dosage, (prob, marker) in result.items(): plt.plot(mic_list, prob, label=dosage, marker=marker, markersize=6) table[dosage] = str(mic_list[np.argmax(np.where(prob >= 80))]) # 辞書からDataFrameを作成. keyが行であるため、orientには'index'を渡す df = pd.DataFrame.from_dict(table, orient='index') df.columns = [pt_type[:9]] print('PK/PD breakpoint(μg/mL)') print(df) # 達成確率80%に水平ラインを引く plt.hlines(y=80, xmin=0, xmax=mic_list[-1], alpha=0.5, color='red', linestyles='dashed') plt.xlim(mic_list[0], ) plt.xscale('log', basex=2) plt.xticks(mic_list, rotation=30, labels=mic_list) plt.ylim(0, ) plt.yticks(np.arange(0, 110, 10)) plt.ylabel('Probability of 40%T>MIC\n attainment(%)') plt.xlabel('MIC(μg/mL)') plt.grid(linestyle='dashed', linewidth=0.5) # テキスト(患者タイプ, CCr, Wt)を右上に表示 plt.text(0.68, 0.93, pt_type, size=12, horizontalalignment='left',verticalalignment='top', transform=ax.transAxes) # グラフラベル(dosage)を左下に表示 plt.legend(loc='lower left', frameon=False) plt.show()