Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ Run status is tracked in CouchDB with events including:
- Run directories are named according to sequencer-specific ID formats (defined in run classes)
- Final completion is indicated by the presence of a sequencer-specific final file (e.g., `CopyComplete.txt` for Illumina)
- Remote storage is accessible via rsync over SSH
- CouchDB is accessible and the database specified in the config exists and has a ddoc called `events` with a view called `current_status_per_runfolder` that emits a dictionary of all the statuses and their current state (true/false)
- CouchDB is accessible and the database specified in the config exists and has a ddoc called `events` with a view called `current_status_per_runfolder` that emits a dictionary of all the statuses and their current state (true/false)
- CouchDB is accessible and the database `gs_configs` contains a document called `regex_patterns` containing the regexes used to identify different run types.
- The flowcell ID is set to correspond to the ID that is scanned with a barcode scanner during sequencing setup in the lab

### Status Files
Expand Down Expand Up @@ -200,3 +201,4 @@ To add support for a new sequencer, add the following to dataflow_transfer:
2. Import the new class in `dataflow_transfer/run_classes/__init__.py`
3. Add a test fixture for the new run in `dataflow_transfer/tests/test_run_classes.py` and include it in the relevant tests
4. Add a section for the sequencer in the config file
5. Add a regular expression matching the run folder name of the sequencer in the CouchDB database `gs_configs`, in the document called `regex_patterns`.
6 changes: 2 additions & 4 deletions dataflow_transfer/run_classes/element_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
class ElementRun(Run):
"""Defines an Element sequencing run"""

run_family = "Element"

def __init__(self, run_dir, configuration):
super().__init__(run_dir, configuration)
self.final_file = "RunUploaded.json"
Expand All @@ -18,11 +20,7 @@ class AVITIRun(ElementRun):
run_type = "AVITI"

def __init__(self, run_dir, configuration):
self.run_id_format = (
r"^\d{8}_AV\d{6}_(A|B)\d{10}$" # 20251007_AV242106_A2507535225
)
super().__init__(run_dir, configuration)
self.flowcell_id = self.run_id.split("_")[-1][1:] # 2507535225


# TODO: Add Teton run class
33 changes: 30 additions & 3 deletions dataflow_transfer/run_classes/generic_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
class Run:
"""Defines a generic sequencing run"""

run_type = None
run_family = None

def __init__(self, run_dir, configuration):
self.run_dir = run_dir
self.run_id = os.path.basename(run_dir)
Expand All @@ -33,6 +36,27 @@ def __init__(self, run_dir, configuration):
)
self.remote_destination = self.sequencer_config.get("remote_destination")
self.db = StatusdbSession(self.configuration.get("statusdb"))
self.run_id_format = self._resolve_run_id_format()
self.flowcell_id = (
re.match(self.run_id_format, self.run_id).group("flowcell_id")
if self.run_id_format
else None
)

def _resolve_run_id_format(self):
"""Resolve the run ID regex from central config."""
run_id_format = None
if self.run_family and self.run_type:
try:
run_id_format = self.db.get_regex_pattern(
self.run_family, self.run_type
)
except Exception as exc:
logger.warning(
f"Unable to load run_id_format for {self.run_type} from regex config: {exc}"
)

return run_id_format

def confirm_run_type(self):
"""Compare run ID with expected format for the run type."""
Expand Down Expand Up @@ -159,10 +183,13 @@ def has_status(self, status_name):

