Source code for hats.io.file_io.file_pointer
from __future__ import annotations
from importlib.metadata import version
from pathlib import Path
from urllib.parse import urlparse
from upath import UPath
[docs]
def get_upath(path: str | Path | UPath) -> UPath:
"""Returns a UPath file pointer from a path string or other path-like type.
Parameters
----------
path: str | Path | UPath
base file path to be normalized to UPath
Returns
-------
UPath
Instance of UPath.
"""
if not path:
return None
if isinstance(path, UPath):
return path
return get_upath_for_protocol(path)
[docs]
def get_upath_for_protocol(path: str | Path) -> UPath:
"""Create UPath with protocol-specific configurations.
If we access pointers on S3 and credentials are not found we assume
an anonymous access, i.e., that the bucket is public.
Parameters
----------
path: str | Path | UPath
base file path to be normalized to UPath
Returns
-------
UPath
Instance of UPath.
"""
upath = UPath(path)
if upath.protocol == "s3":
upath = UPath(path, anon=True, default_block_size=BLOCK_SIZE)
if upath.protocol in ("http", "https"):
kwargs = {
"block_size": BLOCK_SIZE,
"client_kwargs": {"headers": {"User-Agent": f"hats/{version('hats')}"}},
}
parts = urlparse(path)
if parts.netloc == "vizcat.cds.unistra.fr":
kwargs["cache_options"] = {"parquet_precache_all_bytes": True}
upath = UPath(path, **kwargs)
return upath
[docs]
def append_paths_to_pointer(pointer: str | Path | UPath, *paths: str) -> UPath:
"""Append directories and/or a file name to a specified file pointer.
Parameters
----------
pointer : str | Path | UPath
`FilePointer` object to add path to
*paths: str
any number of directory names optionally followed by a file name to append to the
pointer
Returns
-------
UPath
New file pointer to path given by joining given pointer and path names
"""
pointer = get_upath(pointer)
return pointer.joinpath(*paths)
[docs]
def does_file_or_directory_exist(pointer: str | Path | UPath) -> bool:
"""Checks if a file or directory exists for a given file pointer
Parameters
----------
pointer : str | Path | UPath
File Pointer to check if file or directory exists at
Returns
-------
bool
True if file or directory at `pointer` exists, False if not
"""
pointer = get_upath(pointer)
return pointer.exists()
[docs]
def is_regular_file(pointer: str | Path | UPath) -> bool:
"""Checks if a regular file (NOT a directory) exists for a given file pointer.
Parameters
----------
pointer : str | Path | UPath
File Pointer to check if a regular file
Returns
-------
bool
True if regular file at `pointer` exists, False if not or is a directory
"""
pointer = get_upath(pointer)
return pointer.is_file()
[docs]
def find_files_matching_path(pointer: str | Path | UPath, *paths: str) -> list[UPath]:
"""Find files or directories matching the provided path parts.
Parameters
----------
pointer : str | Path | UPath
base File Pointer in which to find contents
*paths: str
any number of directory names optionally followed by a file name.
directory or file names may be replaced with `*` as a matcher.
Returns
-------
list[UPath]
New file pointers to files found matching the path
"""
pointer = get_upath(pointer)
if len(paths) == 0:
return [pointer]
matcher = pointer.fs.sep.join(paths)
contents = []
for child in pointer.rglob(matcher):
contents.append(child)
if len(contents) == 0:
return []
contents.sort()
return contents
[docs]
def directory_has_contents(pointer: str | Path | UPath) -> bool:
"""Checks if a directory already has some contents (any files or subdirectories)
Parameters
----------
pointer : str | Path | UPath
File Pointer to check for existing contents
Returns
-------
bool
True if there are any files or subdirectories below this directory.
"""
pointer = get_upath(pointer)
return next(pointer.rglob("*"), None) is not None