WIP - sync pipeline

This commit is contained in:
Dr Masroor Ehsan 2024-12-31 11:29:24 +06:00
parent 91ac8e3230
commit 42843d2174
11 changed files with 444 additions and 6 deletions

View File

@ -15,6 +15,14 @@ final class Activity
public const int Study_History_Update = 105; public const int Study_History_Update = 105;
public const int Study_Create = 106;
public const int Study_Update = 107;
public const int Study_Archived = 108;
public const int Study_Delete = 109;
// report // report
public const int Report_Save = 201; public const int Report_Save = 201;

View File

@ -24,19 +24,22 @@ class ActivityLogger
private ?string $ipAddr = null; private ?string $ipAddr = null;
/**
* @var true
*/
private bool $anonymous = false; private bool $anonymous = false;
private ?string $orthancId = null;
public function __construct() public function __construct()
{ {
$this->category = Category::GENERAL; $this->category = Category::GENERAL;
} }
public function on(Study $study): static public function on(Study|int $study): static
{ {
if ($study instanceof Study) {
$this->studyId = $study->id; $this->studyId = $study->id;
} else {
$this->studyId = (int) $study;
}
return $this; return $this;
} }
@ -77,6 +80,13 @@ public function url(?string $url = null): static
return $this; return $this;
} }
public function orthanc(string $uuid): static
{
$this->orthancId = $uuid;
return $this;
}
public function notes(string $notes): static public function notes(string $notes): static
{ {
$this->notes = $notes; $this->notes = $notes;
@ -122,12 +132,12 @@ public function log(bool $initDefaults = true): bool
'user_id' => $this->userId, 'user_id' => $this->userId,
'category' => $this->category, 'category' => $this->category,
'activity' => $this->activity, 'activity' => $this->activity,
'orthanc_uuid' => $this->orthancId,
'ip_addr' => $this->ipAddr, 'ip_addr' => $this->ipAddr,
'user_agent' => $this->userAgent, 'user_agent' => $this->userAgent,
'url' => $this->url, 'url' => $this->url,
'notes' => $this->notes, 'notes' => $this->notes,
'created_at' => now(), 'created_at' => now(),
'updated_at' => now(),
]); ]);
} }
} }

View File

@ -0,0 +1,38 @@
<?php
namespace App\Services\Pacs\Sync\Pipes;
use App\Services\AuditTrail\Activity;
use App\Services\AuditTrail\Category;
use App\Services\Pacs\Sync\StudiesSync;
use Closure;
use Illuminate\Support\Facades\DB;
final readonly class ArchiveStudies
{
public function __invoke(StudiesSync $sync, Closure $next): StudiesSync
{
foreach ($sync->getArchiveQueue() as $orthanc_uuid) {
$row_id = DB::table('studies')->where(compact('orthanc_uuid'))->value('id');
if ($row_id === null) {
continue;
}
$payload = [
'is_archived' => true,
'updated_at' => now(),
];
DB::table('studies')->find($row_id)->update($payload);
audit()
->anon()
->category(Category::SYSTEM)
->on($row_id)
->orthanc($orthanc_uuid)
->did(Activity::Study_Archived)
->log(false);
}
return $next($sync);
}
}

View File

@ -0,0 +1,5 @@
<?php
namespace App\Services\Pacs\Sync\Pipes;
final readonly class FetchStudyDetails {}

View File

@ -0,0 +1,48 @@
<?php
namespace App\Services\Pacs\Sync\Pipes;
use App\Models\Enums\StudyLevelStatus;
use App\Services\Pacs\Sync\StudiesSync;
use Closure;
use Illuminate\Support\Collection;
use Illuminate\Support\Facades\DB;
final readonly class FilterStudies
{
public function __invoke(StudiesSync $sync, Closure $next): StudiesSync
{
$sync->resetQueues();
$studies = DB::table('studies')
->where('is_archived', false)
->get(['orthanc_uuid', 'study_status'])
->pluck('study_status', 'orthanc_uuid');
$study_ids = $sync->getStudyIds();
foreach ($study_ids as $study_id) {
$this->checkUpdate($study_id, $sync, $studies);
}
$missing_orthanc_uuids = $studies->keys()->diff($study_ids);
foreach ($missing_orthanc_uuids as $orthanc_uuid) {
$sync->getArchiveQueue()->add($orthanc_uuid);
}
return $next($sync);
}
private function checkUpdate(string $orthanc_uuid, StudiesSync $sync, Collection $studies): void
{
$study_status = $studies->get($orthanc_uuid);
if ($study_status === null) {
$sync->getInsertQueue()->add($orthanc_uuid);
return;
}
if ($study_status < StudyLevelStatus::StudyArrived->value) {
$sync->getUpdateQueue()->add($orthanc_uuid);
}
}
}