def update_statusdb(self, status, additional_info=None):
"""Update the statusdb document for this run with the given status."""
db_doc = (
self.db.get_db_doc(ddoc="lookup", view="runfolder_id", run_id=self.run_id)
or {}
doc_id = self.db.get_doc_id(
ddoc="lookup", view="runfolder_id", run_id=self.run_id
)
if doc_id:
db_doc = self.db.get_document(db=self.db.db_name, doc_id=doc_id)
else:
db_doc = {}

statuses_to_only_update_once = [
"sequencing_started",
Expand Down
15 changes: 2 additions & 13 deletions dataflow_transfer/run_classes/illumina_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
class IlluminaRun(Run):
"""Defines an Illumina sequencing run"""

run_family = "Illumina"

def __init__(self, run_dir, configuration):
super().__init__(run_dir, configuration)
self.final_file = "CopyComplete.txt"
self.flowcell_id = self.run_id.split("_")[-1]


@register_run_class
Expand All @@ -19,11 +20,7 @@ class NovaSeqXPlusRun(IlluminaRun):
run_type = "NovaSeqXPlus"

def __init__(self, run_dir, configuration):
self.run_id_format = (
r"^\d{8}_[A-Z0-9]+_\d{4}_[A-Z0-9]+$" # 20251010_LH00202_0284_B22CVHTLT1
)
super().__init__(run_dir, configuration)
self.flowcell_id = self.run_id.split("_")[-1][1:] # 22CVHTLT1


@register_run_class
Expand All @@ -33,9 +30,6 @@ class NextSeqRun(IlluminaRun):
run_type = "NextSeq"

def __init__(self, run_dir, configuration):
self.run_id_format = (
r"^\d{6}_[A-Z0-9]+_\d{3}_[A-Z0-9]+$" # 251015_VH00203_572_AAHFHCCM5
)
super().__init__(run_dir, configuration)


Expand All @@ -46,9 +40,6 @@ class MiSeqRun(IlluminaRun):
run_type = "MiSeq"

def __init__(self, run_dir, configuration):
self.run_id_format = (
r"^\d{6}_[A-Z0-9]+_\d{4}_[A-Z0-9\-]+$" # 251015_M01548_0646_000000000-M6D7K
)
super().__init__(run_dir, configuration)


Expand All @@ -59,6 +50,4 @@ class MiSeqi100Run(IlluminaRun):
run_type = "MiSeqi100"

def __init__(self, run_dir, configuration):
self.run_id_format = r"^\d{8}_[A-Z0-9]+_\d{4}_[A-Z0-9]{10}-SC3$" # 20260128_SH01140_0002_ASC2150561-SC3
super().__init__(run_dir, configuration)
self.flowcell_id = self.run_id.split("_")[-1][1:] # SC2150561-SC3
5 changes: 2 additions & 3 deletions dataflow_transfer/run_classes/ont_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
class ONTRun(Run):
"""Defines a ONT sequencing run"""

run_family = "ONT"

def __init__(self, run_dir, configuration):
super().__init__(run_dir, configuration)
self.final_file = "final_summary.txt"
self.flowcell_id = self.run_id.split("_")[-2]


@register_run_class
Expand All @@ -19,7 +20,6 @@ class PromethIONRun(ONTRun):
run_type = "PromethION"

def __init__(self, run_dir, configuration):
self.run_id_format = r"^\d{8}_\d{4}_[A-Z0-9]{2}_P[A-Z0-9]+_[a-f0-9]{8}$" # 20251015_1051_3B_PBG60686_0af3a2e0
super().__init__(run_dir, configuration)


Expand All @@ -30,5 +30,4 @@ class MinIONRun(ONTRun):
run_type = "MinION"

def __init__(self, run_dir, configuration):
self.run_id_format = r"^\d{8}_\d{4}_MN[A-Z0-9]+_[A-Z0-9]+_[a-f0-9]{8}$" # 20240229_1404_MN19414_ASH657_7a74bf8f
super().__init__(run_dir, configuration)
26 changes: 25 additions & 1 deletion dataflow_transfer/tests/test_run_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,26 @@ def __init__(self, config):
def get_db_doc(self, ddoc, view, run_id):
return None

def get_regex_pattern(self, run_family, run_type):
if run_family == "Illumina":
if run_type == "NovaSeqXPlus": # 20251010_LH00202_0284_B22CVHTLT1
return r"^(?P<date>\d{8})_(?P<instrument>[A-Z0-9]+)_\d{4}_(?P<position>(A|B))(?P<flowcell_id>[A-Z0-9]+)$"
elif run_type == "NextSeq": # 251015_VH00203_572_AAHFHCCM5
return r"^(?P<date>\d{6})_(?P<instrument>[A-Z0-9]+)_\d{3}_(?P<flowcell_id>[A-Z0-9]+)$"
elif run_type == "MiSeq": # 251015_M01548_0646_000000000-M6D7K
return r"^(?P<date>\d{6})_(?P<instrument>[A-Z0-9]+)_\d{4}_(?P<flowcell_id>[A-Z0-9\-]+)$"
elif run_type == "MiSeqi100": # 20260128_SH01140_0002_ASC2150561-SC3
return r"^(?P<date>\d{8})_(?P<instrument>[A-Z0-9]+)_\d{4}_A(?P<flowcell_id>[A-Z0-9]{9}-SC3)$"
elif run_family == "ONT":
if run_type == "PromethION": # 20251015_1051_3B_PBG60686_0af3a2e0
return r"^(?P<date>\d{8})_(?P<time>\d{4})_(?P<position>[A-Z0-9]{2})_(?P<flowcell_id>P[A-Z0-9]+)_(?P<run_hash>[a-f0-9]{8})$"
elif run_type == "MinION": # 20240229_1404_MN19414_ASH657_7a74bf8f
return r"^(?P<date>\d{8})_(?P<time>\d{4})_(?P<position>MN[A-Z0-9]+)_(?P<flowcell_id>[A-Z0-9]+)_(?P<run_hash>[a-f0-9]{8})$"
elif run_family == "Element":
if run_type == "AVITI": # 20251007_AV242106_A2507535225
return r"^(?P<date>\d{8})_(?P<instrument>AV\d{6})_(?P<position>(A|B))(?P<flowcell_id>\d{10})$"
return None

def update_db_doc(self, doc):
pass

Expand Down Expand Up @@ -458,8 +478,12 @@ def test_update_statusdb(
class MockDB:
def __init__(self):
self.updated_doc = None
self.db_name = "mock_db"

def get_db_doc(self, ddoc, view, run_id):
def get_doc_id(self, ddoc, view, run_id):
return "mock_doc_id"

def get_document(self, db, doc_id):
return {"events": existing_statuses, "files": {}}

def update_db_doc(self, doc):
Expand Down
35 changes: 25 additions & 10 deletions dataflow_transfer/utils/statusdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,31 @@ def _retry_call(self, func):
# re-raise last exception for caller to handle
raise last_exception

def get_db_doc(self, ddoc, view, run_id):
"""Retrieve a document from the database via retried call."""
doc_id = self.get_doc_id(ddoc, view, run_id)
if doc_id:
return self._retry_call(
lambda: self.connection.get_document(
db=self.db_name, doc_id=doc_id
).get_result()
)
return None
def get_document(self, db, doc_id):
"""Retrieve a document from any database via retried call."""
return self._retry_call(
lambda: self.connection.get_document(db=db, doc_id=doc_id).get_result()
)

def get_regex_pattern(
self,
run_family,
run_type,
regex_db="gs_configs",
regex_doc_id="regex_patterns",
):
"""Lookup the python regex pattern for a run type from the central regex config document."""
regex_doc = self.get_document(db=regex_db, doc_id=regex_doc_id)
if not regex_doc:
return None

flowcell_patterns = regex_doc.get("flowcell_patterns", {})
family_patterns = flowcell_patterns.get(run_family, {})
if not family_patterns:
return None

pattern = family_patterns.get(run_type)
return pattern

def get_doc_id(self, ddoc, view, run_id):
"""Retrieve a document ID from the database via retried call."""
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ignore = [

[project]
name = "dataflow_transfer"
version = "1.1.3"
version = "1.1.4"
description = "Script for transferring sequencing data from sequencers to storage"
authors = [
{ name = "Sara Sjunnebo", email = "sara.sjunnebo@scilifelab.se" },
Expand Down
Loading