Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/#397 scenario duplication #2373

Open
wants to merge 14 commits into
base: develop
Choose a base branch
from
8 changes: 6 additions & 2 deletions taipy/core/_repository/_filesystem_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,14 @@ def __filter_files_by_config_and_owner_id(
return None

def __match_file_and_get_entity(self, filepath, config_and_owner_ids, filters):
if match := [(c, p) for c, p in config_and_owner_ids if c.id in filepath.name]:
if match := [(c, p) for c, p in config_and_owner_ids if (c if isinstance(c, str) else c.id) in filepath.name]:
jrobinAV marked this conversation as resolved.
Show resolved Hide resolved
for config, owner_id in match:
for fil in filters:
fil.update({"config_id": config.id, "owner_id": owner_id})
if isinstance(config, str):
config_id = config
else:
config_id = config.id
fil.update({"config_id": config_id, "owner_id": owner_id})

if data := self.__filter_by(filepath, filters):
return config, owner_id, self.__file_content_to_entity(data)
Expand Down
57 changes: 49 additions & 8 deletions taipy/core/data/_data_manager.py
toan-quach marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from ..cycle.cycle_id import CycleId
from ..exceptions.exceptions import InvalidDataNodeType
from ..notification import Event, EventEntityType, EventOperation, Notifier, _make_event
from ..reason import NotGlobalScope, ReasonCollection, WrongConfigType
from ..reason import EntityDoesNotExist, NotGlobalScope, ReasonCollection, WrongConfigType
from ..scenario.scenario_id import ScenarioId
from ..sequence.sequence_id import SequenceId
from ._data_fs_repository import _DataFSRepository
Expand All @@ -37,6 +37,17 @@ class _DataManager(_Manager[DataNode], _VersionMixin):
_EVENT_ENTITY_TYPE = EventEntityType.DATA_NODE
_repository: _DataFSRepository

@classmethod
def _get_owner_id(
cls, scope, cycle_id, scenario_id
) -> Union[Optional[SequenceId], Optional[ScenarioId], Optional[CycleId]]:
if scope == Scope.SCENARIO:
return scenario_id
elif scope == Scope.CYCLE:
return cycle_id
else:
return None

@classmethod
def _bulk_get_or_create(
cls,
Expand All @@ -48,13 +59,7 @@ def _bulk_get_or_create(
dn_configs_and_owner_id = []
for dn_config in data_node_configs:
scope = dn_config.scope
owner_id: Union[Optional[SequenceId], Optional[ScenarioId], Optional[CycleId]]
if scope == Scope.SCENARIO:
owner_id = scenario_id
elif scope == Scope.CYCLE:
owner_id = cycle_id
else:
owner_id = None
owner_id = cls._get_owner_id(scope, cycle_id, scenario_id)
dn_configs_and_owner_id.append((dn_config, owner_id))

data_nodes = cls._repository._get_by_configs_and_owner_ids(
Expand Down Expand Up @@ -174,3 +179,39 @@ def _get_by_config_id(cls, config_id: str, version_number: Optional[str] = None)
for fil in filters:
fil.update({"config_id": config_id})
return cls._repository._load_all(filters)

@classmethod
def _duplicate(
cls, dn: DataNode, cycle_id: Optional[CycleId] = None, scenario_id: Optional[ScenarioId] = None
) -> DataNode:
data_nodes = cls._repository._get_by_configs_and_owner_ids(
[(dn.config_id, cls._get_owner_id(dn.scope, cycle_id, scenario_id))], cls._build_filters_with_version(None)
)

if existing_dn := data_nodes.get((dn.config_id, dn.owner_id)):
return existing_dn
else:
duplicated_dn = cls._get(dn)

duplicated_dn.id = duplicated_dn._new_id(duplicated_dn._config_id)
duplicated_dn._owner_id = cls._get_owner_id(duplicated_dn._scope, cycle_id, scenario_id)
duplicated_dn._parent_ids = set()

duplicated_dn._duplicate_data()

cls._set(duplicated_dn)
return duplicated_dn

@classmethod
def _can_duplicate(cls, dn: DataNode) -> ReasonCollection:
toan-quach marked this conversation as resolved.
Show resolved Hide resolved
reason_collector = ReasonCollection()

if isinstance(dn, DataNode):
dn_id = dn.id
else:
dn_id = dn

if not cls._repository._exists(dn_id):
reason_collector._add_reason(dn_id, EntityDoesNotExist(dn_id))

return reason_collector
64 changes: 47 additions & 17 deletions taipy/core/data/_file_datanode_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class _FileDataNodeMixin:
_PATH_KEY = "path"
_DEFAULT_PATH_KEY = "default_path"
_IS_GENERATED_KEY = "is_generated"
__TAIPY_DUPLICATED_PREFIX = "TAIPY_DUPLICATED"

__logger = _TaipyLogger._get_logger()

Expand Down Expand Up @@ -109,12 +110,14 @@ def _get_downloadable_path(self) -> str:

return ""

def _upload(self,
path: str,
upload_checker: Optional[Callable[[str, Any], bool]] = None,
editor_id: Optional[str] = None,
comment: Optional[str] = None,
**kwargs: Any) -> ReasonCollection:
def _upload(
self,
path: str,
upload_checker: Optional[Callable[[str, Any], bool]] = None,
editor_id: Optional[str] = None,
comment: Optional[str] = None,
**kwargs: Any,
) -> ReasonCollection:
"""Upload a file data to the data node.
Arguments:
Expand All @@ -136,20 +139,23 @@ def _upload(self,
from ._data_manager_factory import _DataManagerFactory

reasons = ReasonCollection()
if (editor_id
and self.edit_in_progress # type: ignore[attr-defined]
and self.editor_id != editor_id # type: ignore[attr-defined]
and (not self.editor_expiration_date # type: ignore[attr-defined]
or self.editor_expiration_date > datetime.now())): # type: ignore[attr-defined]
if (
editor_id
and self.edit_in_progress # type: ignore[attr-defined]
and self.editor_id != editor_id # type: ignore[attr-defined]
and (
not self.editor_expiration_date # type: ignore[attr-defined]
or self.editor_expiration_date > datetime.now()
)
): # type: ignore[attr-defined]
reasons._add_reason(self.id, DataNodeEditInProgress(self.id)) # type: ignore[attr-defined]
return reasons

up_path = pathlib.Path(path)
try:
upload_data = self._read_from_path(str(up_path))
except Exception as err:
self.__logger.error(f"Error uploading `{up_path.name}` to data "
f"node `{self.id}`:") # type: ignore[attr-defined]
self.__logger.error(f"Error uploading `{up_path.name}` to data " f"node `{self.id}`:") # type: ignore[attr-defined]
Comment on lines -151 to +158
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check your automatic formatter as this line is over 120 chars. Please revert the change.

self.__logger.error(f"Error: {err}")
reasons._add_reason(self.id, UploadFileCanNotBeRead(up_path.name, self.id)) # type: ignore[attr-defined]
return reasons
Expand All @@ -161,7 +167,8 @@ def _upload(self,
self.__logger.error(
f"Error with the upload checker `{upload_checker.__name__}` "
f"while checking `{up_path.name}` file for upload to the data "
f"node `{self.id}`:") # type: ignore[attr-defined]
f"node `{self.id}`:"
) # type: ignore[attr-defined]
Comment on lines +170 to +171
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type ignore should apply to the self.id code.

self.__logger.error(f"Error: {err}")
can_upload = False

Expand All @@ -171,9 +178,12 @@ def _upload(self,

shutil.copy(up_path, self.path)

self.track_edit(timestamp=datetime.now(), # type: ignore[attr-defined]
editor_id=editor_id,
comment=comment, **kwargs)
self.track_edit(
timestamp=datetime.now(), # type: ignore[attr-defined]
editor_id=editor_id,
comment=comment,
**kwargs,
)
self.unlock_edit() # type: ignore[attr-defined]

_DataManagerFactory._build_manager()._set(self) # type: ignore[arg-type]
Expand Down Expand Up @@ -212,3 +222,23 @@ def _migrate_path(self, storage_type, old_path) -> str:
if os.path.exists(old_path):
shutil.move(old_path, new_path)
return new_path

def _duplicate_data_file(self, id: str) -> Optional[str]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _duplicate_data_file(self, id: str) -> Optional[str]:
def _duplicate_file(self, id: str) -> Optional[str]:

if os.path.exists(self.path):
folder_path, base_name = os.path.split(self.path)

if base_name.startswith(self.__TAIPY_DUPLICATED_PREFIX):
base_name = "".join(base_name.split("_")[5:])
new_base_path = os.path.join(folder_path, f"{self.__TAIPY_DUPLICATED_PREFIX}_{id}_{base_name}")

if os.path.isdir(self.path):
shutil.copytree(self.path, new_base_path)
else:
shutil.copy(self.path, new_base_path)

if hasattr(self._properties, "_entity_owner"): # type: ignore[attr-defined]
del self._properties._entity_owner # type: ignore[attr-defined]
self._properties[self._PATH_KEY] = new_base_path # type: ignore[attr-defined]

return new_base_path
return ""
3 changes: 3 additions & 0 deletions taipy/core/data/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,6 @@ def _write(self, data: Any, columns: Optional[List[str]] = None):
encoding=properties[self.__ENCODING_KEY],
header=properties[self._HAS_HEADER_PROPERTY],
)

def _duplicate_data(self):
return self._duplicate_data_file(self.id)
52 changes: 32 additions & 20 deletions taipy/core/data/data_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,22 +433,27 @@ def append(self, data, editor_id: Optional[str] = None, comment: Optional[str] =
corresponding to this write.
"""
from ._data_manager_factory import _DataManagerFactory
if (editor_id

if (
editor_id
and self.edit_in_progress
and self.editor_id != editor_id
and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())):
and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())
):
raise DataNodeIsBeingEdited(self.id, self.editor_id)
self._append(data)
self.track_edit(editor_id=editor_id, comment=comment, **kwargs)
self.unlock_edit()
_DataManagerFactory._build_manager()._set(self)

def write(self,
data,
job_id: Optional[JobId] = None,
editor_id: Optional[str] = None,
comment: Optional[str] = None,
**kwargs: Any):
def write(
self,
data,
job_id: Optional[JobId] = None,
editor_id: Optional[str] = None,
comment: Optional[str] = None,
**kwargs: Any,
):
"""Write some data to this data node.
once the data is written, the data node is unlocked and the edit is tracked.
Expand All @@ -461,10 +466,12 @@ def write(self,
**kwargs (Any): Extra information to attach to the edit document
corresponding to this write.
"""
if (editor_id
if (
editor_id
and self.edit_in_progress
and self.editor_id != editor_id
and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())):
and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())
):
raise DataNodeIsBeingEdited(self.id, self.editor_id)
self._write(data)
self.track_edit(job_id=job_id, editor_id=editor_id, comment=comment, **kwargs)
Expand All @@ -473,12 +480,14 @@ def write(self,

_DataManagerFactory._build_manager()._set(self)

def track_edit(self,
job_id: Optional[str] = None,
editor_id: Optional[str] = None,
timestamp: Optional[datetime] = None,
comment: Optional[str] = None,
**options: Any):
def track_edit(
self,
job_id: Optional[str] = None,
editor_id: Optional[str] = None,
timestamp: Optional[datetime] = None,
comment: Optional[str] = None,
**options: Any,
):
"""Creates and adds a new entry in the edits attribute without writing the data.
Arguments:
Expand Down Expand Up @@ -627,15 +636,15 @@ def _get_rank(self, scenario_config_id: str) -> int:
If the data node config is not part of the scenario config, 0xfffc is returned as an infinite rank.
"""
if not scenario_config_id:
return 0xfffb
return 0xFFFB
dn_config = Config.data_nodes.get(self._config_id, None)
if not dn_config:
self._logger.error(f"Data node config `{self.config_id}` for data node `{self.id}` is not found.")
return 0xfffd
return 0xFFFD
if not dn_config._ranks:
self._logger.error(f"Data node config `{self.config_id}` for data node `{self.id}` has no rank.")
return 0xfffe
return dn_config._ranks.get(scenario_config_id, 0xfffc)
return 0xFFFE
return dn_config._ranks.get(scenario_config_id, 0xFFFC)

@abstractmethod
def _read(self):
Expand Down Expand Up @@ -676,6 +685,9 @@ def _get_last_modified_datetime(cls, path: Optional[str] = None) -> Optional[dat

return last_modified_datetime

def _duplicate_data(self):
raise NotImplementedError

@staticmethod
def _class_map():
def all_subclasses(cls):
Expand Down
3 changes: 3 additions & 0 deletions taipy/core/data/excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,6 @@ def _write(self, data: Any):
self._write_excel_with_single_sheet(
data.to_excel, self._path, index=False, header=properties[self._HAS_HEADER_PROPERTY] or None
)

def _duplicate_data(self):
return self._duplicate_data_file(self.id)
3 changes: 3 additions & 0 deletions taipy/core/data/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ def _write(self, data: Any):
with open(self._path, "w", encoding=self.properties[self.__ENCODING_KEY]) as f: # type: ignore
json.dump(data, f, indent=4, cls=self._encoder)

def _duplicate_data(self):
return self._duplicate_data_file(self.id)


class _DefaultJSONEncoder(json.JSONEncoder):
def default(self, o):
Expand Down
3 changes: 3 additions & 0 deletions taipy/core/data/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,6 @@ def _append(self, data: Any):

def _write(self, data: Any):
self._write_with_kwargs(data)

def _duplicate_data(self):
return self._duplicate_data_file(self.id)
3 changes: 3 additions & 0 deletions taipy/core/data/pickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,6 @@ def _read_from_path(self, path: Optional[str] = None, **read_kwargs) -> Any:
def _write(self, data):
with open(self._path, "wb") as pf:
pickle.dump(data, pf)

def _duplicate_data(self):
return self._duplicate_data_file(self.id)
Loading
Loading