import asyncio
import functools
import json
import os
import sys
import traceback
from abc import abstractmethod
from asyncio import Future
from collections import defaultdict
from configparser import ConfigParser
from dataclasses import dataclass, asdict
from pathlib import Path
from tempfile import NamedTemporaryFile
from types import TracebackType
from typing import TypeVar, Dict, Callable, Any, Type, List, Optional, Mapping, overload, Union, Iterator, Tuple, \
Generic, Iterable, Sized, Coroutine, DefaultDict, Awaitable
from aiostream import stream
from deprecation import deprecated
import requests
import typer
from Bio.SeqFeature import FeatureLocation
from pydna.dseq import Dseq
from pydna.dseqrecord import Dseqrecord
from pydna.readers import read as read_seq
from pydna.seqfeature import SeqFeature
from requests_futures.sessions import FuturesSession
from tqdm import tqdm
from tqdm.asyncio import tqdm as tqdm_aio
from appdirs import AppDirs
# import and fix a weird issue with aiohttp
import aiohttp
from functools import wraps
from asyncio.proactor_events import _ProactorBasePipeTransport
from LabGuruAPI._search_api import SearchInterface, LGSearchOperator, LGSearchAPI, make_lg_searchable
def silence_event_loop_closed(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except RuntimeError as e:
if str(e) != 'Event loop is closed':
raise
return wrapper
_ProactorBasePipeTransport.__del__ = silence_event_loop_closed(_ProactorBasePipeTransport.__del__)
###
T = TypeVar('T')
class LGPrimitives(Generic[T], SearchInterface):
base_type: Callable[[Any], T] = int
null_val = None
def __init__(self, default_value: T = None, lg_name: str = None):
"""
Base class used link python properties with LabGuru properties
:param default_value: The value to be supplied to LG if it has not been set elsewhere
:param lg_name: The name of the LabGuru field used in searches. Only necessary if the search name is different
from the property name
"""
if default_value is not None:
self.default_val = default_value
else:
self.default_val = self.null_val
self.labguru_name = lg_name
def __new__(cls: Type[T], *args, **kwargs) -> T:
return super().__new__(cls)
def __set_name__(self, owner, name):
self.private_name = '_' + name
def __set__(self, instance, value):
if value in [None, '']:
new_val = self.null_val or self.base_type()
else:
new_val = self.base_type(value)
setattr(instance, self.private_name, new_val)
def __get__(self, instance, owner) -> Optional[T]:
try:
return getattr(instance, self.private_name) if instance is not None else self
except AttributeError:
return self.default_val
[docs]
class LGInt(LGPrimitives[int], int):
[docs]
def __init__(self, default_value: T = None, lg_name: str = None):
"""
Base class used link python properties with LabGuru integer properties
:param default_value: The value to be supplied to LG if it has not been set elsewhere
:param lg_name: The name of the LabGuru field used in searches. Only necessary if the search name is different
from the property name
"""
super().__init__(default_value, lg_name)
def __set__(self, instance, value):
try:
super().__set__(instance, value)
except ValueError:
super().__set__(instance, float(value))
[docs]
class LGFloat(LGPrimitives[float], float):
base_type = float
[docs]
def __init__(self, default_value: T = None, lg_name: str = None):
"""
Base class used link python properties with LabGuru non-integer numeric properties
:param default_value: The value to be supplied to LG if it has not been set elsewhere
:param lg_name: The name of the LabGuru field used in searches. Only necessary if the search name is different
from the property name
"""
super().__init__(default_value, lg_name)
def __set__(self, instance, value):
try:
value = round(float(value), 9)
except:
pass
super().__set__(instance, value)
[docs]
class LGStr(LGPrimitives[str], str):
base_type = str
[docs]
def __init__(self, default_value: T = None, lg_name: str = None):
"""
Base class used link python properties with LabGuru string properties
:param default_value: The value to be supplied to LG if it has not been set elsewhere
:param lg_name: The name of the LabGuru field used in searches. Only necessary if the search name is different
from the property name
"""
super().__init__(default_value, lg_name)
[docs]
class LGJSONStr(LGPrimitives[str], str):
base_type = str
[docs]
def __init__(self, default_value: T = None, lg_name: str = None):
"""
Base class used link python properties with LabGuru JSON properties with a Python string implementation
:param default_value: The value to be supplied to LG if it has not been set elsewhere
:param lg_name: The name of the LabGuru field used in searches. Only necessary if the search name is different
from the property name
"""
super().__init__(default_value, lg_name)
def __set__(self, instance, value):
if value in [None, '']:
new_val = self.null_val or self.base_type()
elif isinstance(value, str):
new_val = self.base_type(value)
else:
new_val = json.dumps(value)
setattr(instance, self.private_name, new_val)
[docs]
class LGList(Generic[T]):
base_type: Type[T] = int
null_val = None
@classmethod
def new_value_function(cls, value: Any) -> T:
try:
if issubclass(cls.base_type, LabGuruItem):
if isinstance(value, LabGuruItem) and value.id > '':
return SESSION.get_object(cls.base_type, item_id=value.id)
elif isinstance(value, dict):
if 'id' in value and value['id'] is not None:
return SESSION.get_object(cls.base_type, item_id=value['id'])
return cls.base_type.parse_api_data(value)
except (KeyError, ValueError, AttributeError):
pass
return cls.base_type(value)
def __set_name__(self, owner, name):
self.private_name = '_' + name
def __set__(self, instance, value):
if value in [None, '', []]:
new_val = []
else:
try:
new_val = []
for v in value:
if isinstance(v, LabGuruItem):
new_val.append(v)
else:
new_val.append(self.new_value_function(v))
except TypeError:
raise
# raise TypeError(f'{type(instance).__name__}.{self.private_name} was expecting a list of '
# f'{self.base_type.__name__}. Got {type(value).__name__} instead.')
setattr(instance, self.private_name, new_val)
def __get__(self, instance, owner) -> List[T]:
try:
return getattr(instance, self.private_name)
except AttributeError:
setattr(instance, self.private_name, [])
return getattr(instance, self.private_name)
[docs]
class LGStrList(LGList):
base_type = str
[docs]
class LGBool(LGPrimitives[bool], int):
base_type = bool
[docs]
def __init__(self, default_value: T = None, lg_name: str = None):
"""
Base class used link python properties with LabGuru boolean properties
:param default_value: The value to be supplied to LG if it has not been set elsewhere
:param lg_name: The name of the LabGuru field used in searches. Only necessary if the search name is different
from the property name
"""
super().__init__(default_value, lg_name)
[docs]
class LGDict(LGPrimitives[dict], dict):
base_type = dict
[docs]
def __init__(self, default_value: T = None, lg_name: str = None):
"""
Base class used link python properties with LabGuru JSON properties with a Python dict implementation
:param default_value: The value to be supplied to LG if it has not been set elsewhere
:param lg_name: The name of the LabGuru field used in searches. Only necessary if the search name is different
from the property name
"""
super().__init__(default_value, lg_name)
def __set__(self, instance, value):
if isinstance(value, str):
value = json.loads(str)
super().__set__(instance, value)
attachment_session = FuturesSession()
class LGAttachmentPath:
def __init__(self, file_extension: str = ''):
"""
Base class used link python file paths with LabGuru string properties
:param file_extension: The file extension to use in the temporary file
"""
self.file_extension = f'.{file_extension}' if file_extension else None
def __set_name__(self, owner, name):
self.private_name = f'_{name}_path'
self.private_request = f'_{self.private_name}_request'
self.private_update_status = f'_{self.private_request}_updated'
def __get__(self, instance, owner) -> Path:
try:
result: Optional[Future] = getattr(instance, self.private_request)
temp_file: str = getattr(instance, self.private_name)
except AttributeError:
result = None
try:
temp_file: str = getattr(instance, self.private_name)
except AttributeError:
temp_file = NamedTemporaryFile(delete=False, suffix=self.file_extension).name
setattr(instance, self.private_name, temp_file)
if os.path.exists(temp_file) and os.path.getsize(temp_file) > 0:
# print('File Already Written')
pass
elif result is not None:
# print('Waiting for download')
r = result.result()
# print(r)
# print('Writing to file')
Path(temp_file).write_bytes(r.content)
# print('Written')
return Path(temp_file)
def __set__(self, instance, value: Union[str, Path]):
if 'http' in str(value): # remote instance
temp_file = NamedTemporaryFile(delete=False, suffix=self.file_extension)
# print('Making Tempfile')
setattr(instance, self.private_name, temp_file.name)
# print(temp_file.name)
cur_get = attachment_session.get(value)
setattr(instance, self.private_request, cur_get)
else:
setattr(instance, self.private_name, value if isinstance(value, Path) else Path(value))
setattr(instance, self.private_request, None)
setattr(instance, self.private_update_status, hasattr(instance, self.private_update_status))
class LGSeqRecord(LGAttachmentPath):
def __init__(self, file_extension: str = ''):
"""
Base class used link python sequence objects with LabGuru string properties
:param file_extension: The file extension to use in the temporary file
"""
super().__init__(file_extension)
def __set_name__(self, owner, name):
super().__set_name__(owner, name)
self.private_sequence = f'_{name}_seqrecord'
def __get__(self, instance, owner) -> Dseqrecord:
try:
seq_record = getattr(instance, self.private_sequence)
# print('Already converted')
return seq_record
except AttributeError:
record_path = super().__get__(instance, owner)
# print('Parsing to Dseqrecord')
seq_record: Dseqrecord = read_seq(str(record_path))
seq_record.features = list(filter(lambda f: f.location is not None, seq_record.features))
overhangs = [f for f in seq_record.features if f.type.lower() == 'overhang']
if overhangs:
ovhg_5_len = ovhg_3_len = 0
seq_len = len(seq_record.seq)
for cur_ovhg in overhangs:
# print(cur_ovhg.location)
if cur_ovhg.location.start == 0:
ovhg_5_len = cur_ovhg.location.end - cur_ovhg.location.start
ovhg_5_len *= -1 * cur_ovhg.strand
elif cur_ovhg.location.end == seq_len:
ovhg_3_len = cur_ovhg.location.end - cur_ovhg.location.start
ovhg_3_len *= cur_ovhg.strand
# make watson strand
watson_start = max(0, ovhg_5_len)
watson_end = min(seq_len, seq_len + ovhg_3_len)
watson = str(seq_record.seq)[watson_start:watson_end]
# make crick strand
crick_start = max(0, ovhg_3_len)
crick_end = min(seq_len, seq_len + ovhg_5_len)
crick = str(seq_record.seq.rc())[crick_start:crick_end]
seq_record.seq = Dseq(watson, crick, ovhg=ovhg_5_len)
setattr(instance, self.private_sequence, seq_record)
return seq_record
def __set__(self, instance, value: Union[str, Path, Dseqrecord]):
try:
delattr(instance, self.private_sequence)
except AttributeError:
pass
if isinstance(value, Dseqrecord):
value, seq_record = NamedTemporaryFile(delete=True), value
value.close()
o5_type, o5_seq = seq_record.seq.five_prime_end()
o3_type, o3_seq = seq_record.seq.three_prime_end()
if o5_seq:
f5 = [f for f in seq_record.features if f.location.start == 0 and f.type.lower() == 'overhang']
if not f5:
o5_dir = 1 if '5' in o5_type else -1
o5_feature = SeqFeature(FeatureLocation(0, len(o5_seq)), strand=o5_dir, type='overhang')
seq_record.features.append(o5_feature)
if o3_seq:
seq_len = len(seq_record)
f3 = [f for f in seq_record.features if f.location.end == seq_len and f.type.lower() == 'overhang']
if not f3:
o3_dir = -1 if '5' in o5_type else 1
o3_feature = SeqFeature(FeatureLocation(seq_len - len(o3_seq), seq_len), strand=o3_dir,
type='overhang')
seq_record.features.append(o3_feature)
seq_record.write(str(value.name))
value = value.name
super().__set__(instance, value)
LGI = TypeVar('LGI', bound='LabGuruItem') #: TypeVar shortcut for all LabGuruItem subclasses
@dataclass(eq=True, frozen=True)
class SessionCacheKey:
lgi_class: Type[LGI]
item_id: int = None
name: str = None
auto_name: str = None
uuid: str = None
api_url: str = None
include_custom: bool = False
def to_kwargs(self):
out_args = asdict(self)
del out_args['lgi_class']
return out_args
def desc(self) -> str:
d_parts = [self.lgi_class.__name__] + [f'{k}={str(v)}' for k, v in self.to_kwargs().items() if v]
return '/'.join(d_parts)
def __repr__(self):
return f'<SessionCacheKey: {self.desc()}>'
def __hash__(self):
return hash((self.lgi_class.__name__, tuple(self.to_kwargs().values())))
@dataclass
class SessionTransaction:
transaction_type: str
item: LGI
@staticmethod
def ADD(item: LGI):
return SessionTransaction('add', item)
@staticmethod
def UPDATE(item: LGI):
return SessionTransaction('update', item)
@staticmethod
def DELETE(item: LGI):
return SessionTransaction('delete', item)
def throttled_run(*futures: Coroutine[Any, Any, LGI], delay: float = 0.05) -> List[LGI]:
async def _run(*fs: Coroutine[Any, Any, LGI]):
_sem = asyncio.Semaphore()
async def _wrap(f: Coroutine[Any, Any, LGI]):
async with _sem:
await asyncio.sleep(delay)
await_f = await f
return await_f
return await tqdm_aio.gather(*map(_wrap, fs))
return asyncio.run(_run(*futures))
[docs]
class LGDuplicateNameError(BaseException):
pass
[docs]
class Session(object):
"""
An object that is used to interface with the LabGuru REST API
"""
[docs]
def __init__(self, token: str = None):
"""
An object that is used to interface with the LabGuru REST API
:param token: The API token used to manage the session.
"""
if token:
self._token = token
self.cache: Dict[SessionCacheKey, LGI] = {}
"""A cache of LabGuruItems to reduce the number of queries to the LabGuru API"""
self.transactions: List[SessionTransaction] = []
"""Currently unused"""
if 'lg_config' in os.environ:
self.app_path = Path(os.environ['lg_config'])
else:
self.app_path = Path(AppDirs("FoundryBackend", "GROBio").user_data_dir)
"""The base path for foundry backend config files"""
self.app_path.mkdir(parents=True, exist_ok=True)
env_config_path = os.getenv('LGAPI_CONFIG')
self._config_path = Path(env_config_path) if env_config_path else self.app_path / 'config.txt'
self.config = ConfigParser()
"""Interface for the backend config file"""
if self._config_path.exists():
self.config.read(self._config_path)
@property
def token(self):
"""The token used to authenticate API calls"""
return self.login()
[docs]
def login(self) -> str:
"""
Checks the validity of a cached API token and refreshes it if necessary
:return: The current API token
"""
try:
stored_token = self._token
self.set_config_value('AUTH', 'TOKEN', stored_token)
return self._token
except AttributeError:
pass
try:
stored_token = self.get_config_value('AUTH', 'TOKEN')
tqdm.write(f'Trying token {stored_token}')
r = requests.get('https://my.labguru.com/api/v1/projects/1.json', params=dict(token=stored_token))
tqdm.write(str(r.status_code))
if r.ok:
self._token = stored_token
return self._token
except KeyError:
pass
from botocore.exceptions import ClientError, NoCredentialsError
try: # Try to get credentials though AWS
r = self._aws_login()
except (ClientError, NoCredentialsError, KeyError) as e: # Otherwise just prompt for a login
tqdm.write('Please Login to LabGuru')
un = input('Username: ')
pw = input('Password: ')
if un == 'AWS':
self.set_config_value('AUTH', 'SECRET_NAME', pw)
r = self._aws_login()
else:
r = requests.post('https://my.labguru.com/api/v1/sessions.json', data={
'login': un, 'password': pw
})
data = r.json()
self._token = data['token']
if self._token == '-1':
tqdm.write('Invalid Login. Please, retry. If you were unable to fill in your username and password, you need to run this script in a python console.')
return self._token
tqdm.write(self._token)
self.set_config_value('AUTH', 'TOKEN', self._token)
return self._token
def _aws_login(self):
secret_id = self.get_config_value('AUTH', 'SECRET_NAME', ask=False)
tqdm.write('Refreshing Token via AWS')
import boto3
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name="us-east-1"
)
get_secret_value_response = client.get_secret_value(
SecretId=secret_id
)
r = requests.post('https://my.labguru.com/api/v1/sessions.json',
data=json.loads(get_secret_value_response['SecretString']))
return r
[docs]
def get_config_value(self, section: str, key: str, ask=False) -> str:
"""
Retrieves a configuration value from the config file.
:param section: Config file section
:param key: Config key to retrieve
:param ask: If true, prompts the user for a value if the key is not found. Will store and return the value.
:return: The configuration value string
:raises KeyError: The configuration Section or Key does not exist if the ask parameter is False
"""
if ask:
try:
return self.config[section][key]
except KeyError:
value = input(f'Please enter a value for configuration value {section.upper()}:{key} or just type No: ')
self.set_config_value(section, key, value)
return self.config[section][key]
try:
self.config[section][key]
except KeyError:
if self._config_path.exists():
self.config.read(self._config_path)
return self.config[section][key]
[docs]
def set_config_value(self, section: str, key: str, value: Any):
"""
Writes a key-value pair to the configuration file.
:param section: Config file section
:param key: Config key to set
:param value: Value of the config key. Will be converted to a string before writing
"""
try:
self.config[section][key] = value
except KeyError:
self.config.add_section(section)
self.config[section][key] = str(value)
with self._config_path.open('w') as cfg:
self.config.write(cfg)
[docs]
def make_api_path(self, endpoint: str, with_token: bool = False) -> str:
"""
Formats an API path with a given endpoint.
:param endpoint: The API endpoint to be called. Will prepend 'https://my.labguru.com/api/v1/' if not included.
:param with_token: if True, adds the current token as a URL parameter
:return: The formatted API URL
"""
endpoint = endpoint.replace('https://my.labguru.com/api/v1/', '')
url = f'https://my.labguru.com/api/v1/{endpoint}'
if with_token:
url += f'?token={self.token}'
return url
[docs]
def add(self, item: LGI) -> LGI:
"""
Adds an item to the LG database. Updates the cache.
:param item: The LabGuruItem to be added
:return: A freshly-queried version of the added item
"""
item = item.update_api(self)
self.transactions.append(SessionTransaction.ADD(item))
self._add_to_cache(item)
return item
[docs]
async def aio_add(self, item: LGI) -> LGI:
"""
Asynchronously adds an item using a separate thread executor and returns the
result. If an exception occurs during execution, the error information is
captured and returned as an LGIError.
Args:
item: The LGI instance to be added.
Returns:
LGI: The result of the addition if successful, otherwise an LGIError
containing the exception information and a dictionary representation of the
item.
"""
try:
return await asyncio.get_event_loop().run_in_executor(None, self.add, item)
except:
return LGIError(sys.exc_info(), item.to_dict())
[docs]
def add_many(self, items: List[LGI]) -> List[LGI]:
"""
Adds a list of items to the database in parallel. Updates the cache.
:param items: A list of LabGuruItem objects to be added to the database
:return: A freshly-queried list of the added items. The input object order is preserved.
"""
empty_items = [type(lgi).bulk_update(name=f'{lgi.xlsx_collection} Holder {i + 1}') for i, lgi in
enumerate(items)]
futures = [self.aio_add(lgi) for lgi in empty_items]
tqdm_aio.write('Reserving IDs for new items')
results = asyncio.run(tqdm_aio.gather(*futures))
for real_item, empty_item in zip(items, results):
real_item.id = empty_item.id
return self.update_many(items)
[docs]
def update(self, item: LGI) -> LGI:
"""
Updates an item in the LG database. Updates the cache.
:param item: The LabGuruItem to be updated
:return: A freshly-queried version of the updated item
"""
old_parent_uuids = {v for k, v in item.__dict__.items() if 'api_uuid' in k}
try:
item = item.update_api(self)
finally:
# Clean up item parents
new_parent_uuids = {v for k, v in item.__dict__.items() if 'api_uuid' in k}
removed_parent_uuids = old_parent_uuids - new_parent_uuids
if removed_parent_uuids:
parent_links = self.get(item.item_api_url(item.id, True) + '/get_parents')
for cur_link in parent_links:
if cur_link['target_uuid'] in removed_parent_uuids:
self.del_request(f"/links/{cur_link['id']}")
self.transactions.append(SessionTransaction.UPDATE(item))
self._add_to_cache(item)
return item
[docs]
async def aio_update(self, item: LGI) -> LGI:
"""Updates an LGI item asynchronously using an executor.
This method utilizes Python's asyncio library to execute the update
method asynchronously in a separate thread. It is useful when
performing non-blocking I/O operations that involve updating an
LGI item.
Args:
item: The LGI object that needs to be updated. It must adhere
to the LGI interface, providing a to_dict method.
Returns:
LGI: Returns an updated LGI object if the update is successful.
In the event of an exception during execution, an LGIError
containing the exception information and the original item's
dictionary form is returned.
"""
try:
return await asyncio.get_event_loop().run_in_executor(None, self.update, item)
except:
return LGIError(sys.exc_info(), item.to_dict())
[docs]
def update_many(self, items: List[LGI], delay=0.1) -> List[LGI]:
"""
Updates a list of items in the database in parallel. Updates the cache.
:param items: A list of LabGuruItem objects to be updated
:return: A freshly-queried list of the updated items. The input object order is preserved.
"""
futures = [self.aio_update(i) for i in items]
tqdm_aio.write('Updating items')
results = throttled_run(*futures, delay=delay)
for i, r in enumerate(results):
items[i].__dict__ = results[i].__dict__
return items
[docs]
def refresh(self, item: LGI) -> LGI:
"""
Forces a re-query of a LabGuruItem. Replaces the cached version with the new item.
:param item: the LabGuruItem to be refreshed
:return: the refreshed LabGuruItem
"""
self._remove_from_cache(item)
return self.get_object(type(item), item_id=item.id, proxy=False)
[docs]
def delete(self, item: LGI) -> bool:
"""
Removes an item from the LG database and the session cache. This is irreversible.
:param item: The LabGuruItem to be deleted.
:return: True if the request succeeded.
"""
r = requests.delete(item.item_api_url(item.id) + f"?token={self.token}")
if r.ok:
self._remove_from_cache(item)
self.transactions.append(SessionTransaction.DELETE(item))
return True
return False
[docs]
def archive(self, item: LGI) -> bool:
"""
Archives an item in the LG database. Not available for all items.
:param item: The LabGuruItem to be archived.
:return: True if the request succeeded.
"""
data = dict(token=self.token, selected_items=item.id)
endpoint = item.api_url.replace(item.id, "archive_selected")
r = requests.put(item.process_api_url(endpoint, True), data=data)
if r.ok:
# self._remove_from_cache(item)
return True
return False
[docs]
async def aio_archive(self, item: LGI) -> bool:
"""
Archives the specified item asynchronously by making an asynchronous HTTP PUT
request to the item's API endpoint. The method constructs the request using
the provided item's ID and a token for authentication.
Args:
item (LGI): The item to be archived, represented as an instance of LGI
class.
Returns:
bool: Returns True if the item was archived successfully, otherwise False.
"""
data = dict(token=self.token, selected_items=item.id)
endpoint = item.item_api_url(item.id).replace(item.id, "archive_selected")
r = await self.aio_put(endpoint, **data)
if r is not None:
if 'error_message' in r:
return False
# self._remove_from_cache(item)
return True
return False
[docs]
def restore(self, item: LGI) -> bool:
"""
Restores an archived item in the LG database. Not available for all items.
:param item: The LabGuruItem to be restored.
:return: True if the request succeeded.
"""
data = dict(token=self.token, selected_items=item.id)
endpoint = item.api_url.replace(item.id, "restore_selected")
r = requests.put(item.process_api_url(endpoint, True), data=data)
if r.ok:
self._remove_from_cache(item)
return True
return False
def _add_to_cache(self, item: LGI):
if item is not None:
lgi_class = type(item)
include_custom = any('custom' in k for k in item.other_properties.keys())
if item.id:
self.cache[SessionCacheKey(lgi_class, item_id=int(item.id), include_custom=include_custom)] = item
self.cache[SessionCacheKey(lgi_class, item_id=str(item.id), include_custom=include_custom)] = item
api_url = lgi_class.item_api_url(item.id)
self.cache[SessionCacheKey(lgi_class, api_url=api_url, include_custom=include_custom)] = item
if item.name:
self.cache[SessionCacheKey(lgi_class, name=item.name, include_custom=include_custom)] = item
if item.uuid:
self.cache[SessionCacheKey(lgi_class, uuid=item.uuid, include_custom=include_custom)] = item
if item.api_url:
api_url: str = lgi_class.process_api_url(item.api_url)
self.cache[SessionCacheKey(lgi_class, api_url=api_url, include_custom=include_custom)] = item
if item.auto_name:
self.cache[SessionCacheKey(lgi_class, auto_name=item.auto_name, include_custom=include_custom)] = item
def _get_from_cache(self, key: SessionCacheKey) -> Optional[LGI]:
try:
return self.cache[key]
except KeyError:
return None
def _remove_from_cache(self, item: LGI):
if item is not None:
lgi_class = type(item)
include_custom = any('custom' in k for k in item.other_properties.keys())
if item.id:
try:
del self.cache[SessionCacheKey(lgi_class, item_id=item.id, include_custom=include_custom)]
except KeyError:
pass
if item.name:
try:
del self.cache[SessionCacheKey(lgi_class, name=item.name, include_custom=include_custom)]
except KeyError:
pass
if item.uuid:
try:
del self.cache[SessionCacheKey(lgi_class, uuid=item.uuid, include_custom=include_custom)]
except KeyError:
pass
if item.api_url:
try:
del self.cache[SessionCacheKey(lgi_class, api_url=item.api_url, include_custom=include_custom)]
except KeyError:
pass
if item.auto_name:
try:
del self.cache[SessionCacheKey(lgi_class, auto_name=item.auto_name, include_custom=include_custom)]
except KeyError:
pass
[docs]
def dump_cache(self, verbose=True) -> DefaultDict[str, List[Dict]]:
"""
Dumps the contents of the cache into a dictionary of lists categorized
by class display names. This function iterates over unique cached
items and converts them into dictionary format, organizing them
according to their display name.
Args:
verbose (bool): If True, displays a progress bar using tqdm to show
the progress of dumping the cache. If False, the progress bar
is disabled.
Returns:
DefaultDict[str, List[Dict]]: A dictionary where each key is the
class display name and the value is a list of dictionaries
representing cached items of that class.
"""
out_dict = defaultdict(list)
cur_item: LGI
for cur_item in tqdm(set(self.cache.values()), disable=not verbose, leave=False):
out_dict[cur_item.class_display_name].append(cur_item.to_dict())
return out_dict
[docs]
def load_cache(self, dump: DefaultDict[str, List[Dict]], verbose=True):
"""Loads cache from a given data structure and populates internal storage.
Iterates over a provided dictionary, mapping class names to lists of item
data. For each class name, it attempts to retrieve the corresponding class
from a predefined collection. If a class is found, it parses each item data
using the class-specific parsing method and adds the resulting item to the
cache. Progress through the process can optionally be displayed.
Args:
dump: A defaultdict where keys are class names (str) and values are
lists of dictionaries representing item data.
verbose: A boolean indicating whether to display progress information
during the caching process.
"""
from LabGuruAPI._collections import COLLECTIONS_BY_NAME
for cur_class_name, cur_item_list in tqdm(dump.items(), disable=not verbose, leave=False):
cur_class = COLLECTIONS_BY_NAME.get(cur_class_name, None)
if cur_class is None:
continue
for cur_item_dict in tqdm(cur_item_list, disable=not verbose, leave=False, desc=cur_class_name):
cur_item = cur_class.parse_api_data(cur_item_dict)
self._add_to_cache(cur_item)
[docs]
def execute_async(self, awaitables: List[Awaitable[Optional[LGI]]], task_limit=5,
verbose=True) -> List[Optional[LGI]]:
"""
Executes a set of asynchronous tasks with a limit on the number of concurrent tasks,
and optionally displays a progress bar.
Args:
awaitables: A list of awaitable objects that represent the asynchronous tasks to be executed.
task_limit: An integer that sets the maximum number of tasks that can be run concurrently.
verbose: A boolean flag to control the display of a progress bar; if True, the progress bar is shown.
"""
async def _await(a: Awaitable[Optional[LGI]], top_pbar: tqdm_aio) -> Optional[LGI]:
r = await a
top_pbar.update()
return r
async def _run():
with tqdm_aio(total=len(awaitables), disable=not verbose) as t_pbar:
t_stream = stream.map(stream.iterate(awaitables), _await, stream.repeat(t_pbar), task_limit=task_limit)
out_vals = await stream.list(t_stream)
return out_vals
return asyncio.run(_run())
[docs]
async def aio_get_object(self, lgi_class: Type[LGI], item_id: int = None, name: str = None, uuid: str = None,
api_url: str = None, auto_name: str = None, include_custom=False, from_cache=True) -> LGI:
"""
Asynchronously retrieves an object of the specified LGI class using provided
identifying criteria or API URL. The object is fetched from cache if available;
otherwise, it is retrieved from the API.
Args:
lgi_class: The LGI class type that determines the type of object to
retrieve.
item_id: The identifier of the item to be fetched. Defaults to None.
name: The name of the item to be retrieved. Defaults to None.
uuid: The universally unique identifier of the item. Defaults to None.
api_url: The API URL to directly fetch the item from. May include percent
encoding, which will be replaced. Defaults to None.
auto_name: The automatic name used in fetching the object. Defaults to None.
include_custom: A boolean indicating whether to include custom fields when
retrieving the object. Defaults to False.
from_cache: A boolean indicating whether to attempt fetching the object
from cache before contacting the API. Defaults to True.
Returns:
An instance of LGI corresponding to the specified criteria if found, or an
LGIError instance if an error occurs during fetching or processing.
"""
api_url = api_url.replace('%20', ' ') if api_url else None
key = SessionCacheKey(lgi_class, item_id, name, auto_name, uuid, api_url, include_custom)
if from_cache:
try:
out_item = self.cache[key]
# tqdm.write(f'Got {repr(out_item)} from Cache')
return out_item
except KeyError:
pass
# lgi_item = lgi_class.from_api(self, item_id, name, uuid, api_url, auto_name, include_custom)
future_lgi = asyncio.get_event_loop().run_in_executor(None, lgi_class.from_api, self, item_id, name, uuid,
api_url, auto_name, include_custom)
try:
lgi_item = await future_lgi
except:
return LGIError(sys.exc_info(), key.to_kwargs())
self._add_to_cache(lgi_item)
try:
_ = lgi_item.sequence
except AttributeError:
pass
except:
return LGIError(sys.exc_info(), key.to_kwargs())
return lgi_item
[docs]
async def aio_get_from_name(self, potential_classes: List[Type[LGI]], item_name: str) -> Optional[LGI]:
"""
Asynchronously attempts to retrieve an item by its name from a list of potential
classes. This function concurrently queries each class for the desired item and
returns the first matching item found.
Args:
potential_classes (List[Type[LGI]]): A list of classes that are potential
candidates from which the item might be retrieved. Each class should
offer a method for obtaining an item by name.
item_name (str): The name of the item to be retrieved. This is the
identifying string used to search for the item within each provided class.
Returns:
Optional[LGI]: The first item found that matches the given name, if any;
otherwise, returns None if no matching item is found across all provided classes.
"""
potential_items = await asyncio.gather(*(self.aio_get_object(t, name=item_name) for t in potential_classes))
for i in potential_items:
if i:
return i
return None
[docs]
def get_object(self, lgi_class: Type[LGI], item_id: int = None, name: str = None, uuid: str = None,
api_url: str = None, auto_name: str = None, include_custom=False, from_cache=True,
proxy=True) -> LGI:
"""
Retrieves a ``LabGuruItem`` from the API. Only 1 of the following parameters will be queried, in the following
order: item_id, name, uuid, auto_name.
:param lgi_class: The python class of the desired item
:param item_id: The database id of the desired item
:param name: The name of the desired item
:param uuid: The uuid of the desired item
:param api_url: The API URL of the desired item
:param auto_name: The SysID of the desired item
:param include_custom: If true, include the ~200 "custom###" fields returned by the api. Default: False
:param from_cache: If false, forces an API call even if the object is cached. Default: True
:param proxy: If true, returns a proxy object with lazy API calls. Default: True
:return: The requested ``LabGuruItem``
"""
api_url = api_url.replace('%20', ' ') if api_url else None
key = SessionCacheKey(lgi_class, item_id, name, auto_name, uuid, api_url, include_custom)
if from_cache:
try:
out_item = self.cache[key]
# tqdm.write(f'Got {repr(out_item)} from Cache')
return out_item
except KeyError:
pass
if proxy:
return self._make_proxy_object(lgi_class, key)
else:
lgi_item = lgi_class.from_api(self, item_id, name, uuid, api_url, auto_name, include_custom)
# if lgi_item is not None:
# tqdm.write(f'Got {repr(lgi_item)} from LG')
self._add_to_cache(lgi_item)
return lgi_item
[docs]
def get_many(self, lgi_class: Type[LGI], item_ids: List[int] = None, names: List[str] = None,
uuids: List[str] = None, api_urls: List[str] = None, auto_names: List[str] = None,
include_custom=False, from_cache=True) -> List[LGI]:
"""
Retrieves a list of ``LabGuruItem`` objects from the API. Only 1 of the following parameters will be queried,
in the following order: item_ids, names, uuids, auto_names.
:param lgi_class: The python class of the desired items
:param item_ids: The database ids of the desired items
:param names: The names of the desired items
:param uuids: The uuids of the desired items
:param api_urls: The API URLs of the desired items
:param auto_names: The SysIDs of the desired items
:param include_custom: If true, include the ~200 "custom###" fields returned by the api. Default: False
:param from_cache: If false, forces an API call even if the object is cached. Default: True
:return: The requested LabGuruItem objectss
"""
item_kwargs = []
if item_ids:
item_kwargs += [dict(item_id=v) for v in item_ids]
if names:
item_kwargs += [dict(name=v) for v in names]
if uuids:
item_kwargs += [dict(uuid=v) for v in uuids]
if api_urls:
item_kwargs += [dict(api_url=v) for v in api_urls]
if auto_names:
item_kwargs += [dict(auto_name=v) for v in auto_names]
futures = [self.aio_get_object(lgi_class, include_custom=include_custom, from_cache=from_cache, **kwargs)
for kwargs in item_kwargs]
tqdm_aio.write(f'Retrieving {lgi_class.xlsx_collection}')
return throttled_run(*futures)
def get_object_from_cache_key(self, key: SessionCacheKey):
return self.get_object(**key.to_kwargs())
async def aio_get_object_from_cache_key(self, key: SessionCacheKey):
return await self.aio_get_object(lgi_class=key.lgi_class, **key.to_kwargs())
[docs]
async def get_all_dicts(self, lgi_class: Type[LGI], page_size: int = 10):
"""
Retrieves an entire collection.
:param lgi_class: The python class to be queried
:param page_size: number of objects to return per api call
:return: the JSON representation of all queried objects
"""
meta_result = self.get(lgi_class.get_api_url(), meta=True, page_size=page_size)
meta_data = meta_result['meta']
object_dicts = []
cs = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout())
get_pages = [self.aio_get(lgi_class.get_api_url(), cs, meta=False, page_size=page_size, page=(i + 1))
for i in range(meta_data['page_count'])]
results = await tqdm_aio.gather(*get_pages, leave=False)
for cur_json in results:
object_dicts.extend(cur_json)
await cs.close()
return object_dicts
def _make_proxy_object(self, lgi_type: Type[LGI], session_key: SessionCacheKey) -> LGI:
cache_fxn = self._add_to_cache
from_cache_fxn = self._get_from_cache
_LGIT = TypeVar('_LGIT', bound=lgi_type)
class _LGIProxy(lgi_type):
def __init__(self):
object.__setattr__(self, '_proxy_target', session_key)
def __getattr__(self, item):
attr = getattr(self._obj, item)
return attr
def __getattribute__(self, item):
try:
obj = object.__getattribute__(self, '_proxy_obj')
return obj.__getattribute__(item)
except AttributeError:
return super().__getattribute__(item)
def __setattr__(self, key, value):
setattr(self._obj, key, value)
def __delattr__(self, item):
delattr(self._obj, item)
def __repr__(self):
try:
return repr(object.__getattribute__(self, '_proxy_obj'))
except AttributeError:
return f'<LG Proxy: {session_key.desc()}>'
@property
def _obj(self) -> _LGIT:
try:
return object.__getattribute__(self, '_proxy_obj')
except AttributeError:
key: SessionCacheKey = object.__getattribute__(self, '_proxy_target')
obj = from_cache_fxn(key)
if obj:
object.__setattr__(self, '_proxy_obj', obj)
return obj
else:
obj = lgi_type.from_api(**key.to_kwargs())
object.__setattr__(self, '_proxy_obj', obj)
cache_fxn(obj)
return obj
@property
def lgi_type(self) -> Type[LGI]:
return lgi_type
def make_new_copy(self) -> _LGIT:
return self._obj.make_new_copy()
@classmethod
def make_new(cls, **properties) -> _LGIT:
return lgi_type.make_new(**properties)
return _LGIProxy()
@staticmethod
def make_filter(field_name: str, operator: str, value: Any, page_size: int = 1, logic: str = 'and',
filter_id: int = 0) -> Dict[str, Any]:
return {
"kendo" : True,
"filter" : {
"logic" : logic,
"filters": {
str(filter_id): {
"value" : value,
"operator": operator,
"field" : field_name
}
}
},
"page_size": page_size
}
[docs]
@deprecated(details='Please use LabGuruItem.find_all or LabGuruItem.find_one')
def search_api(self, item_type: Type[LGI], field_name: str, operator: str, value: Any,
page_size: int = 100) -> List[LGI]:
results = []
for item in item_type.search_api(self, self.make_filter(field_name, operator, value, page_size)):
results.append(item)
self._add_to_cache(item)
return results
[docs]
@deprecated(details='Please use LabGuruItem.async_find_all or LabGuruItem.async_find_one')
async def aio_search_api(self, item_type: Type[LGI], field_name: str, operator: str, value: Any,
page_size: int = 100) -> List[LGI]:
filter_dict = self.make_filter(field_name, operator, value, page_size)
results = await item_type.aio_search_api(self, filter_dict)
for item in results:
self._add_to_cache(item)
return results
[docs]
def link_objects(self, from_item: LGI, to_item: LGI) -> bool:
"""
Creates a link between LG items
:param from_item: a LabGuruItem
:param to_item: a LabGuruItem
:return: True if the link is successful
"""
return bool(self.post('links.json', item={'source_uuid': from_item.uuid,
'target_uuid': to_item.uuid}))
[docs]
async def aio_link_objects(self, from_item: LGI, to_item: LGI) -> bool:
"""
Creates a link between LG items
:param from_item: a LabGuruItem
:param to_item: a LabGuruItem
:return: True if the link is successful
"""
return bool(await self.aio_post('links.json', item={'source_uuid': from_item.uuid,
'target_uuid': to_item.uuid}))
[docs]
def get(self, endpoint: str, **data) -> Optional[Dict[str, Any]]:
"""Sends a GET request to a specified API endpoint using given data and returns
the response as a JSON object if the request is successful.
The method appends the authentication token to the request data and sends a GET
request to the constructed API path using the input endpoint and additional
data. If the response status is successful, it returns the response content as
a JSON object. Otherwise, it returns None.
Args:
endpoint: The API endpoint to send the GET request to.
**data: Additional data to be included in the request. This data will be
sent as a JSON object in the GET request.
Returns:
A dictionary containing the JSON response data if the request is successful.
Returns None if the request fails.
"""
data['token'] = self.token
r = requests.get(self.make_api_path(endpoint), json=data)
if r.ok:
return r.json()
else:
return None
[docs]
async def aio_get(self, endpoint: str, c_session: aiohttp.ClientSession = None, retries=3,
**data) -> Optional[Dict[str, Any]]:
"""
Performs an asynchronous GET request to a specified API endpoint with
optional retry logic.
Args:
endpoint: The API endpoint to which the GET request is sent.
c_session: An optional aiohttp.ClientSession to be used for the request.
A new session is created if not provided.
retries: The number of times the request should be retried upon
encountering server errors (HTTP status 500 and above).
**data: Additional data to be included in the request, specifically
a 'token' is added from the instance attribute for authorization.
Returns:
An optional dictionary containing the JSON response from the API if the
request is successful; otherwise, None.
"""
data['token'] = self.token
session = c_session or aiohttp.ClientSession()
async with session.get(self.make_api_path(endpoint), timeout=aiohttp.ClientTimeout(), json=data) as r:
if r.ok:
result = await r.json()
elif r.status >= 500 and retries > 0:
retries -= 1
result = await self.aio_get(endpoint, session, retries, **data)
else:
result = None
if c_session is None:
await session.close()
return result
[docs]
def post(self, endpoint: str, **data) -> Optional[Dict[str, Any]]:
"""
Sends a POST request to the provided API endpoint with the given data and a token.
The method constructs the full API path from the endpoint, adds a token to the
request data, and sends the POST request. If the request is successful, the
response is parsed as JSON and returned. Otherwise, returns None.
Args:
endpoint (str): The API endpoint to which the POST request is sent.
**data: Arbitrary keyword arguments representing the request body to be sent
in JSON format.
Returns:
Optional[Dict[str, Any]]: Parsed JSON response from the POST request if
successful; otherwise, None.
"""
data['token'] = self.token
r = requests.post(self.make_api_path(endpoint), json=data)
if r.ok:
return r.json()
else:
return None
[docs]
def put(self, endpoint: str, **data) -> Optional[Dict[str, Any]]:
"""
Updates a resource using a HTTP PUT request to the specified endpoint of the API with the provided
data, appending the authentication token to the payload. Returns the parsed JSON response if the
request is successful, otherwise returns None.
Args:
endpoint (str): The API endpoint to send the PUT request to.
**data: Arbitrary keyword arguments representing the data to be included in the PUT request.
Returns:
Optional[Dict[str, Any]]: The parsed JSON response from the API if the request is successful,
otherwise None.
"""
data['token'] = self.token
r = requests.put(self.make_api_path(endpoint), json=data)
if r.ok:
return r.json()
else:
return None
[docs]
def del_request(self, endpoint: str, **data) -> Optional[Dict[str, Any]]:
"""
Sends a DELETE request to a specified API endpoint with additional data.
This function constructs the endpoint path using the provided `endpoint`
and sends a DELETE request. The token is automatically included in the
data payload. If the request is successful, it returns the JSON response
as a dictionary. Otherwise, it returns None.
Args:
endpoint:
The API endpoint to which the DELETE request will be sent.
**data:
Arbitrary keyword arguments to be included in the request payload.
These will be merged with the authentication token.
Returns:
Optional[Dict[str, Any]]: The JSON response as a dictionary if the
request is successful, or None if the request fails.
"""
data['token'] = self.token
r = requests.delete(self.make_api_path(endpoint), json=data)
if r.ok:
return r.json()
else:
return None
[docs]
async def aio_put(self, endpoint: str, **data) -> Optional[Dict[str, Any]]:
"""
Sends an asynchronous HTTP PUT request to the specified API endpoint with the
provided data.
This method constructs an API path using the specified endpoint and sends a
PUT request including any provided keyword arguments in the JSON payload.
Additionally, it adds an authentication token to the data payload. If the
server responds with an HTTP status code indicating success, it parses and
returns the JSON response. Otherwise, it returns None.
Args:
endpoint (str): The API endpoint to which the PUT request is sent.
**data: Arbitrary keyword arguments representing additional data to be
included in the request payload.
Returns:
Optional[Dict[str, Any]]: The JSON response as a dictionary if the request
is successful, or None if the response indicates failure.
"""
data['token'] = self.token
async with aiohttp.ClientSession() as session:
async with session.put(self.make_api_path(endpoint), json=data) as r:
if r.ok:
json_ = await r.json()
return json_
else:
return None
[docs]
async def aio_post(self, endpoint: str, **data) -> Optional[Dict[str, Any]]:
"""
Sends an asynchronous POST request to a specified endpoint with JSON data.
This method is designed for making POST requests to a specified API endpoint,
including a token within the request payload. It uses an asynchronous HTTP session
to perform the operation and returns the parsed JSON response if the request is successful.
If the request fails, it returns None.
Args:
endpoint (str): The API endpoint to which the POST request is sent.
**data: Additional keyword arguments containing the JSON data to include in the
request body.
Returns:
Optional[Dict[str, Any]]: The parsed JSON response from the server if the
request is successful, or None if the request fails.
"""
data['token'] = self.token
async with aiohttp.ClientSession() as session:
async with session.post(self.make_api_path(endpoint), json=data) as r:
if r.ok:
return await r.json()
else:
return None
[docs]
async def aio_delete(self, endpoint: str) -> bool:
"""
Sends an asynchronous HTTP DELETE request to the specified API endpoint.
This method uses an asynchronous HTTP client session to send a DELETE request
to the given endpoint. It returns a boolean value indicating whether the
request was successful or not.
Args:
endpoint (str): The API endpoint to which the DELETE request will be sent.
Returns:
bool: True if the response status indicates success (HTTP 2xx), False
otherwise.
"""
async with aiohttp.ClientSession() as session:
async with session.delete(self.make_api_path(endpoint, with_token=True)) as r:
if r.ok:
return True
else:
return False
[docs]
def get_members(self) -> Dict[str, int]:
"""
Retrieves a mapping of all member names to their IDs
:return: A dictionary whose keys are names and values are member IDs
"""
# noinspection PyTypeChecker
all_members: List[Dict] = self.get('admin/members')
members = {m['name']: m['id'] for m in all_members if m['enabled'] and 'API' not in m['name']}
return members
#: The global :py:class:`~LabGuruAPI._base.Session` object that should always be used to interact with LabGuru.
SESSION = Session()
[docs]
class LabGuruItem(object):
"""
A base class for all objects retrieved from the LabGuru Database
"""
_api_name = ''
"""The API endpoint for the class"""
_attribute_dict: Dict[str, str] = {
'name' : 'name',
'description': 'description',
'owner_id' : 'owner_id',
'id' : 'id',
'uuid' : 'uuid',
'tags' : 'tags',
'links' : 'links'
}
"""A mapping of API JSON keys to class property names"""
_pending_args = None
# type_name = LGStr('')
class_name = LGStr('')
"""The programmatic class name for the LG API"""
class_display_name = LGStr('')
"""The human-readable class name for the LG API"""
uuid = LGStr('')
"""A unique identifier string of the object. Searchable."""
id = LGStr('')
"""The database ID of the object. Searchable."""
name = LGStr('')
"""The name/title of the object. Searchable."""
owner_id = LGInt()
"""The database ID of the object's owner. Searchable."""
description = LGStr('')
"""A description of the object. Searchable."""
api_url = LGStr('')
"""The object's REST API URL. Searchable."""
auto_name = LGStr('')
"""The object's SysID. Searchable."""
xlsx_collection = 'Generic'
"""The Dataset header/Plate upload name for the object class"""
def __init__(self, **kwargs):
self.other_properties = {}
self.owner = {}
self.tags = []
self.links = []
def __repr__(self):
return f'<{self.class_display_name} {self.id}: {self.name}>'
@property
def _obj(self):
return self
@property
def web_url(self):
"""The web-interface URL for the object"""
try:
url = self.other_properties.get('url', self.other_properties.get('content_url', None)) or self.api_url
full_url = f"http:////my.labguru.com/{url}".replace('api/v1/', '') if url else ''
return full_url.replace('//', '/')
except AttributeError:
return None
def __bool__(self):
return self._obj is not None
def __eq__(self, other):
if not type(self).isinstance(other):
return False
elif self.id == other.id:
return True
else:
return self.name == other.name
def __hash__(self):
return hash((type(self).__name__, self.id, self.name))
[docs]
@classmethod
def isinstance(cls, other: LGI) -> bool:
"""Check if an object is an instance of this class"""
if not isinstance(other, LabGuruItem):
return False
return cls.xlsx_collection == other.xlsx_collection
[docs]
@classmethod
def make_new(cls: Type[LGI], overwrite=False, **properties) -> LGI:
"""
Creates a new item without adding it to the database
:param overwrite: if true, will get an object by the same name if it exists and overwrite that object
:param properties: The initial properties of the object
:return: A new instance of the class
"""
cur_obj = None
if overwrite and 'name' in properties:
cur_obj = cls.from_name(properties['name'])
new_item: LGI = cur_obj or cls()
new_item.bulk_property_update(**properties)
return new_item
[docs]
@classmethod
def add_new(cls: Type[LGI], overwrite=False, **properties) -> LGI:
"""
Creates a new item AND adds it to the database
:param properties: The initial properties of the object
:return: A new instance of the class
"""
return SESSION.add(cls.make_new(overwrite=overwrite, **properties))
def bulk_property_update(self: LGI, **properties):
for k, v in properties.items():
if v:
setattr(self, self._attribute_dict.get(k, k), v)
[docs]
def make_new_copy(self: LGI) -> LGI:
"""Creates a copy of the object excluding the database ids and name"""
new_dict = self.to_dict(False)
for k in ['id', 'uuid', 'sys_id', 'auto_name', 'name']:
if k in new_dict:
del new_dict[k]
return type(self).parse_api_data(new_dict, SESSION)
[docs]
@classmethod
def get_attribute_dict(cls, capitalize=False):
"""Generates a full LG -> python mapping of all class-specific and inherited properties"""
attr_dict = {}
parent_class: LGI
for parent_class in cls.__bases__:
try:
attr_dict.update(parent_class.get_attribute_dict(capitalize))
except AttributeError:
pass
attr_dict.update(cls._attribute_dict)
if capitalize:
attr_dict.update({k.title(): v for k, v in cls._attribute_dict.items()})
return attr_dict
[docs]
@classmethod
def get_api_url(cls) -> str:
"""Returns the API url for the class"""
return f'https://my.labguru.com/api/v1/{cls._api_name}.json'
[docs]
@classmethod
def item_api_url(cls, item_id: int, omit_json=False):
"""
Generates an API URL for a specific item of this class.
:param item_id: the item's database ID
:param omit_json: if true, drops the '.json' from the end of the URL (Default: False)
:return: the URL string
"""
return f'https://my.labguru.com/api/v1/{cls._api_name}/{int(item_id):d}{"" if omit_json else ".json"}'
@classmethod
def _name_search_api_url(cls):
return f'https://my.labguru.com/api/v1/{cls._api_name}/find_by_name.json'
[docs]
@staticmethod
def process_api_url(url: str, omit_json=False):
"""Coerces an API url into the correct format"""
url = url.replace('https://my.labguru.com/api/v1', '')
url = url.replace('/api/v1', '')
url = url.replace('.json', '')
if any(x in url for x in ['plasmid', 'primer', 'compound']):
url = url.replace('/biocollections', '')
if 'materials' in url:
url = url.replace('/catalog', '')
return f'https://my.labguru.com/api/v1{url}{"" if omit_json else ".json"}'
[docs]
@classmethod
def parse_api_data(cls: Type[LGI], json_data: Dict[str, Any], session: "Session" = SESSION,
include_custom=False) -> LGI:
"""
Generates a python object from the LabGuru API's JSON response. This is a low-level function that should
not be used routinely.
:param json_data: the JSON dictionary read from the API
:param session: the active LG session. Please leave as the default value.
:param include_custom: add the LG "custom###" fields to LGI.other_properties. Default: false
:return: the python version of the LG object
"""
new_item: LGI = cls.make_new()
att_dict = cls.get_attribute_dict()
if 'parents' in json_data:
json_data.update(json_data['parents'] or {})
del json_data['parents']
for k, v in json_data.items():
att_name = att_dict.get(k, k)
if hasattr(new_item, att_name):
if att_name == 'concentration' and isinstance(v, str):
v = ''.join(c for c in v if c in '0123456789,.')
try:
if att_name == 'barcode' and 'box' in json_data['api_url']:
continue
except:
pass
setattr(new_item, att_name, v)
if 'parent' in k.lower() and isinstance(v, list) and len(v) > 0 and 'parent_uuid' in v[0]:
setattr(new_item, f"_{att_name}_api_uuid", v[0]['parent_uuid'])
setattr(new_item, att_name, v[0])
elif ('custom' not in k) or include_custom:
new_item.other_properties[k] = v
return new_item
[docs]
@classmethod
def from_api(cls: Type[LGI], session: "Session" = SESSION, item_id: int = None, name: str = None, uuid: str = None,
api_url: str = None, auto_name: str = None, include_custom=False) -> Optional[LGI]:
"""Low-level function. Please use ``LabGuruItem.from_LG`` instead."""
data = {}
params = {'token': session.token, 'exclude_fields': 'links,linked_resources'}
if item_id:
r = requests.get(cls.item_api_url(item_id), params=params)
if r.status_code == 404:
return None
data.update(r.json())
elif name:
params.update({'name': name})
r = requests.get(cls.get_api_url(), params=params)
if not r.ok:
return None
try:
for cur_result in r.json():
if cur_result['name'] == str(name):
data.update(cur_result)
break
else:
return None
except IndexError:
return None
elif auto_name:
params.update(Session.make_filter('auto_name', 'eq', auto_name))
r = requests.get(cls.get_api_url(), json=params)
try:
data.update(r.json()[0])
except IndexError:
return None
elif uuid:
params.update({'uuid': uuid})
r = requests.get(cls.get_api_url(), params=params)
try:
data.update(r.json()[0])
except IndexError:
return None
elif api_url:
r = requests.get(cls.process_api_url(api_url), params=params)
data.update(r.json())
return cls.parse_api_data(data, session)
else:
return None
return cls.parse_api_data(data, session, include_custom)
[docs]
@classmethod
def from_LG(cls: Type[LGI], item_id: int = None, name: str = None, uuid: str = None, api_url: str = None,
auto_name: str = None, include_custom=False, from_cache=True, proxy=True) -> LGI:
"""
Retrieves a ``LabGuruItem`` from the API. Only 1 of the following parameters will be queried, in the following
order: item_id, name, uuid, auto_name.
Note: This should only be used if ``LabGuruItem.from_name`` or ``LabGuruItem.from_id`` is insufficient.
:param item_id: The database id of the desired item
:param name: The name of the desired item
:param uuid: The uuid of the desired item
:param api_url: The API URL of the desired item
:param auto_name: The SysID of the desired item
:param include_custom: If true, include the ~200 "custom###" fields returned by the api. Default: False
:param from_cache: If false, forces an API call even if the object is cached. Default: True
:param proxy: If true, returns a proxy object with lazy API calls. Default: True
:return: The requested ``LabGuruItem``
"""
return SESSION.get_object(cls, item_id, name, uuid, api_url, auto_name, include_custom, from_cache, proxy)
[docs]
@classmethod
async def async_from_LG(cls: Type[LGI], item_id: int = None, name: str = None, uuid: str = None,
api_url: str = None, auto_name: str = None, include_custom=False, from_cache=True) -> LGI:
"""
Retrieves a ``LabGuruItem`` from the API. Only 1 of the following parameters will be queried, in the following
order: item_id, name, uuid, auto_name.
Note: This should only be used if ``LabGuruItem.async_from_name`` or ``LabGuruItem.async_from_id`` is
insufficient.
:param item_id: The database id of the desired item
:param name: The name of the desired item
:param uuid: The uuid of the desired item
:param api_url: The API URL of the desired item
:param auto_name: The SysID of the desired item
:param include_custom: If true, include the ~200 "custom###" fields returned by the api. Default: False
:param from_cache: If false, forces an API call even if the object is cached. Default: True
:return: The requested ``LabGuruItem``
"""
return await SESSION.aio_get_object(cls, item_id, name, uuid, api_url, auto_name, include_custom, from_cache)
[docs]
@classmethod
def from_name(cls: Type[LGI], name: str) -> LGI:
"""
Retrieves a ``LabGuruItem`` from the API via its name.
Examples:
Get plasmid ``pGRO-S0081``::
p = Plasmid.from_name('pGRO-S0081')
:param name: The exact name of the desired item
:return: The requested ``LabGuruItem``
"""
return cls.from_LG(name=name)
[docs]
@classmethod
async def async_from_name(cls: Type[LGI], name: str) -> LGI:
"""
Retrieves a ``LabGuruItem`` from the API via its name.
Examples:
Get plasmid ``pGRO-S0081``::
p_future = Plasmid.async_from_name('pGRO-S0081')
:param name: The exact name of the desired item
:return: The requested ``LabGuruItem``
"""
return await cls.async_from_LG(name=name)
[docs]
@classmethod
def from_names(cls: Type[LGI], names: List[str], verbose: bool = True) -> List[LGI]:
"""
Retrieves a list of ``LabGuruItem`` objects from the API. This is done in parallel, so it will be faster
than individual ``LGI.from_name()`` calls.
Examples:
Retrieve all the plasmids listed in a DataFrame column::
p_names = dframe['Plasmids'].unique()
plasmids = Plasmid.from_names(p_names)
:param names: a list of object names
:param verbose: if true, a progress bar will be written to stdio while gathering the items
:return: the requested list of items
"""
get_futures = [cls.async_from_name(n) for n in names]
return SESSION.execute_async(get_futures, verbose=verbose)
[docs]
@classmethod
def from_id(cls: Type[LGI], item_id: Union[int, str]) -> LGI:
"""
Retrieves a ``LabGuruItem`` from the API via its ID number.
Examples:
Get plasmid ``pGRO-S0081``::
p = Plasmid.from_id(294)
:param item_id: The database id of the desired item
:return: The requested ``LabGuruItem``
"""
return cls.from_LG(item_id=int(item_id))
[docs]
@classmethod
async def async_from_id(cls: Type[LGI], item_id: Union[int, str]) -> LGI:
"""
Retrieves a ``LabGuruItem`` from the API via its ID Number.
Examples:
Get plasmid ``pGRO-S0081``::
p_future = Plasmid.async_from_id(294)
:param item_id: The database id of the desired item
:return: The requested ``LabGuruItem``
"""
return await cls.async_from_LG(item_id=int(item_id))
[docs]
@classmethod
def from_ids(cls: Type[LGI], ids: List[Union[int, str]], verbose: bool = True) -> List[LGI]:
"""
Retrieves a list of ``LabGuruItem`` objects from the API. This is done in parallel, so it will be faster
than individual ``LGI.from_name()`` calls.
Examples:
Retrieve all the plasmids listed in a DataFrame column:
p_ids = dframe['Plasmid_ids'].unique()
plasmids = Plasmid.from_ids(p_ids)
:param ids: a list of database ids
:param verbose: if true, a progress bar will be written to stdio while gathering the items
:return: the requested list of items
"""
get_futures = [cls.async_from_id(i) for i in ids]
return SESSION.execute_async(get_futures, verbose=verbose)
[docs]
@classmethod
@deprecated(details='Please use LabGuruItem.find_all or LabGuruItem.find_one')
def search_api(cls: Type[LGI], session: "Session", query_data: Dict[str, Any]) -> List[LGI]:
params = {'token': session.token}
params.update(query_data)
if 'filter' in params and not isinstance(params['filter'], str):
r = requests.get(cls.get_api_url(), json=params)
else:
r = requests.get(cls.get_api_url(), params=params)
if r.ok:
data = r.json()
out_list = []
for d in data:
i = cls.parse_api_data(d, session)
out_list.append(i)
return out_list
# return [cls.parse_api_data(d, session) for d in data]
else:
raise ValueError(f'Problem searching api with parameters {params.items()}')
[docs]
@classmethod
@deprecated(details='Please use LabGuruItem.async_find_all or LabGuruItem.async_find_one')
async def aio_search_api(cls: Type[LGI], session: Session, query_data: Dict[str, Any], cur_page=1) -> List[LGI]:
result = await session.aio_get(cls.get_api_url(), meta=True, page=cur_page, **query_data)
data = result['data']
out_list = [cls.parse_api_data(d, session) for d in data]
if cur_page == 1 and result['meta']['page_count'] > 1:
i = 2
search_futures = []
while i <= result['meta']['page_count']:
search_futures.append(cls.aio_search_api(session, query_data, cur_page=i))
i += 1
remaining_data = await asyncio.gather(*search_futures)
for new_data in remaining_data:
out_list.extend(new_data)
return out_list
[docs]
def to_dict(self, include_other_properties=True, include_links=True, include_id=True) -> Dict[str, Any]:
"""
Generates a mapping of LG property fields to values for the object. Generally used for add/update calls.
:param include_other_properties: adds fields contained in ``other_properties`` to the dictionary.
:param include_links: will include the links and tags fields in the output dictionary
:param include_id: will include the item's ID it the output dictionary
:return: a dictionary mapping of LG property fields to values
"""
item_data = {}
attribute_items = self.get_attribute_dict()
for lg_param, py_attr in attribute_items.items():
attr_val = getattr(self, py_attr)
if not include_links and attr_val in ['links', 'tags']:
continue
if 'parent' in lg_param.lower() and isinstance(attr_val, LabGuruItem):
# if getattr(self, f"_{py_attr}_api_uuid", '') == attr_val.uuid:
# continue
if not attr_val.uuid:
continue
if 'parents_uuid' not in item_data:
item_data['parents_uuid'] = []
item_data['parents_uuid'].append(attr_val.uuid)
else:
item_data[lg_param] = attr_val
if include_other_properties:
item_data.update(self.other_properties)
if not include_id:
del item_data['id']
return item_data
[docs]
def update_api(self: LGI, session: "Session", retries=0) -> LGI:
"""Low-level function, please do not use. ``LabGuruItem.lg_sync`` is likely the method you want."""
# Build item data
data = {'token': session.token, 'item': self.to_dict(include_other_properties=False, include_links=False, include_id=False)}
# upload to LG
if self.id:
r = requests.put(self.item_api_url(self.id), json=data)
else:
r = requests.post(self.get_api_url(), json=data)
if not r.ok:
if retries < 10:
return self.update_api(session, retries + 1)
else:
if 'please give a different name' in str(r.content):
raise LGDuplicateNameError(f"Name {self.name} already exists.")
raise ValueError(f'Could not update api with {repr(self)}\n{repr(r)}\n{r.content}')
json_data = r.json()
return self.parse_api_data(json_data, session)
[docs]
def lg_sync(self: LGI) -> LGI:
"""
Adds or updates the item in the LG database.
"""
synced_self = SESSION.update(self) if self.id else SESSION.add(self)
self.__dict__.update(synced_self.__dict__)
self.class_display_name = synced_self.class_display_name
self.class_name = synced_self.class_name
return self
[docs]
async def async_lg_sync(self: LGI) -> LGI:
"""
Adds or updates the item in the LG database. Be careful not to queue multiple syncs per item as the final
object may not reflect all updates or may be added multiple times.
"""
synced_self = await (SESSION.aio_update(self) if self.id else SESSION.aio_add(self))
self.bulk_property_update(**synced_self.to_dict())
return self
async def deduplicate_links(self):
target_uuids = []
json_data = await SESSION.aio_get('links', item_uuid=self.uuid)
json_data += await SESSION.aio_get(self.item_api_url(self.id, True) + '/get_parents')
for cur_json in json_data:
if cur_json['target_uuid'] in target_uuids:
await SESSION.aio_delete(f"links/{cur_json['id']}")
else:
target_uuids.append(cur_json['target_uuid'])
[docs]
def find_attachments(self) -> List["Attachment"]:
"""Returns a list of ``Attachment`` objects associated with the item."""
qd = {"filter": f'{{"attachable_id": "{self.id}", "attachable_type": "{self.class_name}"}}'}
attachments_json = SESSION.get(Attachment.get_api_url(), **qd)
return [Attachment.parse_api_data(d) for d in attachments_json]
[docs]
@classmethod
def find_next_id(cls, prefix: str) -> int:
"""
Searches the database for names beginning with a specified prefix, determines the associated index and gives the
next number
:param prefix: The prefix of the indexed samples (ex: "GBFP-0927-")
:return: the integer index following the last entered sample
"""
qd = SESSION.make_filter('name', 'startswith', prefix, 1)
qd['meta'] = True
resp_json = SESSION.get(cls._api_name, **qd)
qd['page'] = resp_json['meta']['page_count']
resp_json = SESSION.get(cls._api_name, **qd)
final_data = resp_json['data']
if len(final_data) == 0:
return 1
else:
final_name: str = final_data[0]['name']
final_id = int(final_name.replace(prefix, ''))
return final_id + 1
[docs]
@classmethod
def make_id_iterator(cls, prefix: str) -> Iterator[int]:
"""
Searches the database for names beginning with a specified prefix, determines the associated index, and yields
subsequent indexes on each iteration
:param prefix: The prefix of the indexed samples (ex: "GBFP-0927-")
:returns: sequential integers beginning with the first unused index number
"""
base_id = cls.find_next_id(prefix)
while True:
yield base_id
base_id += 1
[docs]
@classmethod
def iter_next_names(cls, prefix: str, zeros: int = 4) -> Iterator[str]:
"""
Searches the database for names beginning with a specified prefix, determines the associated index, and yields
subsequent names on each iteration
Example
-------
You need to add temporary strains for experiment 6294. Currently, LG has 27 strains for this experiment, so the
largest temporary strain name is "GBFS-6294-0027". ::
name_iter = Strain.iter_next_names("GBFS-6294-", 4)
for _ in range(3):
print(next(name_iter))
# Results
# GBFS-6294-0028
# GBFS-6294-0029
# GBFS-6294-0030
:param prefix: The prefix of the indexed samples (ex: "GBFP-0927-")
:param zeros: The number of digits used by the index. Default: 4
:returns: sequential formatted names beginning with the first unused index number
"""
fs = f'{prefix}{{0:0{zeros}d}}'
for next_id in cls.make_id_iterator(prefix):
yield fs.format(next_id)
[docs]
def link_to(self, link_target: LGI) -> bool:
"""
Links the current item to the target item in LabGuru
:param link_target: the LabGuruItem to link
:return: True if the link is successful
"""
return SESSION.link_objects(self, link_target)
[docs]
async def aio_link_to(self, link_target: LGI) -> bool:
"""
Links the current item to the target item in LabGuru
:param link_target: the LabGuruItem to link
:return: True if the link is successful
"""
return await SESSION.aio_link_objects(self, link_target)
[docs]
def get_linked_items(self, of_type: Type[LGI]) -> List[LGI]:
"""
Queries LG for items of a particular class that are linked to the current item.
:param of_type: the class of LabGuruItem you would like returned
:return: a list of linked LabGuruItem objects
"""
json_data = SESSION.get('links', item_uuid=self.uuid)
correct_ids = []
for cur_item in json_data:
if cur_item['target']['type_display'] == of_type.xlsx_collection:
correct_ids.append(cur_item['target_id'])
return [of_type.from_id(i) for i in correct_ids]
[docs]
async def async_get_linked_items(self, of_type: Type[LGI]) -> List[LGI]:
"""
Queries LG for items of a particular class that are linked to the current item.
:param of_type: the class of LabGuruItem you would like returned
:return: a list of linked LabGuruItem objects
"""
json_data = await SESSION.aio_get('links', item_uuid=self.uuid)
correct_ids = []
for cur_item in json_data:
if cur_item['target']['type_display'] == of_type.xlsx_collection:
correct_ids.append(cur_item['target_id'])
return [await of_type.async_from_id(i) for i in correct_ids]
[docs]
def get_derived_items(self, of_type: Type[LGI]) -> List[LGI]:
"""
Queries LG for items of a particular class that are derived from the current item
:param of_type: the class of LabGuruItem you would like returned
:return: a list of derived LabGuruItem objects
"""
json_data = SESSION.get(f"{self._api_name}/{self.id}/get_derived_items",
derived_collection_name=of_type._api_name.split('/')[-1])
return list(map(lambda x: SESSION.get_object(of_type, item_id=x['id']), json_data['derived_collection_items']))
[docs]
@classmethod
async def async_count_all(cls: Type[LGI], *search_operators: LGSearchOperator) -> int:
"""
AsyncIO-compatible version of LabGuruItem.count_all()
:param search_operators: Comparison operators that will be used as filters in the LabGuru query
:return: A list of LabGuruItem objects that match all the provided filters
"""
all_filters: LGSearchAPI = sum(search_operators)
results = await SESSION.aio_get(cls.get_api_url(), meta=True, **all_filters.make_filter(1))
return results['meta']['item_count']
[docs]
@classmethod
async def async_find_all(cls: Type[LGI], *search_operators: LGSearchOperator) -> List[LGI]:
"""
AsyncIO-compatible version of LabGuruItem.find_all()
:param search_operators: Comparison operators that will be used as filters in the LabGuru query
:return: A list of LabGuruItem objects that match all the provided filters
"""
all_filters: LGSearchAPI = sum(search_operators)
results = await SESSION.aio_get(cls.get_api_url(), meta=True, **all_filters.make_filter())
res_objects = [cls.parse_api_data(d) for d in results['data']]
try:
page_count = results['meta']['page_count']
futures = []
if page_count > 1:
for i in range(2, page_count + 1):
futures.append(SESSION.aio_get(cls.get_api_url(), meta=False, page=i, **all_filters.make_filter()))
result_pages = await tqdm_aio.gather(*futures)
for rp in result_pages:
res_objects.extend(cls.parse_api_data(d) for d in rp)
except KeyError:
pass
final_results = all_filters.continue_filtering(res_objects)
_ = [SESSION._add_to_cache(i) for i in final_results]
return final_results
[docs]
@classmethod
def find_all(cls: Type[LGI], *search_operators: LGSearchOperator) -> List[LGI]:
"""
Searches the LabGuru API for all objects that match the search operators. All properties that can be used
in searches will be called out directly in their own documentation.
Args:
*search_operators: Comparison operators that will be used as filters in the LabGuru query
Returns:
A list of LabGuruItem objects that match all the provided filters
Examples:
Finding all pGRO plasmids that were generated as part of experiment 818::
results = Plasmid.find_all(Plasmid.name.contains('pGRO'), Plasmid.clone_no.contains('GBFP-0818-'))
Finding all strains that express Uox25 variants::
results = Strain.find_all(Strain.description.contains('Uox25'))
Finding all overnight culture plates from experiment 927::
results = Plate.find_all(Plate.name.starts_with('0927-ONC'))
"""
return asyncio.run(cls.async_find_all(*search_operators))
[docs]
@classmethod
async def async_find_one(cls: Type[LGI], *search_operators: LGSearchOperator) -> Optional[LGI]:
"""
AsyncIO-compatible version of LabGuruItem.find_one()
:param search_operators: Comparison operators that will be used as filters in the LabGuru query
:return: The first LabGuruItem object that matches all the provided filters
"""
all_results = await cls.async_find_all(*search_operators)
return all_results[0] if len(all_results) > 0 else None
[docs]
@classmethod
def find_one(cls: Type[LGI], *search_operators: LGSearchOperator) -> Optional[LGI]:
"""
Searches the LabGuru API for all objects that match the search operators and returns the first match. All
properties that can be used in searches will be called out directly in their own documentation.
Args:
*search_operators: Comparison operators that will be used as filters in the LabGuru query
Returns:
The first LabGuruItem object that match all the provided filters
Examples:
Find the largest biomass pellet from GRO20-1895 that was grown in TB::
s = Strain.from_name('GRO20-1895')
result = BiomassPellet.find_one(BiomassPellet.parent_strain == s, BiomassPellet.pellet_weight.desc())
"""
all_results = cls.find_all(*search_operators)
return all_results[0] if len(all_results) > 0 else None
[docs]
class LGIError(LabGuruItem):
def __init__(self, exception_info: Tuple[Type[BaseException], BaseException, TracebackType],
errored_item_dict: Dict, **kwargs):
super().__init__(**kwargs)
self.exc_type, self.exc_value, self.exc_traceback = exception_info
self.traceback_lines = traceback.format_exception(*exception_info)
self.errored_item_dict = errored_item_dict
@property
def traceback(self):
return ''.join(self.traceback_lines)
@property
def info_dict(self):
out_dict = self.errored_item_dict.copy()
# out_dict['ErrorType'] = self.exc_type.__name__
out_dict['Error'] = str(self.exc_value)
out_dict['traceback'] = self.traceback
return out_dict
def __bool__(self):
return False
def __str__(self):
return f"<LGIError: {self.traceback}>"
[docs]
def to_dict(self, **kwargs) -> Dict[str, Any]:
return dict()
[docs]
def update_api(self: LGI, session: "Session", retries=0) -> LGI:
return self
[docs]
def get_linked_items(self, of_type: Type[LGI]) -> List[LGI]:
return list()
[docs]
class Attachment(LabGuruItem, SearchInterface):
_api_name = 'attachments'
_attribute_dict = {
'attachable': 'attachable'
}
file = LGAttachmentPath()
"""An interactable attachment file"""
attachable = LGDict()
"""A dict describing the collection/inventory/ELN item that this file is attached to"""
def __init__(self, attach_to: LGI = None):
super().__init__()
self.attachable = {}
if attach_to is not None:
self.attach_to(attach_to)
[docs]
def get_download_url(self, session: "Session" = SESSION):
"""Gets the LG URL used to download the attachment file"""
return self.process_api_url(f'/attachments/{self.id}/download?token={session.token}', True)
[docs]
def download(self, session: "Session" = SESSION):
"""Downloads the file to a temporary folder"""
setattr(self, 'file', self.get_download_url(session))
[docs]
def make_file(self, filename: str) -> Path:
"""
Create a new temporary file with a specific name.
:param filename: the desired file name
:return: a Path to the new file
"""
self.file = Path(NamedTemporaryFile().name).parent / filename
return self.file
[docs]
def attach_to(self, item: LGI):
"""Sets the attachable dictionary to point to a given LabGuruItem"""
self.attachable = {
"id" : item.id,
"class": item.class_name,
"uuid" : item.uuid
}
@property
def attachable_id(self):
return self.attachable.get('id', '')
@attachable_id.setter
def attachable_id(self, value):
self.attachable['id'] = value
attachable_id = make_lg_searchable(attachable_id, 'attachable_id')
"""The database ID of the attached item. Searchable."""
@property
def attachable_class(self):
return self.attachable.get('class', '')
@attachable_class.setter
def attachable_class(self, value):
self.attachable['class'] = value
attachable_class = make_lg_searchable(attachable_class, 'attachable_class')
"""The programatic class name of the attached item. Searchable."""
@property
def attachable_uuid(self):
return self.attachable.get('uuid', '')
@attachable_uuid.setter
def attachable_uuid(self, value):
self.attachable['uuid'] = value
attachable_uuid = make_lg_searchable(attachable_uuid, 'attachable_uuid')
"""The database UUID of the attached item. Searchable."""
[docs]
def update_api(self: "Attachment", session: "Session", **kwargs) -> LGI:
form_data = {"token": session.token}
if "uuid" in self.attachable:
form_data["item[attach_to_uuid]"] = self.attachable['uuid']
r = requests.post(self.get_api_url(),
files={"item[attachment]": self.file.open('rb')},
data=form_data)
if r.ok:
r_data = r.json()
return self.parse_api_data(r_data, session)
else:
return self
[docs]
class Units(Mapping[str, int]):
"""
A specialized mapping class for managing and converting scientific units.
This class provides an interface for accessing scientific units, their
corresponding identifiers, and the ability to convert measurements
among equivalent units. The class dynamically fetches unit data from an
external API when accessed. It supports iteration, length retrieval,
and accessing specific unit information via dictionary-like syntax.
Attributes:
unit_ids (Dict[str, int]): A mapping of unit names to their respective IDs.
ids_to_units (Dict[int, str]): A reverse mapping of unit IDs to their names.
unit_types (Dict[int, str]): A mapping of unit IDs to their types/categories.
"""
def __init__(self):
self.unit_ids: Dict[str, int] = {}
self.ids_to_units: Dict[int, str] = {}
self.unit_types: Dict[int, str] = {}
def _get_units(self):
if len(self.unit_ids) == 0:
r = requests.get('https://my.labguru.com/api/v1/units.json?token=' + SESSION.token)
self.unit_ids: Dict[str, int] = {r['name']: r['id'] for r in r.json()}
self.ids_to_units: Dict[int, str] = {r['id']: r['name'] for r in r.json()}
self.unit_types: Dict[int, str] = {r['id']: r['type_for'] for r in r.json()}
def __getitem__(self, k: str) -> int:
self._get_units()
if k:
return self.unit_ids[k.replace('μ', chr(181))]
else:
return 0
def __len__(self) -> int:
# self._get_units()
return len(self.unit_ids)
def __iter__(self) -> Iterator[Tuple[str, int]]:
self._get_units()
return iter(self.unit_ids.items())
@property
def ng_ul(self) -> int:
"""Shortcut for the ng/μL unit id"""
return self['ng/µL']
@property
def molar(self) -> int:
"""Shortcut for the molar unit id"""
return self['M']
@property
def millimolar(self):
"""Shortcut for the millimolar unit id"""
return self['mM']
@property
def micromolar(self):
"""Shortcut for the micromolar unit id"""
return self['µM']
@property
def nanomolar(self):
"""Shortcut for the nanomolar unit id"""
return self['nM']
[docs]
def convert(self, value: float, from_id: int, to_id: int):
"""
Converts a value between units of the same type.
Example
-------
Converting between millimolar & nanomolar::
um = UNITS.convert(1, UNITS.millimolar, UNITS.micromolar)
assert um == 1_000 # True
:param value: the value of the initial unit
:param from_id: the unit ID of the initial value
:param to_id: the unit ID for the desired value
:return: the converted unit magnitude
"""
self._get_units()
from_unit = self.ids_to_units[from_id].replace(chr(181), 'u')
to_unit = self.ids_to_units[to_id].replace(chr(181), 'u')
if self.unit_types[from_id] != self.unit_types[to_id]:
raise ValueError('Cannot convert between different unit types')
elif '/' in from_unit + to_unit:
raise ValueError('Cannot convert with molalities (yet)')
elif from_unit[-1:] != to_unit[-1:]:
raise ValueError('Cannot convert between different SI units')
else:
si_prefixes = dict(M=6, K=3, m=-3, u=-6, n=-9, p=-12, f=-15)
from_exponent = 0 if len(from_unit) == 1 else si_prefixes[from_unit[0]]
to_exponent = 0 if len(to_unit) == 1 else si_prefixes[to_unit[0]]
return value * 10 ** (from_exponent - to_exponent)
UNITS = Units() #: Importable object used to interact with units/unit IDs
[docs]
class HasWells(Sized):
"""Represents a structure composed of wells arranged in a grid-like positions.
This class provides utilities to work with well grids, allowing the mapping
between different well naming conventions and positional systems. It includes
methods for converting between well names (e.g., `A1`, `A01`), linear grid
positions, Tecan positions, as well as to iterate across all valid wells or
positions. It also supports operations to handle stocks associated with the
wells, which can be abstractly implemented in subclasses.
Attributes:
rows (int): Number of rows in the grid.
cols (int): Number of columns in the grid.
"""
rows = LGInt(8)
cols = LGInt(12)
def __init__(self, rows=8, cols=12):
super().__init__()
self.rows = rows
self.cols = cols
def __len__(self) -> int:
return self.rows * self.cols
[docs]
def well_name_to_position(self, name: str) -> int:
"""Convert a well name (A1) to its correspoinding LG position value"""
row_num = ord(name[0].upper()) - 65
col_num = int(name[1:])
return (row_num * self.cols) + col_num
[docs]
def position_to_well_name(self, position: int, short=False) -> str:
"""Convert an LG position value to a well name: (A1 if short=True, A01 if short=False)"""
row_num = (position // self.cols) + 1
col_num = position % self.cols
if col_num == 0:
col_num = self.cols
row_num -= 1
if not short:
if self.cols >= 10:
return f'{chr(row_num + 64)}{col_num:02d}'
return f'{chr(row_num + 64)}{col_num:d}'
[docs]
def position_to_tecan_well(self, position: int) -> int:
"""Convert an LG position value (rowwise indexes) to a Tecan position value (columnwise indexes)"""
for p, i in zip(self.iter_positions(), range(1, len(self) + 1)):
if p == position:
return i
[docs]
def well_name_to_tecan_well(self, well_name: str) -> int:
"""Convert a well name (A1) to its corresponding Tecan position value"""
return self.position_to_tecan_well(self.well_name_to_position(well_name))
[docs]
def tecan_well_to_position(self, tecan_well: int) -> int:
"""Convert a Tecan position value (columnwise indexes) to an LG position value (rowwise indexes)"""
for p, i in zip(self.iter_positions(), range(1, len(self) + 1)):
if i == tecan_well:
return p
[docs]
def tecan_well_to_name(self, tecan_well: int, short: bool = False) -> str:
"""Convert a Tecan position value to a well name: (A1 if short=True, A01 if short=False)"""
return self.position_to_well_name(self.tecan_well_to_position(tecan_well), short)
def _force_short_name(self, name: str) -> str:
return self.position_to_well_name(self.well_name_to_position(name), short=True)
def _force_long_name(self, name: str) -> str:
return self.position_to_well_name(self.well_name_to_position(name))
[docs]
def iter_positions(self, rowwise=False) -> Iterable[int]:
"""Iterate through all available LG position values"""
if rowwise:
for i in range(self.rows * self.cols):
yield i + 1
else:
for i in range(self.cols):
for j in range(self.rows):
yield (i + 1) + (j * self.cols)
[docs]
def iter_well_names(self, rowwise=False, short=False) -> Iterable[str]:
"""Iterate through all available well names"""
for pos in self.iter_positions(rowwise):
yield self.position_to_well_name(pos, short)
[docs]
def iter_tecan_positions(self, rowwise=False) -> Iterable[int]:
"""Iterate through all available Tecan position values"""
for pos in self.iter_positions(rowwise):
yield self.position_to_tecan_well(pos)
[docs]
@abstractmethod
def stocks_from_position(self, position: int, **kwargs) -> List["Stock"]:
"""Get a list of all stocks at a specified position"""
pass
[docs]
def stocks_from_well(self, well_name: str, **kwargs) -> List["Stock"]:
"""Get a list of all stocks in a specified well"""
return self.stocks_from_position(self.well_name_to_position(well_name), **kwargs)
[docs]
def stocks_from_tecan_position(self, tecan_position: int, **kwargs) -> List["Stock"]:
"""Get a list of all stocks in a specified tecan position"""
return self.stocks_from_position(self.tecan_well_to_position(tecan_position), **kwargs)
[docs]
@abstractmethod
def copy_stock_to_position(self, stock: "Stock", position: int) -> "Stock":
"""Make a copy of a stock and add it to a specified position"""
pass
[docs]
def copy_stock_to_well(self, stock: "Stock", well_name: str) -> "Stock":
"""Make a copy of a stock and add it to a specified well"""
return self.copy_stock_to_position(stock, self.well_name_to_position(well_name))
[docs]
def copy_stock_to_tecan_position(self, stock: "Stock", tecan_position: int) -> "Stock":
"""Make a copy of a stock and add it to a specified well"""
return self.copy_stock_to_position(stock, self.tecan_well_to_position(tecan_position))
[docs]
@abstractmethod
def update_stock_at_position(self, stock: "Stock", position: int) -> "Stock":
"""Replace a stock at a specified position with a new one"""
pass
[docs]
def update_stock_at_well(self, stock: "Stock", well_name: str) -> "Stock":
"""Replace a stock in a specified well with a new one"""
return self.update_stock_at_position(stock, self.well_name_to_position(well_name))
if __name__ == '__main__':
from LabGuruAPI import SESSION
SESSION.login()