線量管理システム自動化の構築方法~診療放射線技師がpythonで作ってみた~

  • 2025-08-13
  • 2025-08-13
  • CT
  • 8view
CT 100万円のソフト不要!診療放射線技師がPythonで作る線量管理自動化システム

〜本業・診療放射線技師が実践した方法〜

こんにちは。普段はPythonで株の自動売買やデータ分析を発信していますが、
今回は本業の診療放射線技師として「線量管理システムの自動化」に挑戦した話をお届けします。

先にお伝えしますが、ここで紹介するコードの利用は自己責任でお願いします。

また、利益相反は一切なく、所属団体とも関係のない、一個人としての発信です。

内容の正確性・信頼性は保証できません。


なぜPythonで線量管理を自動化しようと思ったのか
前職では、事務スタッフがCTの線量をExcelに手入力していました。

それを見て、「この単純作業はPythonで自動化できるのに…」と常々感じていました。

しかし当時は撮影件数も画像処理も膨大で、余計な仕事は増やしたくなかったため、自ら線量管理を担当することはありませんでした。

転職後、技師長との面談で「線量管理に興味がある」と話したところ、専用ソフトの導入には100万円以上かかるとのこと。

そこで「やれるなら挑戦してみよう!」という流れで、自作システムに取り組み始めました。

実現までの課題と方針
線量管理と一口に言っても、対象はレントゲン・CT・透視・血管撮影など多岐にわたります。

撮影件数が最も多いのはレントゲンですが、私の勤務先ではX線発生装置(島津製)とフラットパネル(富士フイルム製)が非連動のため、照射条件が画像情報に反映されず、精密な線量管理は困難でした。

そのため、まずはCT線量管理の自動化に着手しました。

詳しい内容は動画で話していますので、そちらをご参照ください。

最後にpythonのコードを貼り付けておきます。

興味がある方は、是非参考にしてみてください。

関連キーワード
#診療放射線技師 #線量管理 #Python自動化 #DICOM解析 #医療ICT #CT線量 #plotly #pydicom #医療DX

import pydicom
import os
import time
import re
import pydicom
import numpy
import traceback
import pandas
path = "C:\\Users\\owner\\pythonwork\\dicom\\0001_1.dcm"
path = os.getcwd()
print(path)

#以下で格納関数定義
results = []
def resul(results,patient_name,patient_age,patient_sex,patient_birth_date,patient_id,study_date,body_part_examined,\
          modality,requesting_service,ctdi_vol,dlp,s_value):
    
    results.append({
        "patient_name": patient_name,
        "patient_age": patient_age,
        "patient_sex": patient_sex,
        "patient_birth_date": patient_birth_date,
        "patient_id": patient_id,
        "study_date": study_date,
        "body_part_examined": body_part_examined,
        "modality": modality,
        "requesting_service": requesting_service,
        "ctdi_vol": ctdi_vol,
        "dlp": dlp,
        "s_value": s_value,
    })

#追加でモジュールのインポート
import os
import pydicom
import numpy
import re
import pickle

# 設定
folder_path = r'C:\Users\owner\Desktop\data'
output_folder = r'C:\Users\owner\Desktop\gazo'


pickle_file = r'C:\Users\owner\Desktop\results.pkl'


window_width = 350
window_level = 50

# 既存結果を読み込み(なければ空リスト)
if os.path.exists(pickle_file):
    with open(pickle_file, 'rb') as f:
        results = pickle.load(f)
else:
    results = []

