from time import sleep
from typing import Optional, Type, Dict, List, Any
from tqdm import tqdm
from LabGuruAPI._base import LabGuruItem, SESSION, LGI, Session, Attachment, LGStrList
from tqdm.asyncio import tqdm_asyncio
import asyncio
import pandas as pd
[docs]
class Dataset(LabGuruItem):
_api_name = 'datasets'
class_name = "LgData::Dataset"
class_display_name = "Datasets"
xlsx_collection = "Datasets"
_attribute_dict = {
'vectors': '_vectors',
'header_row': 'headers',
}
_data: Optional[pd.DataFrame] = None
_original_data: Optional[pd.DataFrame] = None
_updated_data: Optional[pd.DataFrame] = None
_new_data: Optional[pd.DataFrame] = None
_deleted_ids: Optional[pd.Series] = None
headers: List[str] = LGStrList()
"""A list of column headers used by the dataset"""
headers.base_type = str
[docs]
def update_api(self, session: Session, **kwargs) -> LGI:
if self.id: # Update data
if self._updated_data is not None and len(self._updated_data) > 0:
futures = []
for cur_row in self._updated_data.fillna('').to_dict('records'):
cur_id = str(cur_row.pop('id'))
futures.append(session.aio_put(f"vectors/{cur_id}", vector_data=cur_row, dataset_id=self.id))
asyncio.run(tqdm_asyncio.gather(*futures, desc='Updating existing rows'))
if self._new_data is not None and len(self._new_data) > 0:
for cur_row in tqdm(self._new_data.fillna('').to_dict('records'), desc='Adding new rows'):
del cur_row['id']
session.post(f"vectors", vector_data=cur_row, dataset_id=self.id)
if self._deleted_ids is not None:
futures = []
for cur_row in self._deleted_ids.values:
cur_id = str(cur_row)
futures.append(session.aio_delete(f"vectors/{cur_id}"))
asyncio.run(tqdm_asyncio.gather(*futures, desc='Deleting removed rows'))
return session.refresh(self)
else: # Add to LG
pbar = tqdm(desc='Uploading Data to LG', total=len(self._data))
# Create an attachment
att = Attachment()
out_path = att.make_file(f"{self.name}.xlsx")
self._data.drop(columns=['id']).to_excel(out_path, index=False)
att = session.add(att)
pbar.set_description('Creating new Dataset')
create_data = {
'name': self.name,
'description': self.description,
'data_attachment_id': att.id
}
new_obj_json = session.post('datasets', item=create_data)
self.id = new_obj_json['dataset_id']
temp_ds = session.refresh(self)
SESSION.set_config_value('DATASET_UPLOAD_LEN', self.id, str(len(self._data)))
pbar.close()
return temp_ds
@property
def _vectors(self):
return None
@_vectors.setter
def _vectors(self, value: List[Dict[str, Dict[str, Any]]]):
vector_data = []
for cur_vec in value:
new_data = {'id': cur_vec['vector']['id']}
new_data.update(cur_vec['vector']['data'])
vector_data.append(new_data)
self._data = pd.DataFrame(vector_data)
self._original_data = self._data.copy()
[docs]
def get_data(self) -> pd.DataFrame:
"""Retrieve a copy of the current data"""
if self.id and len(self.headers) == 0:
try:
total = int(SESSION.get_config_value('DATASET_UPLOAD_LEN', self.id))
except KeyError:
total = None
pbar = tqdm(desc='LG data upload is not yet complete', total=total)
while True:
temp_ds: Dataset = SESSION.refresh(self)
if temp_ds._data is not None and len(temp_ds.headers) > 0:
pbar.close()
self._original_data = temp_ds.get_data()
self._data = temp_ds.get_data()
self.headers = temp_ds.headers
return self._data
else:
pbar.set_description('Waiting for LG to finish creating dataset')
if temp_ds is not None:
pbar.update(len(temp_ds._data) - pbar.n if total else 1)
sleep(1)
return self._data.copy() if self._data is not None else None
[docs]
def set_data(self, new_data: pd.DataFrame, merge_on: List[str] = None):
"""
Replace the current data with new data
:param new_data: The updated dataframe
:param merge_on: When updating a dataset, the columns used to determine "old" vs "new" data
"""
merge_on = merge_on or []
if self._original_data is None:
self._data = new_data.copy()
self._data['id'] = None
self.headers = [h for h in new_data.columns.values]
return
try:
self._data = new_data[['id'] + self.headers].copy()
except KeyError:
new_data = new_data.merge(self._data[['id'] + merge_on], on=merge_on)
self._data = new_data[['id'] + self.headers].copy()
self._new_data = self._data[self._data['id'].isnull()]
existing_vectors = self._data[self._data['id'] > 0]
if len(existing_vectors) == len(self._original_data):
compare = self._original_data.set_index('id').compare(existing_vectors.set_index('id'))
self._updated_data = self._data.set_index('id').loc[compare.index]
self._deleted_ids = None
else:
matching_original_data = self._data[['id']].merge(self._original_data, on='id')
compare = matching_original_data.set_index('id').compare(existing_vectors.set_index('id'))
self._updated_data = self._data.set_index('id').loc[compare.index]
self._deleted_ids = self._original_data.set_index('id').index.difference(self._data.set_index('id').index)
self._updated_data.reset_index(inplace=True)
[docs]
def add_data(self, new_data: pd.DataFrame):
"""Concatenates new data to the end of the current data"""
self.set_data(pd.concat([self._data, new_data]))
[docs]
@classmethod
def add_new(cls: Type[LGI], **properties) -> LGI:
return cls.make_new(**properties)
if __name__ == '__main__':
test_data: pd.DataFrame = pd.read_excel(r"C:\Users\MarkCerutti\OneDrive - GRO Biosciences\Foundry\Workflow Development\LG Updates\20221205-plas-ds-test.xlsx")
ds = SESSION.get_object(Dataset, name='20220203-test') or Dataset.make_new(name='20220203-test', description='desc')
if not ds.id:
ds.set_data(test_data)
else:
ds.set_data(test_data, merge_on=['Position', 'rack_well'])
test_data = ds.get_data()
new_data = ds.get_data()
new_data.drop(columns=['id'], inplace=True)
# new_data.at[:, 'rack_well'] = 'A3'
ds = SESSION.update(ds) if ds.id else SESSION.add(ds)
ds.add_data(new_data)
ds = SESSION.update(ds)