View File

@ -0,0 +1,39 @@
<?php
namespace App\Services\Pacs\Sync\Pipes;
use App\Models\Study;
use App\Models\StudyDetails;
use App\Services\AuditTrail\Activity;
use App\Services\AuditTrail\Category;
use App\Services\Pacs\Sync\StudiesSync;
use Closure;
final readonly class InsertStudies
{
public function __invoke(StudiesSync $sync, Closure $next): StudiesSync
{
foreach ($sync->getInsertQueue() as $orthanc_uuid) {
$study = $sync->fetchStudyDetails($orthanc_uuid);
if ($study == null) {
continue;
}
$payload = $sync->transformData($study);
$row = Study::create($payload['study']);
$payload['details']['study_id'] = $row->id;
$payload['details']['orthanc_uuid'] = $orthanc_uuid;
StudyDetails::create($payload['details']);
audit()
->anon()
->category(Category::SYSTEM)
->on($row->id)
->orthanc($orthanc_uuid)
->did(Activity::Study_Create)
->log(false);
}
return $next($sync);
}
}

View File

@ -0,0 +1,17 @@
<?php
namespace App\Services\Pacs\Sync\Pipes;
use App\Services\Pacs\Sync\StudiesSync;
use Closure;
final readonly class ScanStudies
{
public function __invoke(StudiesSync $sync, Closure $next): StudiesSync
{
$study_ids = $sync->getClient()->getStudiesIds();
$sync->setStudyIds($study_ids);
return $next($sync);
}
}

View File

@ -0,0 +1,46 @@
<?php
namespace App\Services\Pacs\Sync\Pipes;
use App\Services\AuditTrail\Activity;
use App\Services\AuditTrail\Category;
use App\Services\Pacs\Sync\StudiesSync;
use Closure;
use Illuminate\Support\Facades\DB;
final readonly class UpdateStudies
{
public function __invoke(StudiesSync $sync, Closure $next): StudiesSync
{
foreach ($sync->getUpdateQueue() as $orthanc_uuid) {
$study = $sync->fetchStudyDetails($orthanc_uuid);
if ($study == null) {
continue;
}
$study_id = DB::table('studies')->where(compact('orthanc_uuid'))->value('id');
if ($study_id === null) {
continue;
}
$payload = $sync->transformData($study);
unset($payload['study']['orthanc_uuid']);
$payload['study']['updated_at'] = now();
DB::table('studies')->find($study_id)->update($payload['study']);
if (! empty($payload['details'])) {
$payload['details']['updated_at'] = now();
DB::table('study_details')->where(compact('study_id'))->update($payload['details']);
}
audit()
->anon()
->category(Category::SYSTEM)
->on($study_id)
->orthanc($orthanc_uuid)
->did(Activity::Study_Update)
->log(false);
}
return $next($sync);
}
}

View File

