Source code for ogilo.io

"""Functions for file input and output."""

from typing import Sequence, TextIO, Tuple, Union
import csv
import sys

import nemony as nm
import streq as sq
from tqdm import tqdm

from .checks import re_sites
from .types import Oligo
from .utils import grouping_key

[docs]def extract_col(f: TextIO, col: Union[str, int], sep: str = '\t', is_seq: bool = False) -> Tuple[str]: """Get a column from a CSV or TSV by number or name. Reads a CSV or TSV file and returns the named or numbered column. If a number is used, then it is assumed that there is no header, and the first line is included. If a column name is used, then it is assumed that there is a header, and the first line is skipped. Parameters ---------- f : file or file-like File to read col : str or int The column to return sep : str Delimiter character for the columns is_seq : bool, optional If True, checks that only nucleic acid symbols are present Returns ------- tuple of str The entries of the requested column """ if col.isdigit(): c = csv.reader(f, delimiter=sep) col = int(col) - 1 else: c = csv.DictReader(f, delimiter=sep) coldata = tuple(map(lambda x: x[col], c)) alphabet = set(letter for seq in coldata for letter in seq) xna = sorted(set(sq.sequences.DNA) | set(sq.sequences.RNA) | set(list('NRYWSVB'))) alpha_not_in_xna = sorted(letter for letter in alphabet if not letter.upper() in xna) if is_seq: assert len(alpha_not_in_xna) == 0, \ (f"There are {len(alpha_not_in_xna)} characters not in {''.join(xna)}: {''.join(alpha_not_in_xna)}. " "Did you use an integer instead of a named header for your sequence?") return coldata
[docs]def write_constructs(x: Sequence[Oligo], file: TextIO = sys.stdout) -> None: """Write oligos to a table. Takes a list of Oligo objects and writes them to a file, one per line. Properties are calculated and checks are carried out. Parameters ---------- x : list Oligo objects to write. file : file or file-like File to write to. Default STDOUT Returns ------- None """ c = csv.DictWriter(file, fieldnames=('group', 'pcr_handles', 'length', 'mnemonic', 'restriction_sites', 'oligo_name', 'oligo_sequence'), delimiter='\t') c.writeheader() def upper_lower(x, i): return x.casefold() if i % 2 > 0 else x.upper() try: for row in tqdm(x, disable=len(x) < 100): seq = ''.join(upper_lower(seq.seq if not seq.reverse else sq.reverse_complement(seq.seq), i) for i, seq in enumerate(row)) group = grouping_key(row) try: handles = list(set(seq.name.split('_')[0] for seq in row if seq.type == 'handle'))[0] except IndexError: handles = None name = ('-'.join(seq.name for seq in row if seq.name is not None and seq.type != 'handle')) c.writerow(dict(group=group, pcr_handles=handles, length=str(len(seq)), mnemonic=nm.encode(seq), restriction_sites=';'.join(re_sites(row, seq)), oligo_name=name, oligo_sequence=seq)) except BrokenPipeError: pass return None