Skip to content

API Reference

Aggregation

groupoid.aggregation.TransportGroupoidAggregator dataclass

Federated aggregator using groupoid transport and Riemannian geometry.

Given a network of clients with local model parameters on a Riemannian manifold, this aggregator: - Uses parallel transport (groupoid morphisms) to align parameters - Checks cohomological consistency before aggregation - Computes intrinsic Karcher mean on the parameter manifold - Distributes the result back via inverse transport

Parameters

manifold A geomstats manifold instance for the parameter space. graph Directed graph of the client network. base_node The node to which all parameters are transported for aggregation. consistency_threshold Maximum H^1 norm before raising a consistency warning.

Source code in groupoid/aggregation.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
@dataclass
class TransportGroupoidAggregator:
    """Federated aggregator using groupoid transport and Riemannian geometry.

    Given a network of clients with local model parameters on a Riemannian
    manifold, this aggregator:
    - Uses parallel transport (groupoid morphisms) to align parameters
    - Checks cohomological consistency before aggregation
    - Computes intrinsic Karcher mean on the parameter manifold
    - Distributes the result back via inverse transport

    Parameters
    ----------
    manifold
        A geomstats manifold instance for the parameter space.
    graph
        Directed graph of the client network.
    base_node
        The node to which all parameters are transported for aggregation.
    consistency_threshold
        Maximum H^1 norm before raising a consistency warning.
    """

    manifold: object
    graph: nx.DiGraph
    base_node: str
    consistency_threshold: float = 1e-6
    morphisms: dict[tuple[str, str], Morphism] = field(default_factory=dict)
    _round_idx: int = field(default=0, init=False)

    def register_transport(self, source: str, target: str, matrix: np.ndarray) -> None:
        """Register a transport map between two clients."""
        self.morphisms[(source, target)] = Morphism(
            source=source,
            target=target,
            transport_map=matrix,
        )
        logger.debug("Registered transport {} -> {}", source, target)

    def _get_transport_to_base(self, node: str) -> np.ndarray | None:
        """Compute the composite transport map from node to base_node.

        Uses shortest path in the graph and composes morphisms along it.
        Returns None if node == base_node (identity transport).
        """
        if node == self.base_node:
            return None

        try:
            path = nx.shortest_path(self.graph.to_undirected(), node, self.base_node)
        except nx.NetworkXNoPath as e:
            raise DisconnectedClientGraphError(
                f"client graph is disconnected: no transport path from "
                f"{node} to base {self.base_node}"
            ) from e
        composite: Morphism | None = None

        for i in range(len(path) - 1):
            src, tgt = path[i], path[i + 1]
            if (src, tgt) in self.morphisms:
                m = self.morphisms[(src, tgt)]
            elif (tgt, src) in self.morphisms:
                m = inverse(self.morphisms[(tgt, src)])
            else:
                raise ValueError(f"No transport map for edge ({src}, {tgt})")

            composite = m if composite is None else compose(composite, m)

        return composite.transport_map if composite is not None else None

    def check_consistency(self, client_params: dict[str, np.ndarray]) -> float:
        """Check cohomological consistency of current transport maps.

        Returns the H^1 norm. A value near zero indicates that the
        local models can be consistently aggregated.
        """
        transport_maps = {(m.source, m.target): m.transport_map for m in self.morphisms.values()}
        h1 = compute_h1(self.graph, transport_maps)
        logger.info("Cohomological consistency check: H^1 = {:.2e}", h1)
        return h1

    def aggregate(
        self,
        client_params: dict[str, np.ndarray],
        weights: dict[str, float] | None = None,
    ) -> FederatedRound:
        """Run one round of groupoid-aware federated aggregation.

        Parameters
        ----------
        client_params
            Map from client node ID to local model parameters.
        weights
            Optional per-client weights for the Karcher mean.

        Returns
        -------
        FederatedRound
            The aggregation result including global params, consistency
            metrics, and per-client local updates.
        """
        self._round_idx += 1
        logger.info("Starting aggregation round {}", self._round_idx)

        h1 = self.check_consistency(client_params)
        is_consistent = h1 < self.consistency_threshold

        if not is_consistent:
            logger.warning(
                "H^1 = {:.2e} exceeds threshold {:.2e}, "
                "aggregation may produce inconsistent results",
                h1,
                self.consistency_threshold,
            )

        # Transport all client params to base node frame
        transported = {}
        transport_residuals = {}
        for node, params in client_params.items():
            if node == self.base_node:
                transported[node] = params
                transport_residuals[node] = 0.0
            else:
                # _get_transport_to_base returns None only for node == base_node
                # (excluded by this else); a disconnected graph raises
                # NetworkXNoPath. This guard is therefore defensively
                # unreachable.
                T = self._get_transport_to_base(node)
                if T is None:  # pragma: no cover - unreachable defensive guard (see above)
                    raise ValueError(f"No transport path from {node} to {self.base_node}")
                transported_params = T @ params
                transported[node] = transported_params
                transport_residuals[node] = float(
                    np.linalg.norm(T @ T.T - np.eye(T.shape[0]), "fro")
                )

        # Stack transported params and compute Karcher mean
        nodes = sorted(transported.keys())
        param_stack = np.stack([transported[n] for n in nodes])

        if weights is not None:
            w = np.array([weights.get(n, 1.0) for n in nodes])
            w = w / w.sum()
        else:
            w = None

        global_params = karcher_mean(self.manifold, param_stack, weights=w)

        # Transport global params back to each client's local frame
        local_updates = {}
        for node in client_params:
            if node == self.base_node:
                local_updates[node] = global_params
            else:
                # Same defensively-unreachable guard as in the forward
                # transport loop above.
                T = self._get_transport_to_base(node)
                if T is None:  # pragma: no cover - unreachable defensive guard (see above)
                    raise ValueError(f"No transport path from {node} to {self.base_node}")
                T_inv = np.linalg.inv(T)
                local_updates[node] = T_inv @ global_params

        result = FederatedRound(
            global_params=global_params,
            local_updates=local_updates,
            h1_norm=h1,
            is_consistent=is_consistent,
            transport_residuals=transport_residuals,
            round_idx=self._round_idx,
        )

        logger.info(
            "Round {} complete: H^1={:.2e}, consistent={}",
            self._round_idx,
            h1,
            is_consistent,
        )
        return result

aggregate(client_params, weights=None)

Run one round of groupoid-aware federated aggregation.

Parameters

client_params Map from client node ID to local model parameters. weights Optional per-client weights for the Karcher mean.

Returns

FederatedRound The aggregation result including global params, consistency metrics, and per-client local updates.

