Source code for hats.loaders.read_hats

from __future__ import annotations

import warnings
from pathlib import Path

import numpy as np
import pyarrow as pa
from mocpy import MOC
from upath import UPath

import hats.pixel_math.healpix_shim as hp
from hats.catalog import AssociationCatalog, Catalog, CatalogType, Dataset, MapCatalog, MarginCatalog
from hats.catalog.catalog_collection import CatalogCollection
from hats.catalog.dataset.collection_properties import CollectionProperties
from hats.catalog.dataset.table_properties import TableProperties
from hats.catalog.index.index_catalog import IndexCatalog
from hats.catalog.partition_info import PartitionInfo
from hats.io import file_io, paths
from hats.io.file_io import read_parquet_metadata

[docs] DATASET_TYPE_TO_CLASS = { CatalogType.OBJECT: Catalog, CatalogType.SOURCE: Catalog, CatalogType.ASSOCIATION: AssociationCatalog, CatalogType.INDEX: IndexCatalog, CatalogType.MARGIN: MarginCatalog, CatalogType.MAP: MapCatalog, }
[docs] def read_hats(catalog_path: str | Path | UPath) -> CatalogCollection | Dataset: """Reads a HATS Catalog from a HATS directory Parameters ---------- catalog_path : str | Path | UPath path to the root directory of the catalog Returns ------- CatalogCollection | Dataset HATS catalog found at directory Examples -------- To read a catalog from a public S3 bucket, call it as follows:: from upath import UPath catalog = hats.read_hats(UPath(..., anon=True)) """ path = file_io.get_upath(catalog_path) if (path / "hats.properties").exists() or (path / "properties").exists(): return _load_catalog(path) if (path / "collection.properties").exists(): return _load_collection(path) raise FileNotFoundError(f"Failed to read HATS at location {catalog_path}")
def _load_collection(collection_path: UPath) -> CatalogCollection: collection_properties = CollectionProperties.read_from_dir(collection_path) main_catalog = _load_catalog(collection_path / collection_properties.hats_primary_table_url) return CatalogCollection(collection_path, collection_properties, main_catalog) def _load_catalog(catalog_path: UPath) -> Dataset: properties = TableProperties.read_from_dir(catalog_path) dataset_type = properties.catalog_type if dataset_type not in DATASET_TYPE_TO_CLASS: raise NotImplementedError(f"Cannot load catalog of type {dataset_type}") loader = DATASET_TYPE_TO_CLASS[dataset_type] schema = _read_schema_from_metadata(catalog_path) kwargs = { "catalog_path": catalog_path, "catalog_info": properties, "schema": schema, "original_schema": schema, } if _is_healpix_dataset(dataset_type): kwargs["pixels"] = PartitionInfo.read_from_dir(catalog_path) kwargs["moc"] = _read_moc_from_point_map(catalog_path) return loader(**kwargs) def _is_healpix_dataset(dataset_type): return dataset_type in ( CatalogType.OBJECT, CatalogType.SOURCE, CatalogType.ASSOCIATION, CatalogType.MARGIN, CatalogType.MAP, ) def _read_moc_from_point_map(catalog_base_dir: str | Path | UPath) -> MOC | None: """Reads a MOC object from the `point_map.fits` file if it exists in the catalog directory""" point_map_path = paths.get_point_map_file_pointer(catalog_base_dir) if not file_io.does_file_or_directory_exist(point_map_path): return None fits_image = file_io.read_fits_image(point_map_path) order = hp.npix2order(len(fits_image)) boolean_skymap = fits_image.astype(bool) ipix = np.where(boolean_skymap)[0] orders = np.full(ipix.shape, order) return MOC.from_healpix_cells(ipix, orders, order) def _read_schema_from_metadata(catalog_base_dir: str | Path | UPath) -> pa.Schema | None: """Reads the schema information stored in the _common_metadata or _metadata files.""" common_metadata_file = paths.get_common_metadata_pointer(catalog_base_dir) common_metadata_exists = file_io.does_file_or_directory_exist(common_metadata_file) metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir) metadata_exists = file_io.does_file_or_directory_exist(metadata_file) if not (common_metadata_exists or metadata_exists): warnings.warn( "_common_metadata or _metadata files not found for this catalog." "The arrow schema will not be set." ) return None schema_file = common_metadata_file if common_metadata_exists else metadata_file metadata = read_parquet_metadata(schema_file) return metadata.schema.to_arrow_schema()