Source code for LabGuruAPI._base

import asyncio
import functools
import json
import os
import sys
import traceback
from abc import abstractmethod
from asyncio import Future
from collections import defaultdict
from configparser import ConfigParser
from dataclasses import dataclass, asdict
from pathlib import Path
from tempfile import NamedTemporaryFile
from types import TracebackType
from typing import TypeVar, Dict, Callable, Any, Type, List, Optional, Mapping, overload, Union, Iterator, Tuple, \
    Generic, Iterable, Sized, Coroutine, DefaultDict, Awaitable

from aiostream import stream
from deprecation import deprecated

import requests
import typer
from Bio.SeqFeature import FeatureLocation
from pydna.dseq import Dseq
from pydna.dseqrecord import Dseqrecord
from pydna.readers import read as read_seq
from pydna.seqfeature import SeqFeature
from requests_futures.sessions import FuturesSession
from tqdm import tqdm
from tqdm.asyncio import tqdm as tqdm_aio
from appdirs import AppDirs
# import and fix a weird issue with aiohttp
import aiohttp
from functools import wraps
from asyncio.proactor_events import _ProactorBasePipeTransport

from LabGuruAPI._search_api import SearchInterface, LGSearchOperator, LGSearchAPI, make_lg_searchable


def silence_event_loop_closed(func):
    @wraps(func)
    def wrapper(self, *args, **kwargs):
        try:
            return func(self, *args, **kwargs)
        except RuntimeError as e:
            if str(e) != 'Event loop is closed':
                raise

    return wrapper


_ProactorBasePipeTransport.__del__ = silence_event_loop_closed(_ProactorBasePipeTransport.__del__)
###

T = TypeVar('T')


class LGPrimitives(Generic[T], SearchInterface):
    base_type: Callable[[Any], T] = int
    null_val = None

    def __init__(self, default_value: T = None, lg_name: str = None):
        """
        Base class used link python properties with LabGuru properties

        :param default_value: The value to be supplied to LG if it has not been set elsewhere
        :param lg_name: The name of the LabGuru field used in searches. Only necessary if the search name is different
                        from the property name
        """
        if default_value is not None:
            self.default_val = default_value
        else:
            self.default_val = self.null_val
        self.labguru_name = lg_name

    def __new__(cls: Type[T], *args, **kwargs) -> T:
        return super().__new__(cls)

    def __set_name__(self, owner, name):
        self.private_name = '_' + name

    def __set__(self, instance, value):
        if value in [None, '']:
            new_val = self.null_val or self.base_type()
        else:
            new_val = self.base_type(value)
        setattr(instance, self.private_name, new_val)

    def __get__(self, instance, owner) -> Optional[T]:
        try:
            return getattr(instance, self.private_name) if instance is not None else self
        except AttributeError:
            return self.default_val


