Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 61 additions & 9 deletions irods/data_object.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
"""Interface for iRODS data objects.

Provides high level abstraction and POSIX-like facilities (create, open, read/write) allowing clients to manipulate data objects very much as if they were

Check failure on line 3 in irods/data_object.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff E501

E501: Line too long (154 > 120) [pycodestyle:line-too-long]
local files.
"""

Check failure on line 5 in irods/data_object.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff D213

D213: Multi-line docstring summary should start at the second line [pydocstyle:multi-line-summary-second-line]

import ast
import enum
import io
import sys
import logging
import os
import ast
import sys
from datetime import datetime, timezone

from irods.models import DataObject
from irods.meta import iRODSMetaCollection
import irods.keywords as kw
from irods.api_number import api_number
from irods.message import JSON_Message, iRODSMessage
from irods.meta import iRODSMetaCollection
from irods.models import DataObject

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -41,11 +49,54 @@
return "<{}.{} {}>".format(self.__class__.__module__, self.__class__.__name__, self.resource_name)


class _repl_status(enum.Enum): # noqa: N801
STALE_REPLICA, GOOD_REPLICA, INTERMEDIATE_REPLICA, READ_LOCKED, WRITE_LOCKED = range(5)


# An ordering of the various replica status values, by descending fitness for use/interface
_REPL_STATUSES = tuple(
getattr(_repl_status, ident).value
for ident in (
"GOOD_REPLICA",
"STALE_REPLICA",
"INTERMEDIATE_REPLICA",
"READ_LOCKED",
"WRITE_LOCKED",
)
)

# An appropriate reference datetime value for gauging replica age as part of
# the default sort key in PRC4 and onward.
_REFERENCE_DATETIME = datetime.fromtimestamp(0, timezone.utc)


# Key functions to dictate how replica row results will be sorted within an iRODSDataObject.

def REPLICA_NUMBER_SORT_KEY_FN(row): # noqa: N802

Check failure on line 75 in irods/data_object.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff D103

D103: Missing docstring in public function [pydocstyle:undocumented-public-function]

Check failure on line 75 in irods/data_object.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-format

Ruff format

Improper formatting
return row[DataObject.replica_number]


def REPLICA_FITNESS_SORT_KEY_FN(row): # noqa: N802

Check failure on line 79 in irods/data_object.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff D103

D103: Missing docstring in public function [pydocstyle:undocumented-public-function]
repl_status = int(row[DataObject.replica_status])

repl_status_rank = _REPL_STATUSES.index(repl_status) if _REPL_STATUSES.count(repl_status) else sys.maxsize

return (repl_status_rank, _REFERENCE_DATETIME - row[DataObject.modify_time])


_DEFAULT_SORT_KEY_FN = REPLICA_NUMBER_SORT_KEY_FN


class iRODSDataObject:
def __init__(self, manager, parent=None, results=None):

# iRODSDataObject's constructor is not usually directly accessed by iRODS client applications. See the main README.
# ruff: noqa: D107 off

Check failure on line 93 in irods/data_object.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-format

Ruff format

Improper formatting

def __init__(self, manager, parent=None, results=None, replica_sort_function=None):
self.manager = manager
if parent and results:
self.collection = parent
results = sorted(results, key=(replica_sort_function or _DEFAULT_SORT_KEY_FN))
for attr, value in DataObject.__dict__.items():
if not attr.startswith("_"):
try:
Expand All @@ -54,9 +105,8 @@
# backward compatibility with older schema versions
pass
self.path = self.collection.path + "/" + self.name
replicas = sorted(results, key=lambda r: r[DataObject.replica_number])

# The status quo before iRODS 5
# Copy pre-iRODS 5 fields

replica_args = [
(
Expand All @@ -75,18 +125,20 @@
modify_time=r[DataObject.modify_time],
),
)
for r in replicas
for r in results
]

# Adjust for adding access_time in the iRODS 5 case.

