Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ Dataflow Transfer monitors sequencing run directories and orchestrates the trans
- Dependencies listed in [requirements.txt](requirements.txt):
- PyYAML
- click
- xmltodict
- ibmcloudant
- [run-one](https://launchpad.net/ubuntu/+source/run-one)

Expand Down Expand Up @@ -91,16 +90,13 @@ statusdb:
username: couchdb_user
password: couchdb_password
url: couchdb.host.com
database: sequencing_runs
database: flowcell_status

sequencers:
NovaSeqXPlus:
sequencing_path: /sequencing/NovaSeqXPlus
remote_destination: /Illumina/NovaSeqXPlus
metadata_archive: /path/to/metadata/archive/NovaSeqXPlus_data
metadata_for_statusdb:
- RunInfo.xml
- RunParameters.xml
ignore_folders:
- nosync
remote_rsync_options:
Expand All @@ -115,8 +111,8 @@ sequencers:
1. **Discovery**: Scans configured sequencing directories for run folders
2. **Validation**: Confirms run ID matches expected format for the sequencer type
3. **Transfer Phases**:
- **Sequencing Phase**: Starts continuous background rsync transfer while sequencing is ongoing (when the final sequencing file doesn't exist). Uploads status and metadata files (specified for each sequencer type in the config with `metadata_for_statusdb`) to database.
- **Final Transfer**: After sequencing completes (final sequencing file appears), syncs specified metadata file to archive location, initiates final rsync transfer and captures exit codes.
- **Sequencing Phase**: Starts continuous background rsync transfer while sequencing is ongoing (when the final sequencing file doesn't exist). Uploads status to database.
- **Final Transfer**: After sequencing completes (final sequencing file appears), syncs specified metadata files to archive location, initiates final rsync transfer and captures exit codes.
- **Completion**: Updates database when transfer was successful.

### Status Tracking
Expand Down Expand Up @@ -145,10 +141,9 @@ Run status is tracked in CouchDB with events including:
## Assumptions

- Run directories are named according to sequencer-specific ID formats (defined in run classes)
- Final completion is indicated by the presence of a sequencer-specific final file (e.g., `RTAComplete.txt` for Illumina)
- Final completion is indicated by the presence of a sequencer-specific final file (e.g., `CopyComplete.txt` for Illumina)
- Remote storage is accessible via rsync over SSH
- CouchDB is accessible and the database exists
- Metadata files (e.g., RunInfo.xml) are present in run directories for status database updates and sync to metadata archive location
- CouchDB is accessible and the database specified in the config exists and has a ddoc called `events` with a view called `current_status_per_runfolder` that emits a dictionary of all the statuses and their current state (true/false)
- The flowcell ID is set to correspond to the ID that is scanned with a barcode scanner during sequencing setup in the lab

### Status Files
Expand Down
10 changes: 1 addition & 9 deletions dataflow_transfer/run_classes/generic_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,7 @@ def has_status(self, status_name):
return True if current_statuses.get(status_name) else False

def update_statusdb(self, status, additional_info=None):
"""Update the statusdb document for this run with the given status
and associated metadata files."""
"""Update the statusdb document for this run with the given status."""
db_doc = (
self.db.get_db_doc(ddoc="lookup", view="runfolder_id", run_id=self.run_id)
or {}
Expand All @@ -179,14 +178,7 @@ def update_statusdb(self, status, additional_info=None):
"runfolder_id": self.run_id,
"flowcell_id": self.flowcell_id,
"events": [],
"files": {},
}
files_to_include = fs.locate_metadata(
self.sequencer_config.get("metadata_for_statusdb", []),
self.run_dir,
)
parsed_files = fs.parse_metadata_files(files_to_include)
db_doc["files"].update(parsed_files)
db_doc["events"].append(
{
"event_type": status,
Expand Down
59 changes: 0 additions & 59 deletions dataflow_transfer/tests/test_filesystem.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import os
import tempfile
from subprocess import CalledProcessError
Expand All @@ -10,8 +9,6 @@
check_exit_status,
find_runs,
get_run_dir,
locate_metadata,
parse_metadata_files,
rsync_is_running,
submit_background_process,
)
Expand Down Expand Up @@ -84,38 +81,6 @@ def test_submit_background_process(self, mock_popen):
mock_popen.assert_called_once()


class TestParseMetadataFiles:
def test_parse_json_file(self):
with tempfile.TemporaryDirectory() as tmpdir:
json_file = os.path.join(tmpdir, "metadata.json")
with open(json_file, "w") as f:
json.dump({"key": "value"}, f)
metadata = parse_metadata_files([json_file])
assert "metadata.json" in metadata
assert metadata["metadata.json"]["key"] == "value"

def test_parse_xml_file(self):
with tempfile.TemporaryDirectory() as tmpdir:
xml_file = os.path.join(tmpdir, "metadata.xml")
with open(xml_file, "w") as f:
f.write("<root><key>value</key></root>")
metadata = parse_metadata_files([xml_file])
assert "metadata.xml" in metadata
assert metadata["metadata.xml"]["root"]["key"] == "value"

def test_unsupported_file_type(self):
with tempfile.TemporaryDirectory() as tmpdir:
txt_file = os.path.join(tmpdir, "metadata.txt")
with open(txt_file, "w") as f:
f.write("content")
metadata = parse_metadata_files([txt_file])
assert "metadata.txt" not in metadata

def test_parse_nonexistent_file(self):
metadata = parse_metadata_files(["/nonexistent/file.json"])
assert metadata == {}


class TestCheckExitStatus:
def test_exit_status_zero(self):
with tempfile.TemporaryDirectory() as tmpdir:
Expand All @@ -133,27 +98,3 @@ def test_exit_status_nonzero(self):

def test_exit_status_file_not_found(self):
assert check_exit_status("/nonexistent/file") is False


class TestLocateMetadata:
def test_locate_metadata_found(self):
with tempfile.TemporaryDirectory() as tmpdir:
metadata_file = os.path.join(tmpdir, "metadata.json")
open(metadata_file, "w").close()
located = locate_metadata(["metadata.json"], tmpdir)
assert len(located) == 1
assert metadata_file in located

def test_locate_metadata_not_found(self):
with tempfile.TemporaryDirectory() as tmpdir:
located = locate_metadata(["nonexistent.json"], tmpdir)
assert len(located) == 0

def test_locate_metadata_multiple_patterns(self):
with tempfile.TemporaryDirectory() as tmpdir:
open(os.path.join(tmpdir, "meta1.json"), "w").close()
open(os.path.join(tmpdir, "meta2.json"), "w").close()
located = locate_metadata(["meta1.json", "meta2.json"], tmpdir)
assert len(located) == 2
assert os.path.join(tmpdir, "meta1.json") in located
assert os.path.join(tmpdir, "meta2.json") in located
14 changes: 0 additions & 14 deletions dataflow_transfer/tests/test_run_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def novaseqxplus_testobj(tmp_path):
"sequencers": {
"NovaSeqXPlus": {
"remote_destination": "/data/NovaSeqXPlus",
"metadata_for_statusdb": ["RunInfo.xml", "RunParameters.xml"],
"metadata_archive": "/data/metadata_archive/NovaSeqXPlus",
"ignore_folders": ["nosync"],
"remote_rsync_options": ["--chmod=Dg+s,g+rw"],
Expand Down Expand Up @@ -53,7 +52,6 @@ def nextseq_testobj(tmp_path):
"sequencers": {
"NextSeq": {
"remote_destination": "/data/NextSeq",
"metadata_for_statusdb": ["RunInfo.xml", "RunParameters.xml"],
"metadata_archive": "/data/metadata_archive/NextSeq",
"ignore_folders": ["nosync"],
"remote_rsync_options": ["--chmod=Dg+s,g+rw"],
Expand Down Expand Up @@ -85,7 +83,6 @@ def miseqseq_testobj(tmp_path):
"sequencers": {
"MiSeq": {
"remote_destination": "/data/MiSeq",
"metadata_for_statusdb": ["RunInfo.xml", "RunParameters.xml"],
"metadata_archive": "/data/metadata_archive/MiSeq",
"ignore_folders": ["nosync"],
"remote_rsync_options": ["--chmod=Dg+s,g+rw"],
Expand Down Expand Up @@ -117,7 +114,6 @@ def miseqseqi100_testobj(tmp_path):
"sequencers": {
"MiSeqi100": {
"remote_destination": "/data/MiSeqi100",
"metadata_for_statusdb": ["RunInfo.xml", "RunParameters.xml"],
"metadata_archive": "/data/metadata_archive/MiSeqi100",
"ignore_folders": ["nosync"],
"remote_rsync_options": ["--chmod=Dg+s,g+rw"],
Expand Down Expand Up @@ -469,16 +465,6 @@ def get_db_doc(self, ddoc, view, run_id):
def update_db_doc(self, doc):
self.updated_doc = doc

import dataflow_transfer.utils.filesystem as fs

def mock_locate_metadata(metadata_list, run_dir):
return []

def mock_parse_metadata_files(files):
return {}

fs.locate_metadata = mock_locate_metadata
fs.parse_metadata_files = mock_parse_metadata_files
mock_db = MockDB()
run_obj.db = mock_db
run_obj.update_statusdb(status=status_to_update)
Expand Down
38 changes: 0 additions & 38 deletions dataflow_transfer/utils/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import json
import logging
import os
import subprocess

import xmltodict

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -44,31 +41,6 @@ def submit_background_process(command_str: str):
subprocess.Popen(command_str, stdout=subprocess.PIPE, shell=True)


def parse_metadata_files(files):
"""Given a list of files, read the content into a dict.
Handle .json and .xml files differently."""
metadata = {}
for file_path in files:
try:
if file_path.endswith(".json"):
with open(file_path) as f:
metadata[os.path.basename(file_path)] = json.load(f)
elif file_path.endswith(".xml"):
with open(file_path) as f:
xml_content = xmltodict.parse(
f.read(), attr_prefix="", cdata_key="text"
)
metadata[os.path.basename(file_path)] = xml_content
else:
logger.warning(
f"Unsupported metadata file type for {file_path}. Only .json and .xml are supported."
)
continue
except Exception as e:
logger.error(f"Error reading metadata file {file_path}: {e}")
return metadata


def check_exit_status(file_path):
"""Check the exit status from a given file.
Return True if exit code is 0, else False."""
Expand All @@ -78,13 +50,3 @@ def check_exit_status(file_path):
if exit_code == "0":
return True
return False


def locate_metadata(metadata_list, run_dir):
"""Locate metadata in the given run directory."""
located_paths = []
for pattern in metadata_list:
metadata_path = os.path.join(run_dir, pattern)
if os.path.exists(metadata_path):
located_paths.append(metadata_path)
return located_paths
5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ignore = [

[project]
name = "dataflow_transfer"
version = "1.1.2"
version = "1.1.3"
description = "Script for transferring sequencing data from sequencers to storage"
authors = [
{ name = "Sara Sjunnebo", email = "sara.sjunnebo@scilifelab.se" },
Expand All @@ -31,7 +31,6 @@ requires-python = ">=3.11"
dependencies = [
"click",
"PyYAML",
"xmltodict",
"ibmcloudant",
]

Expand All @@ -51,4 +50,4 @@ requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"

[tool.setuptools.packages.find]
where = ["."]
where = ["."]
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
PyYAML
click
xmltodict
ibmcloudant
Loading
Loading