Source code for hats.catalog.partition_info
"""Container class to hold per-partition metadata"""
from __future__ import annotations
import warnings
from pathlib import Path
import numpy as np
import pandas as pd
from upath import UPath
import hats.pixel_math.healpix_shim as hp
from hats.io import file_io, paths
from hats.pixel_math.healpix_pixel import INVALID_PIXEL, HealpixPixel
[docs]
class PartitionInfo:
"""Container class for per-partition info."""
def __init__(self, pixel_list: list[HealpixPixel], catalog_base_dir: str = None) -> None:
[docs]
self.pixel_list = pixel_list
[docs]
self.catalog_base_dir = catalog_base_dir
[docs]
def get_healpix_pixels(self) -> list[HealpixPixel]:
"""Get healpix pixel objects for all pixels represented as partitions.
Returns
-------
list[HealpixPixel]
List of HealpixPixel
"""
return self.pixel_list
[docs]
def get_highest_order(self) -> int:
"""Get the highest healpix order for the dataset.
Returns
-------
int
int representing highest order.
"""
max_pixel = np.max(self.pixel_list)
return max_pixel.order
[docs]
def write_to_file(
self,
partition_info_file: str | Path | UPath | None = None,
catalog_path: str | Path | UPath | None = None,
):
"""Write all partition data to CSV file.
If no paths are provided, the catalog base directory from the `read_from_dir` call is used.
Parameters
----------
partition_info_file : str | Path | UPath | None
path to where the `partition_info.csv` file will be written.
catalog_path : str | Path | UPath | None
base directory for a catalog where the `partition_info.csv` file will be written.
Raises
------
ValueError
if no path is provided, and could not be inferred.
"""
if partition_info_file is None:
if catalog_path is not None:
partition_info_file = paths.get_partition_info_pointer(catalog_path)
elif self.catalog_base_dir is not None:
partition_info_file = paths.get_partition_info_pointer(self.catalog_base_dir)
else:
raise ValueError("partition_info_file is required if info was not loaded from a directory")
file_io.write_dataframe_to_csv(self.as_dataframe(), partition_info_file, index=False)
@classmethod
[docs]
def read_from_dir(
cls, catalog_base_dir: str | Path | UPath | None, compute_from_catalog: bool = False
) -> PartitionInfo:
"""Read partition info from a file within a hats directory.
This will look for a `partition_info.csv` file, and if not found, will look for
a `_metadata` file. The second approach is typically slower for large catalogs
therefore a warning is issued to the user. In internal testing with large catalogs,
the first approach takes less than a second, while the second can take 10-20 seconds.
If neither file is found, and `compute_from_catalog` is set to True, the partition info
will be computed from the individual catalog files. This is the slowest approach, and
a warning is issued to the user. In internal testing with large catalogs, this
approach can take (??) time.
Parameters
----------
catalog_base_dir : str | Path | UPath | None
Path to the root directory of the catalog
compute_from_catalog : bool
Whether to compute partition info from catalog files if no metadata or partition info
file is found.
Returns
-------
PartitionInfo
A `PartitionInfo` object with the data from the file
Raises
------
FileNotFoundError
if neither desired file is found in the catalog_base_dir
"""
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
partition_info_file = paths.get_partition_info_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(partition_info_file):
pixel_list = PartitionInfo._read_from_csv(partition_info_file)
elif file_io.does_file_or_directory_exist(metadata_file):
warnings.warn("Reading partitions from parquet metadata. This is typically slow.")
pixel_list = PartitionInfo._read_from_metadata_file(metadata_file)
elif compute_from_catalog:
warnings.warn("Computing partitions from catalog parquet files. This may be slow.")
dataset_dir = paths.dataset_directory(catalog_base_dir)
pixel_list = []
# Recursively walk the dataset directory to find all parquet files.
for file_path in dataset_dir.rglob("*.parquet"):
if file_path.suffix == ".parquet":
pixel = paths.get_healpix_from_path(str(file_path))
if pixel != INVALID_PIXEL:
pixel_list.append(pixel)
# Remove duplicates and sort by pixel.
pixel_list = sorted(set(pixel_list))
else:
raise FileNotFoundError(
f"_metadata or partition info file is required in catalog directory {catalog_base_dir}"
)
return cls(pixel_list, catalog_base_dir)
@classmethod
[docs]
def read_from_file(cls, metadata_file: str | Path | UPath) -> PartitionInfo:
"""Read partition info from a `_metadata` file to create an object
Parameters
----------
metadata_file : str | Path | UPath
path to the `_metadata` file
Returns
-------
PartitionInfo
A `PartitionInfo` object with the data from the file
"""
return cls(cls._read_from_metadata_file(metadata_file))
@classmethod
def _read_from_metadata_file(cls, metadata_file: str | Path | UPath) -> list[HealpixPixel]:
"""Read partition info list from a `_metadata` file.
Parameters
----------
metadata_file : str | Path | UPath
path to the `_metadata` file
Returns
-------
list[HealpixPixel]
The list of `HealpixPixel` extracted from the data in the metadata file
"""
total_metadata = file_io.read_parquet_metadata(metadata_file)
pixel_list = [
paths.get_healpix_from_path(total_metadata.row_group(index).column(0).file_path)
for index in range(0, total_metadata.num_row_groups)
]
pixel_list = [p for p in pixel_list if p != INVALID_PIXEL]
## Remove duplicates, preserving order.
return list(dict.fromkeys(pixel_list))
@classmethod
[docs]
def read_from_csv(cls, partition_info_file: str | Path | UPath) -> PartitionInfo:
"""Read partition info from a `partition_info.csv` file to create an object
Parameters
----------
partition_info_file : str | Path | UPath
path to the `partition_info.csv` file
Returns
-------
PartitionInfo
A `PartitionInfo` object with the data from the file
"""
return cls(cls._read_from_csv(partition_info_file))
@classmethod
def _read_from_csv(cls, partition_info_file: str | Path | UPath) -> PartitionInfo:
"""Read partition info from a `partition_info.csv` file to create an object
Parameters
----------
partition_info_file : str | Path | UPath
path to the `partition_info.csv` file
Returns
-------
PartitionInfo
A `PartitionInfo` object with the data from the file
"""
if not file_io.does_file_or_directory_exist(partition_info_file):
raise FileNotFoundError(f"No partition info found where expected: {str(partition_info_file)}")
data_frame = file_io.load_csv_to_pandas(partition_info_file)
return [
HealpixPixel(order, pixel)
for order, pixel in zip(
data_frame[cls.METADATA_ORDER_COLUMN_NAME],
data_frame[cls.METADATA_PIXEL_COLUMN_NAME],
)
]
[docs]
def as_dataframe(self):
"""Construct a pandas dataframe for the partition info pixels.
Returns
-------
pd.DataFrame
Pandas Dataframe with order, directory, and pixel info.
"""
partition_info_dict = {
PartitionInfo.METADATA_ORDER_COLUMN_NAME: [],
PartitionInfo.METADATA_PIXEL_COLUMN_NAME: [],
}
for pixel in self.pixel_list:
partition_info_dict[PartitionInfo.METADATA_ORDER_COLUMN_NAME].append(pixel.order)
partition_info_dict[PartitionInfo.METADATA_PIXEL_COLUMN_NAME].append(pixel.pixel)
return pd.DataFrame.from_dict(partition_info_dict)
@classmethod
[docs]
def from_healpix(cls, healpix_pixels: list[HealpixPixel]) -> PartitionInfo:
"""Create a partition info object from a list of constituent healpix pixels.
Parameters
----------
healpix_pixels: list[HealpixPixel]
a list of constituent healpix pixels
Returns
-------
PartitionInfo
A `PartitionInfo` object with the same healpix pixels
"""
return cls(healpix_pixels)
[docs]
def calculate_fractional_coverage(self):
"""Calculate what fraction of the sky is covered by partition tiles."""
pixel_orders = [p.order for p in self.pixel_list]
cov_order, cov_count = np.unique(pixel_orders, return_counts=True)
area_by_order = [hp.order2pixarea(order, degrees=True) for order in cov_order]
# 41253 is the number of square degrees in a sphere
# https://en.wikipedia.org/wiki/Square_degree
return (area_by_order * cov_count).sum() / (360**2 / np.pi)