〜本業・診療放射線技師が実践した方法〜
こんにちは。普段はPythonで株の自動売買やデータ分析を発信していますが、
今回は本業の診療放射線技師として「線量管理システムの自動化」に挑戦した話をお届けします。
先にお伝えしますが、ここで紹介するコードの利用は自己責任でお願いします。
また、利益相反は一切なく、所属団体とも関係のない、一個人としての発信です。
内容の正確性・信頼性は保証できません。
なぜPythonで線量管理を自動化しようと思ったのか
前職では、事務スタッフがCTの線量をExcelに手入力していました。
それを見て、「この単純作業はPythonで自動化できるのに…」と常々感じていました。
しかし当時は撮影件数も画像処理も膨大で、余計な仕事は増やしたくなかったため、自ら線量管理を担当することはありませんでした。
転職後、技師長との面談で「線量管理に興味がある」と話したところ、専用ソフトの導入には100万円以上かかるとのこと。
そこで「やれるなら挑戦してみよう!」という流れで、自作システムに取り組み始めました。
実現までの課題と方針
線量管理と一口に言っても、対象はレントゲン・CT・透視・血管撮影など多岐にわたります。
撮影件数が最も多いのはレントゲンですが、私の勤務先ではX線発生装置(島津製)とフラットパネル(富士フイルム製)が非連動のため、照射条件が画像情報に反映されず、精密な線量管理は困難でした。
そのため、まずはCT線量管理の自動化に着手しました。
詳しい内容は動画で話していますので、そちらをご参照ください。
最後にpythonのコードを貼り付けておきます。
興味がある方は、是非参考にしてみてください。
関連キーワード
#診療放射線技師 #線量管理 #Python自動化 #DICOM解析 #医療ICT #CT線量 #plotly #pydicom #医療DX
import pydicom
import os
import time
import re
import pydicom
import numpy
import traceback
import pandas
path = "C:\\Users\\owner\\pythonwork\\dicom\\0001_1.dcm"
path = os.getcwd()
print(path)
#以下で格納関数定義
results = []
def resul(results,patient_name,patient_age,patient_sex,patient_birth_date,patient_id,study_date,body_part_examined,\
modality,requesting_service,ctdi_vol,dlp,s_value):
results.append({
"patient_name": patient_name,
"patient_age": patient_age,
"patient_sex": patient_sex,
"patient_birth_date": patient_birth_date,
"patient_id": patient_id,
"study_date": study_date,
"body_part_examined": body_part_examined,
"modality": modality,
"requesting_service": requesting_service,
"ctdi_vol": ctdi_vol,
"dlp": dlp,
"s_value": s_value,
})
#追加でモジュールのインポート
import os
import pydicom
import numpy
import re
import pickle
# 設定
folder_path = r'C:\Users\owner\Desktop\data'
output_folder = r'C:\Users\owner\Desktop\gazo'
pickle_file = r'C:\Users\owner\Desktop\results.pkl'
window_width = 350
window_level = 50
# 既存結果を読み込み(なければ空リスト)
if os.path.exists(pickle_file):
with open(pickle_file, 'rb') as f:
results = pickle.load(f)
else:
results = []
# DICOMファイルを処理
for filename in os.listdir(folder_path):
if filename.endswith('.dcm'):
filepath = os.path.join(folder_path, filename)
ds = pydicom.dcmread(filepath)
modality = str(ds.get((0x0008, 0x0060), "Not Found"))
modality = modality.split("'")[1] if "'" in modality else modality
if modality == "Not Found":
continue
# --- CR 処理 ---
if modality == "CR":
try:
patient_name = str(ds.get((0x0010, 0x0010), "Not Found")).split("'")[2]
patient_age = str(ds.get((0x0010, 0x1010), "Not Found")).split("'")[2]
patient_birth_date = str(ds.get((0x0010, 0x0030), "Not Found")).split("'")[2]
patient_sex = str(ds.get((0x0010, 0x0040), "Not Found")).split("'")[2]
patient_id = str(ds.get((0x0010, 0x0020), "Not Found")).split("'")[1]
study_date = str(ds.get((0x0008, 0x0020), "Not Found")).split("'")[1]
body_part_examined = str(ds.get((0x0018, 0x0015), "Not Found")).split("'")[1]
modality = str(ds.get((0x0008, 0x0060), "Not Found")).split("'")[1]
requesting_service = str(ds.get((0x0032, 0x1033), "Not Found")).split("'")[1]
s_value = str(ds.get((0x0018, 0x1412), "Not Found")).split("'")[1]
ctdi_vol = float('nan')
dlp = float('nan')
resul(results, patient_name, patient_age, patient_sex, patient_birth_date, patient_id,
study_date, body_part_examined, modality, requesting_service, ctdi_vol, dlp, s_value)
except:
continue
# --- CT 処理 ---
if modality == "CT":
try:
patient_name = str(ds.get((0x0010, 0x0010), "Not Found")).split("'")[2]
patient_age = str(ds.get((0x0010, 0x1010), "Not Found")).split("'")[2]
patient_birth_date = str(ds.get((0x0010, 0x0030), "Not Found")).split("'")[2]
patient_sex = str(ds.get((0x0010, 0x0040), "Not Found")).split("'")[2]
patient_id = str(ds.get((0x0010, 0x0020), "Not Found")).split("'")[1]
study_date = str(ds.get((0x0008, 0x0020), "Not Found")).split("'")[1]
body_part_examined = str(ds.get((0x0018, 0x0015), "Not Found")).split("'")[1]
modality = str(ds.get((0x0008, 0x0060), "Not Found")).split("'")[1]
requesting_service = str(ds.get((0x0032, 0x1033), "Not Found")).split("'")[1]
ctdi_vol_raw = str(ds.get((0x0018, 0x9345), "Not Found"))
dlp_raw = str(ds.get((0x7005, 0x1040), "Not Found"))
ctdi_vol = re.search(r"FD:\s*([\d\.]+)", ctdi_vol_raw).group(1) if "FD:" in ctdi_vol_raw else float('nan')
dlp = re.search(r"FD:\s*([\d\.]+)", dlp_raw).group(1) if "FD:" in dlp_raw else float('nan')
s_value = float('nan')
resul(results, patient_name, patient_age, patient_sex, patient_birth_date, patient_id,
study_date, body_part_examined, modality, requesting_service, ctdi_vol, dlp, s_value)
except:
# print(traceback.format_exc())
continue
# Pickleに保存
with open(pickle_file, 'wb') as f:
pickle.dump(results, f)
print("データ取得と保存が完了しました。")
#データフレームの読み込み
import joblib
df = pandas.read_pickle("data.pkl")
#print(df)
import pandas as pd
import plotly.express as px
import dash
from dash import dcc, html, Input, Output
# --- DRL基準 ---
try:
DRL_TABLE = {
"HEAD": {"CTDIvol": 85, "DLP": 1350},
"CHEST": {"CTDIvol": 15, "DLP": 550},
"CHEST-PELVIS": {"CTDIvol": 18, "DLP": 1300},
"ABDOMEN-PELVIS": {"CTDIvol": 20, "DLP": 1000},
"LIVER-DYNAMIC": {"CTDIvol": 15, "DLP": 1800},
"CORONARY": {"CTDIvol": 90, "DLP": 1400},
}
# --- データ読み込み(適宜変更)---
# df = pd.read_csv("your_data.csv")
df["study_date"] = pd.to_datetime(df["study_date"], format="%Y%m%d", errors="coerce")
df["patient_birth_date"] = pd.to_datetime(df["patient_birth_date"], errors="coerce")
df["ctdi_vol"] = pd.to_numeric(df["ctdi_vol"], errors="coerce")
df["dlp"] = pd.to_numeric(df["dlp"], errors="coerce")
# 年齢を計算(年単位)
df["age"] = ((df["study_date"] - df["patient_birth_date"]).dt.days / 365.25).astype(float)
# --- Dash アプリ ---
app = dash.Dash(__name__)
app.layout = html.Div([
html.H2("CT DRL比較グラフ(部位別 + 年齢対応)"),
html.Div([
html.Label("対象部位を選択:"),
dcc.Dropdown(
id="target-part-dropdown",
options=[{"label": part, "value": part} for part in DRL_TABLE.keys()],
value="HEAD",
style={"width": "300px"}
)
], style={"marginBottom": "20px"}),
html.Div([
html.Label("患者IDを入力:"),
dcc.Input(id='patient-id-input', type='text', placeholder='患者ID'),
], style={"marginBottom": "20px"}),
html.Div([
html.Label("検査日を指定:"),
dcc.DatePickerRange(
id='date-range',
start_date=df["study_date"].min(),
end_date=df["study_date"].max(),
display_format='YYYY-MM-DD'
),
], style={"marginBottom": "20px"}),
html.Div([
html.Label("年齢範囲を指定(歳):"),
dcc.Input(id='min-age-input', type='number', placeholder='最小年齢', value=0, style={'width': '100px', 'marginRight': '10px'}),
dcc.Input(id='max-age-input', type='number', placeholder='最大年齢', value=120, style={'width': '100px'})
], style={"marginBottom": "20px"}),
dcc.Graph(id='scatter-plot')
])
@app.callback(
Output('scatter-plot', 'figure'),
Input('target-part-dropdown', 'value'),
Input('patient-id-input', 'value'),
Input('date-range', 'start_date'),
Input('date-range', 'end_date'),
Input('min-age-input', 'value'),
Input('max-age-input', 'value'),
)
def update_graph(target_part, patient_id, start_date, end_date, min_age, max_age):
ctdi_drl = DRL_TABLE[target_part]["CTDIvol"]
dlp_drl = DRL_TABLE[target_part]["DLP"]
filtered_df = df.copy()
filtered_df = filtered_df[
(filtered_df["body_part_examined"] == target_part) &
(filtered_df["modality"] == "CT") &
(filtered_df["study_date"] >= pd.to_datetime(start_date)) &
(filtered_df["study_date"] <= pd.to_datetime(end_date)) &
(filtered_df["age"] >= min_age) &
(filtered_df["age"] <= max_age)
]
if patient_id:
filtered_df = filtered_df[filtered_df["patient_id"].astype(str).str.contains(patient_id)]
filtered_df["ctdi_over"] = filtered_df["ctdi_vol"] > ctdi_drl
filtered_df["dlp_over"] = filtered_df["dlp"] > dlp_drl
filtered_df["any_over"] = filtered_df["ctdi_over"] | filtered_df["dlp_over"]
fig = px.scatter(
filtered_df,
x="ctdi_vol",
y="dlp",
color="any_over",
color_discrete_map={True: "red", False: "blue"},
hover_data=["patient_id", "patient_name", "study_date", "age", "ctdi_vol", "dlp"],
labels={"ctdi_vol": "CTDIvol (mGy)", "dlp": "DLP (mGy·cm)", "any_over": "DRL Over"},
title=f"{target_part} CT: CTDIvol vs DLP(年齢 {min_age}~{max_age} 歳)"
)
fig.add_vline(x=ctdi_drl, line_dash="dash", line_color="red", annotation_text="CTDI DRL")
fig.add_hline(y=dlp_drl, line_dash="dash", line_color="orange", annotation_text="DLP DRL")
return fig
if __name__ == '__main__':
app.run_server(debug=True)
except:
print(traceback.format_exc())