# DICOMファイルを処理
for filename in os.listdir(folder_path):
    if filename.endswith('.dcm'):
        filepath = os.path.join(folder_path, filename)
        ds = pydicom.dcmread(filepath)
        modality = str(ds.get((0x0008, 0x0060), "Not Found"))
        modality = modality.split("'")[1] if "'" in modality else modality

        if modality == "Not Found":
            continue

        # --- CR 処理 ---
        if modality == "CR":
            try:
                patient_name = str(ds.get((0x0010, 0x0010), "Not Found")).split("'")[2]
                patient_age = str(ds.get((0x0010, 0x1010), "Not Found")).split("'")[2]
                patient_birth_date = str(ds.get((0x0010, 0x0030), "Not Found")).split("'")[2]
                patient_sex = str(ds.get((0x0010, 0x0040), "Not Found")).split("'")[2]
                patient_id = str(ds.get((0x0010, 0x0020), "Not Found")).split("'")[1]
                study_date = str(ds.get((0x0008, 0x0020), "Not Found")).split("'")[1]
                body_part_examined = str(ds.get((0x0018, 0x0015), "Not Found")).split("'")[1]
                modality = str(ds.get((0x0008, 0x0060), "Not Found")).split("'")[1]
                requesting_service = str(ds.get((0x0032, 0x1033), "Not Found")).split("'")[1]
                s_value = str(ds.get((0x0018, 0x1412), "Not Found")).split("'")[1]
                ctdi_vol = float('nan')
                dlp = float('nan')

                resul(results, patient_name, patient_age, patient_sex, patient_birth_date, patient_id,
                      study_date, body_part_examined, modality, requesting_service, ctdi_vol, dlp, s_value)
            except:
                continue

        # --- CT 処理 ---
        if modality == "CT":
            try:
                patient_name = str(ds.get((0x0010, 0x0010), "Not Found")).split("'")[2]
                patient_age = str(ds.get((0x0010, 0x1010), "Not Found")).split("'")[2]
                patient_birth_date = str(ds.get((0x0010, 0x0030), "Not Found")).split("'")[2]
                patient_sex = str(ds.get((0x0010, 0x0040), "Not Found")).split("'")[2]
                patient_id = str(ds.get((0x0010, 0x0020), "Not Found")).split("'")[1]
                study_date = str(ds.get((0x0008, 0x0020), "Not Found")).split("'")[1]
                body_part_examined = str(ds.get((0x0018, 0x0015), "Not Found")).split("'")[1]
                modality = str(ds.get((0x0008, 0x0060), "Not Found")).split("'")[1]
                requesting_service = str(ds.get((0x0032, 0x1033), "Not Found")).split("'")[1]

                ctdi_vol_raw = str(ds.get((0x0018, 0x9345), "Not Found"))
                dlp_raw = str(ds.get((0x7005, 0x1040), "Not Found"))
                ctdi_vol = re.search(r"FD:\s*([\d\.]+)", ctdi_vol_raw).group(1) if "FD:" in ctdi_vol_raw else float('nan')
                dlp = re.search(r"FD:\s*([\d\.]+)", dlp_raw).group(1) if "FD:" in dlp_raw else float('nan')
                s_value = float('nan')

                resul(results, patient_name, patient_age, patient_sex, patient_birth_date, patient_id,
                      study_date, body_part_examined, modality, requesting_service, ctdi_vol, dlp, s_value)
            except:
                # print(traceback.format_exc())
                continue

# Pickleに保存
with open(pickle_file, 'wb') as f:
    pickle.dump(results, f)

print("データ取得と保存が完了しました。")

#データフレームの読み込み
import joblib
df = pandas.read_pickle("data.pkl")
#print(df)

import pandas as pd
import plotly.express as px
import dash
from dash import dcc, html, Input, Output