@ -0,0 +1,225 @@
<?php
namespace App\Services\Pacs\Sync;
use App\Models\Enums\StudyLevelStatus;
use App\Services\Pacs\DicomUtils;
use App\Services\Pacs\InstituteMapper;
use App\Services\Pacs\OrthancRestClient;
use Carbon\Carbon;
use Illuminate\Pipeline\Pipeline;
use Illuminate\Support\Collection;
use Illuminate\Support\Facades\Log;
class StudiesSync
{
private Collection $study_ids;
private Collection $insert_queue;
private Collection $update_queue;
private Collection $archive_queue;
private OrthancRestClient $client;
public function execute(): void
{
app(Pipeline::class)
->send($this)
->through([
Pipes\ScanStudies::class,
Pipes\FilterStudies::class,
Pipes\InsertStudies::class,
Pipes\UpdateStudies::class,
Pipes\ArchiveStudies::class,
])
->thenReturn();
}
public function __construct(?OrthancRestClient $client = null)
{
$this->study_ids = collect();
$this->client = $client ?? new OrthancRestClient;
$this->resetQueues();
}
public function getClient(): OrthancRestClient
{
return $this->client;
}
public function getStudyIds(): Collection
{
return $this->study_ids;
}
public function setStudyIds(array $study_ids): void
{
$this->study_ids = collect($study_ids);
}
public function resetQueues()
{
$this->insert_queue = collect();
$this->update_queue = collect();
$this->archive_queue = collect();
}
public function getInsertQueue(): Collection
{
return $this->insert_queue;
}
public function getUpdateQueue(): Collection
{
return $this->update_queue;
}
public function getArchiveQueue(): Collection
{
return $this->archive_queue;
}
public function fetchStudyDetails(string $orthanc_uuid): ?array
{
$study = $this->client->getStudyDetails($orthanc_uuid);
if ($study == null) {
return null;
}
$stats = $this->client->getStudyStatistics($orthanc_uuid);
$study['Statistics'] = $stats;
$series = $this->client->getStudySeries($orthanc_uuid);
$study['Series'] = $series;
return $study;
}
public function transformData(mixed $orthanc_src): array
{
$inst_name = data_get($orthanc_src, 'MainDicomTags.InstitutionName');
$inst_id = InstituteMapper::map($inst_name);
$study = [
'orthanc_uuid' => strtolower($orthanc_src['ID']),
'is_locked' => false,
'is_active' => true,
'institution_name' => $inst_name,
'institute_id' => $inst_id,
'patient_uuid' => strtolower($orthanc_src['ParentPatient']),
'patient_id' => data_get($orthanc_src, 'PatientMainDicomTags.PatientID'),
'patient_name' => data_get($orthanc_src, 'PatientMainDicomTags.PatientName'),
'patient_sex' => data_get($orthanc_src, 'PatientMainDicomTags.PatientSex'),
'accession_number' => data_get($orthanc_src, 'MainDicomTags.AccessionNumber'),
'referring_physician_name' => data_get($orthanc_src, 'MainDicomTags.ReferringPhysicianName'),
'study_id' => data_get($orthanc_src, 'MainDicomTags.StudyID'),
'study_instance_uid' => data_get($orthanc_src, 'MainDicomTags.StudyInstanceUID'),
'study_modality' => data_get($orthanc_src, 'RequestedTags.Modality'),
'body_part_examined' => data_get($orthanc_src, 'RequestedTags.BodyPartExamined'),
'study_date' => DicomUtils::dateTimeToCarbon($orthanc_src['MainDicomTags']['StudyDate'], $orthanc_src['MainDicomTags']['StudyTime']),
'received_at' => Carbon::parse($orthanc_src['LastUpdate'], 'UTC'),
'image_count' => data_get($orthanc_src, 'Statistics.CountInstances'),
'series_count' => data_get($orthanc_src, 'Statistics.CountSeries'),
'disk_size' => data_get($orthanc_src, 'Statistics.DiskSize'),
];
if ((bool) data_get($orthanc_src, 'IsStable', false)) {
$study['study_status'] = StudyLevelStatus::StudyArrived->value;
} else {
$study['study_status'] = StudyLevelStatus::Pending->value;
}
$dob = data_get($orthanc_src, 'PatientMainDicomTags.PatientBirthDate');
if (filled($dob)) {
try {
$study['patient_birthdate'] = Carbon::parse($dob);
} catch (\Exception) {
Log::error('Failed to parse PatientMainDicomTags.PatientBirthDate: {dob}', ['dob' => $dob]);
}
}
$descr = data_get($orthanc_src, 'MainDicomTags.StudyDescription');
if (blank($descr)) {
$descr = data_get($orthanc_src, 'RequestedTags.AcquisitionDeviceProcessingDescription');
}
if (blank($descr)) {
$descr = data_get($orthanc_src, 'MainDicomTags.AcquisitionDeviceProcessingDescription');
}
$this->setValue($study, 'study_description', trim($descr));
$properties = [
'other_patient_names' => data_get($orthanc_src, 'RequestedTags.OtherPatientNames'),
'other_patient_ids' => data_get($orthanc_src, 'RequestedTags.OtherPatientIDs'),
'software_versions' => data_get($orthanc_src, 'RequestedTags.SoftwareVersions'),
'station_name' => data_get($orthanc_src, 'RequestedTags.StationName'),
'operators_name' => data_get($orthanc_src, 'RequestedTags.OperatorsName'),
'manufacturer' => data_get($orthanc_src, 'RequestedTags.Manufacturer'),
'manufacturer_model_name' => data_get($orthanc_src, 'RequestedTags.ManufacturerModelName'),
'acquisition_date' => DicomUtils::dateTimeToCarbon(data_get($orthanc_src, 'RequestedTags.AcquisitionDate'), data_get($orthanc_src, 'RequestedTags.AcquisitionTime')),
];
$properties = array_purge($properties);
if (empty($properties)) {
$properties = [
'other_patient_names' => data_get($orthanc_src, 'MainDicomTags.OtherPatientNames'),
'other_patient_ids' => data_get($orthanc_src, 'MainDicomTags.OtherPatientIDs'),
'software_versions' => data_get($orthanc_src, 'MainDicomTags.SoftwareVersions'),
'station_name' => data_get($orthanc_src, 'MainDicomTags.StationName'),
'operators_name' => data_get($orthanc_src, 'MainDicomTags.OperatorsName'),
'manufacturer' => data_get($orthanc_src, 'MainDicomTags.Manufacturer'),
'manufacturer_model_name' => data_get($orthanc_src, 'MainDicomTags.ManufacturerModelName'),
'acquisition_date' => DicomUtils::dateTimeToCarbon(data_get($orthanc_src, 'MainDicomTags.AcquisitionDate'), data_get($orthanc_src, 'MainDicomTags.AcquisitionTime')),
];
$properties = array_purge($properties);
}
$series = [];
foreach (data_get($orthanc_src, 'Series', []) as $ser) {
$params = [
'orthanc_uuid' => strtolower($ser['ID']),
'series_instance_uid' => data_get($ser, 'MainDicomTags.SeriesInstanceUID'),
'series_date' => DicomUtils::dateTimeToCarbon(data_get($ser, 'MainDicomTags.SeriesDate'), data_get($ser, 'MainDicomTags.SeriesTime')),
'series_number' => data_get($ser, 'MainDicomTags.SeriesNumber'),
'series_description' => data_get($ser, 'MainDicomTags.SeriesDescription'),
'protocol_name' => data_get($ser, 'MainDicomTags.ProtocolName'),
'modality' => data_get($ser, 'MainDicomTags.Modality'),
'body_part_examined' => data_get($ser, 'MainDicomTags.BodyPartExamined'),
'performed_procedure_step_description' => data_get($ser, 'MainDicomTags.PerformedProcedureStepDescription'),
'sequence_name' => data_get($ser, 'MainDicomTags.SequenceName'),
];
$params['num_instances'] = count(data_get($ser, 'Instances', []));
$params = array_purge($params);
if (! empty($params)) {
$series[] = $params;
}
}
if (empty($series)) {
$series = null;
} else {
// $series = array_multisort(array_column($series, 'series_number'), SORT_ASC, $series);
usort($series, fn ($a, $b): int => (int) $a['series_number'] <=> (int) $b['series_number']);
}
if (empty($properties)) {
$properties = null;
}
$details = compact('properties', 'series');
$details = array_purge($details);
$study = array_purge($study);
return compact('study', 'details');
}
private function setValue(array &$array, string $key, mixed $value): void
{
if (filled($value)) {
$array[$key] = $value;
}
}
}

View File

@ -11,6 +11,7 @@ public function up(): void
Schema::create('study_details', function (Blueprint $table) { Schema::create('study_details', function (Blueprint $table) {
$table->id(); $table->id();
$table->foreignId('study_id')->unique()->constrained('studies')->cascadeOnDelete(); $table->foreignId('study_id')->unique()->constrained('studies')->cascadeOnDelete();
$table->string('orthanc_uuid')->unique();
// $table->foreignId('user_id')->constrained('users'); // $table->foreignId('user_id')->constrained('users');
$table->text('clinical_history')->nullable(); $table->text('clinical_history')->nullable();
$table->text('surgical_history')->nullable(); $table->text('surgical_history')->nullable();

View File

@ -16,6 +16,7 @@ public function up(): void
$table->unsignedSmallInteger('activity'); $table->unsignedSmallInteger('activity');
$table->ipAddress('ip_addr')->nullable(); $table->ipAddress('ip_addr')->nullable();
$table->string('user_agent')->nullable(); $table->string('user_agent')->nullable();
$table->string('orthanc_uuid')->nullable();
$table->text('url')->nullable(); $table->text('url')->nullable();
$table->text('notes')->nullable(); $table->text('notes')->nullable();