Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/#397 scenario duplication #2373

Open
wants to merge 15 commits into
base: develop
Choose a base branch
from
8 changes: 6 additions & 2 deletions taipy/core/_repository/_filesystem_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,14 @@ def __filter_files_by_config_and_owner_id(
return None

def __match_file_and_get_entity(self, filepath, config_and_owner_ids, filters):
if match := [(c, p) for c, p in config_and_owner_ids if c.id in filepath.name]:
if match := [(c, p) for c, p in config_and_owner_ids if (c if isinstance(c, str) else c.id) in filepath.name]:
jrobinAV marked this conversation as resolved.
Show resolved Hide resolved
for config, owner_id in match:
for fil in filters:
fil.update({"config_id": config.id, "owner_id": owner_id})
if isinstance(config, str):
config_id = config
else:
config_id = config.id
fil.update({"config_id": config_id, "owner_id": owner_id})

if data := self.__filter_by(filepath, filters):
return config, owner_id, self.__file_content_to_entity(data)
Expand Down
57 changes: 49 additions & 8 deletions taipy/core/data/_data_manager.py
toan-quach marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from ..cycle.cycle_id import CycleId
from ..exceptions.exceptions import InvalidDataNodeType
from ..notification import Event, EventEntityType, EventOperation, Notifier, _make_event
from ..reason import NotGlobalScope, ReasonCollection, WrongConfigType
from ..reason import EntityDoesNotExist, NotGlobalScope, ReasonCollection, WrongConfigType
from ..scenario.scenario_id import ScenarioId
from ..sequence.sequence_id import SequenceId
from ._data_fs_repository import _DataFSRepository
Expand All @@ -37,6 +37,17 @@ class _DataManager(_Manager[DataNode], _VersionMixin):
_EVENT_ENTITY_TYPE = EventEntityType.DATA_NODE
_repository: _DataFSRepository

@classmethod
def _get_owner_id(
cls, scope, cycle_id, scenario_id
) -> Union[Optional[SequenceId], Optional[ScenarioId], Optional[CycleId]]:
if scope == Scope.SCENARIO:
return scenario_id
elif scope == Scope.CYCLE:
return cycle_id
else:
return None

@classmethod
def _bulk_get_or_create(
cls,
Expand All @@ -48,13 +59,7 @@ def _bulk_get_or_create(
dn_configs_and_owner_id = []
for dn_config in data_node_configs:
scope = dn_config.scope
owner_id: Union[Optional[SequenceId], Optional[ScenarioId], Optional[CycleId]]
if scope == Scope.SCENARIO:
owner_id = scenario_id
elif scope == Scope.CYCLE:
owner_id = cycle_id
else:
owner_id = None
owner_id = cls._get_owner_id(scope, cycle_id, scenario_id)
dn_configs_and_owner_id.append((dn_config, owner_id))

data_nodes = cls._repository._get_by_configs_and_owner_ids(
Expand Down Expand Up @@ -174,3 +179,39 @@ def _get_by_config_id(cls, config_id: str, version_number: Optional[str] = None)
for fil in filters:
fil.update({"config_id": config_id})
return cls._repository._load_all(filters)

@classmethod
def _duplicate(
cls, dn: DataNode, cycle_id: Optional[CycleId] = None, scenario_id: Optional[ScenarioId] = None
) -> DataNode:
data_nodes = cls._repository._get_by_configs_and_owner_ids(
[(dn.config_id, cls._get_owner_id(dn.scope, cycle_id, scenario_id))], cls._build_filters_with_version(None)
)

if existing_dn := data_nodes.get((dn.config_id, dn.owner_id)):
return existing_dn
else:
cloned_dn = cls._get(dn)

cloned_dn.id = cloned_dn._new_id(cloned_dn._config_id)
cloned_dn._owner_id = cls._get_owner_id(cloned_dn._scope, cycle_id, scenario_id)
cloned_dn._parent_ids = set()

cloned_dn._duplicate_data()

cls._set(cloned_dn)
return cloned_dn

@classmethod
def _can_duplicate(cls, dn: DataNode) -> ReasonCollection:
toan-quach marked this conversation as resolved.
Show resolved Hide resolved
reason_collector = ReasonCollection()

if isinstance(dn, DataNode):
dn_id = dn.id
else:
dn_id = dn

if not cls._repository._exists(dn_id):
reason_collector._add_reason(dn_id, EntityDoesNotExist(dn_id))

return reason_collector
57 changes: 40 additions & 17 deletions taipy/core/data/_file_datanode_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class _FileDataNodeMixin:
_PATH_KEY = "path"
_DEFAULT_PATH_KEY = "default_path"
_IS_GENERATED_KEY = "is_generated"
__TAIPY_CLONED_PREFIX = "TAIPY_CLONED"

__logger = _TaipyLogger._get_logger()

Expand Down Expand Up @@ -109,12 +110,14 @@ def _get_downloadable_path(self) -> str:

return ""

def _upload(self,
path: str,
upload_checker: Optional[Callable[[str, Any], bool]] = None,
editor_id: Optional[str] = None,
comment: Optional[str] = None,
**kwargs: Any) -> ReasonCollection:
def _upload(
self,
path: str,
upload_checker: Optional[Callable[[str, Any], bool]] = None,
editor_id: Optional[str] = None,
comment: Optional[str] = None,
**kwargs: Any,
) -> ReasonCollection:
"""Upload a file data to the data node.

Arguments:
Expand All @@ -136,20 +139,23 @@ def _upload(self,
from ._data_manager_factory import _DataManagerFactory

reasons = ReasonCollection()
if (editor_id
and self.edit_in_progress # type: ignore[attr-defined]
and self.editor_id != editor_id # type: ignore[attr-defined]
and (not self.editor_expiration_date # type: ignore[attr-defined]
or self.editor_expiration_date > datetime.now())): # type: ignore[attr-defined]
if (
editor_id
and self.edit_in_progress # type: ignore[attr-defined]
and self.editor_id != editor_id # type: ignore[attr-defined]
and (
not self.editor_expiration_date # type: ignore[attr-defined]
or self.editor_expiration_date > datetime.now()
)
): # type: ignore[attr-defined]
reasons._add_reason(self.id, DataNodeEditInProgress(self.id)) # type: ignore[attr-defined]
return reasons

up_path = pathlib.Path(path)
try:
upload_data = self._read_from_path(str(up_path))
except Exception as err:
self.__logger.error(f"Error uploading `{up_path.name}` to data "
f"node `{self.id}`:") # type: ignore[attr-defined]
self.__logger.error(f"Error uploading `{up_path.name}` to data " f"node `{self.id}`:") # type: ignore[attr-defined]
Comment on lines -151 to +158
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check your automatic formatter as this line is over 120 chars. Please revert the change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.__logger.error(f"Error uploading `{up_path.name}` to data " f"node `{self.id}`:") # type: ignore[attr-defined]
self.__logger.error(f"Error uploading `{up_path.name}` to data "
f"node `{self.id}`:") # type: ignore[attr-defined]

self.__logger.error(f"Error: {err}")
reasons._add_reason(self.id, UploadFileCanNotBeRead(up_path.name, self.id)) # type: ignore[attr-defined]
return reasons
Expand All @@ -161,7 +167,8 @@ def _upload(self,
self.__logger.error(
f"Error with the upload checker `{upload_checker.__name__}` "
f"while checking `{up_path.name}` file for upload to the data "
f"node `{self.id}`:") # type: ignore[attr-defined]
f"node `{self.id}`:"
) # type: ignore[attr-defined]
toan-quach marked this conversation as resolved.
Show resolved Hide resolved
self.__logger.error(f"Error: {err}")
can_upload = False

Expand All @@ -171,9 +178,12 @@ def _upload(self,

shutil.copy(up_path, self.path)

self.track_edit(timestamp=datetime.now(), # type: ignore[attr-defined]
editor_id=editor_id,
comment=comment, **kwargs)
self.track_edit(
timestamp=datetime.now(), # type: ignore[attr-defined]
editor_id=editor_id,
comment=comment,
**kwargs,
)
self.unlock_edit() # type: ignore[attr-defined]

_DataManagerFactory._build_manager()._set(self) # type: ignore[arg-type]
Expand Down Expand Up @@ -212,3 +222,16 @@ def _migrate_path(self, storage_type, old_path) -> str:
if os.path.exists(old_path):
shutil.move(old_path, new_path)
return new_path

def _duplicate_data_file(self, id: str) -> Optional[str]:
toan-quach marked this conversation as resolved.
Show resolved Hide resolved
if os.path.exists(self.path):
folder_path, base_name = os.path.split(self.path)
if base_name.startswith(self.__TAIPY_CLONED_PREFIX):
base_name = "".join(base_name.split("_")[5:])
new_base_path = os.path.join(folder_path, f"{self.__TAIPY_CLONED_PREFIX}_{id}_{base_name}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have 2 questions.
What would be the new file name if a data node is duplicated twice?
Similarly, if a duplicated data node is duplicated (dn -> duplicated_dn -> duplicated duplicated_dn)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep so take example.csv as the file name, after you first duplicate the dn, it will create a new file TAIPY_CLONED_new_dn_id_example.csv. But if you want to duplicate this newly duplicated dn, another file will be created with the name pattern being TAIPY_CLONED_another_new_dn_id_example.csv. The TAIPY_CLONED prefix won't be repeated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So what about duplicating a data node twice?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then we will have TAIPY_CLONED_dn_duplicated_id_1_example.csv for the 1st dn, and for 2nd dn, we will have TAIPY_CLONED_dn_duplicated_id_2_example.csv

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe having both the prefix TAIPY_CLONED and the unique ID in the name is not necessary. If we can make the file name unique with the new ID, what is the purpose of keeping the prefix? We can probably propose something slightly better differentiating the case where the file name is generated by taipy or if it is provided by the user.

If the file of the initial dn is generated (dn.is_generated), we can simply generate a new name for the duplicate with the same function (dn._build_path). Otherwise, we can use your proposal without the prefix: {new_id}_{base_name}.

What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well because I think the prefix will make the naming clear as to what it is, just the id is fine but we will have to state clearly in the doc that this is what we generated. While they can understand it just from the prefix without doc.

if os.path.isdir(self.path):
shutil.copytree(self.path, new_base_path)
else:
shutil.copy(self.path, new_base_path)
return new_base_path
return ""
7 changes: 7 additions & 0 deletions taipy/core/data/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,10 @@ def _write(self, data: Any, columns: Optional[List[str]] = None):
encoding=properties[self.__ENCODING_KEY],
header=properties[self._HAS_HEADER_PROPERTY],
)

def _duplicate_data(self):
new_data_path = self._duplicate_data_file(self.id)
if hasattr(self._properties, "_entity_owner"):
del self._properties._entity_owner
self._properties[self._PATH_KEY] = new_data_path
return new_data_path
52 changes: 32 additions & 20 deletions taipy/core/data/data_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,22 +433,27 @@ def append(self, data, editor_id: Optional[str] = None, comment: Optional[str] =
corresponding to this write.
"""
from ._data_manager_factory import _DataManagerFactory
if (editor_id

if (
editor_id
and self.edit_in_progress
and self.editor_id != editor_id
and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())):
and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())
):
raise DataNodeIsBeingEdited(self.id, self.editor_id)
self._append(data)
self.track_edit(editor_id=editor_id, comment=comment, **kwargs)
self.unlock_edit()
_DataManagerFactory._build_manager()._set(self)

def write(self,
data,
job_id: Optional[JobId] = None,
editor_id: Optional[str] = None,
comment: Optional[str] = None,
**kwargs: Any):
def write(
self,
data,
job_id: Optional[JobId] = None,
editor_id: Optional[str] = None,
comment: Optional[str] = None,
**kwargs: Any,
):
"""Write some data to this data node.

once the data is written, the data node is unlocked and the edit is tracked.
Expand All @@ -461,10 +466,12 @@ def write(self,
**kwargs (Any): Extra information to attach to the edit document
corresponding to this write.
"""
if (editor_id
if (
editor_id
and self.edit_in_progress
and self.editor_id != editor_id
and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())):
and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())
):
raise DataNodeIsBeingEdited(self.id, self.editor_id)
self._write(data)
self.track_edit(job_id=job_id, editor_id=editor_id, comment=comment, **kwargs)
Expand All @@ -473,12 +480,14 @@ def write(self,

_DataManagerFactory._build_manager()._set(self)

def track_edit(self,
job_id: Optional[str] = None,
editor_id: Optional[str] = None,
timestamp: Optional[datetime] = None,
comment: Optional[str] = None,
**options: Any):
def track_edit(
self,
job_id: Optional[str] = None,
editor_id: Optional[str] = None,
timestamp: Optional[datetime] = None,
comment: Optional[str] = None,
**options: Any,
):
"""Creates and adds a new entry in the edits attribute without writing the data.

Arguments:
Expand Down Expand Up @@ -627,15 +636,15 @@ def _get_rank(self, scenario_config_id: str) -> int:
If the data node config is not part of the scenario config, 0xfffc is returned as an infinite rank.
"""
if not scenario_config_id:
return 0xfffb
return 0xFFFB
dn_config = Config.data_nodes.get(self._config_id, None)
if not dn_config:
self._logger.error(f"Data node config `{self.config_id}` for data node `{self.id}` is not found.")
return 0xfffd
return 0xFFFD
if not dn_config._ranks:
self._logger.error(f"Data node config `{self.config_id}` for data node `{self.id}` has no rank.")
return 0xfffe
return dn_config._ranks.get(scenario_config_id, 0xfffc)
return 0xFFFE
return dn_config._ranks.get(scenario_config_id, 0xFFFC)

@abstractmethod
def _read(self):
Expand Down Expand Up @@ -676,6 +685,9 @@ def _get_last_modified_datetime(cls, path: Optional[str] = None) -> Optional[dat

return last_modified_datetime

def _duplicate_data(self):
raise NotImplementedError

@staticmethod
def _class_map():
def all_subclasses(cls):
Expand Down
7 changes: 7 additions & 0 deletions taipy/core/data/excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,10 @@ def _write(self, data: Any):
self._write_excel_with_single_sheet(
data.to_excel, self._path, index=False, header=properties[self._HAS_HEADER_PROPERTY] or None
)

def _duplicate_data(self):
new_data_path = self._duplicate_data_file(self.id)
if hasattr(self._properties, "_entity_owner"):
del self._properties._entity_owner
self._properties[self._PATH_KEY] = new_data_path
return new_data_path
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this repetitive?
Can we put this in the parent _FileDataNodeMixin class and override it when necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm I think it's a yes and no answer. The self._properties attribute doesn't exist in _FileDataNodeMixin, but since ExcelDN, etc. are implementing _FileDataNodeMixin, it can access it. But the syntax feels off for me, and since _FileDataNodeMixin is a Mixin, I'm not comfortable putting self._properties in it. (I understand that we do have similar things for def path(self, value) (setter) in _FileDataNodeMixin, but I'm still not sure if it's the best way to do it).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand your point. However, other methods in the Mixin class doesn't seem to fit as well.

I think we did discuss this once. @jrobinAV Let us know what you think about this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is debatable. I would personally do the same as the path setter for consistency.

7 changes: 7 additions & 0 deletions taipy/core/data/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ def _write(self, data: Any):
with open(self._path, "w", encoding=self.properties[self.__ENCODING_KEY]) as f: # type: ignore
json.dump(data, f, indent=4, cls=self._encoder)

def _duplicate_data(self):
new_data_path = self._duplicate_data_file(self.id)
if hasattr(self._properties, "_entity_owner"):
del self._properties._entity_owner
self._properties[self._PATH_KEY] = new_data_path
return new_data_path


class _DefaultJSONEncoder(json.JSONEncoder):
def default(self, o):
Expand Down
7 changes: 7 additions & 0 deletions taipy/core/data/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,10 @@ def _append(self, data: Any):

def _write(self, data: Any):
self._write_with_kwargs(data)

def _duplicate_data(self):
new_data_path = self._duplicate_data_file(self.id)
if hasattr(self._properties, "_entity_owner"):
del self._properties._entity_owner
self._properties[self._PATH_KEY] = new_data_path
return new_data_path
7 changes: 7 additions & 0 deletions taipy/core/data/pickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,10 @@ def _read_from_path(self, path: Optional[str] = None, **read_kwargs) -> Any:
def _write(self, data):
with open(self._path, "wb") as pf:
pickle.dump(data, pf)

def _duplicate_data(self):
new_data_path = self._duplicate_data_file(self.id)
if hasattr(self._properties, "_entity_owner"):
del self._properties._entity_owner
self._properties[self._PATH_KEY] = new_data_path
return new_data_path
Loading
Loading