if self.manager.sess.server_version >= (5,):
for n, r in enumerate(replicas):
for n, r in enumerate(results):
replica_args[n][1]['access_time'] = r[DataObject.access_time]
self.replicas = [iRODSReplica(*a, **k) for a, k in replica_args]

self._meta = None

# ruff: noqa: D107 off

def __repr__(self):
return f"<iRODSDataObject {self.id} {self.name}>"

Expand Down
42 changes: 25 additions & 17 deletions irods/manager/data_object_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,27 +218,29 @@ def should_parallelize_transfer(
if size is not None and isinstance(open_options, dict):
open_options[kw.DATA_SIZE_KW] = size

def _download(self, obj, local_path, num_threads, updatables=(), **options):
def _download(self, obj_path, local_path, num_threads, updatables=(), **options):
"""Transfer the contents of a data object to a local file.

Called from get() when a local path is named.
"""
if os.path.isdir(local_path):
local_file = os.path.join(local_path, irods_basename(obj))
else:
local_file = local_path

local_file = (
os.path.join(local_path, irods_basename(obj_path)) # noqa: PTH118
if os.path.isdir(local_path) # noqa: PTH112
else local_path
)

# Check for force flag if local_file exists
if os.path.exists(local_file) and kw.FORCE_FLAG_KW not in options:
raise ex.OVERWRITE_WITHOUT_FORCE_FLAG

data_open_returned_values_ = {}
with self.open(obj, "r", returned_values=data_open_returned_values_, **options) as o:
with self.open(obj_path, "r", returned_values=data_open_returned_values_, **options) as o:
if self.should_parallelize_transfer(num_threads, o, open_options=options.items()):
error = RuntimeError("parallel get failed")
try:
if not self.parallel_get(
(obj, o),
(obj_path, o),
local_file,
num_threads=num_threads,
target_resource_name=options.get(kw.RESC_NAME_KW, ""),
Expand All @@ -265,6 +267,8 @@ def get(self, path, local_path=None, num_threads=DEFAULT_NUMBER_OF_THREADS, upda
"""
parent = self.sess.collections.get(irods_dirname(path))

replica_sort_function = options.pop('replica_sort_function', None)

# TODO: optimize
if local_path:
self._download(path, local_path, num_threads=num_threads, updatables=updatables, **options)
Expand All @@ -284,7 +288,7 @@ def get(self, path, local_path=None, num_threads=DEFAULT_NUMBER_OF_THREADS, upda
results = query.all() # get up to max_rows replicas
if len(results) <= 0:
raise ex.DataObjectDoesNotExist()
return iRODSDataObject(self, parent, results)
return iRODSDataObject(self, parent, results, replica_sort_function=replica_sort_function)

@staticmethod
def _resolve_force_put_option(options, default_setting=None, true_value=""):
Expand Down Expand Up @@ -317,23 +321,25 @@ def put(
self._resolve_force_put_option(options, default_setting=client_config.data_objects.force_put_by_default)

if self.sess.collections.exists(irods_path):
obj = iRODSCollection.normalize_path(irods_path, os.path.basename(local_path))
obj_path = iRODSCollection.normalize_path(irods_path, os.path.basename(local_path)) # noqa: PTH119
else:
obj = irods_path
if kw.FORCE_FLAG_KW not in options and self.exists(obj):
obj_path = irods_path
if kw.FORCE_FLAG_KW not in options and self.exists(obj_path):
raise ex.OVERWRITE_WITHOUT_FORCE_FLAG
options.pop(kw.FORCE_FLAG_KW, None)

replica_sort_function = options.pop('replica_sort_function', None)

with open(local_path, "rb") as f:
sizelist = []
if self.should_parallelize_transfer(num_threads, f, measured_obj_size=sizelist, open_options=options):
o = deferred_call(self.open, (obj, "w"), options)
o = deferred_call(self.open, (obj_path, "w"), options)
f.close()
error = RuntimeError("parallel put failed")
try:
if not self.parallel_put(
local_path,
(obj, o),
(obj_path, o),
total_bytes=sizelist[0],
num_threads=num_threads,
target_resource_name=options.get(kw.RESC_NAME_KW, "") or options.get(kw.DEST_RESC_NAME_KW, ""),
Expand All @@ -346,7 +352,7 @@ def put(
except BaseException as e:
raise error from e
else:
with self.open(obj, "w", **options) as o:
with self.open(obj_path, "w", **options) as o:
# Set operation type to trigger acPostProcForPut
if kw.OPR_TYPE_KW not in options:
options[kw.OPR_TYPE_KW] = 1 # PUT_OPR
Expand All @@ -360,10 +366,11 @@ def put(
# Requested to register checksum without verifying, but source replica has a checksum. This can result
# in multiple replicas being marked good with different checksums, which is an inconsistency.
del repl_options[kw.REG_CHKSUM_KW]
self.replicate(obj, **repl_options)
self.replicate(obj_path, **repl_options)

if return_data_object:
return self.get(obj)
return self.get(obj_path, replica_sort_function=replica_sort_function)
return None

def chksum(self, path, **options):
"""
Expand Down Expand Up @@ -480,6 +487,7 @@ def create(
raise ex.DataObjectExistsAtLogicalPath

options = {**options, kw.DATA_TYPE_KW: "generic"}
replica_sort_function = options.pop('replica_sort_function', None)

if resource:
options[kw.DEST_RESC_NAME_KW] = resource
Expand Down Expand Up @@ -508,7 +516,7 @@ def create(
desc = response.int_info
conn.close_file(desc)

return self.get(path)
return self.get(path, replica_sort_function=replica_sort_function)

def open_with_FileRaw(self, *arg, **kw_options):
holder = []
Expand Down
96 changes: 70 additions & 26 deletions irods/test/data_obj_test.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
#! /usr/bin/env python

from datetime import datetime, timezone
import base64
import collections
import concurrent.futures
import contextlib
from datetime import datetime, timedelta, timezone
import errno
import hashlib
import io
import itertools
import json
import logging
import os
import random
import re
import socket
import stat
import string
import sys
import subprocess
import threading
import time
import unittest
import xml.etree.ElementTree
import irods.test.helpers as helpers

Check failure on line 26 in irods/test/data_obj_test.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff I001

I001: Import block is un-sorted or un-formatted [isort:unsorted-imports]

try:
import tqdm
Expand All @@ -49,26 +49,24 @@
return localhost_with_optional_domain_pattern.match(name.lower()) or is_localhost_ip(name)


from irods.access import iRODSAccess
from irods.models import Collection, DataObject
from irods.path import iRODSPath
from irods.test.helpers import iRODSUserLogins
from tempfile import NamedTemporaryFile, gettempdir, mktemp

import irods.client_configuration as config
import irods.exception as ex
from irods.column import Criterion
from irods.data_object import chunks, irods_dirname
import irods.keywords as kw
import irods.parallel
import irods.test.helpers as helpers
import irods.test.modules as test_modules
import irods.keywords as kw
import irods.client_configuration as config
from irods.access import iRODSAccess
from irods.column import Criterion
from irods.data_object import REPLICA_FITNESS_SORT_KEY_FN, chunks, irods_dirname
from irods.manager import data_object_manager
from irods.message import RErrorStack
from irods.message import ET, XML_Parser_Type, default_XML_parser, current_XML_parser
from datetime import datetime, timezone, timedelta
from tempfile import NamedTemporaryFile, gettempdir, mktemp
from irods.test.helpers import unique_name, my_function_name
from irods.ticket import Ticket
import irods.parallel
from irods.manager.data_object_manager import Server_Checksum_Warning
from irods.message import ET, RErrorStack, XML_Parser_Type, current_XML_parser, default_XML_parser
from irods.models import Collection, DataObject
from irods.path import iRODSPath
from irods.test.helpers import iRODSUserLogins, my_function_name, unique_name
from irods.ticket import Ticket

RODSUSER = "nonadmin"

Expand Down Expand Up @@ -1253,8 +1251,7 @@

# assertions on replicas
self.assertEqual(len(obj.replicas), number_of_replicas)
for i, replica in enumerate(obj.replicas):
self.assertEqual(replica.number, i)
self.assertEqual({repl.number for repl in obj.replicas}, {*range(len(obj.replicas))})

# now trim odd-numbered replicas
# note (see irods/irods#4861): COPIES_KW might disappear in the future
Expand All @@ -1267,10 +1264,7 @@
obj = session.data_objects.get(obj_path)

# check remaining replica numbers
replica_numbers = []
for replica in obj.replicas:
replica_numbers.append(replica.number)
self.assertEqual(replica_numbers, [0, 2, 4, 6])
self.assertEqual({r.number for r in obj.replicas}, {0, 2, 4, 6})

# remove object
obj.unlink(force=True)
Expand Down Expand Up @@ -1728,11 +1722,12 @@
self.assertIsNotNone(obj.replicas[1].__getattribute__(i))

# ensure replica info is sensible
replicas = sorted(obj.replicas, key=lambda repl: repl.number)
for i in range(2):
self.assertEqual(obj.replicas[i].number, i)
self.assertEqual(obj.replicas[i].status, "1")
self.assertEqual(obj.replicas[i].path.split("/")[-1], filename)
self.assertEqual(obj.replicas[i].resc_hier.split(";")[-1], ufs_resources[i].name)
self.assertEqual(replicas[i].number, i)
self.assertEqual(replicas[i].status, "1")
self.assertEqual(replicas[i].path.split("/")[-1], filename)
self.assertEqual(replicas[i].resc_hier.split(";")[-1], ufs_resources[i].name)

self.assertEqual(obj.replicas[0].resource_name, ufs_resources[0].name)
if self.sess.server_version < (4, 2, 0):
Expand Down Expand Up @@ -2992,6 +2987,55 @@

test_put__issue_722(self)

def test_modified_sorting_of_replicas__issue_746(self):
basename = unique_name(my_function_name(), datetime.now()) + '_dataobj_647' # noqa: DTZ005
with self.create_simple_resc() as new_resc1, self.create_simple_resc() as new_resc2:
data = helpers.make_object(self.sess, f'{helpers.home_collection(self.sess)}/{basename}')

# Precondition for an eventual total of 3 replicas: initial data replica is not
# on either of the new resources.
self.assertFalse({repl.resource_name for repl in data.replicas} & {new_resc1, new_resc2})
try:
data.replicate(resource=new_resc1)

# Ensure that one of the replicas is stale, to test proper sorting.
with data.open('a', **{kw.RESC_NAME_KW: new_resc1}) as f:
f.write(b'.')

# Sleep to ensure different replica modify timestamps.
time.sleep(2)
Comment thread
d-w-moore marked this conversation as resolved.

data.replicate(resource=new_resc2)

# At this point, there should ensure exactly two good replicas of the three.
# Assert exactly one replica is stale, to corroborate
data = self.sess.data_objects.get(
data.path, replica_sort_function=lambda row: int(row[DataObject.replica_status])
)
self.assertEqual([repl.status for repl in data.replicas], ['0', '1', '1'])

# Get a data object with the PRC3-default sort order. Ordering is expected to
# be ascending by replica number.
if irods.version.version_as_tuple() < (4,):
data = self.sess.data_objects.get(data.path)
for i, repl in enumerate(data.replicas):
self.assertEqual(repl.number, i)

options = {}
if irods.version.version_as_tuple() < (4,):
options['replica_sort_function'] = REPLICA_FITNESS_SORT_KEY_FN

# Get a data object with the PRC3-alternative/PRC4-default sort order.
data = self.sess.data_objects.get(data.path, **options)

# Test default replica sorting.
self.assertEqual(data.replicas[0].status, '1')
self.assertEqual(data.replicas[0].modify_time, data.modify_time)
self.assertGreater(data.replicas[0].modify_time, data.replicas[1].modify_time)
finally:
if data:
data.unlink(force=True)


if __name__ == "__main__":
# let the tests find the parent irods lib
Expand Down
Loading