Source code in groupoid/aggregation.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def aggregate(
    self,
    client_params: dict[str, np.ndarray],
    weights: dict[str, float] | None = None,
) -> FederatedRound:
    """Run one round of groupoid-aware federated aggregation.

    Parameters
    ----------
    client_params
        Map from client node ID to local model parameters.
    weights
        Optional per-client weights for the Karcher mean.

    Returns
    -------
    FederatedRound
        The aggregation result including global params, consistency
        metrics, and per-client local updates.
    """
    self._round_idx += 1
    logger.info("Starting aggregation round {}", self._round_idx)

    h1 = self.check_consistency(client_params)
    is_consistent = h1 < self.consistency_threshold

    if not is_consistent:
        logger.warning(
            "H^1 = {:.2e} exceeds threshold {:.2e}, "
            "aggregation may produce inconsistent results",
            h1,
            self.consistency_threshold,
        )

    # Transport all client params to base node frame
    transported = {}
    transport_residuals = {}
    for node, params in client_params.items():
        if node == self.base_node:
            transported[node] = params
            transport_residuals[node] = 0.0
        else:
            # _get_transport_to_base returns None only for node == base_node
            # (excluded by this else); a disconnected graph raises
            # NetworkXNoPath. This guard is therefore defensively
            # unreachable.
            T = self._get_transport_to_base(node)
            if T is None:  # pragma: no cover - unreachable defensive guard (see above)
                raise ValueError(f"No transport path from {node} to {self.base_node}")
            transported_params = T @ params
            transported[node] = transported_params
            transport_residuals[node] = float(
                np.linalg.norm(T @ T.T - np.eye(T.shape[0]), "fro")
            )

    # Stack transported params and compute Karcher mean
    nodes = sorted(transported.keys())
    param_stack = np.stack([transported[n] for n in nodes])

    if weights is not None:
        w = np.array([weights.get(n, 1.0) for n in nodes])
        w = w / w.sum()
    else:
        w = None

    global_params = karcher_mean(self.manifold, param_stack, weights=w)

    # Transport global params back to each client's local frame
    local_updates = {}
    for node in client_params:
        if node == self.base_node:
            local_updates[node] = global_params
        else:
            # Same defensively-unreachable guard as in the forward
            # transport loop above.
            T = self._get_transport_to_base(node)
            if T is None:  # pragma: no cover - unreachable defensive guard (see above)
                raise ValueError(f"No transport path from {node} to {self.base_node}")
            T_inv = np.linalg.inv(T)
            local_updates[node] = T_inv @ global_params

    result = FederatedRound(
        global_params=global_params,
        local_updates=local_updates,
        h1_norm=h1,
        is_consistent=is_consistent,
        transport_residuals=transport_residuals,
        round_idx=self._round_idx,
    )

    logger.info(
        "Round {} complete: H^1={:.2e}, consistent={}",
        self._round_idx,
        h1,
        is_consistent,
    )
    return result

check_consistency(client_params)

Check cohomological consistency of current transport maps.

Returns the H^1 norm. A value near zero indicates that the local models can be consistently aggregated.

Source code in groupoid/aggregation.py
124
125
126
127
128
129
130
131
132
133
def check_consistency(self, client_params: dict[str, np.ndarray]) -> float:
    """Check cohomological consistency of current transport maps.

    Returns the H^1 norm. A value near zero indicates that the
    local models can be consistently aggregated.
    """
    transport_maps = {(m.source, m.target): m.transport_map for m in self.morphisms.values()}
    h1 = compute_h1(self.graph, transport_maps)
    logger.info("Cohomological consistency check: H^1 = {:.2e}", h1)
    return h1

register_transport(source, target, matrix)

Register a transport map between two clients.

Source code in groupoid/aggregation.py
84
85
86
87
88
89
90
91
def register_transport(self, source: str, target: str, matrix: np.ndarray) -> None:
    """Register a transport map between two clients."""
    self.morphisms[(source, target)] = Morphism(
        source=source,
        target=target,
        transport_map=matrix,
    )
    logger.debug("Registered transport {} -> {}", source, target)

groupoid.aggregation.FederatedRound dataclass

Result of a single federated aggregation round.

Source code in groupoid/aggregation.py
42
43
44
45
46
47
48
49
50
51
@dataclass
class FederatedRound:
    """Result of a single federated aggregation round."""

    global_params: np.ndarray
    local_updates: dict[str, np.ndarray]
    h1_norm: float
    is_consistent: bool
    transport_residuals: dict[str, float]
    round_idx: int = 0

Manifold Operations

groupoid.manifold.karcher_mean(manifold, points, weights=None, max_iter=100, tol=1e-06)

Compute the Karcher (Frechet) mean on a Riemannian manifold.

Parameters

manifold : geomstats manifold A geomstats manifold instance with a metric. points : np.ndarray Array of shape (n_points, *point_shape) on the manifold. weights : np.ndarray or None Optional weights for the mean computation. max_iter : int Maximum iterations for gradient descent. tol : float Convergence tolerance.

Returns

np.ndarray The Karcher mean point on the manifold.

Source code in groupoid/manifold.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def karcher_mean(
    manifold: LevelSet,
    points: np.ndarray,
    weights: np.ndarray | None = None,
    max_iter: int = 100,
    tol: float = 1e-6,
) -> np.ndarray:
    """Compute the Karcher (Frechet) mean on a Riemannian manifold.

    Parameters
    ----------
    manifold : geomstats manifold
        A geomstats manifold instance with a metric.
    points : np.ndarray
        Array of shape (n_points, *point_shape) on the manifold.
    weights : np.ndarray or None
        Optional weights for the mean computation.
    max_iter : int
        Maximum iterations for gradient descent.
    tol : float
        Convergence tolerance.

    Returns
    -------
    np.ndarray
        The Karcher mean point on the manifold.
    """
    from geomstats.learning.frechet_mean import FrechetMean

    n_points = points.shape[0]
    logger.debug(
        "Computing Karcher mean of {} points on {}",
        n_points,
        type(manifold).__name__,
    )

    estimator = FrechetMean(manifold)

    # Forward convergence controls to the underlying gradient-descent
    # optimizer when the installed geomstats version exposes it. Older
    # versions without an `optimizer` attribute fall back to their defaults.
    optimizer = getattr(estimator, "optimizer", None)
    if optimizer is not None:
        if hasattr(optimizer, "max_iter"):
            optimizer.max_iter = max_iter
        if hasattr(optimizer, "epsilon"):
            optimizer.epsilon = tol

    if weights is not None:
        estimator.fit(points, weights=weights)
    else:
        estimator.fit(points)

    mean: np.ndarray = estimator.estimate_
    logger.debug("Karcher mean converged")
    return mean

