Source code for LabGuruAPI._collections

from __future__ import annotations

import abc
import asyncio
import datetime
import html
import json
import re
import warnings
from collections import defaultdict
from itertools import groupby, zip_longest
from operator import itemgetter
from typing import Type, List, Optional, Dict, Any, TypeVar, Generic, Sized, Set, Literal, Union, DefaultDict, Tuple
from uuid import uuid4

import pandas as pd
import questionary
import requests
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqFeature import SeqFeature, FeatureLocation
from Bio.SeqRecord import SeqRecord
import pydna.all as pyd
from IDT import SciToolsPlus
from pydna.amplicon import Amplicon as PYDAmplicon
from pydna.dseq import Dseq
from pydna.dseqrecord import Dseqrecord
from pydna.readers import read_primer
from tqdm.asyncio import tqdm_asyncio

import LabGuruAPI._inventory as inv
from LabGuruAPI._base import LabGuruItem, LGI, Session, Attachment, LGInt, LGStr, LGSeqRecord, LGFloat, SESSION, \
    LGList, LGJSONStr, LGDict, SessionCacheKey
from LabGuruAPI._enzymes import restriction_enzyme
from LabGuruAPI._search_api import make_lg_searchable, SearchInterface


[docs] class Collections(LabGuruItem): type_name = LGStr('biocollections_generic') """The LG name for the current type""" class_display_name = LGStr('LG Collection Item') created_at = LGStr() """Datetime of the object's creation. Searchable.""" updated_at = LGStr() """Datetime of the object's last update. Searchable""" owner = LGDict() """Dict representation of the object's owner"""
[docs] @classmethod def parse_api_data(cls: Type[LGI], json_data: Dict[str, Any], session: "Session" = SESSION, include_custom=False) -> LGI: if cls == Collections: try: return COLLECTIONS_BY_NAME[json_data['class_display_name']] \ .parse_api_data(json_data, session, include_custom) except KeyError: pass return super().parse_api_data(json_data, session)
[docs] @classmethod def collection_type_from_url(cls, api_url: str) -> Type[LGI]: """Parses an API URL and returns the corresponding ``Collections`` subclass""" api_url = api_url.replace('glycerol%20stock', 'strain') cur_cls: Type[Collections] for cur_cls in [Strain, BiomassPellet, InclusionBody, Oligo, Compound, Consumable, Plasmid, Amplicon, SyntheticGene, GeneticPart, Library, Selection, AnchorStrain]: if cur_cls._api_name and cur_cls._api_name.lower().replace(' ', '%20') in api_url: return cur_cls return cls
[docs] @classmethod def from_api(cls: Type[LGI], session: "Session" = SESSION, item_id: int = None, name: str = None, uuid: str = None, api_url: str = None, auto_name: str = None, include_custom=False) -> Optional[LGI]: search_types = [] if cls == Collections: search_types = [Plasmid, SyntheticGene, Amplicon, Strain, BiomassPellet, InclusionBody, Oligo, Compound, Consumable, GeneticPart, Sequence] elif cls == MultiSequencedCollection: search_types = [Plasmid, SyntheticGene, Amplicon, GeneticPart] for cur_subclass in search_types: cur_item = cur_subclass.from_api(session, item_id, name, uuid, api_url, auto_name, include_custom) if cur_item: return cur_item return super().from_api(session, item_id, name, uuid, api_url, auto_name, include_custom)
[docs] def get_stocks(self, session: "Session" = SESSION) -> List[inv.Stock]: """Returns a list of ``Stock`` objects associated with the item""" query_data = { "stockable_id" : self.id, "stockable[stockable_type]": self.class_name, "stockable[stockable_id]" : self.id } stocks = inv.Stock.search_api(session, query_data) for s in stocks: s.stockable = self return stocks
[docs] async def aio_get_stocks(self) -> List[inv.Stock]: """Returns a list of ``Stock`` objects associated with the item""" return await asyncio.get_event_loop().run_in_executor(None, self.get_stocks)
[docs] async def aio_rename(self: CI, new_name: str) -> CI: """ Updates the names and associated stocks of an LG collection item. Args: new_name (str): The new name to assign to the current instance of Collections. Returns: CI: The updated instance of Collections with the new name assigned. """ self.name = new_name # Update Stocks stocks = await self.aio_get_stocks() for cur_stock in stocks: cur_stock.name = new_name await cur_stock.async_lg_sync() return await self.async_lg_sync()
[docs] def rename(self: CI, new_name: str, session = SESSION) -> CI: """ Updates the names and associated stocks of an LG collection item. Args: new_name: The new name that the object will be renamed to. session: The LabGuru session object used for the API request. Returns: CI: The updated instance of Collections with the new name assigned. """ return session.execute_async([self.aio_rename(new_name)], verbose=False)[0]
[docs] def add_stock(self, name: str, storage: inv.Storage, well: str = None, update: bool = True, **kwargs) -> inv.Stock: """Creates a new ``Stock`` for the item and in the provided storage""" new_stock = inv.Stock(name, storage, self) if isinstance(storage, inv.Box) and well: new_stock.location_in_box = storage.well_name_to_position(well) new_stock.other_properties.update(kwargs) if update: return SESSION.add(new_stock) else: return new_stock
[docs] def add_stocks(self, name: str, storage: inv.Storage, count: int, **kwargs) -> List[inv.Stock]: """Creates multiple new ``Stock`` objects for the item and in the provided storage""" new_stocks = [] for i in range(count): cur_name = f'{name}_{i + 1:d}' new_stocks.append(self.add_stock(cur_name, storage, **kwargs)) return new_stocks
# @property # def sample_element_headers(self) -> Dict[List[Dict[str, str]]]: # return {self.class_display_name: [dict(header='SysID', attribute='auto_name')]} CI = TypeVar('CI', bound=Collections) #: TypeVar shortcut for all Collection subclasses
[docs] class Parent(Generic[CI], SearchInterface): def __init__(self, parent_type: Type[CI] = None, parent_type_name: str = None, search_field: str = None): self.parent_type: Type[CI] = parent_type self.parent_type_name = parent_type_name self.default_value = None self.labguru_name = search_field def __set_name__(self, owner, name): self.private_name = '_' + name self.url_name = self.private_name + '_api_url' self.updated_name = self.private_name + "_updated" def __set__(self, instance, value): # Set up the defaults if not self.parent_type: if self.parent_type_name: self.parent_type = COLLECTIONS_BY_NAME[self.parent_type_name] pass else: self.parent_type = type(instance) # noinspection PyTypeHints if isinstance(value, dict): for k, v in value.items(): if 'url' in k: url = self.parent_type.process_api_url(v) true_parent_type = Collections.collection_type_from_url(url) setattr(instance, self.url_name, url) setattr(instance, self.private_name, SESSION.get_object(true_parent_type, api_url=url)) elif isinstance(value, list): for cur_value in value: for k, v in cur_value.items(): if 'url' in k: url = self.parent_type.process_api_url(v) true_parent_type = Collections.collection_type_from_url(url) setattr(instance, self.url_name, url) setattr(instance, self.private_name, SESSION.get_object(true_parent_type, api_url=url)) elif not value: pass elif isinstance(value, self.parent_type): setattr(instance, self.private_name, value) elif isinstance(value, str): setattr(instance, self.private_name, SESSION.get_object(self.parent_type, name=value)) elif isinstance(value, Collections): setattr(instance, self.private_name, value) else: raise ValueError(f'Parent class expects a {type({})}, {type("")} or {type(self.parent_type)}, ' f'got {type(value)}') def __get__(self, instance, owner) -> CI: if not self.parent_type: self.parent_type = type(instance) try: return getattr(instance, self.private_name) except AttributeError: pass try: api_url = getattr(instance, self.url_name) r = requests.get(api_url, params={'token': SESSION.token}) json_data = r.json() parent = self.parent_type.parse_api_data(json_data, SESSION) setattr(instance, self.private_name, parent) return parent except AttributeError: return self.parent_type()
[docs] class Library(Collections): _api_name = 'biocollections/libraries' xlsx_collection = 'Libraries' _attribute_dict = { 'Theoretical Diversity': 'theoretical_diversity', 'Diversification Method': 'diversification_method', 'Base Plasmid': '_base_plasmid', 'Base Strain': '_base_strain', 'Library Key': 'library_key' } theoretical_diversity = LGInt(lg_name='custom1') """The maximum theoretical diversity of the library. Searchable.""" diversification_method = LGStr(lg_name='custom2') """The method of library generation. Searchable""" library_key = LGStr('',lg_name='custom10') """The method of library generation. Searchable""" _base_plasmid = LGStr() _base_strain = LGStr() _base_plasmid_obj: Optional[Plasmid] = None _base_strain_obj: Optional[Strain] = None @property def base_plasmid(self) -> Plasmid: if not self._base_plasmid_obj: if self._base_plasmid: self._base_plasmid_obj = SESSION.get_object(Plasmid, name=self._base_plasmid) return self._base_plasmid_obj @base_plasmid.setter def base_plasmid(self, value: Union[Plasmid, str]): if isinstance(value, str): self._base_plasmid = value self._base_plasmid_obj = None elif isinstance(value, Plasmid): self._base_plasmid_obj = value self._base_plasmid = value.name else: raise ValueError(f"Library.base_plasmid cannot be set to {repr(value)}") base_plasmid: Plasmid = make_lg_searchable(base_plasmid, 'custom3') """The plasmid that the library was based on""" @property def base_strain(self) -> Strain: if not self._base_strain_obj: if self._base_strain: self._base_strain_obj = SESSION.get_object(Strain, name=self._base_strain) return self._base_strain_obj @base_strain.setter def base_strain(self, value: Union[Strain, str]): if isinstance(value, str): self._base_strain = value self._base_strain_obj = None elif isinstance(value, Strain): self._base_strain_obj = value self._base_strain = value.name else: raise ValueError(f"Library.base_strain cannot be set to {repr(value)}") base_strain: Strain = make_lg_searchable(base_strain, 'custom4') """The strain that the library was based on"""
[docs] class SelectableMixin(LabGuruItem): _attribute_dict = { 'Parent Selections': 'parent_selection', 'Parent Libraries': 'parent_library' } parent_selection: Optional["Selection"] = Parent(parent_type_name="Selection", search_field='parent_generic_11') """The selection that generated the item""" parent_library: Optional[Library] = Parent(Library, search_field='parent_generic_10') """The library that the item came from"""
[docs] class Selection(Collections, SelectableMixin): _api_name = 'biocollections/selections' xlsx_collection = 'Selections' _attribute_dict = { 'Step': 'step', 'Selection Mechanism': 'mechanism', 'Input Diversity': 'input_diversity', 'Output Diversity': 'output_diversity', '# Cells In': 'n_cells_in', '# Cells Out': 'n_cells_out', 'Reporter': 'reporter' } step = LGInt(0, 'custom1') """A count of how many selections have been run between this item and library generation""" mechanism = LGStr('', 'custom2') """The type of selective pressure being applied in this round""" input_diversity = LGStr('', 'custom8') """The type of selective pressure being applied in this round""" output_diversity = LGStr('', 'custom9') """The type of selective pressure being applied in this round""" n_cells_in = LGStr('', 'custom10') """The type of selective pressure being applied in this round""" n_cells_out = LGStr('', 'custom11') reporter = LGStr('', 'custom12') """The type of selective pressure being applied in this round"""
[docs] class AnchorStrain(Collections): _api_name = 'biocollections/anchor strains' class_display_name = 'Anchor Strain' _attribute_dict = { 'Full Genotype': 'full_genotype', 'Chromosomal Genotype': 'chromosome_genotype', 'Plasmids Genotype': 'plasmid_genotype' } xlsx_collection = 'Anchor Strains' full_genotype = LGStr('Unknown', 'custom1') """The full genotype of the anchor strain. Searchable""" chromosome_genotype = LGStr(None, 'custom4') """The chromosomal genotype of the anchor strain. Searchable""" plasmid_genotype = LGStr(None, 'custom5') """The genotype of the anchor strain's mobile elements. Searchable"""
[docs] class Strain(Collections, SelectableMixin): # def update_api(self: LGI, session: Session, **kwargs) -> LGI: # return super().update_api(session) _api_name = 'biocollections/strains' class_display_name = 'Strain' _attribute_dict = { 'Differentiating Modification': 'differentiating_modification', 'Plasmids' : 'plasmids', 'Strain Background' : 'strain_background', 'Strain Barcode' : 'barcode', 'Strain Description' : 'strain_description', 'Parent Strains' : 'parent_strain', 'Parent Anchor Strains' : 'anchor_strain', 'Meaning of U' : 'meaning_of_u', 'RF Status' : 'rf_status', 'Lambda Status' : 'lambda_status', 'tolC Variant' : 'tolc_variant', 'Genotype' : 'strain_genotype', 'Diversity' : 'diversity' } xlsx_collection = 'Strains' parent_strain: Strain = Parent(search_field='parent_generic_4') """The strain from which this strain was derived. Searchable.""" anchor_strain: AnchorStrain = Parent(AnchorStrain, search_field='parent_generic_12') strain_background = LGStr('Unknown', 'custom4') """The generalized "type" of strain this is. Examples include: 2x310k-T7, U63, DH5α. Searchable.""" strain_genotype = LGStr('Unknown', 'custom16') """The genotype of the strain. Searchable.""" differentiating_modification = LGStr('Unknown', 'custom6') """A short description of how this strain differs from its parent. Searchable.""" strain_description = LGStr('', 'custom8') plasmids = LGStr('', 'custom7') """A comma-separated list of plasmids carried in the strain. Searchable.""" barcode = LGStr('', 'custom9') """A barcode sequnce contained in the strain's genome. Searchable.""" meaning_of_u = LGStr('Unknown', 'custom15') """How the UAG codon is translated by the cell. Searchable""" rf_status = LGStr('Unknown', 'custom12') """The presence/absence of Release Factor 1. Searchable""" lambda_status = LGStr('Unknown', 'custom13') """The presence/absence of the lambda prophage. Searchable""" tolc_variant = LGStr('Unknown', 'custom14') """The tolC variant in the strain, if any. Searchable""" diversity = LGStr('', 'custom5') """String diversity field. Intended for strains associated with libraries and selections.""" _full_genotype: str = None # @property # def anchor_strain(self) -> bool: # return self._anchor_strain == 'Yes' # # @anchor_strain.setter # def anchor_strain(self, value: bool): # self._anchor_strain = 'Yes' if value else 'No' # # anchor_strain: bool = make_lg_searchable(anchor_strain, 'custom11') # """True if this is considered an anchor strain. Searchable.""" @property def plasmid_names(self) -> Set[str]: """The names of the plasmids carried by the strain""" return set(n.strip() for n in self.plasmids.split(', ')) - {'None', 'none', 'NA', 'na', 'N/A', ''}
[docs] def get_plasmids(self) -> List[Plasmid]: """Retrieves a list of `Plasmid` objects carried by the strain""" return [SESSION.get_object(Plasmid, name=p) for p in self.plasmid_names]
[docs] async def aio_get_plasmids(self) -> List[Plasmid]: """Retrieves a list of `Plasmid` objects carried by the strain""" return [await Plasmid.async_from_name(p) for p in self.plasmid_names]
[docs] def add_colony(self, colony_name: str, store=False, plasmids: str = None) -> "Strain": """ Generates a copy of the current strain representing a single colony picked from a plate :param colony_name: A name for the new colony (See `Strain.iter_names()`) :param store: If true, automatically adds the colony to LG and links the colony to this strain. Default: false :param plasmids: A comma-separated list of plasmids carried by the colony. Default: plasmids in this strain :return: a new strain """ colony_gs = self.make_new_copy() colony_gs.name = colony_name colony_gs.description = self.description + f', single colony from {self.name}' colony_gs.plasmids = plasmids or self.plasmids colony_gs.anchor_strain = self.anchor_strain colony_gs.parent_strain = self.parent_strain if store: colony_gs = SESSION.add(colony_gs) SESSION.link_objects(colony_gs, self) return colony_gs
[docs] def make_new_derived_strain(self, child_strain_name: str, differentiating_modification: str, additional_plasmids: str = '', cured_plasmids: str = '', additional_genotype: str = '', remove_genotype: str = '', strain_barcode: str = None, strain_description: str = None, meaning_of_u: str = None, commit: bool = True, **kwargs) -> "Strain": """ Generate a new strain derived from the current one :param child_strain_name: Name of the new strain :param differentiating_modification: A short description of how this strain differs from its child :param additional_plasmids: A comma-separated list of plasmids added to the strain :param cured_plasmids: A comma-separated list of plasmids removed from the strain :param additional_genotype: New genome modification :param remove_genotype: New genome reversions :param strain_barcode: A barcode added to the child-strain genome :param strain_description: A description of the child strain :param meaning_of_u: The amino acid assigned to the UAG codon :param commit: If true, adds the child strain to the LG database and links the child to its parent and plasmids. Default: True :param kwargs: Any additional properties to set on the new strain (See Strain.bulk_property_update()) :return: the child Strain """ # Sanitize input values a_plas_names = additional_plasmids.replace(' ', ',').split(',') c_plas_names = cured_plasmids.replace(' ', ',').split(',') new_strain = Strain.make_new(name=child_strain_name, parent_strain=self, strain_background=self.strain_background, description=strain_description, differentiating_modification=differentiating_modification) all_plasmids = (self.plasmid_names | set(a_plas_names)) - set(c_plas_names) new_strain.plasmids = ", ".join(sorted(all_plasmids, key=lambda n: n[-4:])) # Calculate the genotype if self_anchor := AnchorStrain.from_name(self.name): chrom_genotype = self.name new_strain.anchor_strain = self_anchor else: chrom_genotype: str = self.strain_genotype for p in self.get_plasmids(): chrom_genotype = chrom_genotype.replace(p.calculated_genotype(), '') new_strain.anchor_strain = self.anchor_strain if remove_genotype: chrom_genotype = chrom_genotype.replace('.' + remove_genotype, '') chrom_genotype = chrom_genotype.strip(' .') if additional_genotype: chrom_genotype += '.' + additional_genotype.replace(' ', '.') new_genotype_elements = [chrom_genotype] new_genotype_elements += [p.calculated_genotype() for p in new_strain.get_plasmids()] new_strain.strain_genotype = ' '.join(new_genotype_elements) new_barcode = strain_barcode or self.barcode new_strain.barcode = new_barcode new_strain.meaning_of_u = meaning_of_u or self.meaning_of_u new_strain.rf_status = new_strain.calc_rf_status() new_strain.lambda_status = new_strain.calc_lambda_status() new_strain.tolc_variant = new_strain.calc_tolc_variant() new_strain.bulk_property_update(**kwargs) if commit: new_strain = SESSION.add(new_strain) for p in new_strain.get_plasmids(): SESSION.link_objects(new_strain, p) return new_strain
def _genotype_elements(self, value: bool) -> Dict[str, bool]: reformatted_genotype = self.strain_genotype.replace('..', '||').replace('.', ' ')\ .replace('||', '..') # Splits up the old genotype notations return {x: value for x in reformatted_genotype.split(' ')}
[docs] def calculate_genotype(self, chromosome_additions: List[str] = None, chromosome_reversions: List[str] = None, new_plasmids: List[Plasmid] = None, cured_plasmids: List[Plasmid] = None, depth=0) -> str: """ Calculates an anchor-strain based genotype for the current strain. Deprecated. Please do not use. :param chromosome_additions: List of additional genotype modification. Default: None :param chromosome_reversions: List of additional genotype reversions. Default: None :param new_plasmids: List of additional plasmids. Default: None :param cured_plasmids: List of additional removed plasmids. Default: None :param depth: current recursion depth. Used internally, please do not set this. :return: a genotype for the strain """ warnings.warn('calculate_genotype has been deprecated in LIMS 2.0, please do not use', DeprecationWarning) if self.anchor_strain: return self.name parent_strain: Strain = self.parent_strain if not parent_strain.name: return '' calculated_parent_genotype = parent_strain.calculate_genotype(depth=depth + 1) if not calculated_parent_genotype and depth > 0: return '' # Determine new chromosomal changes compared to the parent strain genotype_additions_dict = self._genotype_elements(True) if calculated_parent_genotype: genotype_additions_dict.update(parent_strain._genotype_elements(False)) genotype_additions_dict.update({parent_strain.name: False}) genotype_additions_dict.update({x: False for x in calculated_parent_genotype.split(' ')}) if chromosome_additions: if isinstance(chromosome_additions, str): chromosome_additions = chromosome_additions.replace('..', '||').replace('.', ' ').replace('||', '..')\ .split(' ') genotype_additions_dict.update({x: True for x in chromosome_additions}) if chromosome_reversions: genotype_additions_dict.update({x: False for x in chromosome_reversions}) # handle plasmids if new_plasmids: genotype_additions_dict.update({p.calculated_genotype(): True for p in new_plasmids}) else: genotype_additions_dict.update({p.calculated_genotype(): True for p in self.get_plasmids()}) if calculated_parent_genotype: genotype_additions_dict.update({p.calculated_genotype(): True for p in parent_strain.get_plasmids()}) if cured_plasmids: genotype_additions_dict.update({p.calculated_genotype(): False for p in cured_plasmids}) genotype_additions_dict.update({f'-{p.name}': True for p in cured_plasmids}) # Return the genotype genotype_additions = [k for k, v in genotype_additions_dict.items() if v and k not in calculated_parent_genotype] # calculated_parent_genotype = calculated_parent_genotype or [] parent_genotype_ = [calculated_parent_genotype] if calculated_parent_genotype else [] return ' '.join(parent_genotype_ + genotype_additions)
@property def full_genotype(self): return self.strain_genotype.replace(self.anchor_strain.name, self.anchor_strain.chromosome_genotype) def update_plasmid_genotype(self, old_plas_name: str, new_plasmid: Plasmid): new_genotype_elements = [] for cur_element in self.strain_genotype.split(' '): if old_plas_name in cur_element: new_genotype_elements.append(new_plasmid.calculated_genotype()) else: new_genotype_elements.append(cur_element) self.strain_genotype = ' '.join(new_genotype_elements) def rename_plasmid(self, old_plasmid_name: str, new_plasmid: Plasmid): self.plasmids = self.plasmids.replace(old_plasmid_name, new_plasmid.name) self.update_plasmid_genotype(old_plasmid_name, new_plasmid) def calc_lambda_status(self) -> str: return 'Negative' if 'Δλ' in self.full_genotype else 'Positive' def calc_rf_status(self) -> str: self_full_genotype = self.full_genotype if 'prfA' in self_full_genotype: prior_char = self_full_genotype.rsplit('prfA', 1)[0][-1] if prior_char == 'Δ': return "Negative" return "Positive" if '2x310k' in self_full_genotype: return "Negative" if 'U63' in self_full_genotype: return "Negative" return "Positive" def calc_tolc_variant(self) -> str: tc_split = self.full_genotype.rsplit('tolC', 1) if len(tc_split) == 1: return "WT" pre, post = tc_split if pre[-1] == 'Δ': return pre[-1] if post[2:5] == 'UAG': return post[1:5] return "WT"
[docs] @classmethod async def aio_search_api(cls: Type[LGI], session: Session, query_data: Dict[str, Any], cur_page=1) -> List[LGI]: out_list = await super().aio_search_api(session, query_data, cur_page) unique_keys = {getattr(s.parent_strain, '_proxy_target') for s in out_list if hasattr(s.parent_strain, '_proxy_target')} await tqdm_asyncio.gather(*[session.aio_get_object_from_cache_key(k) for k in unique_keys], leave=False) return out_list
[docs] class BiomassPellet(Collections): _attribute_dict = { 'Notes' : 'notes', 'Affinity Purification Tag' : 'affinity_tags', 'Antibiotics Used' : 'antibiotics', 'Induction Type(s)' : 'induction', 'Expression/Induction Time (hrs)': 'expression_time', 'Media Additives' : 'additives', 'Media Type' : 'media_type', 'Parent Strains' : 'parent_strain', 'Pellet Weight (g)' : 'pellet_weight', } xlsx_collection = 'Biomass Pellets' _api_name = 'biocollections/biomass pellets' parent_strain: Strain = Parent(Strain, search_field='parent_generic_4') """The strain from which this item was derived. Searchable.""" media_type = LGStr('', 'custom3') """The media used to genenrate the biomass. Searchable.""" antibiotics = LGStr('', 'custom1') """The antibiotics used in the media. Searchable.""" additives = LGStr('', 'custom4') """Any additional compounds added to the media. Searchable.""" expression_time = LGInt(0, 'custom5') """The number of hours between induction and harvest. Searchable.""" induction = LGStr('', 'custom6') """The biomass induction system. Searchable.""" affinity_tags = LGStr('', 'custom8') """Any affinity tags included on the target product. Searchable.""" pellet_weight = LGFloat(0, 'custom7') """Grams of wet cell weight. Searchable.""" notes = LGStr('') """Anything else. Searchable."""
[docs] def derived_inclusion_body(self, session: Session) -> Optional[InclusionBody]: """Find an inclusion body derived from this item.""" for link_uuid in self.links: ib = SESSION.get_object(InclusionBody, uuid=link_uuid) if ib: return ib return None
[docs] class InclusionBody(Collections): _api_name = 'biocollections/inclusion bodies' _attribute_dict = { 'Inclusion Body weight (g)': 'weight', 'Resuspension Volume (mL)' : 'resuspension_volume', 'Parent Biomass Pellets' : 'parent_biomass', 'Parent Strains' : 'parent_strain', 'source' : 'source' } xlsx_collection = 'Inclusion Bodies' weight = LGFloat(0, 'custom2') """Weight of the inclusion body in grams. Searchable.""" notes = LGStr('') """Anything else. Searchable.""" resuspension_volume = LGFloat(0, 'custom1') """Volume of inclusion body resuspension in mL. Searchable.""" parent_biomass: BiomassPellet = Parent(BiomassPellet, search_field='parent_generic_5') """BiomassPellet from which this item was derived. Searchable.""" parent_strain = Parent(Strain, search_field='parent_generic_4') """Strain from which this item was derived. Searchable.""" source = LGStr('') """URL of the experiment where this was created. Not searchable""" @property def source_experiment_id(self) -> int: return int(self.source.split("/")[-1]) @source_experiment_id.setter def source_experiment_id(self, value: Union[int, str]): self.source = f"http://my.labguru.com/knowledge/experiments/{str(value)}"
# source_experiment_id = make_lg_searchable(source_experiment_id, 'source')
[docs] class Weighted: """ An abstract base class that provides the ``mol_weight`` function. """
[docs] @abc.abstractmethod def mol_weight(self) -> float: """The molecular mass of the item""" pass
[docs] class Compound(Collections, Weighted): _api_name = 'compounds' _attribute_dict = { 'molar_mass': 'molar_mass', 'cas' : 'cas', 'formula' : 'formula', 'density' : 'density', 'melting_point': 'melting_point', 'boiling_point': 'boiling_point' } xlsx_collection = 'Compound' molar_mass = LGFloat() """Molar mass of the compound in g/mol. Searchable.""" cas = LGStr() """The CAS ID of the compound. Searchable.""" formula = LGStr() """The atomic formula of the compound. Searchable.""" density = LGFloat() """The density of the compound in g/mL. Searchable.""" melting_point = LGFloat() """The melting point of the compound in °C. Searchable.""" boiling_point = LGFloat() """The boiling point of the compound in °C. Searchable."""
[docs] def mol_weight(self) -> float: return self.molar_mass
[docs] class Consumable(Collections): _api_name = 'materials' xlsx_collection = 'Consumable'
[docs] @classmethod def from_api(cls: Type[LGI], session: "Session" = SESSION, item_id: int = None, name: str = None, uuid: str = None, api_url: str = None, auto_name: str = None, include_custom=False) -> Optional[LGI]: if auto_name: item_id = auto_name auto_name = None return super().from_api(session, item_id, name, uuid, api_url, auto_name, include_custom)
SCI = TypeVar('SCI', bound='OldSequencedCollection')
[docs] class SequenceKindInt(LGInt): _enum_names = ['', 'DNA', 'cDNA', 'RNA', 'PROBE (or synthetic)', 'Protein (AA)'] DNA = 1 cDNA = 2 RNA = 3 Probe = 4 Synthetic = 4 Protein = 5 def __set__(self, instance, value): if isinstance(value, dict): value = value['id'] super().__set__(instance, value) @classmethod def as_string(cls, value: int) -> str: return cls._enum_names[value] @classmethod def from_string(cls, value: str) -> int: try: return cls._enum_names.index(value) except ValueError: return cls.DNA
[docs] class OldSequencedCollection(Collections, Sized, Weighted): _sequence = LGSeqRecord() def __init__(self, **kwargs): warnings.warn('Please use MultiSequencedCollection instead.', DeprecationWarning, stacklevel=2) super().__init__(**kwargs) self._starting_sequence = None
[docs] @classmethod def parse_api_data(cls: Type[LGI], json_data: Dict[str, Any], session: "Session", include_custom=False) -> LGI: parsed_item = super().parse_api_data(json_data, session, include_custom) assert isinstance(parsed_item, OldSequencedCollection) for attachment in parsed_item.find_attachments(): if '.gb' in attachment.name and attachment.other_properties.get('archived_at', None) is None: parsed_item._sequence = attachment.get_download_url(session) break return parsed_item
@property def sequence(self) -> Dseqrecord: if self._starting_sequence is None and self._sequence is not None: self._starting_sequence = self._sequence return self._sequence @sequence.setter def sequence(self, value: Dseqrecord): if self._starting_sequence is None: self._starting_sequence = Dseqrecord('ACTG') self._sequence = value def __len__(self) -> int: return len(self.sequence.seq)
[docs] def mol_weight(self) -> float: return (len(self.sequence.seq) * 617.96) + 36.04
[docs] def update_api(self: SCI, session: "Session", **kwargs) -> SCI: # standard update new_item: OldSequencedCollection = super().update_api(session) # upload attachment if self._starting_sequence is not None and self._starting_sequence != self._sequence: # Flag old sequences for removal for_deletion = [] for attachment in self.find_attachments(): if '.gb' in attachment.name and attachment.other_properties.get('archived_at', None) is None: for_deletion.append(attachment) # Attach the new one gb_attachment = Attachment(attach_to=new_item) self.sequence.pn = self.name.replace(" ", "_")[:16] # gb_file = gb_attachment.file gb_file = gb_attachment.make_file(f'{self.sequence.pn}.gb') gb_file.unlink(True) self.sequence.write(str(gb_file)) gb_attachment.update_api(session) new_item.sequence = gb_file # Remove old sequences for a in for_deletion: SESSION.delete(a) # gb_file.unlink(True) return new_item
def SequencedCollection(*args, **kwargs) -> OldSequencedCollection: warnings.warn('Please use MultiSequencedCollection instead.', DeprecationWarning) return OldSequencedCollection(*args, **kwargs) # noinspection PyPep8Naming MSCI = TypeVar('MSCI', bound="MultiSequencedCollection")
[docs] class Sequence(Collections): _api_name = 'sequences' _attribute_dict = { 'title': 'name', 'kind': '_kind', 'accession': 'accession', 'organism': 'organism', 'source': '_source_dict', 'source_id': '_source_id', 'source_type': '_source_type', 'sequence': '_sequence', 'seq': '_sequence', 'dna_data': '_dna_data' } _kind = SequenceKindInt(1) accession = LGStr('', 'accession') """The NCBI accession number of the sequence. Searchable.""" organism = LGStr('', 'organism') """The organism from which the sequence was derived. Searchable.""" _source_dict = LGDict() _source_id = LGStr() _source_type = LGStr() _sequence = LGStr('') _dna_data = LGJSONStr() _source = None updated = False
[docs] def to_dict(self, **kwargs) -> Dict[str, Any]: to_dict = super().to_dict(**kwargs) del to_dict['source'] # if self.source: # to_dict['source_id'] = self.source.id # to_dict['source_type'] = self.source.class_name return to_dict
_seq = None @property def kind(self) -> str: return SequenceKindInt.as_string(self._kind) @kind.setter def kind(self, value: Union[str, int, dict]): if isinstance(value, str): self._kind = SequenceKindInt.from_string(value) else: self._kind = value self.updated = True kind: str = make_lg_searchable(kind, 'kind') """The type of the sequence. Searchable.""" @property def source(self) -> MSCI: if not self._source: if 'url' in self._source_dict: self._source = SESSION.get_object(MultiSequencedCollection, api_url=self._source_dict['url']) elif self._source_type and self._source_id: pass return self._source @source.setter def source(self, value: MSCI): self._source = value self._source_id = value.id self._source_type = value.class_name @property def sequence(self) -> SeqRecord | Dseqrecord: if not self._seq and self._dna_data: if not self._dna_data.endswith('}'): self._dna_data = self._dna_data.rsplit('}', 1)[0] + '}' self._seq = self.json_to_seq(json.loads(self._dna_data)) return self._seq @sequence.setter def sequence(self, value: SeqRecord): self._seq = value self._dna_data = self.seq_to_json() self._sequence = str(self._seq.seq) self.updated = True sequence: SeqRecord | Dseqrecord = make_lg_searchable(sequence, 'seq') """The nucleotide/amino acid sequence of the item. Searchable."""
[docs] @classmethod def new_from_seq_record(cls, seq_record: SeqRecord | Dseqrecord, seq_type: int, source: MSCI) -> "Sequence": """ Create a new sequence from a SeqRecord object :param seq_record: the sequence to base this object off of :param seq_type: the LG sequence type. See SequenceKindInt. :param source: the MultiSequencedCollection item to associat this sequence with :return: the new sequence """ new_seq = cls.make_new(title=source.name, description=seq_record.description, kind=seq_type, accession=seq_record.id, organism=seq_record.annotations.get('organism', None)) new_seq.sequence = seq_record.copy() if isinstance(seq_record, Dseqrecord) else Dseqrecord(seq_record) new_seq.sequence.name = source.name new_seq.source = source return new_seq
[docs] def has_same_sequence_as(self, other: Union[Sequence, SeqRecord, Seq, str], force_circular=False) -> bool: """Checks to see if two sequences are the same. Accounts for circular permutations and capitalization""" circular = force_circular or self.sequence.circular if isinstance(other, Sequence): circular = circular or other.sequence.circular other_seq: str = str(other.sequence.seq) elif isinstance(other, SeqRecord): circular = circular or (other.annotations.get('topology', '') == 'circular') other_seq = str(other.seq) else: other_seq = str(other) self_seq: str = str(self.sequence.seq).lower() if len(self_seq) != len(other_seq): return False other_seq = other_seq.lower() if circular: self_seq += self_seq return other_seq in self_seq else: return other_seq == self_seq
[docs] def seq_to_json(self) -> str: """Low-level method. Do not use.""" # Set base stuff prop_dict = { 'features': {}, 'name': self.sequence.name, 'sequence': str(self.sequence.seq), 'sequenceTypeFromLocus': self.sequence.annotations.get('molecule_type', '.'), 'date': self.sequence.annotations.get('date', '.'), 'circular': self.sequence.annotations.get('topology', 'linear'), 'definition': self.sequence.description, 'description': self.sequence.description, 'accession': self.sequence.id, 'version': self.sequence.id, 'extraLines': [ f"KEYWORDS {self.sequence.annotations.get('keywords', '.')}", f"SOURCE {self.sequence.annotations.get('source', '.')}", f" ORGANISM {self.sequence.annotations.get('organism', '.')}" ], 'comments': self.sequence.annotations.get('comment', []), 'type': self.kind, 'size': len(self.sequence), 'primers': {}, 'stateTrackingId': '', 'proteinSequence': '', 'proteinSize': 0, 'warnings': {}, 'assemblyPieces': {}, 'lineageAnnotations': {}, 'parts': {}, 'cutsites': {}, 'orfs': {}, 'translations': {}, 'guides': {}, 'materiallyAvailable': True, 'fromFileUpload': False } if isinstance(prop_dict['date'], (datetime.datetime, datetime.date)): prop_dict['date'] = prop_dict['date'].strftime('%d-%b-%Y').upper() # format features f: SeqFeature for f in self.sequence.features: if f.location is None: continue fid = f.id if f.id and f.id[0] != '<' else str(uuid4()).replace('-', '') f_dict = { 'type': f.type, 'strand': f.location.strand, 'start': f.location.start, 'end': f.location.end, 'annotationTypePlural': 'features', 'id': fid, 'forward': f.location.strand >= 0 } # find the name f_quals = f.qualifiers.copy() for q_key in ['label', 'name', 'standard_name', 'product']: if q_key in f_quals: q_val = f_quals.pop(q_key) f_dict['name'] = q_val[0] break f_dict['notes'] = f_quals prop_dict['features'][fid] = f_dict html_encoded_json_str = html.escape(json.dumps(prop_dict)) return html_encoded_json_str
[docs] @staticmethod def json_to_seq(json_dict: Dict[str, Any]) -> SeqRecord: """Low-level method. Do not use.""" seq_record = SeqRecord(Seq(json_dict['sequence'])) seq_record.name = json_dict['name'] seq_record.id = json_dict.get('accession', '') seq_record.description = json_dict.get('description', '') seq_record.annotations['date'] = json_dict.get('date', None) seq_record.annotations['data_file_division'] = 'SYN' is_circular = json_dict.get('circular', False) == 'circular' seq_record.annotations['topology'] = 'circular' if is_circular else 'linear' seq_record.annotations['molecule_type'] = json_dict.get('sequenceTypeFromLocus', 'DNA') if json_dict.get('comments', False): seq_record.annotations['comment'] = json_dict.get('comments', '') for el in json_dict.get('extraLines', []): try: assert isinstance(el, str) split_el = el.split(maxsplit=1) seq_record.annotations[split_el[0].lower()] = split_el[1] except (AssertionError, IndexError): pass # generate features f_dict: Dict[str, Any] for f_dict in json_dict['features'].values(): if 'notes' not in f_dict: f_dict['notes'] = {} f_dict['notes']['label'] = [f_dict.get('name', f_dict['type'])] f = SeqFeature( location=FeatureLocation(f_dict['start'], f_dict['end'], f_dict['strand']), type=f_dict['type'], qualifiers=f_dict['notes'] ) seq_record.features.append(f) overhangs = [f for f in seq_record.features if f.type.lower() == 'overhang' and (f.location.start == 0 or f.location.end == len(seq_record))] if overhangs: ovhg_5_len = ovhg_3_len = 0 seq_len = len(seq_record.seq) for cur_ovhg in overhangs: # print(cur_ovhg.location) if cur_ovhg.location.start == 0: ovhg_5_len = cur_ovhg.location.end - cur_ovhg.location.start ovhg_5_len *= -1 * cur_ovhg.strand elif cur_ovhg.location.end == seq_len: ovhg_3_len = cur_ovhg.location.end - cur_ovhg.location.start ovhg_3_len *= cur_ovhg.strand # make watson strand watson_start = max(0, ovhg_5_len) watson_end = min(seq_len, seq_len + ovhg_3_len) watson = str(seq_record.seq)[watson_start:watson_end] # make crick strand crick_start = max(0, ovhg_3_len) crick_end = min(seq_len, seq_len + ovhg_5_len) crick = str(seq_record.seq.reverse_complement())[crick_start:crick_end] dseq = Dseq(watson, crick, ovhg=ovhg_5_len) if is_circular: try: seq_record.seq = dseq.looped() except TypeError: pass else: seq_record.seq = dseq return seq_record
[docs] class DNASequence(Sequence):
[docs] @staticmethod def json_to_seq(json_dict: Dict[str, Any]) -> Dseqrecord: is_circular = json_dict['circular'] == 'circular' dseqrecord = Dseqrecord(Sequence.json_to_seq(json_dict), circular=is_circular) return dseqrecord
[docs] class LGSeqList(LGList[List[DNASequence]]): base_type = DNASequence def __init__(self, seq_class: Type[Sequence] = None) -> None: super().__init__() self.base_type = seq_class or DNASequence def __get__(self, instance, owner) -> List[DNASequence]: return super().__get__(instance, owner)
[docs] class MultiSequencedCollection(Collections, Sized, Weighted): sequences = LGSeqList() """A list of Sequence objects associated with the item""" _seq_kind = 1 def __init__(self, **kwargs): self._attribute_dict['sequences'] = '_sequences' super().__init__(**kwargs) del self._attribute_dict['sequences']
[docs] def get_attached_sequence(self, delete=False) -> Optional[SeqRecord]: """Low-level function. Do not use.""" for attachment in self.find_attachments(): if '.gb' in attachment.name and attachment.other_properties.get('archived_at', None) is None: class _Dummy: seq = LGSeqRecord() dummy_seq = _Dummy() dummy_seq.seq = attachment.get_download_url(SESSION) if delete: SESSION.delete(attachment) return dummy_seq.seq return None
@property def sequence(self) -> Optional[Dseqrecord]: try: out_seq = self.sequences[0].sequence if out_seq is None: raise IndexError() if isinstance(self, Plasmid) and out_seq.linear: out_seq = out_seq.looped() return out_seq except IndexError: legacy_seq = self.get_attached_sequence() if legacy_seq: self.sequence = legacy_seq SESSION.update(self) out_seq = self.sequences[0].sequence if isinstance(self, Plasmid) and out_seq.linear: out_seq = out_seq.looped() return out_seq return None @sequence.setter def sequence(self, value: SeqRecord): mol_type = value.annotations.get('molecule_type', '') seq_type = Sequence if 'AA' in mol_type else DNASequence seq_kind = SequenceKindInt.from_string(mol_type) new_seq = seq_type.new_from_seq_record(value, seq_kind, self) new_seq.name = f'Seq. for: {self.name}' if self.sequences: SESSION.archive(self.sequences[0]) self.sequences = self.sequences[1:] self.sequences.append(new_seq) sequence: Optional[Dseqrecord] = make_lg_searchable(sequence, 'sequence.seq') """The first nucleotide/amino acid sequence associated with the object. Searchable.""" def __len__(self) -> int: return len(self.sequence.seq)
[docs] def mol_weight(self) -> float: return (len(self.sequence.seq) * 617.96) + 36.04
[docs] def update_api(self: MSCI, session: "Session", retries=0) -> MSCI: # standard update # noinspection PyTypeChecker new_item: MSCI = super().update_api(session, retries) # update modified sequences has_updated_sequences = False for cur_seq in self.sequences: cur_seq.source = new_item if cur_seq.updated: has_updated_sequences = True if cur_seq.id: cur_seq = SESSION.update(cur_seq) else: cur_seq = SESSION.add(cur_seq) # get new item with sequence updates if has_updated_sequences: # noinspection PyTypeChecker new_item: MSCI = new_item.from_api(item_id=new_item.id) return new_item
[docs] async def aio_rename(self: MSCI, new_name: str) -> MSCI: old_name = self.name new_obj = await super().aio_rename(new_name) assert isinstance(new_obj, MultiSequencedCollection) for cur_seq in new_obj.sequences: if old_name in cur_seq.name: cur_seq.name = cur_seq.name.replace(old_name, new_name) new_seq_obj = cur_seq.sequence.copy() new_seq_obj.name = new_name cur_seq.sequence = new_seq_obj await cur_seq.async_lg_sync() return new_obj
[docs] class Plasmid(MultiSequencedCollection, SelectableMixin): _api_name = 'plasmids' _attribute_dict = { 'Clone #/Cloning ID' : 'clone_no', 'Vector notes' : 'notes', 'Origin of Replication' : '_origin', 'Resistance Gene' : '_resistance', 'insert' : 'insert', 'Promoter' : 'promoter', 'Affinity Tag' : 'affinity_tag', 'Made by?' : 'made_by', 'Temperature Sensitive?' : 'temp_sensitive', 'Genotype' : 'genotype', 'Barcode' : 'barcode', 'U Count' : 'u_count', 'Diversity' : 'diversity', } class_name = 'Biocollections::Plasmid' class_display_name = 'Plasmid' xlsx_collection = 'Plasmid' clone_no = LGStr('', 'custom3') """An ID associated with the plasmid during cloning. Searchable.""" notes = LGStr('', 'custom5') """Anything else. Searchable.""" _origin = LGStr('Unknown') _resistance = LGStr('Unsure') insert = LGStr('', 'insert') """The product of interest on the plasmid. Searchable.""" promoter = LGStr('Unknown', 'custom15') """The promoter used to express the insert. Searchable.""" affinity_tag = LGStr('None', 'custom16') """Affinity tags included on the insert. Comma-separated. Searchable.""" made_by = LGStr('Foundry', 'custom13') """Initials of the individual who cloned the plasmid. Searchable.""" temp_sensitive = LGStr('No', 'custom9') """Yes/No for if the plasmid only propagates at certain temperatures. Searchable.""" genotype = LGStr('', 'custom1') """Genotype of the plasmid. Searchable.""" barcode = LGStr('', 'custom17') """Barcode of the plasmid. Searchable.""" u_count = LGInt(None, 'custom19') """Number of UAG codons in the target protein. Searchable.""" parent_genetic_part = Parent(parent_type_name="GeneticPart", search_field='parent_generic_9') """If this is a L0 plasmid, the genetic part it contains. Searchable.""" diversity = LGStr('', 'custom7') """String diversity field. Intended for plasmids associated with libraries and selections.""" @property def descriptive_id(self) -> str: warnings.warn('The Plasmid attribute descriptive_id is deprecated, please use description', DeprecationWarning) return self.description @descriptive_id.setter def descriptive_id(self, value): warnings.warn('The Plasmid attribute descriptive_id is deprecated, please use description', DeprecationWarning) self.description = value @property def origin(self) -> str: return self._origin @origin.setter def origin(self, value: str): if not value: return ori_options = ['pMB1 (pBR322)', 'pMB1 (pUC19)', 'CDF', 'RSF1030', 'ColE1', 'rep101ts', 'rep101', 'p15A'] if value in ori_options: self._origin = value else: try: self._origin = SESSION.get_config_value('ORIGINS', value) except KeyError: prompt_text = f"{value} is not a valid plasmid origin for {self.name}, please choose from:" chosen_ori = questionary.rawselect(prompt_text, ori_options).ask() self._origin = chosen_ori SESSION.set_config_value('ORIGINS', value, chosen_ori) origin: str = make_lg_searchable(origin, 'custom2') """The plasmid's origin of replication. Searchable.""" @property def resistance(self) -> str: return self._resistance @resistance.setter def resistance(self, value: str): if not value: return abx_options = ['kanR', 'bla', 'cat', 'zeoR', 'gmR', 'aadA', 'tetA'] if value in abx_options: self._resistance = value else: try: self._resistance = SESSION.get_config_value('RESISTANCES', value) except KeyError: prompt_text = f"{value} is not a valid resistance gene for {self.name}, please choose from:" chosen_abx = questionary.rawselect(prompt_text, abx_options).ask() self._resistance = chosen_abx SESSION.set_config_value('RESISTANCES', value, chosen_abx) resistance: str = make_lg_searchable(resistance, 'custom14') """The plasmid's selectable marker. Searchable."""
[docs] def calculated_genotype(self) -> str: """The genotype of the plasmid formatted for inclusion in strain genotypes""" if self.genotype: return f'{self.name}/{self.genotype}' else: return self.name
[docs] def imply_mut_count(self, new_aa: str) -> int: """Calculates the number of mutations implied by a given new amino acid sequence. This function uses regex to match mutation patterns in the `insert` string, based on the provided `new_aa` amino acid parameter. The pattern identifies amino acid mutations denoted by a specific structure combining letters, digits, and the provided `new_aa`. The function then returns the count of such matches. Args: new_aa (str): The new amino acid for which mutation matches are to be searched within the `insert` string. Returns: int: The number of matches corresponding to the mutation pattern in the `insert` string. """ # regex = re.compile(f"(_[A-Z]\\d+{new_aa})") m = re.findall(f"(_[A-Z]\\d+{new_aa})", self.insert) return len(m)
[docs] def guess_genotype(self, inplace=False) -> str: """ Trys to determine the plasmid's genotype given the origin, promoter, and insert fields. If there are multiple inserts or promoters (defined by separating the fields by ", "), it will match them 1:1 in separate cistrons. If there are more promoters than inserts, the algorithm will assume that the last insert has multiple promoters. If there are more inserts than promoters, the algorithm will assume that the last promoter drives multiple ORFs (aka. an operon). .. list-table:: Genotype Examples :header-rows: 1 * - origin - promoter - insert - -> - guessed genotype * - o1 - p1 - i1 - - o1-(p1:i1) * - o1 - p1, p2 - i1, i1 - - o1-(p1:i1)-(p2:i2) * - o1 - p1, p2 - i1, i1, i3 - - o1-(p1:i1)-(p2:i2:i3) * - o1 - p1, p2, p3 - i1, i1 - - o1-(p1:i1)-(p2-p3:i2) Args: inplace: If True, the plasmid's genotype field will be set to the guessed value. Default: False Returns: The best guess at the genotype given the information on-hand """ def add_prefix(value: str, prefix: str) -> str: return value if value.startswith(prefix) else prefix + value def clean_up_values(val: str) -> str: if '(' in val and ')' in val: val = val.split('(')[0].strip() for c in ' :,': val = val.replace(c, '.') return val # determine origin if self.origin == 'pMB1 (pUC19)': ori = 'pUC' elif self.origin == 'pMB1 (pBR322)': ori = 'pMB1' if 'MB1' in self.description else 'pET' else: ori = add_prefix(self.origin, 'p') # determine ORFs promoter_split = self.promoter.split(', ') insert_split = self.insert.split(', ') last_p = last_i = '' cds_by_promoter = defaultdict(list) c_p: Optional[str] c_i: Optional[str] for c_p, c_i in zip_longest(promoter_split, insert_split): c_p = add_prefix(clean_up_values(c_p), 'P.') if c_p else None c_i = clean_up_values(c_i) if c_i else None if c_p and c_i: # 1:1 promoter:CDS match cds_by_promoter[c_p].append(c_i) last_p, last_i = c_p, c_i elif c_i: # extra insert under the same promoter cds_by_promoter[last_p].append(c_i) last_i = c_i elif c_p: # multiple promoters for the same insert c_p = f"{last_p}-{c_p}" cds_by_promoter[c_p] = cds_by_promoter[last_p] del cds_by_promoter[last_p] last_p = c_p orf_list = [] for p, i_list in cds_by_promoter.items(): orf_list.append(f"({p}:{':'.join(i_list)})") guess = '-'.join([ori] + orf_list) if inplace: self.genotype = guess return guess
[docs] async def aio_rename(self: Plasmid, new_name: str) -> Plasmid: old_name = self.name new_obj = await super().aio_rename(new_name) assert isinstance(new_obj, Plasmid) derived_strains = await Strain.async_find_all(Strain.plasmids.contains(old_name)) for s in derived_strains: s.rename_plasmid(old_name, self) await s.async_lg_sync() return new_obj
[docs] class Oligo(MultiSequencedCollection): _api_name = 'primers' _attribute_dict = { 'Oligo Sequence': '_idt_sequence', "5 Phos": 'five_phos', 'Other Mods': 'other_mods', 'RE Sites': 're_sites', 'used_for': 'application' } xlsx_collection = 'Primer' _idt_sequence = LGStr('', 'custom9') application = LGStr('', 'application') five_phos = LGStr('', 'custom23') other_mods = LGStr('', 'custom24') re_sites = LGStr('', 'custom25') @property def idt_sequence(self) -> str: return self._idt_sequence @idt_sequence.setter def idt_sequence(self, value: str): from LabGuruAPI._idt_mods import IDT_MOD_INFO backslash_count = sum(c == '/' for c in value) if backslash_count % 2: self._idt_sequence = value return self.five_phos = 'Yes' if '/5Phos/' in value else 'No' value = value.replace('/5Phos/', '') mod_bases_by_name: DefaultDict[str, Set[int]] = defaultdict(set) mod_ranges: List[Tuple[str, int, int]] = [] cur_base = 0 nucleotides = '' seq_iter = iter(value) for c in seq_iter: if c in 'ACTGUN': cur_base += 1 nucleotides += c elif c == ' ': continue elif c == '/': mod_name = '' c = next(seq_iter) while c != '/': mod_name += c c = next(seq_iter) if mod_name in IDT_MOD_INFO: mod_name, addl_bases = IDT_MOD_INFO[mod_name] if addl_bases: for b in addl_bases: mod_bases_by_name[mod_name].add(cur_base + 1) nucleotides += b cur_base += 1 continue mod_bases_by_name[mod_name].add(cur_base + 1) elif c in 'rm+': mod_name = {'r': 'RNA', 'm': "2' O-methyl", '+': 'Affinity Plus'}[c] mod_bases_by_name[mod_name].add(cur_base + 1) elif c == '*': mod_ranges.append(('Phosphorothioate Bond', cur_base, cur_base + 1)) for mod, base_set in mod_bases_by_name.items(): base_list = sorted(base_set) for k, g in groupby(enumerate(base_list), key=lambda v: v[0] - v[1]): group = list(map(itemgetter(1), g)) mod_ranges.append((mod, group[0], group[-1])) seq_record = read_primer(f">{self.name}\n{nucleotides}") seq_record.name = self.name all_mods = set() for mod, start, end in mod_ranges: feature_location = FeatureLocation(start - 1, end, strand=0) seq_record.features.append(SeqFeature(feature_location, type='modified_base', qualifiers=dict(label=[mod]))) all_mods.add(mod) self.other_mods = ', '.join(sorted(all_mods)) # Get restriction sites sites = [] for e in ['BsaI', 'BbsI', 'SapI', 'PaqCI']: seq = Dseq(nucleotides.upper().replace('U', 'T')) cut_seqs = seq.cut(restriction_enzyme(e)) if len(cut_seqs) > 1: for s in cut_seqs[:-1]: strand, ovhg = s.three_prime_end() ovhg = ovhg.upper() rc_ovhg = str(Dseq(ovhg).rc()) sites.append(f'{e}/{strand}/{ovhg}/{rc_ovhg}') self.re_sites = ', '.join(sites) self.sequence = seq_record self._idt_sequence = value
[docs] @staticmethod def get_idt_order_data(order_id: int) -> pd.DataFrame: """ This method retrieves IDT (Integrated DNA Technologies) order data for a given order ID. It returns the order data as a pandas DataFrame. Args: order_id (int): The ID of the order for which the data is to be retrieved. Returns: pd.DataFrame: The IDT order data as a pandas DataFrame. """ with SciToolsPlus() as idt_session: print('coa_data') coa_data = idt_session.get_coa_dataframe(order_id) return coa_data
[docs] class SyntheticGene(MultiSequencedCollection, SelectableMixin): _api_name = 'biocollections/Synthesized dsDNA' _attribute_dict = { 'Designer' : 'designer', 'Notes on Design or Usage': 'notes', } xlsx_collection = 'Synthesized dsDNA' designer = LGStr('', 'custom1') notes = LGStr('', 'custom8')
[docs] class Amplicon(MultiSequencedCollection, SelectableMixin): _api_name = 'biocollections/pcr products' _attribute_dict = { 'F Primer' : 'fwd_primer', 'R Primer' : 'rev_primer', 'PCR Conditions' : 'pcr_conditions', 'Usage Notes' : 'usage_notes', 'Date Made' : 'date_made', 'Made by?' : 'made_by', 'Parent Plasmid' : 'parent_plasmid', 'Parent Strain' : 'parent_strain', 'Parent Synthesized dsDNA': 'parent_synthetic_gene', 'Parent PCR Products' : 'parent_amplicon' } xlsx_collection = 'PCR Products' concentration = LGStr('') fwd_primer = LGStr('', 'custom5') """The forward primer used in the PCR reaction. Searchable.""" rev_primer = LGStr('', 'custom6') """The reverse primer used in the PCR reaction. Searchable.""" pcr_conditions = LGStr('', 'custom7') """A brief description of the PCR cycle. Searchable.""" usage_notes = LGStr('', 'custom15') """Intended use of the amplicon. Searchable.""" date_made = LGStr(datetime.date.today().strftime('%m/%d/%Y'), 'custom2') made_by = LGStr('Foundry', 'custom3') """Initials of the individual who ran the PCR. Searchable.""" parent_plasmid = Parent(Plasmid, search_field='parent_plasmid') """The plasmid used as a PCR template. Searchable.""" parent_strain = Parent(Strain, search_field='parent_generic_4') """The strain used as the PCR template. Searchable.""" parent_synthetic_gene = Parent(SyntheticGene, search_field='parent_generic_1') """The synthetic DNA used as the PCR template. Searchable.""" parent_amplicon = Parent(search_field='parent_generic_2') """The amplicon used as the PCR template. Searchable.""" def set_parent(self, parent: MSCI): if Plasmid.isinstance(parent): self.parent_plasmid = parent elif Strain.isinstance(parent): self.parent_strain = parent elif SyntheticGene.isinstance(parent): self.parent_synthetic_gene = parent elif Amplicon.isinstance(parent): self.parent_amplicon = parent else: raise ValueError(f'{repr(parent)} cannot be set as a parent of {repr(self)}') @property def parent(self) -> MSCI: return self.parent_plasmid or self.parent_amplicon or self.parent_strain or self.parent_synthetic_gene @parent.setter def parent(self, value: MSCI): self.set_parent(value)
[docs] @classmethod def make_new(cls: Type[LGI], overwrite=False, **properties) -> LGI: parent: Optional[SCI] = properties.pop('parent', None) new_amp: Amplicon = super().make_new(overwrite=overwrite, **properties) if parent: new_amp.set_parent(parent) return new_amp
[docs] @classmethod async def aio_search_api(cls: Type[LGI], session: Session, query_data: Dict[str, Any], cur_page=1) -> List[LGI]: out_list = await super().aio_search_api(session, query_data, cur_page) unique_keys = {getattr(s.parent, '_proxy_target') for s in out_list if hasattr(s.parent, '_proxy_target')} await tqdm_asyncio.gather(*[session.aio_get_object_from_cache_key(k) for k in unique_keys], leave=False) return out_list
[docs] @staticmethod def new_from_template_and_primers(name: str, template: MSCI, oligo1: Oligo, oligo2: Oligo, **props) -> Amplicon: """ Simulates a PCR reaction and creates a new ``Amplicon`` object from the sequence. Args: name: the name of the resulting amplicon template: the object representing the PCR template. this must have a sequence to be successful oligo1: one of the two PCR oligos, generally the sense (forward) oligo oligo2: one of the two PCR oligos, generally the antisense (reverse) oligo **props: other amplicon properties that can be set in the initial object creation Returns: The amplicon generated by the template and oligos Raises: ValueError: The PCR could not be simulated """ if not isinstance(template, MultiSequencedCollection): raise ValueError(f"Objects of type {type(template).__name__} cannot be used as amplicon templates") if not template.sequence: raise ValueError(f"Template {template.name} has no associated sequence.") try: amplicon: PYDAmplicon = pyd.pcr(oligo1.sequence, oligo2.sequence, template.sequence, limit=18) except ValueError: primer_annealing = pyd.Anneal([oligo1.sequence, oligo2.sequence], template.sequence) if len(primer_annealing.products) == 1: amplicon = primer_annealing.products[0] elif len(primer_annealing.products) == 0: raise ValueError( f'NO AMPLICONS FOUND for {name} ({template.name} with {oligo1.name} and {oligo2.name}).') else: amplicon = sorted(primer_annealing.products, key=lambda x: x.tmf + x.tmr, reverse=True)[0] amplicon.name = name amplicon.description = amplicon.description.replace('Product', template.name) # Remove primers from amplicon features features_copy = [a for a in amplicon.features] for f in features_copy: possible_names = ''.join(f.qualifiers.get('note', [''])) possible_names += ''.join(f.qualifiers.get('label', [''])) if oligo1.name in possible_names or oligo2.name in possible_names: amplicon.features.delete(f) props.update(dict(name=name, parent=template, fwd_oligo=oligo1.name, rev_oligo=oligo2.name, sequence=amplicon)) out_amplicon = Amplicon.make_new(**props) return out_amplicon
[docs] class GeneticPart(MultiSequencedCollection): _api_name = 'biocollections/genetic parts' xlsx_collection = 'Genetic Parts' _attribute_dict = { 'Type': 'part_type', 'Strength': 'strength', 'Strength Units': 'strength_units', 'Genotype Contribution': 'genotype' } part_type = LGStr('', 'custom1') """The type of the genetic part. Searchable.""" strength = LGFloat(lg_name='custom2') """If known, the strength magnitude of the part. Searchable.""" strength_units = LGStr(lg_name='custom3') """If known, the units of the part strength. Searchable.""" genotype = LGStr(lg_name='custom4') """The part's contribution to a plasmid genotype. Searchable."""
[docs] class RodentStrain(Collections): _api_name = 'rodent_strains' xlsx_collection = 'Rodent strains' _attribute_dict = { 'Species': 'species', 'genotype': 'genotype', 'phenotype': 'phenotype', 'transgene': 'transgene', 'source': 'source', 'alternative_name': 'alternative_name', } species = LGStr(lg_name='custom1') """The species of the rodent strain. Searchable.""" genotype = LGStr(lg_name='genotype') """The genotype of the rodent strain. Searchable.""" phenotype = LGStr(lg_name='phenotype') """The phenotype of the rodent strain. Searchable.""" transgene = LGStr(lg_name='transgene') """The transgene of the rodent strain. Searchable.""" source = LGStr(lg_name='source') """The source of the rodent strain. Searchable.""" alternative_name = LGStr(lg_name='alternative_name') """The alternative name of the rodent strain. Searchable."""
[docs] @classmethod def from_name(cls: Type[LGI], name: str) -> LGI: if rodent := super().from_name(name): return rodent elif rodent := cls.find_one(cls.genotype == name): SESSION.cache[SessionCacheKey(cls, name=name)] = rodent return rodent
[docs] class LGNamedEnum(LGStr): def __set__(self, instance, value): if isinstance(value, dict): value = value.get('name', self.default_val).lower() super().__set__(instance, value)
[docs] class Rodent(Collections): _api_name = 'rodent_specimens' _attribute_dict = { 'alternative_name': 'alternative_name', 'dob': '_dob', 'dod': '_dod', 'sex': 'sex', 'ear_tag': 'tag_id', 'status': 'status', 'experiment': 'experiment', 'strain_id': '_strain_id', 'genotype': 'genotype', 'source': 'source', 'Group': 'group', # custom1 'Treatment Schedule': 'treatment_schedule', # custom 2 } alternative_name = LGStr(lg_name='alternative_name') _dob = LGStr(lg_name='dob') _dod = LGStr(lg_name='dod') sex = LGNamedEnum('?') tag_id = LGStr(lg_name='ear_tag') status = LGNamedEnum('alive') experiment = LGStr(lg_name='experiment') _strain_id = LGInt(lg_name='strain_id') source = LGStr(lg_name='source') genotype = LGStr(lg_name='genotype') group = LGInt(lg_name='custom1') treatment_schedule = LGStr(lg_name='custom2') @property def date_of_birth(self) -> datetime.date: return datetime.date.fromisoformat(self._dob) @date_of_birth.setter def date_of_birth(self, value: Union[str, datetime.date]): self._dob = value.isoformat() if isinstance(value, datetime.date) else value date_of_birth: datetime.date = make_lg_searchable(date_of_birth, 'dob') @property def date_of_death(self) -> datetime.date: return datetime.date.fromisoformat(self._dod) @date_of_death.setter def date_of_death(self, value: Union[str, datetime.date]): self._dod = value.isoformat() if isinstance(value, datetime.date) else value date_of_death: datetime.date = make_lg_searchable(date_of_death, 'dod') @property def strain(self) -> Optional[RodentStrain]: if self._strain_id: return RodentStrain.from_id(self._strain_id) return None @strain.setter def strain(self, value: Union[str, dict, RodentStrain]): if isinstance(value, str): value = RodentStrain.from_name(value) elif isinstance(value, dict) and 'id' in value: value = RodentStrain.from_id(value['id']) elif isinstance(value, RodentStrain): value = value else: raise ValueError(f'Invalid value for strain: {value}') self._strain_id = value.id self.genotype = value.genotype def create_tissue_sample(self, name: str, tissue_type: str, harvest_date: Union[datetime.date, str], fixation_embedding_procedure: str = None, applications: str = None, storage_conditions: str = None, source: str = None) -> "Tissue": tissue_name = f'{self.experiment}-{name}' new_tissue = Tissue.make_new(name=tissue_name, tissue_type=tissue_type, harvest_date=harvest_date, fixation_embedding_procedure=fixation_embedding_procedure, applications=applications, storage_conditions=storage_conditions, source=source) new_tissue.species = self.strain.species if self.strain else None new_tissue.genotype = self.strain.genotype if self.strain else None new_tissue.specimen = self return new_tissue
[docs] class Tissue(Collections): _api_name = 'tissues' _attribute_dict = { 'species': 'species', 'genotype_phenotype': 'genotype', 'animal_details': 'animal_details', 'tissue_type': 'tissue_type', 'harvest_date': '_harvest_date', 'fixation_embedding_procedure': 'fixation_embedding_procedure', 'applications': 'applications', 'storage_conditions': 'storage_conditions', 'source': 'source', 'specimen_id': '_specimen_id', } species = LGStr(lg_name='species') genotype = LGStr(lg_name='genotype_phenotype') animal_details = LGStr(lg_name='animal_details') tissue_type = LGStr(lg_name='tissue_type') _harvest_date = LGStr(lg_name='harvest_date') fixation_embedding_procedure = LGStr(lg_name='fixation_embedding_procedure') applications = LGStr(lg_name='applications') storage_conditions = LGStr(lg_name='storage_conditions') source = LGStr(lg_name='source') _specimen_id = LGInt(lg_name='specimen_id') @property def harvest_date(self) -> datetime.date: return datetime.date.fromisoformat(self._harvest_date) @harvest_date.setter def harvest_date(self, value: Union[str, datetime.date]): self._harvest_date = value.isoformat() if isinstance(value, datetime.date) else value harvest_date: datetime.date = make_lg_searchable(harvest_date, 'harvest_date') @property def specimen(self) -> Optional[Rodent]: return Rodent.from_id(self._specimen_id) if self._specimen_id else None @specimen.setter def specimen(self, value: Rodent): self._specimen_id = value.id
COLLECTIONS_BY_NAME: Dict[str, Type[CI]] = { 'Strains' : Strain, 'Strain' : Strain, 'Glycerol Stocks' : Strain, 'Glycerol Stock' : Strain, 'Anchor Strain' : AnchorStrain, 'Anchor Strains' : AnchorStrain, 'Biomass Pellets' : BiomassPellet, 'Biomass Pellet' : BiomassPellet, 'Inclusion Bodies' : InclusionBody, 'Inclusion Bodie' : InclusionBody, 'Inclusion Body' : InclusionBody, 'Primers' : Oligo, 'Primer' : Oligo, 'Compounds' : Compound, 'Compound' : Compound, 'Plasmids' : Plasmid, 'Plasmid' : Plasmid, 'PCR Products' : Amplicon, 'PCR Product' : Amplicon, 'Synthesized dsDNAs': SyntheticGene, 'Synthesized dsDNA' : SyntheticGene, 'Consumables' : Consumable, 'Consumable' : Consumable, 'Sequence' : Sequence, 'Sequences' : Sequence, 'Genetic Part' : GeneticPart, 'Genetic Parts' : GeneticPart, 'Library' : Library, 'Libraries' : Library, 'Selection' : Selection, 'Selections' : Selection } if __name__ == '__main__': s = SESSION.get_object(Selection, 21051, proxy=False) print(s.parent_library.theoretical_diversity)