[docs] class LGInt(LGPrimitives[int], int):
[docs] def __init__(self, default_value: T = None, lg_name: str = None): """ Base class used link python properties with LabGuru integer properties :param default_value: The value to be supplied to LG if it has not been set elsewhere :param lg_name: The name of the LabGuru field used in searches. Only necessary if the search name is different from the property name """ super().__init__(default_value, lg_name)
def __set__(self, instance, value): try: super().__set__(instance, value) except ValueError: super().__set__(instance, float(value))
[docs] class LGFloat(LGPrimitives[float], float): base_type = float
[docs] def __init__(self, default_value: T = None, lg_name: str = None): """ Base class used link python properties with LabGuru non-integer numeric properties :param default_value: The value to be supplied to LG if it has not been set elsewhere :param lg_name: The name of the LabGuru field used in searches. Only necessary if the search name is different from the property name """ super().__init__(default_value, lg_name)
def __set__(self, instance, value): try: value = round(float(value), 9) except: pass super().__set__(instance, value)
[docs] class LGStr(LGPrimitives[str], str): base_type = str
[docs] def __init__(self, default_value: T = None, lg_name: str = None): """ Base class used link python properties with LabGuru string properties :param default_value: The value to be supplied to LG if it has not been set elsewhere :param lg_name: The name of the LabGuru field used in searches. Only necessary if the search name is different from the property name """ super().__init__(default_value, lg_name)
[docs] class LGJSONStr(LGPrimitives[str], str): base_type = str
[docs] def __init__(self, default_value: T = None, lg_name: str = None): """ Base class used link python properties with LabGuru JSON properties with a Python string implementation :param default_value: The value to be supplied to LG if it has not been set elsewhere :param lg_name: The name of the LabGuru field used in searches. Only necessary if the search name is different from the property name """ super().__init__(default_value, lg_name)
def __set__(self, instance, value): if value in [None, '']: new_val = self.null_val or self.base_type() elif isinstance(value, str): new_val = self.base_type(value) else: new_val = json.dumps(value) setattr(instance, self.private_name, new_val)
[docs] class LGList(Generic[T]): base_type: Type[T] = int null_val = None @classmethod def new_value_function(cls, value: Any) -> T: try: if issubclass(cls.base_type, LabGuruItem): if isinstance(value, LabGuruItem) and value.id > '': return SESSION.get_object(cls.base_type, item_id=value.id) elif isinstance(value, dict): if 'id' in value and value['id'] is not None: return SESSION.get_object(cls.base_type, item_id=value['id']) return cls.base_type.parse_api_data(value) except (KeyError, ValueError, AttributeError): pass return cls.base_type(value) def __set_name__(self, owner, name): self.private_name = '_' + name def __set__(self, instance, value): if value in [None, '', []]: new_val = [] else: try: new_val = [] for v in value: if isinstance(v, LabGuruItem): new_val.append(v) else: new_val.append(self.new_value_function(v)) except TypeError: raise # raise TypeError(f'{type(instance).__name__}.{self.private_name} was expecting a list of ' # f'{self.base_type.__name__}. Got {type(value).__name__} instead.') setattr(instance, self.private_name, new_val) def __get__(self, instance, owner) -> List[T]: try: return getattr(instance, self.private_name) except AttributeError: setattr(instance, self.private_name, []) return getattr(instance, self.private_name)
[docs] class LGStrList(LGList): base_type = str
[docs] class LGBool(LGPrimitives[bool], int): base_type = bool
[docs] def __init__(self, default_value: T = None, lg_name: str = None): """ Base class used link python properties with LabGuru boolean properties :param default_value: The value to be supplied to LG if it has not been set elsewhere :param lg_name: The name of the LabGuru field used in searches. Only necessary if the search name is different from the property name """ super().__init__(default_value, lg_name)
[docs] class LGDict(LGPrimitives[dict], dict): base_type = dict
[docs] def __init__(self, default_value: T = None, lg_name: str = None): """ Base class used link python properties with LabGuru JSON properties with a Python dict implementation :param default_value: The value to be supplied to LG if it has not been set elsewhere :param lg_name: The name of the LabGuru field used in searches. Only necessary if the search name is different from the property name """ super().__init__(default_value, lg_name)
def __set__(self, instance, value): if isinstance(value, str): value = json.loads(str) super().__set__(instance, value)
attachment_session = FuturesSession() class LGAttachmentPath: def __init__(self, file_extension: str = ''): """ Base class used link python file paths with LabGuru string properties :param file_extension: The file extension to use in the temporary file """ self.file_extension = f'.{file_extension}' if file_extension else None def __set_name__(self, owner, name): self.private_name = f'_{name}_path' self.private_request = f'_{self.private_name}_request' self.private_update_status = f'_{self.private_request}_updated' def __get__(self, instance, owner) -> Path: try: result: Optional[Future] = getattr(instance, self.private_request) temp_file: str = getattr(instance, self.private_name) except AttributeError: result = None try: temp_file: str = getattr(instance, self.private_name) except AttributeError: temp_file = NamedTemporaryFile(delete=False, suffix=self.file_extension).name setattr(instance, self.private_name, temp_file) if os.path.exists(temp_file) and os.path.getsize(temp_file) > 0: # print('File Already Written') pass elif result is not None: # print('Waiting for download') r = result.result() # print(r) # print('Writing to file') Path(temp_file).write_bytes(r.content) # print('Written') return Path(temp_file) def __set__(self, instance, value: Union[str, Path]): if 'http' in str(value): # remote instance temp_file = NamedTemporaryFile(delete=False, suffix=self.file_extension) # print('Making Tempfile') setattr(instance, self.private_name, temp_file.name) # print(temp_file.name) cur_get = attachment_session.get(value) setattr(instance, self.private_request, cur_get) else: setattr(instance, self.private_name, value if isinstance(value, Path) else Path(value)) setattr(instance, self.private_request, None) setattr(instance, self.private_update_status, hasattr(instance, self.private_update_status)) class LGSeqRecord(LGAttachmentPath): def __init__(self, file_extension: str = ''): """ Base class used link python sequence objects with LabGuru string properties :param file_extension: The file extension to use in the temporary file """ super().__init__(file_extension) def __set_name__(self, owner, name): super().__set_name__(owner, name) self.private_sequence = f'_{name}_seqrecord' def __get__(self, instance, owner) -> Dseqrecord: try: seq_record = getattr(instance, self.private_sequence) # print('Already converted') return seq_record except AttributeError: record_path = super().__get__(instance, owner) # print('Parsing to Dseqrecord') seq_record: Dseqrecord = read_seq(str(record_path)) seq_record.features = list(filter(lambda f: f.location is not None, seq_record.features)) overhangs = [f for f in seq_record.features if f.type.lower() == 'overhang'] if overhangs: ovhg_5_len = ovhg_3_len = 0 seq_len = len(seq_record.seq) for cur_ovhg in overhangs: # print(cur_ovhg.location) if cur_ovhg.location.start == 0: ovhg_5_len = cur_ovhg.location.end - cur_ovhg.location.start ovhg_5_len *= -1 * cur_ovhg.strand elif cur_ovhg.location.end == seq_len: ovhg_3_len = cur_ovhg.location.end - cur_ovhg.location.start ovhg_3_len *= cur_ovhg.strand # make watson strand watson_start = max(0, ovhg_5_len) watson_end = min(seq_len, seq_len + ovhg_3_len) watson = str(seq_record.seq)[watson_start:watson_end] # make crick strand crick_start = max(0, ovhg_3_len) crick_end = min(seq_len, seq_len + ovhg_5_len) crick = str(seq_record.seq.rc())[crick_start:crick_end] seq_record.seq = Dseq(watson, crick, ovhg=ovhg_5_len) setattr(instance, self.private_sequence, seq_record) return seq_record def __set__(self, instance, value: Union[str, Path, Dseqrecord]): try: delattr(instance, self.private_sequence) except AttributeError: pass if isinstance(value, Dseqrecord): value, seq_record = NamedTemporaryFile(delete=True), value value.close() o5_type, o5_seq = seq_record.seq.five_prime_end() o3_type, o3_seq = seq_record.seq.three_prime_end() if o5_seq: f5 = [f for f in seq_record.features if f.location.start == 0 and f.type.lower() == 'overhang'] if not f5: o5_dir = 1 if '5' in o5_type else -1 o5_feature = SeqFeature(FeatureLocation(0, len(o5_seq)), strand=o5_dir, type='overhang') seq_record.features.append(o5_feature) if o3_seq: seq_len = len(seq_record) f3 = [f for f in seq_record.features if f.location.end == seq_len and f.type.lower() == 'overhang'] if not f3: o3_dir = -1 if '5' in o5_type else 1 o3_feature = SeqFeature(FeatureLocation(seq_len - len(o3_seq), seq_len), strand=o3_dir, type='overhang') seq_record.features.append(o3_feature) seq_record.write(str(value.name)) value = value.name super().__set__(instance, value) LGI = TypeVar('LGI', bound='LabGuruItem') #: TypeVar shortcut for all LabGuruItem subclasses @dataclass(eq=True, frozen=True) class SessionCacheKey: lgi_class: Type[LGI] item_id: int = None name: str = None auto_name: str = None uuid: str = None api_url: str = None include_custom: bool = False def to_kwargs(self): out_args = asdict(self) del out_args['lgi_class'] return out_args def desc(self) -> str: d_parts = [self.lgi_class.__name__] + [f'{k}={str(v)}' for k, v in self.to_kwargs().items() if v] return '/'.join(d_parts) def __repr__(self): return f'<SessionCacheKey: {self.desc()}>' def __hash__(self): return hash((self.lgi_class.__name__, tuple(self.to_kwargs().values()))) @dataclass class SessionTransaction: transaction_type: str item: LGI @staticmethod def ADD(item: LGI): return SessionTransaction('add', item) @staticmethod def UPDATE(item: LGI): return SessionTransaction('update', item) @staticmethod def DELETE(item: LGI): return SessionTransaction('delete', item) def throttled_run(*futures: Coroutine[Any, Any, LGI], delay: float = 0.05) -> List[LGI]: async def _run(*fs: Coroutine[Any, Any, LGI]): _sem = asyncio.Semaphore() async def _wrap(f: Coroutine[Any, Any, LGI]): async with _sem: await asyncio.sleep(delay) await_f = await f return await_f return await tqdm_aio.gather(*map(_wrap, fs)) return asyncio.run(_run(*futures))
[docs] class LGDuplicateNameError(BaseException): pass
[docs] class Session(object): """ An object that is used to interface with the LabGuru REST API """
[docs] def __init__(self, token: str = None): """ An object that is used to interface with the LabGuru REST API :param token: The API token used to manage the session. """ if token: self._token = token self.cache: Dict[SessionCacheKey, LGI] = {} """A cache of LabGuruItems to reduce the number of queries to the LabGuru API""" self.transactions: List[SessionTransaction] = [] """Currently unused""" if 'lg_config' in os.environ: self.app_path = Path(os.environ['lg_config']) else: self.app_path = Path(AppDirs("FoundryBackend", "GROBio").user_data_dir) """The base path for foundry backend config files""" self.app_path.mkdir(parents=True, exist_ok=True) env_config_path = os.getenv('LGAPI_CONFIG') self._config_path = Path(env_config_path) if env_config_path else self.app_path / 'config.txt' self.config = ConfigParser() """Interface for the backend config file""" if self._config_path.exists(): self.config.read(self._config_path)
@property def token(self): """The token used to authenticate API calls""" return self.login()
[docs] def login(self) -> str: """ Checks the validity of a cached API token and refreshes it if necessary :return: The current API token """ try: stored_token = self._token self.set_config_value('AUTH', 'TOKEN', stored_token) return self._token except AttributeError: pass try: stored_token = self.get_config_value('AUTH', 'TOKEN') tqdm.write(f'Trying token {stored_token}') r = requests.get('https://my.labguru.com/api/v1/projects/1.json', params=dict(token=stored_token)) tqdm.write(str(r.status_code)) if r.ok: self._token = stored_token return self._token except KeyError: pass from botocore.exceptions import ClientError, NoCredentialsError try: # Try to get credentials though AWS r = self._aws_login() except (ClientError, NoCredentialsError, KeyError) as e: # Otherwise just prompt for a login tqdm.write('Please Login to LabGuru') un = input('Username: ') pw = input('Password: ') if un == 'AWS': self.set_config_value('AUTH', 'SECRET_NAME', pw) r = self._aws_login() else: r = requests.post('https://my.labguru.com/api/v1/sessions.json', data={ 'login': un, 'password': pw }) data = r.json() self._token = data['token'] if self._token == '-1': tqdm.write('Invalid Login. Please, retry. If you were unable to fill in your username and password, you need to run this script in a python console.') return self._token tqdm.write(self._token) self.set_config_value('AUTH', 'TOKEN', self._token) return self._token
def _aws_login(self): secret_id = self.get_config_value('AUTH', 'SECRET_NAME', ask=False) tqdm.write('Refreshing Token via AWS') import boto3 session = boto3.session.Session() client = session.client( service_name='secretsmanager', region_name="us-east-1" ) get_secret_value_response = client.get_secret_value( SecretId=secret_id ) r = requests.post('https://my.labguru.com/api/v1/sessions.json', data=json.loads(get_secret_value_response['SecretString'])) return r
[docs] def get_config_value(self, section: str, key: str, ask=False) -> str: """ Retrieves a configuration value from the config file. :param section: Config file section :param key: Config key to retrieve :param ask: If true, prompts the user for a value if the key is not found. Will store and return the value. :return: The configuration value string :raises KeyError: The configuration Section or Key does not exist if the ask parameter is False """ if ask: try: return self.config[section][key] except KeyError: value = input(f'Please enter a value for configuration value {section.upper()}:{key} or just type No: ') self.set_config_value(section, key, value) return self.config[section][key] try: self.config[section][key] except KeyError: if self._config_path.exists(): self.config.read(self._config_path) return self.config[section][key]
[docs] def set_config_value(self, section: str, key: str, value: Any): """ Writes a key-value pair to the configuration file. :param section: Config file section :param key: Config key to set :param value: Value of the config key. Will be converted to a string before writing """ try: self.config[section][key] = value except KeyError: self.config.add_section(section) self.config[section][key] = str(value) with self._config_path.open('w') as cfg: self.config.write(cfg)
[docs] def make_api_path(self, endpoint: str, with_token: bool = False) -> str: """ Formats an API path with a given endpoint. :param endpoint: The API endpoint to be called. Will prepend 'https://my.labguru.com/api/v1/' if not included. :param with_token: if True, adds the current token as a URL parameter :return: The formatted API URL """ endpoint = endpoint.replace('https://my.labguru.com/api/v1/', '') url = f'https://my.labguru.com/api/v1/{endpoint}' if with_token: url += f'?token={self.token}' return url
[docs] def add(self, item: LGI) -> LGI: """ Adds an item to the LG database. Updates the cache. :param item: The LabGuruItem to be added :return: A freshly-queried version of the added item """ item = item.update_api(self) self.transactions.append(SessionTransaction.ADD(item)) self._add_to_cache(item) return item
[docs] async def aio_add(self, item: LGI) -> LGI: """ Asynchronously adds an item using a separate thread executor and returns the result. If an exception occurs during execution, the error information is captured and returned as an LGIError. Args: item: The LGI instance to be added. Returns: LGI: The result of the addition if successful, otherwise an LGIError containing the exception information and a dictionary representation of the item. """ try: return await asyncio.get_event_loop().run_in_executor(None, self.add, item) except: return LGIError(sys.exc_info(), item.to_dict())
[docs] def add_many(self, items: List[LGI]) -> List[LGI]: """ Adds a list of items to the database in parallel. Updates the cache. :param items: A list of LabGuruItem objects to be added to the database :return: A freshly-queried list of the added items. The input object order is preserved. """ empty_items = [type(lgi).bulk_update(name=f'{lgi.xlsx_collection} Holder {i + 1}') for i, lgi in enumerate(items)] futures = [self.aio_add(lgi) for lgi in empty_items] tqdm_aio.write('Reserving IDs for new items') results = asyncio.run(tqdm_aio.gather(*futures)) for real_item, empty_item in zip(items, results): real_item.id = empty_item.id return self.update_many(items)
[docs] def update(self, item: LGI) -> LGI: """ Updates an item in the LG database. Updates the cache. :param item: The LabGuruItem to be updated :return: A freshly-queried version of the updated item """ old_parent_uuids = {v for k, v in item.__dict__.items() if 'api_uuid' in k} try: item = item.update_api(self) finally: # Clean up item parents new_parent_uuids = {v for k, v in item.__dict__.items() if 'api_uuid' in k} removed_parent_uuids = old_parent_uuids - new_parent_uuids if removed_parent_uuids: parent_links = self.get(item.item_api_url(item.id, True) + '/get_parents') for cur_link in parent_links: if cur_link['target_uuid'] in removed_parent_uuids: self.del_request(f"/links/{cur_link['id']}") self.transactions.append(SessionTransaction.UPDATE(item)) self._add_to_cache(item) return item
[docs] async def aio_update(self, item: LGI) -> LGI: """Updates an LGI item asynchronously using an executor. This method utilizes Python's asyncio library to execute the update method asynchronously in a separate thread. It is useful when performing non-blocking I/O operations that involve updating an LGI item. Args: item: The LGI object that needs to be updated. It must adhere to the LGI interface, providing a to_dict method. Returns: LGI: Returns an updated LGI object if the update is successful. In the event of an exception during execution, an LGIError containing the exception information and the original item's dictionary form is returned. """ try: return await asyncio.get_event_loop().run_in_executor(None, self.update, item) except: return LGIError(sys.exc_info(), item.to_dict())
[docs] def update_many(self, items: List[LGI], delay=0.1) -> List[LGI]: """ Updates a list of items in the database in parallel. Updates the cache. :param items: A list of LabGuruItem objects to be updated :return: A freshly-queried list of the updated items. The input object order is preserved. """ futures = [self.aio_update(i) for i in items] tqdm_aio.write('Updating items') results = throttled_run(*futures, delay=delay) for i, r in enumerate(results): items[i].__dict__ = results[i].__dict__ return items
[docs] def refresh(self, item: LGI) -> LGI: """ Forces a re-query of a LabGuruItem. Replaces the cached version with the new item. :param item: the LabGuruItem to be refreshed :return: the refreshed LabGuruItem """ self._remove_from_cache(item) return self.get_object(type(item), item_id=item.id, proxy=False)
[docs] def delete(self, item: LGI) -> bool: """ Removes an item from the LG database and the session cache. This is irreversible. :param item: The LabGuruItem to be deleted. :return: True if the request succeeded. """ r = requests.delete(item.item_api_url(item.id) + f"?token={self.token}") if r.ok: self._remove_from_cache(item) self.transactions.append(SessionTransaction.DELETE(item)) return True return False
[docs] def archive(self, item: LGI) -> bool: """ Archives an item in the LG database. Not available for all items. :param item: The LabGuruItem to be archived. :return: True if the request succeeded. """ data = dict(token=self.token, selected_items=item.id) endpoint = item.api_url.replace(item.id, "archive_selected") r = requests.put(item.process_api_url(endpoint, True), data=data) if r.ok: # self._remove_from_cache(item) return True return False
[docs] async def aio_archive(self, item: LGI) -> bool: """ Archives the specified item asynchronously by making an asynchronous HTTP PUT request to the item's API endpoint. The method constructs the request using the provided item's ID and a token for authentication. Args: item (LGI): The item to be archived, represented as an instance of LGI class. Returns: bool: Returns True if the item was archived successfully, otherwise False. """ data = dict(token=self.token, selected_items=item.id) endpoint = item.item_api_url(item.id).replace(item.id, "archive_selected") r = await self.aio_put(endpoint, **data) if r is not None: if 'error_message' in r: return False # self._remove_from_cache(item) return True return False
[docs] def restore(self, item: LGI) -> bool: """ Restores an archived item in the LG database. Not available for all items. :param item: The LabGuruItem to be restored. :return: True if the request succeeded. """ data = dict(token=self.token, selected_items=item.id) endpoint = item.api_url.replace(item.id, "restore_selected") r = requests.put(item.process_api_url(endpoint, True), data=data) if r.ok: self._remove_from_cache(item) return True return False
def _add_to_cache(self, item: LGI): if item is not None: lgi_class = type(item) include_custom = any('custom' in k for k in item.other_properties.keys()) if item.id: self.cache[SessionCacheKey(lgi_class, item_id=int(item.id), include_custom=include_custom)] = item self.cache[SessionCacheKey(lgi_class, item_id=str(item.id), include_custom=include_custom)] = item api_url = lgi_class.item_api_url(item.id) self.cache[SessionCacheKey(lgi_class, api_url=api_url, include_custom=include_custom)] = item if item.name: self.cache[SessionCacheKey(lgi_class, name=item.name, include_custom=include_custom)] = item if item.uuid: self.cache[SessionCacheKey(lgi_class, uuid=item.uuid, include_custom=include_custom)] = item if item.api_url: api_url: str = lgi_class.process_api_url(item.api_url) self.cache[SessionCacheKey(lgi_class, api_url=api_url, include_custom=include_custom)] = item if item.auto_name: self.cache[SessionCacheKey(lgi_class, auto_name=item.auto_name, include_custom=include_custom)] = item def _get_from_cache(self, key: SessionCacheKey) -> Optional[LGI]: try: return self.cache[key] except KeyError: return None def _remove_from_cache(self, item: LGI): if item is not None: lgi_class = type(item) include_custom = any('custom' in k for k in item.other_properties.keys()) if item.id: try: del self.cache[SessionCacheKey(lgi_class, item_id=item.id, include_custom=include_custom)] except KeyError: pass if item.name: try: del self.cache[SessionCacheKey(lgi_class, name=item.name, include_custom=include_custom)] except KeyError: pass if item.uuid: try: del self.cache[SessionCacheKey(lgi_class, uuid=item.uuid, include_custom=include_custom)] except KeyError: pass if item.api_url: try: del self.cache[SessionCacheKey(lgi_class, api_url=item.api_url, include_custom=include_custom)] except KeyError: pass if item.auto_name: try: del self.cache[SessionCacheKey(lgi_class, auto_name=item.auto_name, include_custom=include_custom)] except KeyError: pass
[docs] def dump_cache(self, verbose=True) -> DefaultDict[str, List[Dict]]: """ Dumps the contents of the cache into a dictionary of lists categorized by class display names. This function iterates over unique cached items and converts them into dictionary format, organizing them according to their display name. Args: verbose (bool): If True, displays a progress bar using tqdm to show the progress of dumping the cache. If False, the progress bar is disabled. Returns: DefaultDict[str, List[Dict]]: A dictionary where each key is the class display name and the value is a list of dictionaries representing cached items of that class. """ out_dict = defaultdict(list) cur_item: LGI for cur_item in tqdm(set(self.cache.values()), disable=not verbose, leave=False): out_dict[cur_item.class_display_name].append(cur_item.to_dict()) return out_dict
[docs] def load_cache(self, dump: DefaultDict[str, List[Dict]], verbose=True): """Loads cache from a given data structure and populates internal storage. Iterates over a provided dictionary, mapping class names to lists of item data. For each class name, it attempts to retrieve the corresponding class from a predefined collection. If a class is found, it parses each item data using the class-specific parsing method and adds the resulting item to the cache. Progress through the process can optionally be displayed. Args: dump: A defaultdict where keys are class names (str) and values are lists of dictionaries representing item data. verbose: A boolean indicating whether to display progress information during the caching process. """ from LabGuruAPI._collections import COLLECTIONS_BY_NAME for cur_class_name, cur_item_list in tqdm(dump.items(), disable=not verbose, leave=False): cur_class = COLLECTIONS_BY_NAME.get(cur_class_name, None) if cur_class is None: continue for cur_item_dict in tqdm(cur_item_list, disable=not verbose, leave=False, desc=cur_class_name): cur_item = cur_class.parse_api_data(cur_item_dict) self._add_to_cache(cur_item)
[docs] def execute_async(self, awaitables: List[Awaitable[Optional[LGI]]], task_limit=5, verbose=True) -> List[Optional[LGI]]: """ Executes a set of asynchronous tasks with a limit on the number of concurrent tasks, and optionally displays a progress bar. Args: awaitables: A list of awaitable objects that represent the asynchronous tasks to be executed. task_limit: An integer that sets the maximum number of tasks that can be run concurrently. verbose: A boolean flag to control the display of a progress bar; if True, the progress bar is shown. """ async def _await(a: Awaitable[Optional[LGI]], top_pbar: tqdm_aio) -> Optional[LGI]: r = await a top_pbar.update() return r async def _run(): with tqdm_aio(total=len(awaitables), disable=not verbose) as t_pbar: t_stream = stream.map(stream.iterate(awaitables), _await, stream.repeat(t_pbar), task_limit=task_limit) out_vals = await stream.list(t_stream) return out_vals return asyncio.run(_run())
[docs] async def aio_get_object(self, lgi_class: Type[LGI], item_id: int = None, name: str = None, uuid: str = None, api_url: str = None, auto_name: str = None, include_custom=False, from_cache=True) -> LGI: """ Asynchronously retrieves an object of the specified LGI class using provided identifying criteria or API URL. The object is fetched from cache if available; otherwise, it is retrieved from the API. Args: lgi_class: The LGI class type that determines the type of object to retrieve. item_id: The identifier of the item to be fetched. Defaults to None. name: The name of the item to be retrieved. Defaults to None. uuid: The universally unique identifier of the item. Defaults to None. api_url: The API URL to directly fetch the item from. May include percent encoding, which will be replaced. Defaults to None. auto_name: The automatic name used in fetching the object. Defaults to None. include_custom: A boolean indicating whether to include custom fields when retrieving the object. Defaults to False. from_cache: A boolean indicating whether to attempt fetching the object from cache before contacting the API. Defaults to True. Returns: An instance of LGI corresponding to the specified criteria if found, or an LGIError instance if an error occurs during fetching or processing. """ api_url = api_url.replace('%20', ' ') if api_url else None key = SessionCacheKey(lgi_class, item_id, name, auto_name, uuid, api_url, include_custom) if from_cache: try: out_item = self.cache[key] # tqdm.write(f'Got {repr(out_item)} from Cache') return out_item except KeyError: pass # lgi_item = lgi_class.from_api(self, item_id, name, uuid, api_url, auto_name, include_custom) future_lgi = asyncio.get_event_loop().run_in_executor(None, lgi_class.from_api, self, item_id, name, uuid, api_url, auto_name, include_custom) try: lgi_item = await future_lgi except: return LGIError(sys.exc_info(), key.to_kwargs()) self._add_to_cache(lgi_item) try: _ = lgi_item.sequence except AttributeError: pass except: return LGIError(sys.exc_info(), key.to_kwargs()) return lgi_item
[docs] async def aio_get_from_name(self, potential_classes: List[Type[LGI]], item_name: str) -> Optional[LGI]: """ Asynchronously attempts to retrieve an item by its name from a list of potential classes. This function concurrently queries each class for the desired item and returns the first matching item found. Args: potential_classes (List[Type[LGI]]): A list of classes that are potential candidates from which the item might be retrieved. Each class should offer a method for obtaining an item by name. item_name (str): The name of the item to be retrieved. This is the identifying string used to search for the item within each provided class. Returns: Optional[LGI]: The first item found that matches the given name, if any; otherwise, returns None if no matching item is found across all provided classes. """ potential_items = await asyncio.gather(*(self.aio_get_object(t, name=item_name) for t in potential_classes)) for i in potential_items: if i: return i return None
[docs] def get_object(self, lgi_class: Type[LGI], item_id: int = None, name: str = None, uuid: str = None, api_url: str = None, auto_name: str = None, include_custom=False, from_cache=True, proxy=True) -> LGI: """ Retrieves a ``LabGuruItem`` from the API. Only 1 of the following parameters will be queried, in the following order: item_id, name, uuid, auto_name. :param lgi_class: The python class of the desired item :param item_id: The database id of the desired item :param name: The name of the desired item :param uuid: The uuid of the desired item :param api_url: The API URL of the desired item :param auto_name: The SysID of the desired item :param include_custom: If true, include the ~200 "custom###" fields returned by the api. Default: False :param from_cache: If false, forces an API call even if the object is cached. Default: True :param proxy: If true, returns a proxy object with lazy API calls. Default: True :return: The requested ``LabGuruItem`` """ api_url = api_url.replace('%20', ' ') if api_url else None key = SessionCacheKey(lgi_class, item_id, name, auto_name, uuid, api_url, include_custom) if from_cache: try: out_item = self.cache[key] # tqdm.write(f'Got {repr(out_item)} from Cache') return out_item except KeyError: pass if proxy: return self._make_proxy_object(lgi_class, key) else: lgi_item = lgi_class.from_api(self, item_id, name, uuid, api_url, auto_name, include_custom) # if lgi_item is not None: # tqdm.write(f'Got {repr(lgi_item)} from LG') self._add_to_cache(lgi_item) return lgi_item
[docs] def get_many(self, lgi_class: Type[LGI], item_ids: List[int] = None, names: List[str] = None, uuids: List[str] = None, api_urls: List[str] = None, auto_names: List[str] = None, include_custom=False, from_cache=True) -> List[LGI]: """ Retrieves a list of ``LabGuruItem`` objects from the API. Only 1 of the following parameters will be queried, in the following order: item_ids, names, uuids, auto_names. :param lgi_class: The python class of the desired items :param item_ids: The database ids of the desired items :param names: The names of the desired items :param uuids: The uuids of the desired items :param api_urls: The API URLs of the desired items :param auto_names: The SysIDs of the desired items :param include_custom: If true, include the ~200 "custom###" fields returned by the api. Default: False :param from_cache: If false, forces an API call even if the object is cached. Default: True :return: The requested LabGuruItem objectss """ item_kwargs = [] if item_ids: item_kwargs += [dict(item_id=v) for v in item_ids] if names: item_kwargs += [dict(name=v) for v in names] if uuids: item_kwargs += [dict(uuid=v) for v in uuids] if api_urls: item_kwargs += [dict(api_url=v) for v in api_urls] if auto_names: item_kwargs += [dict(auto_name=v) for v in auto_names] futures = [self.aio_get_object(lgi_class, include_custom=include_custom, from_cache=from_cache, **kwargs) for kwargs in item_kwargs] tqdm_aio.write(f'Retrieving {lgi_class.xlsx_collection}') return throttled_run(*futures)
def get_object_from_cache_key(self, key: SessionCacheKey): return self.get_object(**key.to_kwargs()) async def aio_get_object_from_cache_key(self, key: SessionCacheKey): return await self.aio_get_object(lgi_class=key.lgi_class, **key.to_kwargs())
[docs] async def get_all_dicts(self, lgi_class: Type[LGI], page_size: int = 10): """ Retrieves an entire collection. :param lgi_class: The python class to be queried :param page_size: number of objects to return per api call :return: the JSON representation of all queried objects """ meta_result = self.get(lgi_class.get_api_url(), meta=True, page_size=page_size) meta_data = meta_result['meta'] object_dicts = [] cs = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout()) get_pages = [self.aio_get(lgi_class.get_api_url(), cs, meta=False, page_size=page_size, page=(i + 1)) for i in range(meta_data['page_count'])] results = await tqdm_aio.gather(*get_pages, leave=False) for cur_json in results: object_dicts.extend(cur_json) await cs.close() return object_dicts
def _make_proxy_object(self, lgi_type: Type[LGI], session_key: SessionCacheKey) -> LGI: cache_fxn = self._add_to_cache from_cache_fxn = self._get_from_cache _LGIT = TypeVar('_LGIT', bound=lgi_type) class _LGIProxy(lgi_type): def __init__(self): object.__setattr__(self, '_proxy_target', session_key) def __getattr__(self, item): attr = getattr(self._obj, item) return attr def __getattribute__(self, item): try: obj = object.__getattribute__(self, '_proxy_obj') return obj.__getattribute__(item) except AttributeError: return super().__getattribute__(item) def __setattr__(self, key, value): setattr(self._obj, key, value) def __delattr__(self, item): delattr(self._obj, item) def __repr__(self): try: return repr(object.__getattribute__(self, '_proxy_obj')) except AttributeError: return f'<LG Proxy: {session_key.desc()}>' @property def _obj(self) -> _LGIT: try: return object.__getattribute__(self, '_proxy_obj') except AttributeError: key: SessionCacheKey = object.__getattribute__(self, '_proxy_target') obj = from_cache_fxn(key) if obj: object.__setattr__(self, '_proxy_obj', obj) return obj else: obj = lgi_type.from_api(**key.to_kwargs()) object.__setattr__(self, '_proxy_obj', obj) cache_fxn(obj) return obj @property def lgi_type(self) -> Type[LGI]: return lgi_type def make_new_copy(self) -> _LGIT: return self._obj.make_new_copy() @classmethod def make_new(cls, **properties) -> _LGIT: return lgi_type.make_new(**properties) return _LGIProxy() @staticmethod def make_filter(field_name: str, operator: str, value: Any, page_size: int = 1, logic: str = 'and', filter_id: int = 0) -> Dict[str, Any]: return { "kendo" : True, "filter" : { "logic" : logic, "filters": { str(filter_id): { "value" : value, "operator": operator, "field" : field_name } } }, "page_size": page_size }
[docs] @deprecated(details='Please use LabGuruItem.find_all or LabGuruItem.find_one') def search_api(self, item_type: Type[LGI], field_name: str, operator: str, value: Any, page_size: int = 100) -> List[LGI]: results = [] for item in item_type.search_api(self, self.make_filter(field_name, operator, value, page_size)): results.append(item) self._add_to_cache(item) return results
[docs] @deprecated(details='Please use LabGuruItem.async_find_all or LabGuruItem.async_find_one') async def aio_search_api(self, item_type: Type[LGI], field_name: str, operator: str, value: Any, page_size: int = 100) -> List[LGI]: filter_dict = self.make_filter(field_name, operator, value, page_size) results = await item_type.aio_search_api(self, filter_dict) for item in results: self._add_to_cache(item) return results
[docs] def get(self, endpoint: str, **data) -> Optional[Dict[str, Any]]: """Sends a GET request to a specified API endpoint using given data and returns the response as a JSON object if the request is successful. The method appends the authentication token to the request data and sends a GET request to the constructed API path using the input endpoint and additional data. If the response status is successful, it returns the response content as a JSON object. Otherwise, it returns None. Args: endpoint: The API endpoint to send the GET request to. **data: Additional data to be included in the request. This data will be sent as a JSON object in the GET request. Returns: A dictionary containing the JSON response data if the request is successful. Returns None if the request fails. """ data['token'] = self.token r = requests.get(self.make_api_path(endpoint), json=data) if r.ok: return r.json() else: return None
[docs] async def aio_get(self, endpoint: str, c_session: aiohttp.ClientSession = None, retries=3, **data) -> Optional[Dict[str, Any]]: """ Performs an asynchronous GET request to a specified API endpoint with optional retry logic. Args: endpoint: The API endpoint to which the GET request is sent. c_session: An optional aiohttp.ClientSession to be used for the request. A new session is created if not provided. retries: The number of times the request should be retried upon encountering server errors (HTTP status 500 and above). **data: Additional data to be included in the request, specifically a 'token' is added from the instance attribute for authorization. Returns: An optional dictionary containing the JSON response from the API if the request is successful; otherwise, None. """ data['token'] = self.token session = c_session or aiohttp.ClientSession() async with session.get(self.make_api_path(endpoint), timeout=aiohttp.ClientTimeout(), json=data) as r: if r.ok: result = await r.json() elif r.status >= 500 and retries > 0: retries -= 1 result = await self.aio_get(endpoint, session, retries, **data) else: result = None if c_session is None: await session.close() return result
[docs] def post(self, endpoint: str, **data) -> Optional[Dict[str, Any]]: """ Sends a POST request to the provided API endpoint with the given data and a token. The method constructs the full API path from the endpoint, adds a token to the request data, and sends the POST request. If the request is successful, the response is parsed as JSON and returned. Otherwise, returns None. Args: endpoint (str): The API endpoint to which the POST request is sent. **data: Arbitrary keyword arguments representing the request body to be sent in JSON format. Returns: Optional[Dict[str, Any]]: Parsed JSON response from the POST request if successful; otherwise, None. """ data['token'] = self.token r = requests.post(self.make_api_path(endpoint), json=data) if r.ok: return r.json() else: return None
[docs] def put(self, endpoint: str, **data) -> Optional[Dict[str, Any]]: """ Updates a resource using a HTTP PUT request to the specified endpoint of the API with the provided data, appending the authentication token to the payload. Returns the parsed JSON response if the request is successful, otherwise returns None. Args: endpoint (str): The API endpoint to send the PUT request to. **data: Arbitrary keyword arguments representing the data to be included in the PUT request. Returns: Optional[Dict[str, Any]]: The parsed JSON response from the API if the request is successful, otherwise None. """ data['token'] = self.token r = requests.put(self.make_api_path(endpoint), json=data) if r.ok: return r.json() else: return None
[docs] def del_request(self, endpoint: str, **data) -> Optional[Dict[str, Any]]: """ Sends a DELETE request to a specified API endpoint with additional data. This function constructs the endpoint path using the provided `endpoint` and sends a DELETE request. The token is automatically included in the data payload. If the request is successful, it returns the JSON response as a dictionary. Otherwise, it returns None. Args: endpoint: The API endpoint to which the DELETE request will be sent. **data: Arbitrary keyword arguments to be included in the request payload. These will be merged with the authentication token. Returns: Optional[Dict[str, Any]]: The JSON response as a dictionary if the request is successful, or None if the request fails. """ data['token'] = self.token r = requests.delete(self.make_api_path(endpoint), json=data) if r.ok: return r.json() else: return None
[docs] async def aio_put(self, endpoint: str, **data) -> Optional[Dict[str, Any]]: """ Sends an asynchronous HTTP PUT request to the specified API endpoint with the provided data. This method constructs an API path using the specified endpoint and sends a PUT request including any provided keyword arguments in the JSON payload. Additionally, it adds an authentication token to the data payload. If the server responds with an HTTP status code indicating success, it parses and returns the JSON response. Otherwise, it returns None. Args: endpoint (str): The API endpoint to which the PUT request is sent. **data: Arbitrary keyword arguments representing additional data to be included in the request payload. Returns: Optional[Dict[str, Any]]: The JSON response as a dictionary if the request is successful, or None if the response indicates failure. """ data['token'] = self.token async with aiohttp.ClientSession() as session: async with session.put(self.make_api_path(endpoint), json=data) as r: if r.ok: json_ = await r.json() return json_ else: return None
[docs] async def aio_post(self, endpoint: str, **data) -> Optional[Dict[str, Any]]: """ Sends an asynchronous POST request to a specified endpoint with JSON data. This method is designed for making POST requests to a specified API endpoint, including a token within the request payload. It uses an asynchronous HTTP session to perform the operation and returns the parsed JSON response if the request is successful. If the request fails, it returns None. Args: endpoint (str): The API endpoint to which the POST request is sent. **data: Additional keyword arguments containing the JSON data to include in the request body. Returns: Optional[Dict[str, Any]]: The parsed JSON response from the server if the request is successful, or None if the request fails. """ data['token'] = self.token async with aiohttp.ClientSession() as session: async with session.post(self.make_api_path(endpoint), json=data) as r: if r.ok: return await r.json() else: return None
[docs] async def aio_delete(self, endpoint: str) -> bool: """ Sends an asynchronous HTTP DELETE request to the specified API endpoint. This method uses an asynchronous HTTP client session to send a DELETE request to the given endpoint. It returns a boolean value indicating whether the request was successful or not. Args: endpoint (str): The API endpoint to which the DELETE request will be sent. Returns: bool: True if the response status indicates success (HTTP 2xx), False otherwise. """ async with aiohttp.ClientSession() as session: async with session.delete(self.make_api_path(endpoint, with_token=True)) as r: if r.ok: return True else: return False
[docs] def get_members(self) -> Dict[str, int]: """ Retrieves a mapping of all member names to their IDs :return: A dictionary whose keys are names and values are member IDs """ # noinspection PyTypeChecker all_members: List[Dict] = self.get('admin/members') members = {m['name']: m['id'] for m in all_members if m['enabled'] and 'API' not in m['name']} return members
#: The global :py:class:`~LabGuruAPI._base.Session` object that should always be used to interact with LabGuru. SESSION = Session()
[docs] class LabGuruItem(object): """ A base class for all objects retrieved from the LabGuru Database """ _api_name = '' """The API endpoint for the class""" _attribute_dict: Dict[str, str] = { 'name' : 'name', 'description': 'description', 'owner_id' : 'owner_id', 'id' : 'id', 'uuid' : 'uuid', 'tags' : 'tags', 'links' : 'links' } """A mapping of API JSON keys to class property names""" _pending_args = None # type_name = LGStr('') class_name = LGStr('') """The programmatic class name for the LG API""" class_display_name = LGStr('') """The human-readable class name for the LG API""" uuid = LGStr('') """A unique identifier string of the object. Searchable.""" id = LGStr('') """The database ID of the object. Searchable.""" name = LGStr('') """The name/title of the object. Searchable.""" owner_id = LGInt() """The database ID of the object's owner. Searchable.""" description = LGStr('') """A description of the object. Searchable.""" api_url = LGStr('') """The object's REST API URL. Searchable.""" auto_name = LGStr('') """The object's SysID. Searchable.""" xlsx_collection = 'Generic' """The Dataset header/Plate upload name for the object class""" def __init__(self, **kwargs): self.other_properties = {} self.owner = {} self.tags = [] self.links = [] def __repr__(self): return f'<{self.class_display_name} {self.id}: {self.name}>' @property def _obj(self): return self @property def web_url(self): """The web-interface URL for the object""" try: url = self.other_properties.get('url', self.other_properties.get('content_url', None)) or self.api_url full_url = f"http:////my.labguru.com/{url}".replace('api/v1/', '') if url else '' return full_url.replace('//', '/') except AttributeError: return None def __bool__(self): return self._obj is not None def __eq__(self, other): if not type(self).isinstance(other): return False elif self.id == other.id: return True else: return self.name == other.name def __hash__(self): return hash((type(self).__name__, self.id, self.name))
[docs] @classmethod def isinstance(cls, other: LGI) -> bool: """Check if an object is an instance of this class""" if not isinstance(other, LabGuruItem): return False return cls.xlsx_collection == other.xlsx_collection
[docs] @classmethod def make_new(cls: Type[LGI], overwrite=False, **properties) -> LGI: """ Creates a new item without adding it to the database :param overwrite: if true, will get an object by the same name if it exists and overwrite that object :param properties: The initial properties of the object :return: A new instance of the class """ cur_obj = None if overwrite and 'name' in properties: cur_obj = cls.from_name(properties['name']) new_item: LGI = cur_obj or cls() new_item.bulk_property_update(**properties) return new_item
[docs] @classmethod def add_new(cls: Type[LGI], overwrite=False, **properties) -> LGI: """ Creates a new item AND adds it to the database :param properties: The initial properties of the object :return: A new instance of the class """ return SESSION.add(cls.make_new(overwrite=overwrite, **properties))
def bulk_property_update(self: LGI, **properties): for k, v in properties.items(): if v: setattr(self, self._attribute_dict.get(k, k), v)
[docs] def make_new_copy(self: LGI) -> LGI: """Creates a copy of the object excluding the database ids and name""" new_dict = self.to_dict(False) for k in ['id', 'uuid', 'sys_id', 'auto_name', 'name']: if k in new_dict: del new_dict[k] return type(self).parse_api_data(new_dict, SESSION)
[docs] @classmethod def get_attribute_dict(cls, capitalize=False): """Generates a full LG -> python mapping of all class-specific and inherited properties""" attr_dict = {} parent_class: LGI for parent_class in cls.__bases__: try: attr_dict.update(parent_class.get_attribute_dict(capitalize)) except AttributeError: pass attr_dict.update(cls._attribute_dict) if capitalize: attr_dict.update({k.title(): v for k, v in cls._attribute_dict.items()}) return attr_dict
[docs] @classmethod def get_api_url(cls) -> str: """Returns the API url for the class""" return f'https://my.labguru.com/api/v1/{cls._api_name}.json'
[docs] @classmethod def item_api_url(cls, item_id: int, omit_json=False): """ Generates an API URL for a specific item of this class. :param item_id: the item's database ID :param omit_json: if true, drops the '.json' from the end of the URL (Default: False) :return: the URL string """ return f'https://my.labguru.com/api/v1/{cls._api_name}/{int(item_id):d}{"" if omit_json else ".json"}'
@classmethod def _name_search_api_url(cls): return f'https://my.labguru.com/api/v1/{cls._api_name}/find_by_name.json'
[docs] @staticmethod def process_api_url(url: str, omit_json=False): """Coerces an API url into the correct format""" url = url.replace('https://my.labguru.com/api/v1', '') url = url.replace('/api/v1', '') url = url.replace('.json', '') if any(x in url for x in ['plasmid', 'primer', 'compound']): url = url.replace('/biocollections', '') if 'materials' in url: url = url.replace('/catalog', '') return f'https://my.labguru.com/api/v1{url}{"" if omit_json else ".json"}'
[docs] @classmethod def parse_api_data(cls: Type[LGI], json_data: Dict[str, Any], session: "Session" = SESSION, include_custom=False) -> LGI: """ Generates a python object from the LabGuru API's JSON response. This is a low-level function that should not be used routinely. :param json_data: the JSON dictionary read from the API :param session: the active LG session. Please leave as the default value. :param include_custom: add the LG "custom###" fields to LGI.other_properties. Default: false :return: the python version of the LG object """ new_item: LGI = cls.make_new() att_dict = cls.get_attribute_dict() if 'parents' in json_data: json_data.update(json_data['parents'] or {}) del json_data['parents'] for k, v in json_data.items(): att_name = att_dict.get(k, k) if hasattr(new_item, att_name): if att_name == 'concentration' and isinstance(v, str): v = ''.join(c for c in v if c in '0123456789,.') try: if att_name == 'barcode' and 'box' in json_data['api_url']: continue except: pass setattr(new_item, att_name, v) if 'parent' in k.lower() and isinstance(v, list) and len(v) > 0 and 'parent_uuid' in v[0]: setattr(new_item, f"_{att_name}_api_uuid", v[0]['parent_uuid']) setattr(new_item, att_name, v[0]) elif ('custom' not in k) or include_custom: new_item.other_properties[k] = v return new_item
[docs] @classmethod def from_api(cls: Type[LGI], session: "Session" = SESSION, item_id: int = None, name: str = None, uuid: str = None, api_url: str = None, auto_name: str = None, include_custom=False) -> Optional[LGI]: """Low-level function. Please use ``LabGuruItem.from_LG`` instead.""" data = {} params = {'token': session.token, 'exclude_fields': 'links,linked_resources'} if item_id: r = requests.get(cls.item_api_url(item_id), params=params) if r.status_code == 404: return None data.update(r.json()) elif name: params.update({'name': name}) r = requests.get(cls.get_api_url(), params=params) if not r.ok: return None try: for cur_result in r.json(): if cur_result['name'] == str(name): data.update(cur_result) break else: return None except IndexError: return None elif auto_name: params.update(Session.make_filter('auto_name', 'eq', auto_name)) r = requests.get(cls.get_api_url(), json=params) try: data.update(r.json()[0]) except IndexError: return None elif uuid: params.update({'uuid': uuid}) r = requests.get(cls.get_api_url(), params=params) try: data.update(r.json()[0]) except IndexError: return None elif api_url: r = requests.get(cls.process_api_url(api_url), params=params) data.update(r.json()) return cls.parse_api_data(data, session) else: return None return cls.parse_api_data(data, session, include_custom)
[docs] @classmethod def from_LG(cls: Type[LGI], item_id: int = None, name: str = None, uuid: str = None, api_url: str = None, auto_name: str = None, include_custom=False, from_cache=True, proxy=True) -> LGI: """ Retrieves a ``LabGuruItem`` from the API. Only 1 of the following parameters will be queried, in the following order: item_id, name, uuid, auto_name. Note: This should only be used if ``LabGuruItem.from_name`` or ``LabGuruItem.from_id`` is insufficient. :param item_id: The database id of the desired item :param name: The name of the desired item :param uuid: The uuid of the desired item :param api_url: The API URL of the desired item :param auto_name: The SysID of the desired item :param include_custom: If true, include the ~200 "custom###" fields returned by the api. Default: False :param from_cache: If false, forces an API call even if the object is cached. Default: True :param proxy: If true, returns a proxy object with lazy API calls. Default: True :return: The requested ``LabGuruItem`` """ return SESSION.get_object(cls, item_id, name, uuid, api_url, auto_name, include_custom, from_cache, proxy)
[docs] @classmethod async def async_from_LG(cls: Type[LGI], item_id: int = None, name: str = None, uuid: str = None, api_url: str = None, auto_name: str = None, include_custom=False, from_cache=True) -> LGI: """ Retrieves a ``LabGuruItem`` from the API. Only 1 of the following parameters will be queried, in the following order: item_id, name, uuid, auto_name. Note: This should only be used if ``LabGuruItem.async_from_name`` or ``LabGuruItem.async_from_id`` is insufficient. :param item_id: The database id of the desired item :param name: The name of the desired item :param uuid: The uuid of the desired item :param api_url: The API URL of the desired item :param auto_name: The SysID of the desired item :param include_custom: If true, include the ~200 "custom###" fields returned by the api. Default: False :param from_cache: If false, forces an API call even if the object is cached. Default: True :return: The requested ``LabGuruItem`` """ return await SESSION.aio_get_object(cls, item_id, name, uuid, api_url, auto_name, include_custom, from_cache)
[docs] @classmethod def from_name(cls: Type[LGI], name: str) -> LGI: """ Retrieves a ``LabGuruItem`` from the API via its name. Examples: Get plasmid ``pGRO-S0081``:: p = Plasmid.from_name('pGRO-S0081') :param name: The exact name of the desired item :return: The requested ``LabGuruItem`` """ return cls.from_LG(name=name)
[docs] @classmethod async def async_from_name(cls: Type[LGI], name: str) -> LGI: """ Retrieves a ``LabGuruItem`` from the API via its name. Examples: Get plasmid ``pGRO-S0081``:: p_future = Plasmid.async_from_name('pGRO-S0081') :param name: The exact name of the desired item :return: The requested ``LabGuruItem`` """ return await cls.async_from_LG(name=name)
[docs] @classmethod def from_names(cls: Type[LGI], names: List[str], verbose: bool = True) -> List[LGI]: """ Retrieves a list of ``LabGuruItem`` objects from the API. This is done in parallel, so it will be faster than individual ``LGI.from_name()`` calls. Examples: Retrieve all the plasmids listed in a DataFrame column:: p_names = dframe['Plasmids'].unique() plasmids = Plasmid.from_names(p_names) :param names: a list of object names :param verbose: if true, a progress bar will be written to stdio while gathering the items :return: the requested list of items """ get_futures = [cls.async_from_name(n) for n in names] return SESSION.execute_async(get_futures, verbose=verbose)
[docs] @classmethod def from_id(cls: Type[LGI], item_id: Union[int, str]) -> LGI: """ Retrieves a ``LabGuruItem`` from the API via its ID number. Examples: Get plasmid ``pGRO-S0081``:: p = Plasmid.from_id(294) :param item_id: The database id of the desired item :return: The requested ``LabGuruItem`` """ return cls.from_LG(item_id=int(item_id))
[docs] @classmethod async def async_from_id(cls: Type[LGI], item_id: Union[int, str]) -> LGI: """ Retrieves a ``LabGuruItem`` from the API via its ID Number. Examples: Get plasmid ``pGRO-S0081``:: p_future = Plasmid.async_from_id(294) :param item_id: The database id of the desired item :return: The requested ``LabGuruItem`` """ return await cls.async_from_LG(item_id=int(item_id))
[docs] @classmethod def from_ids(cls: Type[LGI], ids: List[Union[int, str]], verbose: bool = True) -> List[LGI]: """ Retrieves a list of ``LabGuruItem`` objects from the API. This is done in parallel, so it will be faster than individual ``LGI.from_name()`` calls. Examples: Retrieve all the plasmids listed in a DataFrame column: p_ids = dframe['Plasmid_ids'].unique() plasmids = Plasmid.from_ids(p_ids) :param ids: a list of database ids :param verbose: if true, a progress bar will be written to stdio while gathering the items :return: the requested list of items """ get_futures = [cls.async_from_id(i) for i in ids] return SESSION.execute_async(get_futures, verbose=verbose)
[docs] @classmethod @deprecated(details='Please use LabGuruItem.find_all or LabGuruItem.find_one') def search_api(cls: Type[LGI], session: "Session", query_data: Dict[str, Any]) -> List[LGI]: params = {'token': session.token} params.update(query_data) if 'filter' in params and not isinstance(params['filter'], str): r = requests.get(cls.get_api_url(), json=params) else: r = requests.get(cls.get_api_url(), params=params) if r.ok: data = r.json() out_list = [] for d in data: i = cls.parse_api_data(d, session) out_list.append(i) return out_list # return [cls.parse_api_data(d, session) for d in data] else: raise ValueError(f'Problem searching api with parameters {params.items()}')
[docs] @classmethod @deprecated(details='Please use LabGuruItem.async_find_all or LabGuruItem.async_find_one') async def aio_search_api(cls: Type[LGI], session: Session, query_data: Dict[str, Any], cur_page=1) -> List[LGI]: result = await session.aio_get(cls.get_api_url(), meta=True, page=cur_page, **query_data) data = result['data'] out_list = [cls.parse_api_data(d, session) for d in data] if cur_page == 1 and result['meta']['page_count'] > 1: i = 2 search_futures = [] while i <= result['meta']['page_count']: search_futures.append(cls.aio_search_api(session, query_data, cur_page=i)) i += 1 remaining_data = await asyncio.gather(*search_futures) for new_data in remaining_data: out_list.extend(new_data) return out_list
[docs] def to_dict(self, include_other_properties=True, include_links=True, include_id=True) -> Dict[str, Any]: """ Generates a mapping of LG property fields to values for the object. Generally used for add/update calls. :param include_other_properties: adds fields contained in ``other_properties`` to the dictionary. :param include_links: will include the links and tags fields in the output dictionary :param include_id: will include the item's ID it the output dictionary :return: a dictionary mapping of LG property fields to values """ item_data = {} attribute_items = self.get_attribute_dict() for lg_param, py_attr in attribute_items.items(): attr_val = getattr(self, py_attr) if not include_links and attr_val in ['links', 'tags']: continue if 'parent' in lg_param.lower() and isinstance(attr_val, LabGuruItem): # if getattr(self, f"_{py_attr}_api_uuid", '') == attr_val.uuid: # continue if not attr_val.uuid: continue if 'parents_uuid' not in item_data: item_data['parents_uuid'] = [] item_data['parents_uuid'].append(attr_val.uuid) else: item_data[lg_param] = attr_val if include_other_properties: item_data.update(self.other_properties) if not include_id: del item_data['id'] return item_data
[docs] def update_api(self: LGI, session: "Session", retries=0) -> LGI: """Low-level function, please do not use. ``LabGuruItem.lg_sync`` is likely the method you want.""" # Build item data data = {'token': session.token, 'item': self.to_dict(include_other_properties=False, include_links=False, include_id=False)} # upload to LG if self.id: r = requests.put(self.item_api_url(self.id), json=data) else: r = requests.post(self.get_api_url(), json=data) if not r.ok: if retries < 10: return self.update_api(session, retries + 1) else: if 'please give a different name' in str(r.content): raise LGDuplicateNameError(f"Name {self.name} already exists.") raise ValueError(f'Could not update api with {repr(self)}\n{repr(r)}\n{r.content}') json_data = r.json() return self.parse_api_data(json_data, session)
[docs] def lg_sync(self: LGI) -> LGI: """ Adds or updates the item in the LG database. """ synced_self = SESSION.update(self) if self.id else SESSION.add(self) self.__dict__.update(synced_self.__dict__) self.class_display_name = synced_self.class_display_name self.class_name = synced_self.class_name return self
[docs] async def async_lg_sync(self: LGI) -> LGI: """ Adds or updates the item in the LG database. Be careful not to queue multiple syncs per item as the final object may not reflect all updates or may be added multiple times. """ synced_self = await (SESSION.aio_update(self) if self.id else SESSION.aio_add(self)) self.bulk_property_update(**synced_self.to_dict()) return self
async def deduplicate_links(self): target_uuids = [] json_data = await SESSION.aio_get('links', item_uuid=self.uuid) json_data += await SESSION.aio_get(self.item_api_url(self.id, True) + '/get_parents') for cur_json in json_data: if cur_json['target_uuid'] in target_uuids: await SESSION.aio_delete(f"links/{cur_json['id']}") else: target_uuids.append(cur_json['target_uuid'])
[docs] def find_attachments(self) -> List["Attachment"]: """Returns a list of ``Attachment`` objects associated with the item.""" qd = {"filter": f'{{"attachable_id": "{self.id}", "attachable_type": "{self.class_name}"}}'} attachments_json = SESSION.get(Attachment.get_api_url(), **qd) return [Attachment.parse_api_data(d) for d in attachments_json]
[docs] @classmethod def find_next_id(cls, prefix: str) -> int: """ Searches the database for names beginning with a specified prefix, determines the associated index and gives the next number :param prefix: The prefix of the indexed samples (ex: "GBFP-0927-") :return: the integer index following the last entered sample """ qd = SESSION.make_filter('name', 'startswith', prefix, 1) qd['meta'] = True resp_json = SESSION.get(cls._api_name, **qd) qd['page'] = resp_json['meta']['page_count'] resp_json = SESSION.get(cls._api_name, **qd) final_data = resp_json['data'] if len(final_data) == 0: return 1 else: final_name: str = final_data[0]['name'] final_id = int(final_name.replace(prefix, '')) return final_id + 1
[docs] @classmethod def make_id_iterator(cls, prefix: str) -> Iterator[int]: """ Searches the database for names beginning with a specified prefix, determines the associated index, and yields subsequent indexes on each iteration :param prefix: The prefix of the indexed samples (ex: "GBFP-0927-") :returns: sequential integers beginning with the first unused index number """ base_id = cls.find_next_id(prefix) while True: yield base_id base_id += 1
[docs] @classmethod def iter_next_names(cls, prefix: str, zeros: int = 4) -> Iterator[str]: """ Searches the database for names beginning with a specified prefix, determines the associated index, and yields subsequent names on each iteration Example ------- You need to add temporary strains for experiment 6294. Currently, LG has 27 strains for this experiment, so the largest temporary strain name is "GBFS-6294-0027". :: name_iter = Strain.iter_next_names("GBFS-6294-", 4) for _ in range(3): print(next(name_iter)) # Results # GBFS-6294-0028 # GBFS-6294-0029 # GBFS-6294-0030 :param prefix: The prefix of the indexed samples (ex: "GBFP-0927-") :param zeros: The number of digits used by the index. Default: 4 :returns: sequential formatted names beginning with the first unused index number """ fs = f'{prefix}{{0:0{zeros}d}}' for next_id in cls.make_id_iterator(prefix): yield fs.format(next_id)
[docs] def get_linked_items(self, of_type: Type[LGI]) -> List[LGI]: """ Queries LG for items of a particular class that are linked to the current item. :param of_type: the class of LabGuruItem you would like returned :return: a list of linked LabGuruItem objects """ json_data = SESSION.get('links', item_uuid=self.uuid) correct_ids = [] for cur_item in json_data: if cur_item['target']['type_display'] == of_type.xlsx_collection: correct_ids.append(cur_item['target_id']) return [of_type.from_id(i) for i in correct_ids]
[docs] async def async_get_linked_items(self, of_type: Type[LGI]) -> List[LGI]: """ Queries LG for items of a particular class that are linked to the current item. :param of_type: the class of LabGuruItem you would like returned :return: a list of linked LabGuruItem objects """ json_data = await SESSION.aio_get('links', item_uuid=self.uuid) correct_ids = [] for cur_item in json_data: if cur_item['target']['type_display'] == of_type.xlsx_collection: correct_ids.append(cur_item['target_id']) return [await of_type.async_from_id(i) for i in correct_ids]
[docs] def get_derived_items(self, of_type: Type[LGI]) -> List[LGI]: """ Queries LG for items of a particular class that are derived from the current item :param of_type: the class of LabGuruItem you would like returned :return: a list of derived LabGuruItem objects """ json_data = SESSION.get(f"{self._api_name}/{self.id}/get_derived_items", derived_collection_name=of_type._api_name.split('/')[-1]) return list(map(lambda x: SESSION.get_object(of_type, item_id=x['id']), json_data['derived_collection_items']))
[docs] @classmethod async def async_count_all(cls: Type[LGI], *search_operators: LGSearchOperator) -> int: """ AsyncIO-compatible version of LabGuruItem.count_all() :param search_operators: Comparison operators that will be used as filters in the LabGuru query :return: A list of LabGuruItem objects that match all the provided filters """ all_filters: LGSearchAPI = sum(search_operators) results = await SESSION.aio_get(cls.get_api_url(), meta=True, **all_filters.make_filter(1)) return results['meta']['item_count']
[docs] @classmethod async def async_find_all(cls: Type[LGI], *search_operators: LGSearchOperator) -> List[LGI]: """ AsyncIO-compatible version of LabGuruItem.find_all() :param search_operators: Comparison operators that will be used as filters in the LabGuru query :return: A list of LabGuruItem objects that match all the provided filters """ all_filters: LGSearchAPI = sum(search_operators) results = await SESSION.aio_get(cls.get_api_url(), meta=True, **all_filters.make_filter()) res_objects = [cls.parse_api_data(d) for d in results['data']] try: page_count = results['meta']['page_count'] futures = [] if page_count > 1: for i in range(2, page_count + 1): futures.append(SESSION.aio_get(cls.get_api_url(), meta=False, page=i, **all_filters.make_filter())) result_pages = await tqdm_aio.gather(*futures) for rp in result_pages: res_objects.extend(cls.parse_api_data(d) for d in rp) except KeyError: pass final_results = all_filters.continue_filtering(res_objects) _ = [SESSION._add_to_cache(i) for i in final_results] return final_results
[docs] @classmethod def find_all(cls: Type[LGI], *search_operators: LGSearchOperator) -> List[LGI]: """ Searches the LabGuru API for all objects that match the search operators. All properties that can be used in searches will be called out directly in their own documentation. Args: *search_operators: Comparison operators that will be used as filters in the LabGuru query Returns: A list of LabGuruItem objects that match all the provided filters Examples: Finding all pGRO plasmids that were generated as part of experiment 818:: results = Plasmid.find_all(Plasmid.name.contains('pGRO'), Plasmid.clone_no.contains('GBFP-0818-')) Finding all strains that express Uox25 variants:: results = Strain.find_all(Strain.description.contains('Uox25')) Finding all overnight culture plates from experiment 927:: results = Plate.find_all(Plate.name.starts_with('0927-ONC')) """ return asyncio.run(cls.async_find_all(*search_operators))
[docs] @classmethod async def async_find_one(cls: Type[LGI], *search_operators: LGSearchOperator) -> Optional[LGI]: """ AsyncIO-compatible version of LabGuruItem.find_one() :param search_operators: Comparison operators that will be used as filters in the LabGuru query :return: The first LabGuruItem object that matches all the provided filters """ all_results = await cls.async_find_all(*search_operators) return all_results[0] if len(all_results) > 0 else None
[docs] @classmethod def find_one(cls: Type[LGI], *search_operators: LGSearchOperator) -> Optional[LGI]: """ Searches the LabGuru API for all objects that match the search operators and returns the first match. All properties that can be used in searches will be called out directly in their own documentation. Args: *search_operators: Comparison operators that will be used as filters in the LabGuru query Returns: The first LabGuruItem object that match all the provided filters Examples: Find the largest biomass pellet from GRO20-1895 that was grown in TB:: s = Strain.from_name('GRO20-1895') result = BiomassPellet.find_one(BiomassPellet.parent_strain == s, BiomassPellet.pellet_weight.desc()) """ all_results = cls.find_all(*search_operators) return all_results[0] if len(all_results) > 0 else None
[docs] class LGIError(LabGuruItem): def __init__(self, exception_info: Tuple[Type[BaseException], BaseException, TracebackType], errored_item_dict: Dict, **kwargs): super().__init__(**kwargs) self.exc_type, self.exc_value, self.exc_traceback = exception_info self.traceback_lines = traceback.format_exception(*exception_info) self.errored_item_dict = errored_item_dict @property def traceback(self): return ''.join(self.traceback_lines) @property def info_dict(self): out_dict = self.errored_item_dict.copy() # out_dict['ErrorType'] = self.exc_type.__name__ out_dict['Error'] = str(self.exc_value) out_dict['traceback'] = self.traceback return out_dict def __bool__(self): return False def __str__(self): return f"<LGIError: {self.traceback}>"
[docs] def to_dict(self, **kwargs) -> Dict[str, Any]: return dict()
[docs] def update_api(self: LGI, session: "Session", retries=0) -> LGI: return self
[docs] def get_linked_items(self, of_type: Type[LGI]) -> List[LGI]: return list()
[docs] class Attachment(LabGuruItem, SearchInterface): _api_name = 'attachments' _attribute_dict = { 'attachable': 'attachable' } file = LGAttachmentPath() """An interactable attachment file""" attachable = LGDict() """A dict describing the collection/inventory/ELN item that this file is attached to""" def __init__(self, attach_to: LGI = None): super().__init__() self.attachable = {} if attach_to is not None: self.attach_to(attach_to)
[docs] def get_download_url(self, session: "Session" = SESSION): """Gets the LG URL used to download the attachment file""" return self.process_api_url(f'/attachments/{self.id}/download?token={session.token}', True)
[docs] def download(self, session: "Session" = SESSION): """Downloads the file to a temporary folder""" setattr(self, 'file', self.get_download_url(session))
[docs] def make_file(self, filename: str) -> Path: """ Create a new temporary file with a specific name. :param filename: the desired file name :return: a Path to the new file """ self.file = Path(NamedTemporaryFile().name).parent / filename return self.file
[docs] def attach_to(self, item: LGI): """Sets the attachable dictionary to point to a given LabGuruItem""" self.attachable = { "id" : item.id, "class": item.class_name, "uuid" : item.uuid }
@property def attachable_id(self): return self.attachable.get('id', '') @attachable_id.setter def attachable_id(self, value): self.attachable['id'] = value attachable_id = make_lg_searchable(attachable_id, 'attachable_id') """The database ID of the attached item. Searchable.""" @property def attachable_class(self): return self.attachable.get('class', '') @attachable_class.setter def attachable_class(self, value): self.attachable['class'] = value attachable_class = make_lg_searchable(attachable_class, 'attachable_class') """The programatic class name of the attached item. Searchable.""" @property def attachable_uuid(self): return self.attachable.get('uuid', '') @attachable_uuid.setter def attachable_uuid(self, value): self.attachable['uuid'] = value attachable_uuid = make_lg_searchable(attachable_uuid, 'attachable_uuid') """The database UUID of the attached item. Searchable."""
[docs] def update_api(self: "Attachment", session: "Session", **kwargs) -> LGI: form_data = {"token": session.token} if "uuid" in self.attachable: form_data["item[attach_to_uuid]"] = self.attachable['uuid'] r = requests.post(self.get_api_url(), files={"item[attachment]": self.file.open('rb')}, data=form_data) if r.ok: r_data = r.json() return self.parse_api_data(r_data, session) else: return self
[docs] class Units(Mapping[str, int]): """ A specialized mapping class for managing and converting scientific units. This class provides an interface for accessing scientific units, their corresponding identifiers, and the ability to convert measurements among equivalent units. The class dynamically fetches unit data from an external API when accessed. It supports iteration, length retrieval, and accessing specific unit information via dictionary-like syntax. Attributes: unit_ids (Dict[str, int]): A mapping of unit names to their respective IDs. ids_to_units (Dict[int, str]): A reverse mapping of unit IDs to their names. unit_types (Dict[int, str]): A mapping of unit IDs to their types/categories. """ def __init__(self): self.unit_ids: Dict[str, int] = {} self.ids_to_units: Dict[int, str] = {} self.unit_types: Dict[int, str] = {} def _get_units(self): if len(self.unit_ids) == 0: r = requests.get('https://my.labguru.com/api/v1/units.json?token=' + SESSION.token) self.unit_ids: Dict[str, int] = {r['name']: r['id'] for r in r.json()} self.ids_to_units: Dict[int, str] = {r['id']: r['name'] for r in r.json()} self.unit_types: Dict[int, str] = {r['id']: r['type_for'] for r in r.json()} def __getitem__(self, k: str) -> int: self._get_units() if k: return self.unit_ids[k.replace('μ', chr(181))] else: return 0 def __len__(self) -> int: # self._get_units() return len(self.unit_ids) def __iter__(self) -> Iterator[Tuple[str, int]]: self._get_units() return iter(self.unit_ids.items()) @property def ng_ul(self) -> int: """Shortcut for the ng/μL unit id""" return self['ng/µL'] @property def molar(self) -> int: """Shortcut for the molar unit id""" return self['M'] @property def millimolar(self): """Shortcut for the millimolar unit id""" return self['mM'] @property def micromolar(self): """Shortcut for the micromolar unit id""" return self['µM'] @property def nanomolar(self): """Shortcut for the nanomolar unit id""" return self['nM']
[docs] def convert(self, value: float, from_id: int, to_id: int): """ Converts a value between units of the same type. Example ------- Converting between millimolar & nanomolar:: um = UNITS.convert(1, UNITS.millimolar, UNITS.micromolar) assert um == 1_000 # True :param value: the value of the initial unit :param from_id: the unit ID of the initial value :param to_id: the unit ID for the desired value :return: the converted unit magnitude """ self._get_units() from_unit = self.ids_to_units[from_id].replace(chr(181), 'u') to_unit = self.ids_to_units[to_id].replace(chr(181), 'u') if self.unit_types[from_id] != self.unit_types[to_id]: raise ValueError('Cannot convert between different unit types') elif '/' in from_unit + to_unit: raise ValueError('Cannot convert with molalities (yet)') elif from_unit[-1:] != to_unit[-1:]: raise ValueError('Cannot convert between different SI units') else: si_prefixes = dict(M=6, K=3, m=-3, u=-6, n=-9, p=-12, f=-15) from_exponent = 0 if len(from_unit) == 1 else si_prefixes[from_unit[0]] to_exponent = 0 if len(to_unit) == 1 else si_prefixes[to_unit[0]] return value * 10 ** (from_exponent - to_exponent)
UNITS = Units() #: Importable object used to interact with units/unit IDs
[docs] class HasWells(Sized): """Represents a structure composed of wells arranged in a grid-like positions. This class provides utilities to work with well grids, allowing the mapping between different well naming conventions and positional systems. It includes methods for converting between well names (e.g., `A1`, `A01`), linear grid positions, Tecan positions, as well as to iterate across all valid wells or positions. It also supports operations to handle stocks associated with the wells, which can be abstractly implemented in subclasses. Attributes: rows (int): Number of rows in the grid. cols (int): Number of columns in the grid. """ rows = LGInt(8) cols = LGInt(12) def __init__(self, rows=8, cols=12): super().__init__() self.rows = rows self.cols = cols def __len__(self) -> int: return self.rows * self.cols
[docs] def well_name_to_position(self, name: str) -> int: """Convert a well name (A1) to its correspoinding LG position value""" row_num = ord(name[0].upper()) - 65 col_num = int(name[1:]) return (row_num * self.cols) + col_num
[docs] def position_to_well_name(self, position: int, short=False) -> str: """Convert an LG position value to a well name: (A1 if short=True, A01 if short=False)""" row_num = (position // self.cols) + 1 col_num = position % self.cols if col_num == 0: col_num = self.cols row_num -= 1 if not short: if self.cols >= 10: return f'{chr(row_num + 64)}{col_num:02d}' return f'{chr(row_num + 64)}{col_num:d}'
[docs] def position_to_tecan_well(self, position: int) -> int: """Convert an LG position value (rowwise indexes) to a Tecan position value (columnwise indexes)""" for p, i in zip(self.iter_positions(), range(1, len(self) + 1)): if p == position: return i
[docs] def well_name_to_tecan_well(self, well_name: str) -> int: """Convert a well name (A1) to its corresponding Tecan position value""" return self.position_to_tecan_well(self.well_name_to_position(well_name))
[docs] def tecan_well_to_position(self, tecan_well: int) -> int: """Convert a Tecan position value (columnwise indexes) to an LG position value (rowwise indexes)""" for p, i in zip(self.iter_positions(), range(1, len(self) + 1)): if i == tecan_well: return p
[docs] def tecan_well_to_name(self, tecan_well: int, short: bool = False) -> str: """Convert a Tecan position value to a well name: (A1 if short=True, A01 if short=False)""" return self.position_to_well_name(self.tecan_well_to_position(tecan_well), short)
def _force_short_name(self, name: str) -> str: return self.position_to_well_name(self.well_name_to_position(name), short=True) def _force_long_name(self, name: str) -> str: return self.position_to_well_name(self.well_name_to_position(name))
[docs] def iter_positions(self, rowwise=False) -> Iterable[int]: """Iterate through all available LG position values""" if rowwise: for i in range(self.rows * self.cols): yield i + 1 else: for i in range(self.cols): for j in range(self.rows): yield (i + 1) + (j * self.cols)
[docs] def iter_well_names(self, rowwise=False, short=False) -> Iterable[str]: """Iterate through all available well names""" for pos in self.iter_positions(rowwise): yield self.position_to_well_name(pos, short)
[docs] def iter_tecan_positions(self, rowwise=False) -> Iterable[int]: """Iterate through all available Tecan position values""" for pos in self.iter_positions(rowwise): yield self.position_to_tecan_well(pos)
[docs] @abstractmethod def stocks_from_position(self, position: int, **kwargs) -> List["Stock"]: """Get a list of all stocks at a specified position""" pass
[docs] def stocks_from_well(self, well_name: str, **kwargs) -> List["Stock"]: """Get a list of all stocks in a specified well""" return self.stocks_from_position(self.well_name_to_position(well_name), **kwargs)
[docs] def stocks_from_tecan_position(self, tecan_position: int, **kwargs) -> List["Stock"]: """Get a list of all stocks in a specified tecan position""" return self.stocks_from_position(self.tecan_well_to_position(tecan_position), **kwargs)
[docs] @abstractmethod def copy_stock_to_position(self, stock: "Stock", position: int) -> "Stock": """Make a copy of a stock and add it to a specified position""" pass
[docs] def copy_stock_to_well(self, stock: "Stock", well_name: str) -> "Stock": """Make a copy of a stock and add it to a specified well""" return self.copy_stock_to_position(stock, self.well_name_to_position(well_name))
[docs] def copy_stock_to_tecan_position(self, stock: "Stock", tecan_position: int) -> "Stock": """Make a copy of a stock and add it to a specified well""" return self.copy_stock_to_position(stock, self.tecan_well_to_position(tecan_position))
[docs] @abstractmethod def update_stock_at_position(self, stock: "Stock", position: int) -> "Stock": """Replace a stock at a specified position with a new one""" pass
[docs] def update_stock_at_well(self, stock: "Stock", well_name: str) -> "Stock": """Replace a stock in a specified well with a new one""" return self.update_stock_at_position(stock, self.well_name_to_position(well_name))
if __name__ == '__main__': from LabGuruAPI import SESSION SESSION.login()