Groupoid

groupoid.groupoid.Morphism

Bases: BaseModel

A morphism in the transport groupoid.

Represents a transport map between two nodes (clients) in the federated learning network.

Source code in groupoid/groupoid.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Morphism(BaseModel):
    """A morphism in the transport groupoid.

    Represents a transport map between two nodes (clients) in the
    federated learning network.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    source: str
    target: str
    transport_map: np.ndarray

    def __repr__(self) -> str:
        return f"Morphism({self.source} -> {self.target})"

groupoid.groupoid.compose(f, g)

Compose two morphisms f and g (f followed by g).

Requires f.target == g.source (category composition law).

Parameters

f : Morphism First morphism (applied first). g : Morphism Second morphism (applied second).

Returns

Morphism The composed morphism from f.source to g.target.

Raises

CompositionError If f.target != g.source.

Source code in groupoid/groupoid.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def compose(f: Morphism, g: Morphism) -> Morphism:
    """Compose two morphisms f and g (f followed by g).

    Requires f.target == g.source (category composition law).

    Parameters
    ----------
    f : Morphism
        First morphism (applied first).
    g : Morphism
        Second morphism (applied second).

    Returns
    -------
    Morphism
        The composed morphism from f.source to g.target.

    Raises
    ------
    CompositionError
        If f.target != g.source.
    """
    if f.target != g.source:
        raise CompositionError(f"Cannot compose: {f.target} != {g.source}")

    logger.debug("Composing {} with {}", f, g)
    composed_map = g.transport_map @ f.transport_map
    return Morphism(
        source=f.source,
        target=g.target,
        transport_map=composed_map,
    )

groupoid.groupoid.inverse(f)

Compute the inverse of a morphism.

Parameters

f : Morphism The morphism to invert.

Returns

Morphism The inverse morphism from f.target to f.source.

Source code in groupoid/groupoid.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def inverse(f: Morphism) -> Morphism:
    """Compute the inverse of a morphism.

    Parameters
    ----------
    f : Morphism
        The morphism to invert.

    Returns
    -------
    Morphism
        The inverse morphism from f.target to f.source.
    """
    logger.debug("Inverting {}", f)
    inv_map = np.linalg.inv(f.transport_map)
    return Morphism(
        source=f.target,
        target=f.source,
        transport_map=inv_map,
    )

Cohomology

groupoid.cohomology.compute_h1(graph, transport_maps)

Compute the H^1 cohomology norm of a transport cocycle.

Returns 0.0 (up to numerical tolerance) when the cocycle is a coboundary, meaning the local data is globally consistent.

The H^1 obstruction on a cycle is the deviation from the identity of the holonomy, i.e. the ordered product of the restriction/transport maps around the entire cycle. The cocycle must therefore be fully specified: every edge of every basis cycle needs a transport map in one direction or the other (the reverse direction is inverted). A missing edge map leaves the holonomy undefined, so this function raises :class:IncompleteCocycleError rather than silently forming a partial product.

Parameters

graph : nx.DiGraph The groupoid nerve (directed graph of nodes/clients). transport_maps : dict Maps (source, target) edge tuples to transport matrices.

Returns

float The cohomology norm. Zero indicates global consistency.

Raises

IncompleteCocycleError If any edge on a basis cycle has no transport map in either direction, so the cycle's holonomy is undefined.

Source code in groupoid/cohomology.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def compute_h1(
    graph: nx.DiGraph,
    transport_maps: dict[tuple[str, str], np.ndarray],
) -> float:
    """Compute the H^1 cohomology norm of a transport cocycle.

    Returns 0.0 (up to numerical tolerance) when the cocycle is a
    coboundary, meaning the local data is globally consistent.

    The H^1 obstruction on a cycle is the deviation from the identity of
    the holonomy, i.e. the ordered product of the restriction/transport
    maps around the *entire* cycle. The cocycle must therefore be fully
    specified: every edge of every basis cycle needs a transport map in
    one direction or the other (the reverse direction is inverted). A
    missing edge map leaves the holonomy undefined, so this function
    raises :class:`IncompleteCocycleError` rather than silently forming a
    partial product.

    Parameters
    ----------
    graph : nx.DiGraph
        The groupoid nerve (directed graph of nodes/clients).
    transport_maps : dict
        Maps (source, target) edge tuples to transport matrices.

    Returns
    -------
    float
        The cohomology norm. Zero indicates global consistency.

    Raises
    ------
    IncompleteCocycleError
        If any edge on a basis cycle has no transport map in either
        direction, so the cycle's holonomy is undefined.
    """
    undirected = graph.to_undirected()
    cycles = nx.cycle_basis(undirected)

    if not cycles:
        logger.debug("No cycles in graph, H^1 = 0 trivially")
        return 0.0

    max_holonomy_norm = 0.0

    for cycle in cycles:
        n = len(cycle)
        # Resolve every edge's transport map first, raising on the first
        # missing one so a partial product is never formed. The reverse
        # orientation is inverted. Every cycle-basis cycle has length >= 3, so
        # the resulting product is always fully formed.
        edge_maps = []
        for i in range(n):
            u = cycle[i]
            v = cycle[(i + 1) % n]

            if (u, v) in transport_maps:
                edge_maps.append(transport_maps[(u, v)])
            elif (v, u) in transport_maps:
                edge_maps.append(np.linalg.inv(transport_maps[(v, u)]))
            else:
                raise IncompleteCocycleError(
                    f"Incomplete cocycle: no transport map for edge ({u}, {v}) "
                    f"on cycle {cycle}; holonomy is undefined. Supply the edge "
                    f"map in either direction before computing H^1."
                )

        holonomy = edge_maps[0]
        for T in edge_maps[1:]:
            holonomy = T @ holonomy

        dim = holonomy.shape[0]
        deviation = float(np.linalg.norm(holonomy - np.eye(dim), ord="fro"))
        max_holonomy_norm = max(max_holonomy_norm, deviation)

    logger.debug("H^1 norm = {:.6e}", max_holonomy_norm)
    return max_holonomy_norm

Sheaf

groupoid.sheaf.Sheaf

A cellular sheaf on a graph.

Assigns vector spaces to nodes and linear restriction maps to edges.

Source code in groupoid/sheaf.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class Sheaf:
    """A cellular sheaf on a graph.

    Assigns vector spaces to nodes and linear restriction maps to edges.
    """

    def __init__(self, graph: nx.DiGraph) -> None:
        self.graph = graph
        self._restriction_maps: dict[tuple[str, str], np.ndarray] = {}
        self._sections: dict[str, np.ndarray] = {}

    def set_restriction_map(self, source: str, target: str, matrix: np.ndarray) -> None:
        """Set the restriction map for an edge."""
        self._restriction_maps[(source, target)] = matrix
        logger.debug("Set restriction map {} -> {}", source, target)

    def get_restriction_map(self, source: str, target: str) -> np.ndarray:
        """Get the restriction map for an edge."""
        return self._restriction_maps[(source, target)]

    def set_section(self, node: str, value: np.ndarray) -> None:
        """Set a section value at a node."""
        self._sections[node] = value

    def get_section(self, node: str) -> np.ndarray:
        """Get the section value at a node."""
        return self._sections[node]

    def restrict(self, section: np.ndarray, source: str, target: str) -> np.ndarray:
        """Apply the restriction map to a section.

        Parameters
        ----------
        section : np.ndarray
            The section value at the source node.
        source : str
            Source node identifier.
        target : str
            Target node identifier.

        Returns
        -------
        np.ndarray
            The restricted section at the target node.
        """
        R = self._restriction_maps[(source, target)]
        result: np.ndarray = R @ section
        return result

    def restrict_along_path(self, section: np.ndarray, path: list[str]) -> np.ndarray:
        """Restrict a section along a path of nodes.

        Parameters
        ----------
        section : np.ndarray
            The section value at path[0].
        path : list[str]
            Ordered list of nodes forming a path in the graph.

        Returns
        -------
        np.ndarray
            The section restricted to path[-1].
        """
        result = section
        for i in range(len(path) - 1):
            result = self.restrict(result, path[i], path[i + 1])
        return result

get_restriction_map(source, target)

Get the restriction map for an edge.

Source code in groupoid/sheaf.py
26
27
28
def get_restriction_map(self, source: str, target: str) -> np.ndarray:
    """Get the restriction map for an edge."""
    return self._restriction_maps[(source, target)]

get_section(node)

Get the section value at a node.

Source code in groupoid/sheaf.py
34
35
36
def get_section(self, node: str) -> np.ndarray:
    """Get the section value at a node."""
    return self._sections[node]

restrict(section, source, target)

Apply the restriction map to a section.

Parameters

section : np.ndarray The section value at the source node. source : str Source node identifier. target : str Target node identifier.

Returns

np.ndarray The restricted section at the target node.

Source code in groupoid/sheaf.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def restrict(self, section: np.ndarray, source: str, target: str) -> np.ndarray:
    """Apply the restriction map to a section.

    Parameters
    ----------
    section : np.ndarray
        The section value at the source node.
    source : str
        Source node identifier.
    target : str
        Target node identifier.

    Returns
    -------
    np.ndarray
        The restricted section at the target node.
    """
    R = self._restriction_maps[(source, target)]
    result: np.ndarray = R @ section
    return result

restrict_along_path(section, path)

Restrict a section along a path of nodes.

Parameters

section : np.ndarray The section value at path[0]. path : list[str] Ordered list of nodes forming a path in the graph.

Returns

np.ndarray The section restricted to path[-1].

Source code in groupoid/sheaf.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def restrict_along_path(self, section: np.ndarray, path: list[str]) -> np.ndarray:
    """Restrict a section along a path of nodes.

    Parameters
    ----------
    section : np.ndarray
        The section value at path[0].
    path : list[str]
        Ordered list of nodes forming a path in the graph.

    Returns
    -------
    np.ndarray
        The section restricted to path[-1].
    """
    result = section
    for i in range(len(path) - 1):
        result = self.restrict(result, path[i], path[i + 1])
    return result

set_restriction_map(source, target, matrix)

Set the restriction map for an edge.

Source code in groupoid/sheaf.py
21
22
23
24
def set_restriction_map(self, source: str, target: str, matrix: np.ndarray) -> None:
    """Set the restriction map for an edge."""
    self._restriction_maps[(source, target)] = matrix
    logger.debug("Set restriction map {} -> {}", source, target)

set_section(node, value)

Set a section value at a node.

Source code in groupoid/sheaf.py
30
31
32
def set_section(self, node: str, value: np.ndarray) -> None:
    """Set a section value at a node."""
    self._sections[node] = value

Sheaf Laplacian

groupoid.laplacian.build_sheaf_laplacian(sheaf, stalk_dim)

Build the sheaf Laplacian matrix.

For a sheaf F on graph G with n nodes and stalk dimension d, the sheaf Laplacian is a (nd) x (nd) block matrix defined as:

L_F = delta^T @ delta

where delta is the connection coboundary. For each edge (u, v) with restriction (transport) map R = R_{uv}: stalk(u) -> stalk(v), the coboundary acts as (delta x){(u,v)} = x_v - R{uv} x_u, so L = delta^T @ delta has blocks (summed over incident edges):

L[u,u] += R_{uv}^T @ R_{uv}    (source diagonal)
L[v,v] += I                    (target diagonal)
L[u,v] += -R_{uv}^T            (off-diagonal)
L[v,u] += -R_{uv}              (off-diagonal)

L is symmetric positive semi-definite for ANY restriction maps (it is delta^T delta); its kernel is the space of transport-consistent global sections (x_v = R_{uv} x_u on every edge).

Parameters

sheaf A Sheaf instance with restriction maps set. stalk_dim Dimension of each stalk (vector space at each node).

Returns

np.ndarray The sheaf Laplacian matrix of shape (nd, nd).

Source code in groupoid/laplacian.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def build_sheaf_laplacian(sheaf: Sheaf, stalk_dim: int) -> np.ndarray:
    """Build the sheaf Laplacian matrix.

    For a sheaf F on graph G with n nodes and stalk dimension d,
    the sheaf Laplacian is a (n*d) x (n*d) block matrix defined as:

        L_F = delta^T @ delta

    where delta is the connection coboundary. For each edge (u, v) with
    restriction (transport) map R = R_{uv}: stalk(u) -> stalk(v), the
    coboundary acts as (delta x)_{(u,v)} = x_v - R_{uv} x_u, so
    L = delta^T @ delta has blocks (summed over incident edges):

        L[u,u] += R_{uv}^T @ R_{uv}    (source diagonal)
        L[v,v] += I                    (target diagonal)
        L[u,v] += -R_{uv}^T            (off-diagonal)
        L[v,u] += -R_{uv}              (off-diagonal)

    L is symmetric positive semi-definite for ANY restriction maps (it is
    delta^T delta); its kernel is the space of transport-consistent global
    sections (x_v = R_{uv} x_u on every edge).

    Parameters
    ----------
    sheaf
        A Sheaf instance with restriction maps set.
    stalk_dim
        Dimension of each stalk (vector space at each node).

    Returns
    -------
    np.ndarray
        The sheaf Laplacian matrix of shape (n*d, n*d).
    """
    nodes = sorted(sheaf.graph.nodes())
    n = len(nodes)
    node_idx = {node: i for i, node in enumerate(nodes)}
    N = n * stalk_dim

    L = np.zeros((N, N))

    for u, v in sheaf.graph.edges():
        i, j = node_idx[u], node_idx[v]
        R = sheaf.get_restriction_map(u, v)
        i_slice = slice(i * stalk_dim, (i + 1) * stalk_dim)
        j_slice = slice(j * stalk_dim, (j + 1) * stalk_dim)

        # L = delta^T delta for coboundary (delta x)_(u,v) = x_v - R_uv x_u:
        L[i_slice, i_slice] += R.T @ R  # source diagonal: R^T R
        L[j_slice, j_slice] += np.eye(stalk_dim)  # target diagonal: I
        L[i_slice, j_slice] += -R.T  # off-diagonal: -R^T
        L[j_slice, i_slice] += -R  # off-diagonal: -R

    logger.debug("Built sheaf Laplacian: {}x{} ({} nodes, stalk_dim={})", N, N, n, stalk_dim)
    return L

groupoid.laplacian.spectral_analysis(sheaf, stalk_dim, tol=1e-10)

Compute spectral decomposition of the sheaf Laplacian.

Parameters

sheaf A Sheaf instance with restriction maps. stalk_dim Dimension of each stalk. tol Tolerance for identifying zero eigenvalues.

Returns

SpectralSummary Full spectral summary including connectivity and consensus rate.

Source code in groupoid/laplacian.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def spectral_analysis(
    sheaf: Sheaf,
    stalk_dim: int,
    tol: float = 1e-10,
) -> SpectralSummary:
    """Compute spectral decomposition of the sheaf Laplacian.

    Parameters
    ----------
    sheaf
        A Sheaf instance with restriction maps.
    stalk_dim
        Dimension of each stalk.
    tol
        Tolerance for identifying zero eigenvalues.

    Returns
    -------
    SpectralSummary
        Full spectral summary including connectivity and consensus rate.
    """
    L = build_sheaf_laplacian(sheaf, stalk_dim)
    eigenvalues, eigenvectors = np.linalg.eigh(L)

    # Sort by magnitude
    idx = np.argsort(eigenvalues)
    eigenvalues = eigenvalues[idx]
    eigenvectors = eigenvectors[:, idx]

    # Kernel dimension (number of zero eigenvalues)
    kernel_dim = int(np.sum(np.abs(eigenvalues) < tol))

    # Spectral gap and algebraic connectivity
    nonzero_eigs = eigenvalues[np.abs(eigenvalues) >= tol]
    if len(nonzero_eigs) > 0:
        algebraic_connectivity = float(nonzero_eigs[0])
        spectral_gap = float(nonzero_eigs[0])
    else:
        algebraic_connectivity = 0.0
        spectral_gap = 0.0

    # Consensus rate: exponential convergence rate of sheaf diffusion
    # x(t+1) = (I - epsilon * L) @ x(t), converges as exp(-lambda_1 * t)
    consensus_rate = algebraic_connectivity

    logger.info(
        "Spectral analysis: kernel_dim={}, spectral_gap={:.4f}, connectivity={:.4f}",
        kernel_dim,
        spectral_gap,
        algebraic_connectivity,
    )

    return SpectralSummary(
        eigenvalues=eigenvalues,
        eigenvectors=eigenvectors,
        spectral_gap=spectral_gap,
        algebraic_connectivity=algebraic_connectivity,
        kernel_dimension=kernel_dim,
        consensus_rate=consensus_rate,
    )

groupoid.laplacian.sheaf_diffusion_step(sheaf, sections, stalk_dim, step_size=0.1)

One step of sheaf diffusion (Laplacian smoothing).

Drives local sections toward global consistency by flowing along the negative gradient of the sheaf Laplacian energy:

E(x) = x^T L_F x = sum_{(i,j)} ||R_{ij} x_i - x_j||^2

Parameters

sheaf Sheaf with restriction maps. sections Current section values at each node. stalk_dim Dimension of each stalk. step_size Diffusion step size (must be < 1/lambda_max for stability).

Returns

dict[str, np.ndarray] Updated section values after one diffusion step.

Source code in groupoid/laplacian.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def sheaf_diffusion_step(
    sheaf: Sheaf,
    sections: dict[str, np.ndarray],
    stalk_dim: int,
    step_size: float = 0.1,
) -> dict[str, np.ndarray]:
    """One step of sheaf diffusion (Laplacian smoothing).

    Drives local sections toward global consistency by flowing along
    the negative gradient of the sheaf Laplacian energy:

        E(x) = x^T L_F x = sum_{(i,j)} ||R_{ij} x_i - x_j||^2

    Parameters
    ----------
    sheaf
        Sheaf with restriction maps.
    sections
        Current section values at each node.
    stalk_dim
        Dimension of each stalk.
    step_size
        Diffusion step size (must be < 1/lambda_max for stability).

    Returns
    -------
    dict[str, np.ndarray]
        Updated section values after one diffusion step.
    """
    L = build_sheaf_laplacian(sheaf, stalk_dim)
    nodes = sorted(sheaf.graph.nodes())

    # Stack sections into vector
    x = np.concatenate([sections[n] for n in nodes])

    # Diffusion step: x' = x - step_size * L @ x
    x_new = x - step_size * L @ x

    # Unstack
    result = {}
    for idx, node in enumerate(nodes):
        result[node] = x_new[idx * stalk_dim : (idx + 1) * stalk_dim]

    return result

Parallel Transport

groupoid.transport.schild_ladder(manifold, tangent_vec, base_point, end_point, n_rungs=1)

Parallel transport via Schild's ladder.

An iterative approximation of parallel transport that only requires the exponential and logarithmic maps. This is the coarser of the two ladders: on S^2 its direction agrees with the analytic parallel transport only to cosine ~0.98 and does not converge to the exact value as n_rungs increases (see pole_ladder for the accurate variant and LIMITATIONS.md for the measured behavior).

Parameters

manifold A geomstats manifold with exp and log maps. tangent_vec Tangent vector at base_point to transport. base_point Starting point on the manifold. end_point Destination point on the manifold. n_rungs Number of ladder rungs (higher = more accurate).

Returns

np.ndarray The transported tangent vector at end_point.

Source code in groupoid/transport.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def schild_ladder(
    manifold,
    tangent_vec: np.ndarray,
    base_point: np.ndarray,
    end_point: np.ndarray,
    n_rungs: int = 1,
) -> np.ndarray:
    """Parallel transport via Schild's ladder.

    An iterative approximation of parallel transport that only requires
    the exponential and logarithmic maps. This is the coarser of the two
    ladders: on S^2 its direction agrees with the analytic parallel
    transport only to cosine ~0.98 and does not converge to the exact
    value as ``n_rungs`` increases (see ``pole_ladder`` for the accurate
    variant and LIMITATIONS.md for the measured behavior).

    Parameters
    ----------
    manifold
        A geomstats manifold with exp and log maps.
    tangent_vec
        Tangent vector at base_point to transport.
    base_point
        Starting point on the manifold.
    end_point
        Destination point on the manifold.
    n_rungs
        Number of ladder rungs (higher = more accurate).

    Returns
    -------
    np.ndarray
        The transported tangent vector at end_point.
    """
    metric = manifold.metric
    direction = metric.log(end_point, base_point)

    current_base = base_point
    current_vec = tangent_vec

    for _k in range(n_rungs):
        step = direction * (1.0 / n_rungs)
        next_base = metric.exp(step, current_base)

        # Schild's ladder: reflect the antipodal endpoint through
        # the midpoint of the base-to-next geodesic
        u = metric.exp(-current_vec, current_base)
        midpoint = metric.exp(step / 2.0, current_base)
        log_mid_u = metric.log(u, midpoint)
        u_prime = metric.exp(-log_mid_u, midpoint)
        current_vec = metric.log(u_prime, next_base)

        current_base = next_base
        direction = metric.log(end_point, current_base)

    result: np.ndarray = current_vec
    return result

groupoid.transport.pole_ladder(manifold, tangent_vec, base_point, end_point, n_rungs=1)

Parallel transport via pole ladder.

A variant of Schild's ladder that uses geodesic symmetry instead of midpoint construction. More accurate than Schild's ladder for the same number of rungs, and exactly preserves the norm of the transported vector on symmetric spaces.

Parameters

manifold A geomstats manifold with exp and log maps. tangent_vec Tangent vector at base_point to transport. base_point Starting point on the manifold. end_point Destination point on the manifold. n_rungs Number of ladder rungs.

Returns

np.ndarray The transported tangent vector at end_point.

Source code in groupoid/transport.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def pole_ladder(
    manifold,
    tangent_vec: np.ndarray,
    base_point: np.ndarray,
    end_point: np.ndarray,
    n_rungs: int = 1,
) -> np.ndarray:
    """Parallel transport via pole ladder.

    A variant of Schild's ladder that uses geodesic symmetry instead
    of midpoint construction. More accurate than Schild's ladder for
    the same number of rungs, and exactly preserves the norm of the
    transported vector on symmetric spaces.

    Parameters
    ----------
    manifold
        A geomstats manifold with exp and log maps.
    tangent_vec
        Tangent vector at base_point to transport.
    base_point
        Starting point on the manifold.
    end_point
        Destination point on the manifold.
    n_rungs
        Number of ladder rungs.

    Returns
    -------
    np.ndarray
        The transported tangent vector at end_point.
    """
    metric = manifold.metric
    direction = metric.log(end_point, base_point)

    current_base = base_point
    current_vec = tangent_vec

    for _k in range(n_rungs):
        step = direction * (1.0 / n_rungs)
        next_base = metric.exp(step, current_base)

        # Pole construction: reflect through the midpoint of the geodesic
        pole = metric.exp(-current_vec, current_base)
        mid_of_geodesic = metric.exp(step / 2.0, current_base)
        log_pole_from_mid = metric.log(pole, mid_of_geodesic)
        reflected = metric.exp(-log_pole_from_mid, mid_of_geodesic)
        current_vec = metric.log(reflected, next_base)

        current_base = next_base

    return current_vec

groupoid.transport.compute_transport_matrix(manifold, base_point, end_point, method='pole', n_rungs=2)

Compute the parallel transport matrix between two points.

Constructs the full linear map T such that for any tangent vector v at base_point, T @ v gives the parallel transport of v to end_point.

Parameters

manifold A geomstats manifold. base_point Starting point on the manifold. end_point Destination point on the manifold. method Transport method: "pole" or "schild". n_rungs Number of ladder rungs for the approximation.

Returns

np.ndarray Transport matrix of shape (dim, dim).

Source code in groupoid/transport.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def compute_transport_matrix(
    manifold,
    base_point: np.ndarray,
    end_point: np.ndarray,
    method: str = "pole",
    n_rungs: int = 2,
) -> np.ndarray:
    """Compute the parallel transport matrix between two points.

    Constructs the full linear map T such that for any tangent vector v
    at base_point, T @ v gives the parallel transport of v to end_point.

    Parameters
    ----------
    manifold
        A geomstats manifold.
    base_point
        Starting point on the manifold.
    end_point
        Destination point on the manifold.
    method
        Transport method: "pole" or "schild".
    n_rungs
        Number of ladder rungs for the approximation.

    Returns
    -------
    np.ndarray
        Transport matrix of shape (dim, dim).
    """
    transport_fn = pole_ladder if method == "pole" else schild_ladder
    dim = base_point.shape[-1]
    T = np.zeros((dim, dim))

    # Transport each basis vector
    for i in range(dim):
        e_i = np.zeros(dim)
        e_i[i] = 1.0

        # Project onto tangent space at base_point
        tangent = manifold.to_tangent(e_i, base_point)

        transported = transport_fn(manifold, tangent, base_point, end_point, n_rungs=n_rungs)
        T[:, i] = transported

    logger.debug(
        "Transport matrix computed ({} method, {} rungs): det={:.4f}",
        method,
        n_rungs,
        np.linalg.det(T),
    )
    return T

Riemannian Optimizers

groupoid.optimizer.RiemannianSGD dataclass

Riemannian stochastic gradient descent.

Updates parameters by computing the Riemannian gradient (projection of Euclidean gradient onto tangent space) and retracting back to the manifold via the exponential map.

Parameters

manifold A geomstats manifold instance. lr Learning rate. momentum Momentum coefficient (0 = no momentum).

Source code in groupoid/optimizer.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@dataclass
class RiemannianSGD:
    """Riemannian stochastic gradient descent.

    Updates parameters by computing the Riemannian gradient (projection
    of Euclidean gradient onto tangent space) and retracting back to
    the manifold via the exponential map.

    Parameters
    ----------
    manifold
        A geomstats manifold instance.
    lr
        Learning rate.
    momentum
        Momentum coefficient (0 = no momentum).
    """

    manifold: Any
    lr: float = 0.01
    momentum: float = 0.0
    _velocity: np.ndarray | None = field(default=None, init=False, repr=False)

    def step(self, point: np.ndarray, euclidean_grad: np.ndarray) -> np.ndarray:
        """Perform one optimization step.

        Parameters
        ----------
        point
            Current point on the manifold.
        euclidean_grad
            Euclidean gradient (will be projected to tangent space).

        Returns
        -------
        np.ndarray
            Updated point on the manifold.
        """
        # Project gradient onto tangent space
        riemannian_grad = self.manifold.to_tangent(euclidean_grad, point)

        # Apply momentum
        vel: np.ndarray
        if self.momentum > 0:
            if self._velocity is None:
                vel = riemannian_grad
            else:
                vel = self.manifold.to_tangent(
                    self.momentum * self._velocity + riemannian_grad, point
                )
            self._velocity = vel
            update = -self.lr * vel
        else:
            update = -self.lr * riemannian_grad

        # Retract to manifold via exponential map
        new_point: np.ndarray = self.manifold.metric.exp(update, point)

        return new_point

step(point, euclidean_grad)

Perform one optimization step.

Parameters

point Current point on the manifold. euclidean_grad Euclidean gradient (will be projected to tangent space).

Returns

np.ndarray Updated point on the manifold.

Source code in groupoid/optimizer.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def step(self, point: np.ndarray, euclidean_grad: np.ndarray) -> np.ndarray:
    """Perform one optimization step.

    Parameters
    ----------
    point
        Current point on the manifold.
    euclidean_grad
        Euclidean gradient (will be projected to tangent space).

    Returns
    -------
    np.ndarray
        Updated point on the manifold.
    """
    # Project gradient onto tangent space
    riemannian_grad = self.manifold.to_tangent(euclidean_grad, point)

    # Apply momentum
    vel: np.ndarray
    if self.momentum > 0:
        if self._velocity is None:
            vel = riemannian_grad
        else:
            vel = self.manifold.to_tangent(
                self.momentum * self._velocity + riemannian_grad, point
            )
        self._velocity = vel
        update = -self.lr * vel
    else:
        update = -self.lr * riemannian_grad

    # Retract to manifold via exponential map
    new_point: np.ndarray = self.manifold.metric.exp(update, point)

    return new_point

groupoid.optimizer.RiemannianAdam dataclass

Riemannian Adam optimizer.

Adapts the Adam optimizer to Riemannian manifolds by maintaining exponential moving averages of the Riemannian gradient and its squared norm, with updates via the exponential map.

Parameters

manifold A geomstats manifold instance. lr Learning rate. beta1 Exponential decay rate for first moment. beta2 Exponential decay rate for second moment. eps Small constant for numerical stability.

Source code in groupoid/optimizer.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
@dataclass
class RiemannianAdam:
    """Riemannian Adam optimizer.

    Adapts the Adam optimizer to Riemannian manifolds by maintaining
    exponential moving averages of the Riemannian gradient and its
    squared norm, with updates via the exponential map.

    Parameters
    ----------
    manifold
        A geomstats manifold instance.
    lr
        Learning rate.
    beta1
        Exponential decay rate for first moment.
    beta2
        Exponential decay rate for second moment.
    eps
        Small constant for numerical stability.
    """

    manifold: Any
    lr: float = 0.001
    beta1: float = 0.9
    beta2: float = 0.999
    eps: float = 1e-8
    _m: np.ndarray | None = field(default=None, init=False, repr=False)
    _v: float = field(default=0.0, init=False, repr=False)
    _t: int = field(default=0, init=False, repr=False)

    def step(self, point: np.ndarray, euclidean_grad: np.ndarray) -> np.ndarray:
        """Perform one optimization step.

        Parameters
        ----------
        point
            Current point on the manifold.
        euclidean_grad
            Euclidean gradient.

        Returns
        -------
        np.ndarray
            Updated point on the manifold.
        """
        self._t += 1

        # Riemannian gradient
        grad = self.manifold.to_tangent(euclidean_grad, point)
        grad_norm_sq = float(np.sum(grad**2))

        # Update biased first moment (tangent vector)
        first_moment: np.ndarray
        if self._m is None:
            first_moment = grad.copy()
        else:
            first_moment = self.manifold.to_tangent(
                self.beta1 * self._m + (1 - self.beta1) * grad, point
            )
        self._m = first_moment

        # Update biased second moment (scalar, norm-based)
        self._v = self.beta2 * self._v + (1 - self.beta2) * grad_norm_sq

        # Bias correction
        m_hat = first_moment / (1 - self.beta1**self._t)
        v_hat = self._v / (1 - self.beta2**self._t)

        # Adaptive update
        update = -self.lr * m_hat / (np.sqrt(v_hat) + self.eps)
        update = self.manifold.to_tangent(update, point)

        new_point: np.ndarray = self.manifold.metric.exp(update, point)
        return new_point

step(point, euclidean_grad)

Perform one optimization step.

Parameters

point Current point on the manifold. euclidean_grad Euclidean gradient.

Returns

np.ndarray Updated point on the manifold.

Source code in groupoid/optimizer.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def step(self, point: np.ndarray, euclidean_grad: np.ndarray) -> np.ndarray:
    """Perform one optimization step.

    Parameters
    ----------
    point
        Current point on the manifold.
    euclidean_grad
        Euclidean gradient.

    Returns
    -------
    np.ndarray
        Updated point on the manifold.
    """
    self._t += 1

    # Riemannian gradient
    grad = self.manifold.to_tangent(euclidean_grad, point)
    grad_norm_sq = float(np.sum(grad**2))

    # Update biased first moment (tangent vector)
    first_moment: np.ndarray
    if self._m is None:
        first_moment = grad.copy()
    else:
        first_moment = self.manifold.to_tangent(
            self.beta1 * self._m + (1 - self.beta1) * grad, point
        )
    self._m = first_moment

    # Update biased second moment (scalar, norm-based)
    self._v = self.beta2 * self._v + (1 - self.beta2) * grad_norm_sq

    # Bias correction
    m_hat = first_moment / (1 - self.beta1**self._t)
    v_hat = self._v / (1 - self.beta2**self._t)

    # Adaptive update
    update = -self.lr * m_hat / (np.sqrt(v_hat) + self.eps)
    update = self.manifold.to_tangent(update, point)

    new_point: np.ndarray = self.manifold.metric.exp(update, point)
    return new_point

groupoid.optimizer.curvature_adaptive_lr(manifold, point, base_lr, tangent_vec)

Adapt learning rate based on local sectional curvature.

In regions of high positive curvature, geodesics converge and we should take smaller steps. In regions of negative curvature, geodesics diverge and we can take larger steps.

Parameters

manifold A geomstats manifold with a curvature method. point Current point on the manifold. base_lr Base learning rate to adapt. tangent_vec Direction of the update.

Returns

float Adapted learning rate.

Source code in groupoid/optimizer.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def curvature_adaptive_lr(
    manifold,
    point: np.ndarray,
    base_lr: float,
    tangent_vec: np.ndarray,
) -> float:
    """Adapt learning rate based on local sectional curvature.

    In regions of high positive curvature, geodesics converge and
    we should take smaller steps. In regions of negative curvature,
    geodesics diverge and we can take larger steps.

    Parameters
    ----------
    manifold
        A geomstats manifold with a curvature method.
    point
        Current point on the manifold.
    base_lr
        Base learning rate to adapt.
    tangent_vec
        Direction of the update.

    Returns
    -------
    float
        Adapted learning rate.
    """
    try:
        if hasattr(manifold.metric, "sectional_curvature"):
            # Use a random orthogonal vector for the plane
            random_vec = manifold.to_tangent(np.random.randn(*tangent_vec.shape), point)
            kappa = manifold.metric.sectional_curvature(tangent_vec, random_vec, point)
            kappa = float(np.mean(kappa)) if hasattr(kappa, "__len__") else float(kappa)

            # Scale: lr / (1 + max(kappa, 0)) damps in positive curvature
            adapted: float = base_lr / (1.0 + max(kappa, 0.0))
            logger.debug("Curvature-adapted LR: {:.6f} (kappa={:.4f})", adapted, kappa)
            return adapted
    except (AttributeError, NotImplementedError):
        pass

    return base_lr

Persistent Homology

groupoid.persistence.compute_persistence(points, max_dim=1, max_edge_length=np.inf)

Compute persistent homology of a point cloud.

Parameters

points Array of shape (n_points, n_features) representing model parameters or their embeddings. max_dim Maximum homological dimension to compute. max_edge_length Maximum edge length for the Rips filtration.

Returns

PersistenceSummary Topological summary including Betti numbers and persistence.

Source code in groupoid/persistence.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def compute_persistence(
    points: np.ndarray,
    max_dim: int = 1,
    max_edge_length: float = np.inf,
) -> PersistenceSummary:
    """Compute persistent homology of a point cloud.

    Parameters
    ----------
    points
        Array of shape (n_points, n_features) representing model
        parameters or their embeddings.
    max_dim
        Maximum homological dimension to compute.
    max_edge_length
        Maximum edge length for the Rips filtration.

    Returns
    -------
    PersistenceSummary
        Topological summary including Betti numbers and persistence.
    """
    from ripser import ripser

    logger.debug("Computing persistence: {} points, max_dim={}", points.shape[0], max_dim)

    result = ripser(points, maxdim=max_dim, thresh=max_edge_length)
    diagrams = result["dgms"]

    # Compute Betti numbers (count features that persist at infinity)
    betti_0 = int(np.sum(diagrams[0][:, 1] == np.inf)) if len(diagrams) > 0 else 0
    betti_1 = int(np.sum(diagrams[1][:, 1] == np.inf)) if len(diagrams) > 1 else 0

    # Compute total and max persistence (excluding infinite features)
    all_finite = []
    for dgm in diagrams:
        finite_mask = dgm[:, 1] < np.inf
        if np.any(finite_mask):
            lifetimes = dgm[finite_mask, 1] - dgm[finite_mask, 0]
            all_finite.extend(lifetimes.tolist())

    total_persistence = float(sum(all_finite)) if all_finite else 0.0
    max_persistence = float(max(all_finite)) if all_finite else 0.0

    # Concatenate all diagrams for storage, RETAINING the homology
    # dimension as a third column. ripser returns a per-dimension list
    # (diagrams[d] holds dimension d); a bare np.vstack would discard
    # that label and make H0 and H1 bars indistinguishable. We append a
    # dim column so downstream consumers (e.g. track_divergence) can
    # compare like dimensions instead of pooling them.
    labelled = []
    for dim, dgm in enumerate(diagrams):
        if dgm.shape[0] == 0:
            continue
        dim_col = np.full((dgm.shape[0], 1), float(dim))
        labelled.append(np.hstack([dgm, dim_col]))
    full_diagram = np.vstack(labelled) if labelled else np.empty((0, 3))

    logger.debug(
        "Persistence: beta_0={}, beta_1={}, total={:.4f}",
        betti_0,
        betti_1,
        total_persistence,
    )

    return PersistenceSummary(
        betti_0=betti_0,
        betti_1=betti_1,
        total_persistence=total_persistence,
        max_persistence=max_persistence,
        diagram=full_diagram,
    )

groupoid.persistence.track_divergence(current_params, previous_summary=None, max_dim=1)

Track federation divergence across rounds.

Computes persistence of current parameter distribution and, if a previous summary exists, computes the bottleneck distance to measure how much the topological structure has changed.

Parameters

current_params Array of shape (n_clients, n_features). previous_summary PersistenceSummary from the previous round, if available. max_dim Maximum homological dimension.

Returns

PersistenceSummary Updated summary with bottleneck distance to previous round.

Source code in groupoid/persistence.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def track_divergence(
    current_params: np.ndarray,
    previous_summary: PersistenceSummary | None = None,
    max_dim: int = 1,
) -> PersistenceSummary:
    """Track federation divergence across rounds.

    Computes persistence of current parameter distribution and, if a
    previous summary exists, computes the bottleneck distance to
    measure how much the topological structure has changed.

    Parameters
    ----------
    current_params
        Array of shape (n_clients, n_features).
    previous_summary
        PersistenceSummary from the previous round, if available.
    max_dim
        Maximum homological dimension.

    Returns
    -------
    PersistenceSummary
        Updated summary with bottleneck distance to previous round.
    """
    summary = compute_persistence(current_params, max_dim=max_dim)

    if previous_summary is not None:
        from persim import bottleneck

        # Compare H0 diagrams (connected components) ONLY. We select bars
        # whose homology dimension is 0 and which are finite (the single
        # infinite H0 bar -- the whole space's surviving component -- is
        # excluded so persim never has to match infinities). Selecting on
        # the dimension label is essential: without it, H1 (loop) bars
        # leak into this pool and silently contaminate the "H0 divergence"
        # with loop-structure changes. See PersistenceSummary.diagram_for_dim.
        current_h0 = summary.diagram_for_dim(0, finite_only=True)
        prev_h0 = previous_summary.diagram_for_dim(0, finite_only=True)

        if len(current_h0) > 0 and len(prev_h0) > 0:
            dist = bottleneck(current_h0, prev_h0)
            summary.bottleneck_to_previous = float(dist)
            logger.info("Bottleneck distance to previous round: {:.4f}", dist)
        else:
            summary.bottleneck_to_previous = 0.0

    return summary