"""
Methods for clustering localization data in LocData objects.
"""
from __future__ import annotations
import logging
import sys
from collections.abc import Sequence
from copy import copy
from typing import TYPE_CHECKING, Any
import numpy as np
import numpy.typing as npt
from sklearn.cluster import DBSCAN, HDBSCAN
from locan.configuration import N_JOBS
from locan.data.locdata import LocData
from locan.data.locdata_utils import _check_loc_properties
from locan.process.aggregate import Bins, _accumulate_2d
from locan.process.properties.locdata_statistics import ranges
if TYPE_CHECKING:
import boost_histogram as bh
__all__: list[str] = ["cluster_hdbscan", "cluster_dbscan", "cluster_by_bin"]
logger = logging.getLogger(__name__)
[docs]
def cluster_hdbscan(
locdata: LocData,
min_cluster_size: int = 5,
loc_properties: list[str] | None = None,
allow_single_cluster: bool = False,
**kwargs: Any,
) -> tuple[LocData, LocData]:
"""
Cluster localizations in locdata using the hdbscan clustering algorithm.
Parameters
----------
locdata
Localization data on which to perform the manipulation.
loc_properties
The LocData properties to be used for clustering.
If None, `locdata.coordinates` will be used.
min_cluster_size
Minimumm cluster size in HDBSCAN algorithm (default: 5)
allow_single_cluster
If True, return single cluster (default: False)
kwargs
Other parameters passed to `hdbscan.HDBSCAN`.
Returns
-------
tuple[LocData, LocData]
A tuple with noise and cluster.
The first LocData object is a selection of all localizations that are
defined as noise, in other words all localizations that are not part
of any cluster.
The second LocData object is a LocData instance assembling all
generated selections (i.e. localization cluster).
Note
----
In locdata 0.20.0 the original hdbscan implementation was replaced by the
scikit-learn implementation.
The new implementation should yield identical results if the min_samples
parameter is increased by one. See notes at
https://scikit-learn.org/stable/modules/generated/sklearn.cluster.HDBSCAN.html
"""
parameter = locals()
if len(locdata) == 0:
locdata_noise = LocData()
collection = LocData()
if len(locdata) < min_cluster_size:
locdata_noise = copy(locdata)
collection = LocData()
else:
if loc_properties is None:
fit_data = locdata.coordinates
else:
fit_data = locdata.data[loc_properties].to_numpy()
labels = HDBSCAN(
min_cluster_size=min_cluster_size,
allow_single_cluster=allow_single_cluster,
**kwargs,
).fit_predict(fit_data)
grouped = locdata.data.groupby(labels)
locdata_index_labels = [
locdata.data.index[idxs] for idxs in grouped.indices.values()
]
selections = [
LocData.from_selection(locdata=locdata, indices=idxs)
for idxs in locdata_index_labels
]
try:
grouped.get_group(-1)
locdata_noise = selections[0]
collection = LocData.from_collection(selections[1:])
except KeyError:
locdata_noise = LocData()
collection = LocData.from_collection(selections)
# set regions
if locdata_noise:
locdata_noise.region = locdata.region
if collection:
collection.region = locdata.region
# metadata
if locdata_noise:
del locdata_noise.meta.history[:]
locdata_noise.meta.history.add(
name=sys._getframe().f_code.co_name, parameter=str(parameter)
)
del collection.meta.history[:]
collection.meta.history.add(
name=sys._getframe().f_code.co_name, parameter=str(parameter)
)
return locdata_noise, collection
[docs]
def cluster_dbscan(
locdata: LocData,
eps: float = 20,
min_samples: int = 5,
loc_properties: list[str] | None = None,
**kwargs: Any,
) -> tuple[LocData, LocData]:
"""
Cluster localizations in locdata using the dbscan clustering algorithm as
implemented in sklearn.
Parameters
----------
locdata
specifying the localization data on which to perform the manipulation.
eps
The maximum distance between two samples for them to be considered as
in the same neighborhood.
min_samples
The number of samples in a neighborhood for a point to be considered
as a core point.
This includes the point itself.
loc_properties
The LocData properties to be used for clustering. If None,
`locdata.coordinates` will be used.
kwargs
Other parameters passed to `sklearn.cluster.DBSCAN`.
Returns
-------
tuple[LocData, LocData]
A tuple with noise and cluster.
The first LocData object is a selection of all localizations that are
defined as noise, in other words all localizations that are not part
of any cluster.
The second LocData object is a LocData instance assembling all
generated selections (i.e. localization cluster).
"""
parameter = locals()
if len(locdata) == 0:
locdata_noise = LocData()
collection = LocData()
else:
if loc_properties is None:
fit_data = locdata.coordinates
else:
fit_data = locdata.data[loc_properties].to_numpy()
labels = DBSCAN(
eps=eps, min_samples=min_samples, n_jobs=N_JOBS, **kwargs
).fit_predict(fit_data)
grouped = locdata.data.groupby(labels)
locdata_index_labels = [
locdata.data.index[idxs] for idxs in grouped.indices.values()
]
selections = [
LocData.from_selection(locdata=locdata, indices=idxs)
for idxs in locdata_index_labels
]
try:
grouped.get_group(-1)
locdata_noise = selections[0]
collection = LocData.from_collection(selections[1:])
except KeyError:
locdata_noise = LocData()
collection = LocData.from_collection(selections)
# set regions
if locdata_noise:
locdata_noise.region = locdata.region
if collection:
collection.region = locdata.region
# metadata
if locdata_noise:
del locdata_noise.meta.history[:]
locdata_noise.meta.history.add(
name=sys._getframe().f_code.co_name, parameter=str(parameter)
)
del collection.meta.history[:]
collection.meta.history.add(
name=sys._getframe().f_code.co_name, parameter=str(parameter)
)
return locdata_noise, collection
[docs]
def cluster_by_bin(
locdata: LocData,
loc_properties: list[str] | None = None,
min_samples: int = 1,
bins: Bins | bh.axis.Axis | bh.axis.AxesTuple | None = None,
n_bins: int | Sequence[int] | None = None,
bin_size: float | Sequence[float] | Sequence[Sequence[float]] | None = None,
bin_edges: Sequence[float] | Sequence[Sequence[float]] | None = None,
bin_range: (
tuple[float, float] | Sequence[float] | Sequence[Sequence[float]] | str | None
) = None,
return_counts: bool = False,
) -> tuple[Bins | None, npt.NDArray[np.int64], LocData, npt.NDArray[np.int64] | None]:
"""
Cluster localizations in locdata by binning all localizations with regard
to `loc_properties` and collecting all localizations per bin as cluster.
Parameters
----------
locdata
Localization data.
loc_properties
Localization properties to be grouped into bins.
If None The coordinate_values of locdata are used.
min_samples
The minimum number of samples per bin to be considered as cluster.
bins
The bin specification as defined in :class:`Bins`
bin_edges
Array of bin edges with shape (n_bin_edges,)
or (dimension, n_bin_edges) for all or each dimension.
n_bins
The number of bins for all or each dimension.
5 yields 5 bins in all dimensions.
(2, 5) yields 2 bins for one dimension and 5 for the other dimension.
bin_size
The size of bins in units of locdata coordinate units for all or each
dimension.
5 would describe bin_size of 5 for all bins in all dimensions.
(2, 5) yields bins of size 2 for one dimension and 5 for the other
dimension.
To specify arbitrary sequence of `bin_sizes` use `bin_edges` instead.
bin_range : tuple[float, ...] | tuple[tuple[float, float], ...] | Literal['zero'] | None
The data bin_range to be taken into consideration for all or each
dimension.
((min_x, max_x), (min_y, max_y), ...) bin_range for each coordinate;
for None (min, max) bin_range are determined from data;
for 'zero' (0, max) bin_range with max determined from data.
return_counts
If true, n_elements per bin are returned.
Returns
-------
tuple[Bins | None, npt.NDArray[np.int64], LocData, npt.NDArray[np.int64] | None]
Tuple with bins, bin_indices,
collection of all generated selections (i.e. localization clusters),
and counts per bin.
"""
parameter = locals()
if len(locdata) == 0:
bins = None
bin_indices = np.array([])
collection = LocData()
counts = np.array([]) if return_counts else None
return bins, bin_indices, collection, counts
loc_properties = _check_loc_properties(locdata, loc_properties)
data = locdata.data[loc_properties].values
if bins is not None:
bins = Bins(bins=bins)
if any([item is not None for item in [n_bins, bin_size, bin_edges, bin_range]]):
logger.warning("bins are used - all other bin specifications are ignored.")
else:
if (bin_range is None or isinstance(bin_range, str)) and bin_edges is None:
bin_range_ = ranges(locdata, loc_properties=loc_properties, special=bin_range) # type: ignore
else:
bin_range_ = bin_range # type: ignore
try:
bins = Bins(
bins=bins,
n_bins=n_bins,
bin_size=bin_size,
bin_edges=bin_edges,
bin_range=bin_range_, # type: ignore
labels=loc_properties,
)
except ValueError as exc:
raise ValueError(
"Bin dimension and len of `loc_properties` is incompatible."
) from exc
if bins.dimension == 2:
bin_indices, data_indices, _, counts = _accumulate_2d(
data, bin_edges=bins.bin_edges, return_counts=True
)
assert counts is not None # type narrowing # noqa: S101
else:
raise NotImplementedError("Only implemented for dimension 2.")
if min_samples > 1:
mask = counts >= min_samples
counts = counts[mask]
bin_indices = bin_indices[mask]
data_indices = [
data_indices_e
for data_indices_e, mask_e in zip(data_indices, mask)
if mask_e
]
selections = [
LocData.from_selection(locdata=locdata, indices=idxs) for idxs in data_indices
]
collection = LocData.from_collection(selections)
# set regions
if collection:
collection.region = locdata.region
# metadata
del collection.meta.history[:]
collection.meta.history.add(
name=sys._getframe().f_code.co_name, parameter=str(parameter)
)
if not return_counts:
counts = None
return bins, bin_indices, collection, counts