# --- DRL基準 ---
try:
    DRL_TABLE = {
        "HEAD": {"CTDIvol": 85, "DLP": 1350},
        "CHEST": {"CTDIvol": 15, "DLP": 550},
        "CHEST-PELVIS": {"CTDIvol": 18, "DLP": 1300},
        "ABDOMEN-PELVIS": {"CTDIvol": 20, "DLP": 1000},
        "LIVER-DYNAMIC": {"CTDIvol": 15, "DLP": 1800},
        "CORONARY": {"CTDIvol": 90, "DLP": 1400},
    }
    
    # --- データ読み込み(適宜変更)---
    # df = pd.read_csv("your_data.csv")
    df["study_date"] = pd.to_datetime(df["study_date"], format="%Y%m%d", errors="coerce")
    df["patient_birth_date"] = pd.to_datetime(df["patient_birth_date"], errors="coerce")
    df["ctdi_vol"] = pd.to_numeric(df["ctdi_vol"], errors="coerce")
    df["dlp"] = pd.to_numeric(df["dlp"], errors="coerce")
    
    # 年齢を計算(年単位)
    df["age"] = ((df["study_date"] - df["patient_birth_date"]).dt.days / 365.25).astype(float)
    
    # --- Dash アプリ ---
    app = dash.Dash(__name__)
    
    app.layout = html.Div([
        html.H2("CT DRL比較グラフ(部位別 + 年齢対応)"),
        
        html.Div([
            html.Label("対象部位を選択:"),
            dcc.Dropdown(
                id="target-part-dropdown",
                options=[{"label": part, "value": part} for part in DRL_TABLE.keys()],
                value="HEAD",
                style={"width": "300px"}
            )
        ], style={"marginBottom": "20px"}),
    
        html.Div([
            html.Label("患者IDを入力:"),
            dcc.Input(id='patient-id-input', type='text', placeholder='患者ID'),
        ], style={"marginBottom": "20px"}),
        html.Div([
            html.Label("検査日を指定:"),
            dcc.DatePickerRange(
                id='date-range',
                start_date=df["study_date"].min(),
                end_date=df["study_date"].max(),
                display_format='YYYY-MM-DD'
            ),
        ], style={"marginBottom": "20px"}),
    
        html.Div([
            html.Label("年齢範囲を指定(歳):"),
            dcc.Input(id='min-age-input', type='number', placeholder='最小年齢', value=0, style={'width': '100px', 'marginRight': '10px'}),
            dcc.Input(id='max-age-input', type='number', placeholder='最大年齢', value=120, style={'width': '100px'})
        ], style={"marginBottom": "20px"}),
    
        dcc.Graph(id='scatter-plot')
    ])
    
    @app.callback(
        Output('scatter-plot', 'figure'),
        Input('target-part-dropdown', 'value'),
        Input('patient-id-input', 'value'),
        Input('date-range', 'start_date'),
        Input('date-range', 'end_date'),
        Input('min-age-input', 'value'),
        Input('max-age-input', 'value'),
    )
    def update_graph(target_part, patient_id, start_date, end_date, min_age, max_age):
        ctdi_drl = DRL_TABLE[target_part]["CTDIvol"]
        dlp_drl = DRL_TABLE[target_part]["DLP"]
    
        filtered_df = df.copy()
    
        filtered_df = filtered_df[
            (filtered_df["body_part_examined"] == target_part) &
            (filtered_df["modality"] == "CT") &
            (filtered_df["study_date"] >= pd.to_datetime(start_date)) &
            (filtered_df["study_date"] <= pd.to_datetime(end_date)) &
            (filtered_df["age"] >= min_age) &
            (filtered_df["age"] <= max_age)
        ]
    
        if patient_id:
            filtered_df = filtered_df[filtered_df["patient_id"].astype(str).str.contains(patient_id)]
    
        filtered_df["ctdi_over"] = filtered_df["ctdi_vol"] > ctdi_drl
        filtered_df["dlp_over"] = filtered_df["dlp"] > dlp_drl
        filtered_df["any_over"] = filtered_df["ctdi_over"] | filtered_df["dlp_over"]
    
        fig = px.scatter(
            filtered_df,
            x="ctdi_vol",
            y="dlp",
            color="any_over",
            color_discrete_map={True: "red", False: "blue"},
            hover_data=["patient_id", "patient_name", "study_date", "age", "ctdi_vol", "dlp"],
            labels={"ctdi_vol": "CTDIvol (mGy)", "dlp": "DLP (mGy·cm)", "any_over": "DRL Over"},
            title=f"{target_part} CT: CTDIvol vs DLP(年齢 {min_age}~{max_age} 歳)"
        )
    
        fig.add_vline(x=ctdi_drl, line_dash="dash", line_color="red", annotation_text="CTDI DRL")
        fig.add_hline(y=dlp_drl, line_dash="dash", line_color="orange", annotation_text="DLP DRL")
    
        return fig
    if __name__ == '__main__':
        app.run_server(debug=True)
except:
    print(traceback.format_exc())

100万円のソフト不要!診療放射線技師がPythonで作る線量管理自動化システム
ツイートもチェックしよう!