Source code for LabGuruAPI._datasets

from time import sleep
from typing import Optional, Type, Dict, List, Any

from tqdm import tqdm

from LabGuruAPI._base import LabGuruItem, SESSION, LGI, Session, Attachment, LGStrList
from tqdm.asyncio import tqdm_asyncio
import asyncio
import pandas as pd


[docs] class Dataset(LabGuruItem): _api_name = 'datasets' class_name = "LgData::Dataset" class_display_name = "Datasets" xlsx_collection = "Datasets" _attribute_dict = { 'vectors': '_vectors', 'header_row': 'headers', } _data: Optional[pd.DataFrame] = None _original_data: Optional[pd.DataFrame] = None _updated_data: Optional[pd.DataFrame] = None _new_data: Optional[pd.DataFrame] = None _deleted_ids: Optional[pd.Series] = None headers: List[str] = LGStrList() """A list of column headers used by the dataset""" headers.base_type = str
[docs] def update_api(self, session: Session, **kwargs) -> LGI: if self.id: # Update data if self._updated_data is not None and len(self._updated_data) > 0: futures = [] for cur_row in self._updated_data.fillna('').to_dict('records'): cur_id = str(cur_row.pop('id')) futures.append(session.aio_put(f"vectors/{cur_id}", vector_data=cur_row, dataset_id=self.id)) asyncio.run(tqdm_asyncio.gather(*futures, desc='Updating existing rows')) if self._new_data is not None and len(self._new_data) > 0: for cur_row in tqdm(self._new_data.fillna('').to_dict('records'), desc='Adding new rows'): del cur_row['id'] session.post(f"vectors", vector_data=cur_row, dataset_id=self.id) if self._deleted_ids is not None: futures = [] for cur_row in self._deleted_ids.values: cur_id = str(cur_row) futures.append(session.aio_delete(f"vectors/{cur_id}")) asyncio.run(tqdm_asyncio.gather(*futures, desc='Deleting removed rows')) return session.refresh(self) else: # Add to LG pbar = tqdm(desc='Uploading Data to LG', total=len(self._data)) # Create an attachment att = Attachment() out_path = att.make_file(f"{self.name}.xlsx") self._data.drop(columns=['id']).to_excel(out_path, index=False) att = session.add(att) pbar.set_description('Creating new Dataset') create_data = { 'name': self.name, 'description': self.description, 'data_attachment_id': att.id } new_obj_json = session.post('datasets', item=create_data) self.id = new_obj_json['dataset_id'] temp_ds = session.refresh(self) SESSION.set_config_value('DATASET_UPLOAD_LEN', self.id, str(len(self._data))) pbar.close() return temp_ds
@property def _vectors(self): return None @_vectors.setter def _vectors(self, value: List[Dict[str, Dict[str, Any]]]): vector_data = [] for cur_vec in value: new_data = {'id': cur_vec['vector']['id']} new_data.update(cur_vec['vector']['data']) vector_data.append(new_data) self._data = pd.DataFrame(vector_data) self._original_data = self._data.copy()
[docs] def get_data(self) -> pd.DataFrame: """Retrieve a copy of the current data""" if self.id and len(self.headers) == 0: try: total = int(SESSION.get_config_value('DATASET_UPLOAD_LEN', self.id)) except KeyError: total = None pbar = tqdm(desc='LG data upload is not yet complete', total=total) while True: temp_ds: Dataset = SESSION.refresh(self) if temp_ds._data is not None and len(temp_ds.headers) > 0: pbar.close() self._original_data = temp_ds.get_data() self._data = temp_ds.get_data() self.headers = temp_ds.headers return self._data else: pbar.set_description('Waiting for LG to finish creating dataset') if temp_ds is not None: pbar.update(len(temp_ds._data) - pbar.n if total else 1) sleep(1) return self._data.copy() if self._data is not None else None
[docs] def set_data(self, new_data: pd.DataFrame, merge_on: List[str] = None): """ Replace the current data with new data :param new_data: The updated dataframe :param merge_on: When updating a dataset, the columns used to determine "old" vs "new" data """ merge_on = merge_on or [] if self._original_data is None: self._data = new_data.copy() self._data['id'] = None self.headers = [h for h in new_data.columns.values] return try: self._data = new_data[['id'] + self.headers].copy() except KeyError: new_data = new_data.merge(self._data[['id'] + merge_on], on=merge_on) self._data = new_data[['id'] + self.headers].copy() self._new_data = self._data[self._data['id'].isnull()] existing_vectors = self._data[self._data['id'] > 0] if len(existing_vectors) == len(self._original_data): compare = self._original_data.set_index('id').compare(existing_vectors.set_index('id')) self._updated_data = self._data.set_index('id').loc[compare.index] self._deleted_ids = None else: matching_original_data = self._data[['id']].merge(self._original_data, on='id') compare = matching_original_data.set_index('id').compare(existing_vectors.set_index('id')) self._updated_data = self._data.set_index('id').loc[compare.index] self._deleted_ids = self._original_data.set_index('id').index.difference(self._data.set_index('id').index) self._updated_data.reset_index(inplace=True)
[docs] def add_data(self, new_data: pd.DataFrame): """Concatenates new data to the end of the current data""" self.set_data(pd.concat([self._data, new_data]))
[docs] @classmethod def add_new(cls: Type[LGI], **properties) -> LGI: return cls.make_new(**properties)
if __name__ == '__main__': test_data: pd.DataFrame = pd.read_excel(r"C:\Users\MarkCerutti\OneDrive - GRO Biosciences\Foundry\Workflow Development\LG Updates\20221205-plas-ds-test.xlsx") ds = SESSION.get_object(Dataset, name='20220203-test') or Dataset.make_new(name='20220203-test', description='desc') if not ds.id: ds.set_data(test_data) else: ds.set_data(test_data, merge_on=['Position', 'rack_well']) test_data = ds.get_data() new_data = ds.get_data() new_data.drop(columns=['id'], inplace=True) # new_data.at[:, 'rack_well'] = 'A3' ds = SESSION.update(ds) if ds.id else SESSION.add(ds) ds.add_data(new_data) ds = SESSION.update(ds)