Skip to content

Types

hyperbench.types

EdgeIndex

A wrapper for edge index representation of a graph. Edge index is a tensor of shape (2, num_edges) where the first row contains source node indices and the second row contains destination node indices for each edge.

Examples:

>>> edge_index = [[0, 1, 2],
...               [1, 0, 3]]

This represents a graph with edges (0, 1), (1, 0), and (2, 3). The number of nodes in this graph is 4 (nodes 0, 1, 2, and 3) and the number of edges is 3.

Parameters:

Name Type Description Default
edge_index Tensor

A tensor of shape (2, num_edges) representing the edges in the graph.

required
edge_weights Tensor | None

Optional tensor of shape (num_edges,) containing a weight for each edge.

None
Source code in hyperbench/types/graph.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
class EdgeIndex:
    """
    A wrapper for edge index representation of a graph.
    Edge index is a tensor of shape ``(2, num_edges)`` where the first row contains source node indices
    and the second row contains destination node indices for each edge.

    Examples:
        >>> edge_index = [[0, 1, 2],
        ...               [1, 0, 3]]

        This represents a graph with edges (0, 1), (1, 0), and (2, 3).
        The number of nodes in this graph is 4 (nodes 0, 1, 2, and 3) and the number of edges is 3.

    Args:
        edge_index: A tensor of shape ``(2, num_edges)`` representing the edges in the graph.
        edge_weights: Optional tensor of shape ``(num_edges,)`` containing a weight for each edge.
    """

    def __init__(
        self,
        edge_index: Tensor,
        edge_weights: Tensor | None = None,
    ):
        self.__edge_index = edge_index
        self.__validate_edge_weights(edge_weights)
        self.__edge_weights = edge_weights

    @property
    def item(self) -> Tensor:
        """Return the edge index tensor."""
        return self.__edge_index

    @property
    def edge_weights(self) -> Tensor | None:
        """Return the edge weight tensor, if present."""
        return self.__edge_weights

    @property
    def max_node_id(self) -> int:
        """Return the maximum node ID in the edge index."""
        if self.__edge_index.size(1) < 1:
            return -1
        return int(self.__edge_index.max())

    @property
    def num_edges(self) -> int:
        """Return the number of edges in the graph."""
        if self.__edge_index.size(1) < 1:
            return 0
        # Number of edges is the number of columns in edge_index, which is dim=1,
        # as each column represents an edge (source, destination)
        return self.__edge_index.size(1)

    @property
    def num_nodes(self) -> int:
        """Return the number of nodes in the graph."""
        if self.__edge_index.size(1) < 1:
            return 0
        unique_nodes = torch.unique(self.__edge_index)
        return len(unique_nodes)

    def add_selfloops(
        self,
        num_nodes: int | None = None,
        with_duplicate_removal: bool = True,
    ) -> "EdgeIndex":
        """
        Add self-loops to each node in the edge index.

        Examples:
            >>> edge_index = [[0, 1, 2],
            ...               [1, 0, 3]]
            >>> edge_index_with_selfloops = [[0, 1, 2, 0, 1, 2, 3],
            ...                              [1, 0, 3, 0, 1, 2, 3]]

            When ``num_nodes`` is higher than the number of nodes in ``edge_index``,
            self-loops are added for all nodes from ``0`` to ``num_nodes - 1``,
            including nodes not present in the original edges:

            >>> edge_index = [[0, 1, 2],
            ...               [1, 0, 3]]
            >>> num_nodes = 6
            >>> edge_index_with_selfloops = [[0, 1, 2, 0, 1, 2, 3, 4, 5],
            ...                              [1, 0, 3, 0, 1, 2, 3, 4, 5]]

        Args:
            num_nodes: Total number of nodes. When provided, self-loops are added for nodes ``0`` to ``num_nodes - 1``. When ``None``, defaults to ``self.num_nodes``.
                This parameter is important when ``edge_index`` does not contain all nodes (e.g., some nodes are isolated and have no edges or have been removed),
                as it ensures that the resulting Laplacian matrix has the correct size and includes all nodes. For instance, for self-loops.
            with_duplicate_removal: Whether to remove duplicate edges after adding self-loops. Defaults to ``True``.

        Returns:
            This :class:`EdgeIndex` instance with self-loops added.

        Raises:
            ValueError: If the input edge index has no edges (i.e., ``shape (2, 0)``).
        """
        if self.__edge_index.size(1) < 1:
            raise ValueError("Edge index must have at least one edge to add self-loops.")

        device = self.__edge_index.device
        src, dest = self.__edge_index[0], self.__edge_index[1]

        # Add self-loops: A_hat = A + I (works as we assume node indices are in [0, num_nodes-1])
        # Example: edge_index = [[0, 1, 2],
        #                        [1, 0, 3]], num_nodes = None
        #          -> num_selfloop_nodes = 4 (self.num_nodes, as num_nodes is None)
        #          -> selfloop_indices = [0, 1, 2, 3]
        #          -> src = [0, 1, 2, 0, 1, 2, 3]
        #          -> dest = [1, 0, 3, 0, 1, 2, 3]
        #          -> edge_index_with_selfloops = [[0, 1, 2, 0, 1, 2, 3],
        #                                          [1, 0, 3, 0, 1, 2, 3]]
        #
        # Example with num_nodes=6 (higher than 4 nodes in edge_index):
        #          -> num_selfloop_nodes = 6
        #          -> selfloop_indices = [0, 1, 2, 3, 4, 5]
        #          -> src = [0, 1, 2, 0, 1, 2, 3, 4, 5]
        #          -> dest = [1, 0, 3, 0, 1, 2, 3, 4, 5]
        #          -> edge_index_with_selfloops = [[0, 1, 2, 0, 1, 2, 3, 4, 5],
        #                                          [1, 0, 3, 0, 1, 2, 3, 4, 5]]
        num_selfloop_nodes = self.num_nodes if num_nodes is None else num_nodes
        selfloop_indices = torch.arange(num_selfloop_nodes, device=device)
        src = torch.cat([src, selfloop_indices])
        dest = torch.cat([dest, selfloop_indices])
        edge_index_with_selfloops = torch.stack([src, dest], dim=0)

        self.__edge_index = edge_index_with_selfloops

        if self.__edge_weights is not None:
            # Set weights of self-loops to 1
            selfloop_weights = torch.ones(
                num_selfloop_nodes,
                device=device,
                dtype=self.__edge_weights.dtype,
            )
            self.__edge_weights = torch.cat([self.__edge_weights, selfloop_weights])

        if with_duplicate_removal:
            self.remove_duplicate_edges()

        return self

    def get_sparse_adjacency_matrix(
        self,
        num_nodes: int | None = None,
        use_edge_weights: bool = False,
    ) -> Tensor:
        """
        Compute the sparse adjacency matrix from a graph edge index.
        To get the normalized adjacency matrix, add self-loops to the edge_index.

        Examples:
            >>> edge_index = [[0, 1, 2],
            ...               [1, 0, 3]]
            >>> num_nodes = 4
            >>> adj_values = [1, 1, 1]
            >>> adj_indices = [[0, 1, 2],
            ...                [1, 0, 3]]
            >>>                0  1  2  3
            ... adj_matrix = [[0, 1, 0, 0], 0
            ...               [1, 0, 0, 0], 1
            ...               [0, 0, 0, 1], 2
            ...               [0, 0, 1, 0]] 3

        Args:
            num_nodes: The number of nodes in the graph.
                If ``None``, it will be inferred from ``self.num_nodes``.
                Note that the node indices in ``edge_index`` are assumed to be in the range [0, num_nodes-1].
            use_edge_weights: Whether to use edge weights if they are present.
                If ``False``, all edges will have weight 1. Defaults to ``False``.

        Returns:
            The sparse adjacency matrix of shape ``(num_nodes, num_nodes)``.
        """
        device = self.__edge_index.device
        src, dest = self.__edge_index
        num_nodes = self.num_nodes if num_nodes is None else num_nodes

        # Example: edge_index = [[0, 1, 2, 3],
        #                       [1, 0, 3, 2]]
        #          use_edge_weights = False
        #          -> adj_values = [1, 1, 1, 1]
        #          -> adj_indices = [[0, 1, 2, 3],
        #                            [1, 0, 3, 2]]
        #                   0  1  2  3
        #          -> A = [[0, 1, 0, 0], 1
        #                  [1, 0, 0, 0], 0
        #                  [0, 0, 0, 1], 3
        #                  [0, 0, 1, 0]] 2
        if use_edge_weights:
            adj_values = (
                self.edge_weights
                if self.edge_weights is not None
                else torch.ones(self.num_edges, device=device)
            )
        else:
            adj_values = torch.ones(self.num_edges, device=device)

        adj_indices = torch.stack([src, dest], dim=0)
        adj_matrix = torch.sparse_coo_tensor(adj_indices, adj_values, size=(num_nodes, num_nodes))
        return adj_matrix.coalesce()

    def get_sparse_identity_matrix(self, num_nodes: int | None = None) -> Tensor:
        """
        Compute the sparse identity matrix I of shape (num_nodes, num_nodes).

        Examples:
            >>> num_nodes = 3
            >>> identity_indices = [[0, 1, 2],
            ...                     [0, 1, 2]]
            >>> values = [1, 1, 1]
            >>> I = [[1, 0, 0],
            ...      [0, 1, 0],
            ...      [0, 0, 1]]

        Args:
            num_nodes: The number of nodes in the graph.
                If ``None``, it will be inferred from ``self.num_nodes``.

        Returns:
            The sparse identity matrix I of shape ``(num_nodes, num_nodes)``.
        """
        device = self.__edge_index.device
        num_nodes = self.num_nodes if num_nodes is None else num_nodes

        # Example: num_nodes = 3
        #          -> identity_indices = [[0, 1, 2],
        #                                 [0, 1, 2]]
        #             we use repeat(2, 1) as I is a matrix NxN, so we need indices for both rows and columns
        #          -> values = [1, 1, 1]
        #                   0  1  2
        #          -> I = [[1, 0, 0], 0
        #                  [0, 1, 0], 1
        #                  [0, 0, 1]] 2
        identity_indices = torch.arange(num_nodes, device=device).unsqueeze(0).repeat(2, 1)
        identity_matrix = torch.sparse_coo_tensor(
            indices=identity_indices,
            values=torch.ones(num_nodes, device=device),
            size=(num_nodes, num_nodes),
        )
        return identity_matrix.coalesce()

    def get_sparse_normalized_degree_matrix(
        self,
        num_nodes: int | None = None,
        use_edge_weights: bool = False,
    ) -> Tensor:
        """
        Compute the sparse normalized degree matrix D^-1/2 from a graph edge index.

        Args:
            num_nodes: The number of nodes in the graph.
                If ``None``, it will be inferred from ``self.num_nodes``.
                Note that the node indices in ``edge_index`` are assumed to be in the range [0, num_nodes-1].
            use_edge_weights: If ``True``, use the edge weights from ``self.edge_weights``. If ``False``, all edges use weight 1.

        Returns:
            The sparse normalized degree matrix D^-1/2 of shape ``(num_nodes, num_nodes)``.
        """
        device = self.__edge_index.device

        num_nodes = self.num_nodes if num_nodes is None else num_nodes

        adj_matrix = self.get_sparse_adjacency_matrix(
            num_nodes=num_nodes, use_edge_weights=use_edge_weights
        )
        adj_indices = adj_matrix.indices()
        adj_values = adj_matrix.values()

        # Compute degree for each node as the weighted row-sum of the adjacency matrix.
        degrees: Tensor = torch.zeros(num_nodes, device=device, dtype=adj_values.dtype)
        degrees.scatter_add_(dim=0, index=adj_indices[0], src=adj_values)

        # Compute D^-1/2 == D^-0.5
        degree_inv_sqrt: Tensor = degrees.pow(-0.5)
        # Handle isolated nodes where degree is 0, which lead to inf values in degree_inv_sqrt
        degree_inv_sqrt[degree_inv_sqrt == float("inf")] = 0

        # Convert degree vector to a diagonal sparse normalized matrix D
        # Example: degree_inv_sqrt = [1, 0.707, 1, 0]
        #          -> diagonal_indices = [[0, 1, 2, 3],
        #                                 [0, 1, 2, 3]]
        #                   0  1      2  3
        #          -> D = [[1, 0,     0, 0], 0
        #                  [0, 0.707, 0, 0], 1
        #                  [0, 0,     1, 0], 2
        #                  [0, 0,     0, 0]] 3
        diagonal_indices = torch.arange(num_nodes, device=device).unsqueeze(0).repeat(2, 1)
        degree_matrix = torch.sparse_coo_tensor(
            indices=diagonal_indices,
            values=degree_inv_sqrt,
            size=(num_nodes, num_nodes),
        )
        return degree_matrix.coalesce()

    def get_sparse_normalized_laplacian(
        self,
        num_nodes: int | None = None,
    ) -> Tensor:
        """
        Compute the sparse symmetric normalized Laplacian matrix: L = I - D^{-1/2} A D^{-1/2}.

        Unlike ``get_sparse_normalized_gcn_laplacian``, this method does not add self-loops
        and computes the standard Laplacian (not the GCN propagation matrix).

        Args:
            num_nodes: The number of nodes in the graph. If ``None``,
                it will be inferred from ``self.num_nodes``.

        Returns:
            The sparse symmetric normalized Laplacian matrix of shape ``(num_nodes, num_nodes)``.
        """
        self.to_undirected(with_selfloops=False)

        num_nodes = self.num_nodes if num_nodes is None else num_nodes

        degree_matrix = self.get_sparse_normalized_degree_matrix(num_nodes)
        adj_matrix = self.get_sparse_adjacency_matrix(num_nodes)

        # D^{-1/2} A D^{-1/2}
        normalized_adj_matrix = torch.sparse.mm(
            degree_matrix,
            torch.sparse.mm(adj_matrix, degree_matrix),
        )

        # L = I - D^{-1/2} A D^{-1/2}
        identity_matrix = self.get_sparse_identity_matrix(num_nodes)
        return (identity_matrix - normalized_adj_matrix).coalesce()

    def get_sparse_normalized_gcn_laplacian(
        self,
        num_nodes: int | None = None,
        use_edge_weights: bool = False,
    ) -> Tensor:
        """
        Compute the sparse Laplacian matrix from a graph edge index.

        The GCN Laplacian is defined as: L_GCN = D_hat^-1/2 * A_hat * D_hat^-1/2,
        where A_hat = A + I (adjacency with self-loops) and D_hat is the degree matrix of A_hat.

        Args:
            num_nodes: The number of nodes in the graph. If ``None``,
                it will be inferred from ``self.num_nodes``.
                Note that the node indices in ``edge_index`` are assumed to be in the range [0, num_nodes-1].
                This parameter is important when ``edge_index`` does not contain all nodes (e.g., some nodes are isolated and have no edges or have been removed),
                as it ensures that the resulting Laplacian matrix has the correct size and includes all nodes. For instance, for self-loops.
            use_edge_weights: If ``True``, use the edge weights from ``self.edge_weights``. If ``False``, all edges use weight 1.

        Returns:
            The sparse symmetrically normalized Laplacian matrix of shape ``(num_nodes, num_nodes)``.
        """
        self.to_undirected(with_selfloops=True, num_nodes=num_nodes)

        num_nodes = self.num_nodes if num_nodes is None else num_nodes

        degree_matrix = self.get_sparse_normalized_degree_matrix(
            num_nodes=num_nodes, use_edge_weights=use_edge_weights
        )
        adj_matrix = self.get_sparse_adjacency_matrix(
            num_nodes=num_nodes, use_edge_weights=use_edge_weights
        )

        # Compute normalized Laplacian matrix: L = D^-1/2 * A * D^-1/2
        normalized_laplacian_matrix = torch.sparse.mm(
            degree_matrix,
            torch.sparse.mm(adj_matrix, degree_matrix),
        )
        return normalized_laplacian_matrix.coalesce()

    def remove_selfloops(self) -> "EdgeIndex":
        """Remove self-loops from the edge index."""
        # Example: edge_index = [[0, 1, 2, 3],
        #                        [1, 1, 3, 2]], shape (2, |E| = 4)
        #          -> keep_mask = [True, False, True, True]
        #          -> edge_index = [[0, 2, 3],
        #                           [1, 3, 2]], shape (2, |E'| = 3)
        keep_mask = self.__edge_index[0] != self.__edge_index[1]
        self.__edge_index = self.__edge_index[:, keep_mask]
        if self.__edge_weights is not None:
            self.__edge_weights = self.__edge_weights[keep_mask]
        return self

    def remove_duplicate_edges(self, num_nodes: int | None = None) -> "EdgeIndex":
        """
        Remove duplicate edges from the edge index. Keeps the tensor contiguous in memory.

        Args:
            num_nodes: The number of nodes in the graph. If ``None``, it will be inferred from ``self.num_nodes``.
                This parameter is important when ``edge_index`` does not contain all nodes (e.g., some nodes are isolated and have no edges or have been removed),
                as it ensures that the resulting Laplacian matrix has the correct size and includes all nodes. For instance, for self-loops.

        Returns:
            This :class:`EdgeIndex` instance with duplicate edges removed.
        """
        # Example: edge_index = [[0, 1, 2, 2, 0, 3, 2],
        #                        [1, 0, 3, 2, 1, 2, 2]], shape (2, |E| = 7)
        #          -> after torch.unique(..., dim=1):
        #             edge_index = [[0, 1, 2, 2, 3],
        #                           [1, 0, 3, 2, 2]], shape (2, |E'| = 5)
        # Note: we call contiguous() to ensure that the resulting tensor is contiguous in memory,
        # which can improve performance for subsequent operations that require contiguous tensors.
        if self.__edge_weights is None:
            self.__edge_index = torch.unique(self.__edge_index, dim=1).contiguous()
            return self

        # No edges to process, just ensure tensors are contiguous
        if self.num_edges < 1:
            self.__edge_index = self.__edge_index.contiguous()
            self.__edge_weights = self.__edge_weights.contiguous()
            return self

        # When edge weights are present, we need to use torch.sparse_coo_tensor
        # to remove duplicate edges while preserving the weights.
        # Example: edge_index = [[0, 0, 1],
        #                        [1, 1, 2]]
        #          edge_weights = [1.0, 2.0, 3.0]
        #          -> before coalesce, we have duplicate edges (0, 1) with weights 1.0 and 2.0
        #          -> after `.coalesce()`:
        #          -> edge_index = [[0, 1],
        #                           [1, 2]]
        #          -> edge_weights = [3.0, 3.0] (weights of duplicate edges are summed)
        num_nodes = self.num_nodes if num_nodes is None else num_nodes
        coalesced = torch.sparse_coo_tensor(
            self.__edge_index,
            self.__edge_weights,
            size=(num_nodes, num_nodes),
        ).coalesce()
        self.__edge_index = coalesced.indices().contiguous()
        self.__edge_weights = coalesced.values().contiguous()
        return self

    def to_undirected(
        self,
        with_selfloops: bool = False,
        num_nodes: int | None = None,
    ) -> "EdgeIndex":
        """
        Convert the edge index to an undirected edge index by adding reverse edges.

        Args:
            with_selfloops: Whether to add self-loops to each node. Defaults to ``False``.
            num_nodes: Total number of nodes. Propagated to ``add_selfloops`` when ``with_selfloops`` is ``True``.
                This parameter is useful when ``edge_index`` does not contain all nodes (e.g., some nodes are isolated and have no edges or have been removed),
                as it ensures that the resulting Laplacian matrix has the correct size and includes all nodes. For instance, for self-loops.

        Returns:
            This :class:`EdgeIndex` instance converted to undirected.
        """
        device = self.__edge_index.device
        num_nodes = self.num_nodes if num_nodes is None else num_nodes

        orig_src, orig_dest = self.__edge_index[0], self.__edge_index[1]

        # Encode each directed edge (u, v) as a unique scalar key u * num_nodes + v.
        # Example: num_nodes = 4, orig_src  = [0, 1, 2], orig_dest = [1, 0, 3]
        #          -> edges are [(0,1), (1,0), (2,3)]
        #          -> encoded_edge_ids = [0*4+1, 1*4+0, 2*4+3] = [1, 4, 11]
        encoded_edge_ids = orig_src * num_nodes + orig_dest

        # Build the key for the reverse of each existing edge.
        # Example: reverse edges are [(1,0), (0,1), (3,2)]
        #          -> reversed_encoded_edge_ids = [1*4+0, 0*4+1, 3*4+2] = [4, 1, 14]
        reversed_encoded_edge_ids = orig_dest * num_nodes + orig_src

        # Example: encoded_edge_ids          = [1, 4, 11],
        #          reversed_encoded_edge_ids = [4, 1, 14]
        #          -> missing_reverse_mask = [False, False, True]
        #             because 4 and 1 are in both, it means edges (0,1) and (1,0) are already present,
        #             but 14 is only in reversed_encoded_edge_ids, which means edge (3,2) is missing
        #             and this is because the mask points to the missing reversee edges that are missing
        missing_mask = torch.logical_not(torch.isin(reversed_encoded_edge_ids, encoded_edge_ids))

        # Keep all original sources and append the destination of each edge whose reverse is missing.
        # Example: orig_src = [0, 1, 2], orig_dest[missing_mask] = [3]
        #          -> src = [0, 1, 2, 3]
        src = torch.cat([orig_src, orig_dest[missing_mask]])

        # Keep all original destinations and append the source of each edge whose reverse is missing.
        # Example: orig_dest = [1, 0, 3], orig_src[missing_mask] = [2]
        #          -> dest = [1, 0, 3, 2]
        #          -> final undirected edges: [(0,1), (1,0), (2,3), (3,2)]
        dest = torch.cat([orig_dest, orig_src[missing_mask]])

        # Example: edge_index = [[0, 1, 2],
        #                        [1, 0, 3]]
        #          -> after torch.stack([...], dim=0):
        #             undirected_edge_index = [[0, 1, 2, 1, 0, 3],
        #                                      [1, 0, 3, 0, 1, 2]]
        #          -> after torch.unique(..., dim=1):
        #             undirected_edge_index = [[0, 1, 2, 3],
        #                                      [1, 0, 3, 2]]
        self.__edge_index = torch.stack([src, dest], dim=0).to(device)

        # The new edges have the same weights as the original edges.
        # Example: edge_index = [[0, 1, 2],
        #                        [1, 0, 3]]
        #          edge_weights = [0.5, 1.0, 2.0]
        #          -> after adding reverse edges:
        #             edge_index = [[0, 1, 2, 1, 0, 3],
        #                           [1, 0, 3, 0, 1, 2]]
        #             edge_weights = [0.5, 1.0, 2.0, 0.5, 1.0, 2.0]
        self.__edge_weights = (
            torch.cat([self.__edge_weights, self.__edge_weights[missing_mask]])
            if self.__edge_weights is not None
            else None
        )

        if with_selfloops:
            # Don't remove duplicate edges when adding self-loops, as we need to remove them
            # even if with_selfloops is False, to ensure that the edge index is clean and doesn't contain duplicate edges.
            # In this way, we don't do the duplicate edge removal twice, which would be redundant and inefficient
            self.add_selfloops(num_nodes=num_nodes, with_duplicate_removal=False)

        self.remove_duplicate_edges()

        return self

    def __validate_edge_weights(self, edge_weights: Tensor | None) -> None:
        if edge_weights is None:
            return

        if edge_weights.dim() != 1:
            raise ValueError(
                f"edge_weights must be a 1D tensor. Got {edge_weights.dim()}D tensor with shape {edge_weights.shape}."
            )

        if edge_weights.size(0) != self.__edge_index.size(1):
            raise ValueError(
                "edge_weights must have the same number of entries as edge_index columns. "
                f"Got {edge_weights.size(0)} edge weights but {self.__edge_index.size(1)} edge columns."
            )

item property

Return the edge index tensor.

edge_weights property

Return the edge weight tensor, if present.

max_node_id property

Return the maximum node ID in the edge index.

num_edges property

Return the number of edges in the graph.

num_nodes property

Return the number of nodes in the graph.

add_selfloops(num_nodes=None, with_duplicate_removal=True)

Add self-loops to each node in the edge index.

Examples:

>>> edge_index = [[0, 1, 2],
...               [1, 0, 3]]
>>> edge_index_with_selfloops = [[0, 1, 2, 0, 1, 2, 3],
...                              [1, 0, 3, 0, 1, 2, 3]]

When num_nodes is higher than the number of nodes in edge_index, self-loops are added for all nodes from 0 to num_nodes - 1, including nodes not present in the original edges:

>>> edge_index = [[0, 1, 2],
...               [1, 0, 3]]
>>> num_nodes = 6
>>> edge_index_with_selfloops = [[0, 1, 2, 0, 1, 2, 3, 4, 5],
...                              [1, 0, 3, 0, 1, 2, 3, 4, 5]]

Parameters:

Name Type Description Default
num_nodes int | None

Total number of nodes. When provided, self-loops are added for nodes 0 to num_nodes - 1. When None, defaults to self.num_nodes. This parameter is important when edge_index does not contain all nodes (e.g., some nodes are isolated and have no edges or have been removed), as it ensures that the resulting Laplacian matrix has the correct size and includes all nodes. For instance, for self-loops.

None
with_duplicate_removal bool

Whether to remove duplicate edges after adding self-loops. Defaults to True.

True

Returns:

Name Type Description
This EdgeIndex

class:EdgeIndex instance with self-loops added.

Raises:

Type Description
ValueError

If the input edge index has no edges (i.e., shape (2, 0)).

Source code in hyperbench/types/graph.py
def add_selfloops(
    self,
    num_nodes: int | None = None,
    with_duplicate_removal: bool = True,
) -> "EdgeIndex":
    """
    Add self-loops to each node in the edge index.

    Examples:
        >>> edge_index = [[0, 1, 2],
        ...               [1, 0, 3]]
        >>> edge_index_with_selfloops = [[0, 1, 2, 0, 1, 2, 3],
        ...                              [1, 0, 3, 0, 1, 2, 3]]

        When ``num_nodes`` is higher than the number of nodes in ``edge_index``,
        self-loops are added for all nodes from ``0`` to ``num_nodes - 1``,
        including nodes not present in the original edges:

        >>> edge_index = [[0, 1, 2],
        ...               [1, 0, 3]]
        >>> num_nodes = 6
        >>> edge_index_with_selfloops = [[0, 1, 2, 0, 1, 2, 3, 4, 5],
        ...                              [1, 0, 3, 0, 1, 2, 3, 4, 5]]

    Args:
        num_nodes: Total number of nodes. When provided, self-loops are added for nodes ``0`` to ``num_nodes - 1``. When ``None``, defaults to ``self.num_nodes``.
            This parameter is important when ``edge_index`` does not contain all nodes (e.g., some nodes are isolated and have no edges or have been removed),
            as it ensures that the resulting Laplacian matrix has the correct size and includes all nodes. For instance, for self-loops.
        with_duplicate_removal: Whether to remove duplicate edges after adding self-loops. Defaults to ``True``.

    Returns:
        This :class:`EdgeIndex` instance with self-loops added.

    Raises:
        ValueError: If the input edge index has no edges (i.e., ``shape (2, 0)``).
    """
    if self.__edge_index.size(1) < 1:
        raise ValueError("Edge index must have at least one edge to add self-loops.")

    device = self.__edge_index.device
    src, dest = self.__edge_index[0], self.__edge_index[1]

    # Add self-loops: A_hat = A + I (works as we assume node indices are in [0, num_nodes-1])
    # Example: edge_index = [[0, 1, 2],
    #                        [1, 0, 3]], num_nodes = None
    #          -> num_selfloop_nodes = 4 (self.num_nodes, as num_nodes is None)
    #          -> selfloop_indices = [0, 1, 2, 3]
    #          -> src = [0, 1, 2, 0, 1, 2, 3]
    #          -> dest = [1, 0, 3, 0, 1, 2, 3]
    #          -> edge_index_with_selfloops = [[0, 1, 2, 0, 1, 2, 3],
    #                                          [1, 0, 3, 0, 1, 2, 3]]
    #
    # Example with num_nodes=6 (higher than 4 nodes in edge_index):
    #          -> num_selfloop_nodes = 6
    #          -> selfloop_indices = [0, 1, 2, 3, 4, 5]
    #          -> src = [0, 1, 2, 0, 1, 2, 3, 4, 5]
    #          -> dest = [1, 0, 3, 0, 1, 2, 3, 4, 5]
    #          -> edge_index_with_selfloops = [[0, 1, 2, 0, 1, 2, 3, 4, 5],
    #                                          [1, 0, 3, 0, 1, 2, 3, 4, 5]]
    num_selfloop_nodes = self.num_nodes if num_nodes is None else num_nodes
    selfloop_indices = torch.arange(num_selfloop_nodes, device=device)
    src = torch.cat([src, selfloop_indices])
    dest = torch.cat([dest, selfloop_indices])
    edge_index_with_selfloops = torch.stack([src, dest], dim=0)

    self.__edge_index = edge_index_with_selfloops

    if self.__edge_weights is not None:
        # Set weights of self-loops to 1
        selfloop_weights = torch.ones(
            num_selfloop_nodes,
            device=device,
            dtype=self.__edge_weights.dtype,
        )
        self.__edge_weights = torch.cat([self.__edge_weights, selfloop_weights])

    if with_duplicate_removal:
        self.remove_duplicate_edges()

    return self

get_sparse_adjacency_matrix(num_nodes=None, use_edge_weights=False)

Compute the sparse adjacency matrix from a graph edge index. To get the normalized adjacency matrix, add self-loops to the edge_index.

Examples:

>>> edge_index = [[0, 1, 2],
...               [1, 0, 3]]
>>> num_nodes = 4
>>> adj_values = [1, 1, 1]
>>> adj_indices = [[0, 1, 2],
...                [1, 0, 3]]
>>>                0  1  2  3
... adj_matrix = [[0, 1, 0, 0], 0
...               [1, 0, 0, 0], 1
...               [0, 0, 0, 1], 2
...               [0, 0, 1, 0]] 3

Parameters:

Name Type Description Default
num_nodes int | None

The number of nodes in the graph. If None, it will be inferred from self.num_nodes. Note that the node indices in edge_index are assumed to be in the range [0, num_nodes-1].

None
use_edge_weights bool

Whether to use edge weights if they are present. If False, all edges will have weight 1. Defaults to False.

False

Returns:

Type Description
Tensor

The sparse adjacency matrix of shape (num_nodes, num_nodes).

Source code in hyperbench/types/graph.py
def get_sparse_adjacency_matrix(
    self,
    num_nodes: int | None = None,
    use_edge_weights: bool = False,
) -> Tensor:
    """
    Compute the sparse adjacency matrix from a graph edge index.
    To get the normalized adjacency matrix, add self-loops to the edge_index.

    Examples:
        >>> edge_index = [[0, 1, 2],
        ...               [1, 0, 3]]
        >>> num_nodes = 4
        >>> adj_values = [1, 1, 1]
        >>> adj_indices = [[0, 1, 2],
        ...                [1, 0, 3]]
        >>>                0  1  2  3
        ... adj_matrix = [[0, 1, 0, 0], 0
        ...               [1, 0, 0, 0], 1
        ...               [0, 0, 0, 1], 2
        ...               [0, 0, 1, 0]] 3

    Args:
        num_nodes: The number of nodes in the graph.
            If ``None``, it will be inferred from ``self.num_nodes``.
            Note that the node indices in ``edge_index`` are assumed to be in the range [0, num_nodes-1].
        use_edge_weights: Whether to use edge weights if they are present.
            If ``False``, all edges will have weight 1. Defaults to ``False``.

    Returns:
        The sparse adjacency matrix of shape ``(num_nodes, num_nodes)``.
    """
    device = self.__edge_index.device
    src, dest = self.__edge_index
    num_nodes = self.num_nodes if num_nodes is None else num_nodes

    # Example: edge_index = [[0, 1, 2, 3],
    #                       [1, 0, 3, 2]]
    #          use_edge_weights = False
    #          -> adj_values = [1, 1, 1, 1]
    #          -> adj_indices = [[0, 1, 2, 3],
    #                            [1, 0, 3, 2]]
    #                   0  1  2  3
    #          -> A = [[0, 1, 0, 0], 1
    #                  [1, 0, 0, 0], 0
    #                  [0, 0, 0, 1], 3
    #                  [0, 0, 1, 0]] 2
    if use_edge_weights:
        adj_values = (
            self.edge_weights
            if self.edge_weights is not None
            else torch.ones(self.num_edges, device=device)
        )
    else:
        adj_values = torch.ones(self.num_edges, device=device)

    adj_indices = torch.stack([src, dest], dim=0)
    adj_matrix = torch.sparse_coo_tensor(adj_indices, adj_values, size=(num_nodes, num_nodes))
    return adj_matrix.coalesce()

get_sparse_identity_matrix(num_nodes=None)

Compute the sparse identity matrix I of shape (num_nodes, num_nodes).

Examples:

>>> num_nodes = 3
>>> identity_indices = [[0, 1, 2],
...                     [0, 1, 2]]
>>> values = [1, 1, 1]
>>> I = [[1, 0, 0],
...      [0, 1, 0],
...      [0, 0, 1]]

Parameters:

Name Type Description Default
num_nodes int | None

The number of nodes in the graph. If None, it will be inferred from self.num_nodes.

None

Returns:

Type Description
Tensor

The sparse identity matrix I of shape (num_nodes, num_nodes).

Source code in hyperbench/types/graph.py
def get_sparse_identity_matrix(self, num_nodes: int | None = None) -> Tensor:
    """
    Compute the sparse identity matrix I of shape (num_nodes, num_nodes).

    Examples:
        >>> num_nodes = 3
        >>> identity_indices = [[0, 1, 2],
        ...                     [0, 1, 2]]
        >>> values = [1, 1, 1]
        >>> I = [[1, 0, 0],
        ...      [0, 1, 0],
        ...      [0, 0, 1]]

    Args:
        num_nodes: The number of nodes in the graph.
            If ``None``, it will be inferred from ``self.num_nodes``.

    Returns:
        The sparse identity matrix I of shape ``(num_nodes, num_nodes)``.
    """
    device = self.__edge_index.device
    num_nodes = self.num_nodes if num_nodes is None else num_nodes

    # Example: num_nodes = 3
    #          -> identity_indices = [[0, 1, 2],
    #                                 [0, 1, 2]]
    #             we use repeat(2, 1) as I is a matrix NxN, so we need indices for both rows and columns
    #          -> values = [1, 1, 1]
    #                   0  1  2
    #          -> I = [[1, 0, 0], 0
    #                  [0, 1, 0], 1
    #                  [0, 0, 1]] 2
    identity_indices = torch.arange(num_nodes, device=device).unsqueeze(0).repeat(2, 1)
    identity_matrix = torch.sparse_coo_tensor(
        indices=identity_indices,
        values=torch.ones(num_nodes, device=device),
        size=(num_nodes, num_nodes),
    )
    return identity_matrix.coalesce()

get_sparse_normalized_degree_matrix(num_nodes=None, use_edge_weights=False)

Compute the sparse normalized degree matrix D^-½ from a graph edge index.

Parameters:

Name Type Description Default
num_nodes int | None

The number of nodes in the graph. If None, it will be inferred from self.num_nodes. Note that the node indices in edge_index are assumed to be in the range [0, num_nodes-1].

None
use_edge_weights bool

If True, use the edge weights from self.edge_weights. If False, all edges use weight 1.

False

Returns:

Type Description
Tensor

The sparse normalized degree matrix D^-½ of shape (num_nodes, num_nodes).

Source code in hyperbench/types/graph.py
def get_sparse_normalized_degree_matrix(
    self,
    num_nodes: int | None = None,
    use_edge_weights: bool = False,
) -> Tensor:
    """
    Compute the sparse normalized degree matrix D^-1/2 from a graph edge index.

    Args:
        num_nodes: The number of nodes in the graph.
            If ``None``, it will be inferred from ``self.num_nodes``.
            Note that the node indices in ``edge_index`` are assumed to be in the range [0, num_nodes-1].
        use_edge_weights: If ``True``, use the edge weights from ``self.edge_weights``. If ``False``, all edges use weight 1.

    Returns:
        The sparse normalized degree matrix D^-1/2 of shape ``(num_nodes, num_nodes)``.
    """
    device = self.__edge_index.device

    num_nodes = self.num_nodes if num_nodes is None else num_nodes

    adj_matrix = self.get_sparse_adjacency_matrix(
        num_nodes=num_nodes, use_edge_weights=use_edge_weights
    )
    adj_indices = adj_matrix.indices()
    adj_values = adj_matrix.values()

    # Compute degree for each node as the weighted row-sum of the adjacency matrix.
    degrees: Tensor = torch.zeros(num_nodes, device=device, dtype=adj_values.dtype)
    degrees.scatter_add_(dim=0, index=adj_indices[0], src=adj_values)

    # Compute D^-1/2 == D^-0.5
    degree_inv_sqrt: Tensor = degrees.pow(-0.5)
    # Handle isolated nodes where degree is 0, which lead to inf values in degree_inv_sqrt
    degree_inv_sqrt[degree_inv_sqrt == float("inf")] = 0

    # Convert degree vector to a diagonal sparse normalized matrix D
    # Example: degree_inv_sqrt = [1, 0.707, 1, 0]
    #          -> diagonal_indices = [[0, 1, 2, 3],
    #                                 [0, 1, 2, 3]]
    #                   0  1      2  3
    #          -> D = [[1, 0,     0, 0], 0
    #                  [0, 0.707, 0, 0], 1
    #                  [0, 0,     1, 0], 2
    #                  [0, 0,     0, 0]] 3
    diagonal_indices = torch.arange(num_nodes, device=device).unsqueeze(0).repeat(2, 1)
    degree_matrix = torch.sparse_coo_tensor(
        indices=diagonal_indices,
        values=degree_inv_sqrt,
        size=(num_nodes, num_nodes),
    )
    return degree_matrix.coalesce()

get_sparse_normalized_laplacian(num_nodes=None)

Compute the sparse symmetric normalized Laplacian matrix: L = I - D^{-½} A D^{-½}.

Unlike get_sparse_normalized_gcn_laplacian, this method does not add self-loops and computes the standard Laplacian (not the GCN propagation matrix).

Parameters:

Name Type Description Default
num_nodes int | None

The number of nodes in the graph. If None, it will be inferred from self.num_nodes.

None

Returns:

Type Description
Tensor

The sparse symmetric normalized Laplacian matrix of shape (num_nodes, num_nodes).

Source code in hyperbench/types/graph.py
def get_sparse_normalized_laplacian(
    self,
    num_nodes: int | None = None,
) -> Tensor:
    """
    Compute the sparse symmetric normalized Laplacian matrix: L = I - D^{-1/2} A D^{-1/2}.

    Unlike ``get_sparse_normalized_gcn_laplacian``, this method does not add self-loops
    and computes the standard Laplacian (not the GCN propagation matrix).

    Args:
        num_nodes: The number of nodes in the graph. If ``None``,
            it will be inferred from ``self.num_nodes``.

    Returns:
        The sparse symmetric normalized Laplacian matrix of shape ``(num_nodes, num_nodes)``.
    """
    self.to_undirected(with_selfloops=False)

    num_nodes = self.num_nodes if num_nodes is None else num_nodes

    degree_matrix = self.get_sparse_normalized_degree_matrix(num_nodes)
    adj_matrix = self.get_sparse_adjacency_matrix(num_nodes)

    # D^{-1/2} A D^{-1/2}
    normalized_adj_matrix = torch.sparse.mm(
        degree_matrix,
        torch.sparse.mm(adj_matrix, degree_matrix),
    )

    # L = I - D^{-1/2} A D^{-1/2}
    identity_matrix = self.get_sparse_identity_matrix(num_nodes)
    return (identity_matrix - normalized_adj_matrix).coalesce()

get_sparse_normalized_gcn_laplacian(num_nodes=None, use_edge_weights=False)

Compute the sparse Laplacian matrix from a graph edge index.

The GCN Laplacian is defined as: L_GCN = D_hat^-½ * A_hat * D_hat^-½, where A_hat = A + I (adjacency with self-loops) and D_hat is the degree matrix of A_hat.

Parameters:

Name Type Description Default
num_nodes int | None

The number of nodes in the graph. If None, it will be inferred from self.num_nodes. Note that the node indices in edge_index are assumed to be in the range [0, num_nodes-1]. This parameter is important when edge_index does not contain all nodes (e.g., some nodes are isolated and have no edges or have been removed), as it ensures that the resulting Laplacian matrix has the correct size and includes all nodes. For instance, for self-loops.

None
use_edge_weights bool

If True, use the edge weights from self.edge_weights. If False, all edges use weight 1.

False

Returns:

Type Description
Tensor

The sparse symmetrically normalized Laplacian matrix of shape (num_nodes, num_nodes).

Source code in hyperbench/types/graph.py
def get_sparse_normalized_gcn_laplacian(
    self,
    num_nodes: int | None = None,
    use_edge_weights: bool = False,
) -> Tensor:
    """
    Compute the sparse Laplacian matrix from a graph edge index.

    The GCN Laplacian is defined as: L_GCN = D_hat^-1/2 * A_hat * D_hat^-1/2,
    where A_hat = A + I (adjacency with self-loops) and D_hat is the degree matrix of A_hat.

    Args:
        num_nodes: The number of nodes in the graph. If ``None``,
            it will be inferred from ``self.num_nodes``.
            Note that the node indices in ``edge_index`` are assumed to be in the range [0, num_nodes-1].
            This parameter is important when ``edge_index`` does not contain all nodes (e.g., some nodes are isolated and have no edges or have been removed),
            as it ensures that the resulting Laplacian matrix has the correct size and includes all nodes. For instance, for self-loops.
        use_edge_weights: If ``True``, use the edge weights from ``self.edge_weights``. If ``False``, all edges use weight 1.

    Returns:
        The sparse symmetrically normalized Laplacian matrix of shape ``(num_nodes, num_nodes)``.
    """
    self.to_undirected(with_selfloops=True, num_nodes=num_nodes)

    num_nodes = self.num_nodes if num_nodes is None else num_nodes

    degree_matrix = self.get_sparse_normalized_degree_matrix(
        num_nodes=num_nodes, use_edge_weights=use_edge_weights
    )
    adj_matrix = self.get_sparse_adjacency_matrix(
        num_nodes=num_nodes, use_edge_weights=use_edge_weights
    )

    # Compute normalized Laplacian matrix: L = D^-1/2 * A * D^-1/2
    normalized_laplacian_matrix = torch.sparse.mm(
        degree_matrix,
        torch.sparse.mm(adj_matrix, degree_matrix),
    )
    return normalized_laplacian_matrix.coalesce()

remove_selfloops()

Remove self-loops from the edge index.

Source code in hyperbench/types/graph.py
def remove_selfloops(self) -> "EdgeIndex":
    """Remove self-loops from the edge index."""
    # Example: edge_index = [[0, 1, 2, 3],
    #                        [1, 1, 3, 2]], shape (2, |E| = 4)
    #          -> keep_mask = [True, False, True, True]
    #          -> edge_index = [[0, 2, 3],
    #                           [1, 3, 2]], shape (2, |E'| = 3)
    keep_mask = self.__edge_index[0] != self.__edge_index[1]
    self.__edge_index = self.__edge_index[:, keep_mask]
    if self.__edge_weights is not None:
        self.__edge_weights = self.__edge_weights[keep_mask]
    return self

remove_duplicate_edges(num_nodes=None)

Remove duplicate edges from the edge index. Keeps the tensor contiguous in memory.

Parameters:

Name Type Description Default
num_nodes int | None

The number of nodes in the graph. If None, it will be inferred from self.num_nodes. This parameter is important when edge_index does not contain all nodes (e.g., some nodes are isolated and have no edges or have been removed), as it ensures that the resulting Laplacian matrix has the correct size and includes all nodes. For instance, for self-loops.

None

Returns:

Name Type Description
This EdgeIndex

class:EdgeIndex instance with duplicate edges removed.

Source code in hyperbench/types/graph.py
def remove_duplicate_edges(self, num_nodes: int | None = None) -> "EdgeIndex":
    """
    Remove duplicate edges from the edge index. Keeps the tensor contiguous in memory.

    Args:
        num_nodes: The number of nodes in the graph. If ``None``, it will be inferred from ``self.num_nodes``.
            This parameter is important when ``edge_index`` does not contain all nodes (e.g., some nodes are isolated and have no edges or have been removed),
            as it ensures that the resulting Laplacian matrix has the correct size and includes all nodes. For instance, for self-loops.

    Returns:
        This :class:`EdgeIndex` instance with duplicate edges removed.
    """
    # Example: edge_index = [[0, 1, 2, 2, 0, 3, 2],
    #                        [1, 0, 3, 2, 1, 2, 2]], shape (2, |E| = 7)
    #          -> after torch.unique(..., dim=1):
    #             edge_index = [[0, 1, 2, 2, 3],
    #                           [1, 0, 3, 2, 2]], shape (2, |E'| = 5)
    # Note: we call contiguous() to ensure that the resulting tensor is contiguous in memory,
    # which can improve performance for subsequent operations that require contiguous tensors.
    if self.__edge_weights is None:
        self.__edge_index = torch.unique(self.__edge_index, dim=1).contiguous()
        return self

    # No edges to process, just ensure tensors are contiguous
    if self.num_edges < 1:
        self.__edge_index = self.__edge_index.contiguous()
        self.__edge_weights = self.__edge_weights.contiguous()
        return self

    # When edge weights are present, we need to use torch.sparse_coo_tensor
    # to remove duplicate edges while preserving the weights.
    # Example: edge_index = [[0, 0, 1],
    #                        [1, 1, 2]]
    #          edge_weights = [1.0, 2.0, 3.0]
    #          -> before coalesce, we have duplicate edges (0, 1) with weights 1.0 and 2.0
    #          -> after `.coalesce()`:
    #          -> edge_index = [[0, 1],
    #                           [1, 2]]
    #          -> edge_weights = [3.0, 3.0] (weights of duplicate edges are summed)
    num_nodes = self.num_nodes if num_nodes is None else num_nodes
    coalesced = torch.sparse_coo_tensor(
        self.__edge_index,
        self.__edge_weights,
        size=(num_nodes, num_nodes),
    ).coalesce()
    self.__edge_index = coalesced.indices().contiguous()
    self.__edge_weights = coalesced.values().contiguous()
    return self

to_undirected(with_selfloops=False, num_nodes=None)

Convert the edge index to an undirected edge index by adding reverse edges.

Parameters:

Name Type Description Default
with_selfloops bool

Whether to add self-loops to each node. Defaults to False.

False
num_nodes int | None

Total number of nodes. Propagated to add_selfloops when with_selfloops is True. This parameter is useful when edge_index does not contain all nodes (e.g., some nodes are isolated and have no edges or have been removed), as it ensures that the resulting Laplacian matrix has the correct size and includes all nodes. For instance, for self-loops.

None

Returns:

Name Type Description
This EdgeIndex

class:EdgeIndex instance converted to undirected.

Source code in hyperbench/types/graph.py
def to_undirected(
    self,
    with_selfloops: bool = False,
    num_nodes: int | None = None,
) -> "EdgeIndex":
    """
    Convert the edge index to an undirected edge index by adding reverse edges.

    Args:
        with_selfloops: Whether to add self-loops to each node. Defaults to ``False``.
        num_nodes: Total number of nodes. Propagated to ``add_selfloops`` when ``with_selfloops`` is ``True``.
            This parameter is useful when ``edge_index`` does not contain all nodes (e.g., some nodes are isolated and have no edges or have been removed),
            as it ensures that the resulting Laplacian matrix has the correct size and includes all nodes. For instance, for self-loops.

    Returns:
        This :class:`EdgeIndex` instance converted to undirected.
    """
    device = self.__edge_index.device
    num_nodes = self.num_nodes if num_nodes is None else num_nodes

    orig_src, orig_dest = self.__edge_index[0], self.__edge_index[1]

    # Encode each directed edge (u, v) as a unique scalar key u * num_nodes + v.
    # Example: num_nodes = 4, orig_src  = [0, 1, 2], orig_dest = [1, 0, 3]
    #          -> edges are [(0,1), (1,0), (2,3)]
    #          -> encoded_edge_ids = [0*4+1, 1*4+0, 2*4+3] = [1, 4, 11]
    encoded_edge_ids = orig_src * num_nodes + orig_dest

    # Build the key for the reverse of each existing edge.
    # Example: reverse edges are [(1,0), (0,1), (3,2)]
    #          -> reversed_encoded_edge_ids = [1*4+0, 0*4+1, 3*4+2] = [4, 1, 14]
    reversed_encoded_edge_ids = orig_dest * num_nodes + orig_src

    # Example: encoded_edge_ids          = [1, 4, 11],
    #          reversed_encoded_edge_ids = [4, 1, 14]
    #          -> missing_reverse_mask = [False, False, True]
    #             because 4 and 1 are in both, it means edges (0,1) and (1,0) are already present,
    #             but 14 is only in reversed_encoded_edge_ids, which means edge (3,2) is missing
    #             and this is because the mask points to the missing reversee edges that are missing
    missing_mask = torch.logical_not(torch.isin(reversed_encoded_edge_ids, encoded_edge_ids))

    # Keep all original sources and append the destination of each edge whose reverse is missing.
    # Example: orig_src = [0, 1, 2], orig_dest[missing_mask] = [3]
    #          -> src = [0, 1, 2, 3]
    src = torch.cat([orig_src, orig_dest[missing_mask]])

    # Keep all original destinations and append the source of each edge whose reverse is missing.
    # Example: orig_dest = [1, 0, 3], orig_src[missing_mask] = [2]
    #          -> dest = [1, 0, 3, 2]
    #          -> final undirected edges: [(0,1), (1,0), (2,3), (3,2)]
    dest = torch.cat([orig_dest, orig_src[missing_mask]])

    # Example: edge_index = [[0, 1, 2],
    #                        [1, 0, 3]]
    #          -> after torch.stack([...], dim=0):
    #             undirected_edge_index = [[0, 1, 2, 1, 0, 3],
    #                                      [1, 0, 3, 0, 1, 2]]
    #          -> after torch.unique(..., dim=1):
    #             undirected_edge_index = [[0, 1, 2, 3],
    #                                      [1, 0, 3, 2]]
    self.__edge_index = torch.stack([src, dest], dim=0).to(device)

    # The new edges have the same weights as the original edges.
    # Example: edge_index = [[0, 1, 2],
    #                        [1, 0, 3]]
    #          edge_weights = [0.5, 1.0, 2.0]
    #          -> after adding reverse edges:
    #             edge_index = [[0, 1, 2, 1, 0, 3],
    #                           [1, 0, 3, 0, 1, 2]]
    #             edge_weights = [0.5, 1.0, 2.0, 0.5, 1.0, 2.0]
    self.__edge_weights = (
        torch.cat([self.__edge_weights, self.__edge_weights[missing_mask]])
        if self.__edge_weights is not None
        else None
    )

    if with_selfloops:
        # Don't remove duplicate edges when adding self-loops, as we need to remove them
        # even if with_selfloops is False, to ensure that the edge index is clean and doesn't contain duplicate edges.
        # In this way, we don't do the duplicate edge removal twice, which would be redundant and inefficient
        self.add_selfloops(num_nodes=num_nodes, with_duplicate_removal=False)

    self.remove_duplicate_edges()

    return self

Graph

A simple graph data structure using edge list representation.

Parameters:

Name Type Description Default
edges list[list[int]]

A list of edges, where each edge is represented as a list of two integers (source_node, destination_node).

required
edge_weights list[float] | None

Optional list of edge weights corresponding to each edge in edges. If provided, must have the same length as edges.

None
Source code in hyperbench/types/graph.py
class Graph:
    """
    A simple graph data structure using edge list representation.

    Args:
        edges: A list of edges, where each edge is represented as a list of two integers (source_node, destination_node).
        edge_weights: Optional list of edge weights corresponding to each edge in ``edges``. If provided, must have the same length as ``edges``.
    """

    def __init__(self, edges: list[list[int]], edge_weights: list[float] | None = None):
        self.edges = edges
        self.__validate_edge_weights(edge_weights)
        self.__edge_weights = edge_weights

    @property
    def edge_weights(self) -> list[float] | None:
        """Return the edge weights, if present."""
        return self.__edge_weights

    @property
    def edge_weights_tensor(self) -> Tensor:
        """Return the edge weights as a tensor, if present."""
        if self.__edge_weights is not None:
            return torch.tensor(self.__edge_weights, dtype=torch.float)
        return torch.empty(0, dtype=torch.float)

    @property
    def num_nodes(self) -> int:
        """Return the number of nodes in the graph."""
        nodes = set()
        for edge in self.edges:
            nodes.update(edge)
        return len(nodes)

    @property
    def num_edges(self) -> int:
        """Return the number of edges in the graph."""
        return len(self.edges)

    def remove_selfloops(self) -> "Graph":
        """
        Remove self-loops from the graph.

        Returns:
            List of edges without self-loops.
        """
        if self.num_edges == 0:
            return self

        edges_tensor = torch.tensor(self.edges, dtype=torch.long)
        edge_weights_tensor = (
            torch.tensor(self.edge_weights, dtype=torch.float)
            if self.edge_weights is not None
            else None
        )

        # Example: edges = [[0, 1],
        #                   [1, 1],
        #                   [2, 3]] shape (|E|, 2)
        #          -> no_selfloop_mask = [True, False, True]
        #          -> edges without self-loops = [[0, 1],
        #                                         [2, 3]]
        no_selfloop_mask = edges_tensor[:, 0] != edges_tensor[:, 1]
        self.edges = edges_tensor[no_selfloop_mask].tolist()

        # Example: edge_weights = [0.5, 1.0, 0.8], no_selfloop_mask = [True, False, True]
        #         -> edge_weights without self-loops = [0.5, 0.8]
        if edge_weights_tensor is not None:
            self.__edge_weights = edge_weights_tensor[no_selfloop_mask].tolist()

        return self

    def to_edge_index(self) -> Tensor:
        """
        Convert the graph to edge index representation.

        Returns:
            edge_index: Tensor of shape (2, |E|) representing edges.
        """
        if self.num_edges == 0:
            return torch.empty((2, 0), dtype=torch.long)

        # Example: edges = [[0, 1],
        #                   [1, 2],
        #                   [2, 3]] shape (|E|, 2)
        #          ->  edge_index = [[0, 1, 2],
        #                            [1, 2, 3]] shape (2, |E|)
        edge_index = torch.tensor(self.edges, dtype=torch.long).t()
        return edge_index

    def __validate_edge_weights(self, edge_weights: list[float] | None) -> None:
        if edge_weights is None:
            return

        if len(edge_weights) != self.num_edges:
            raise ValueError(
                "edge_weights must have the same number of entries as edges. "
                f"Got {len(edge_weights)} edge weights but {self.num_edges} edges."
            )

    @staticmethod
    def smoothing_with_laplacian_matrix(
        x: Tensor,
        laplacian_matrix: Tensor,
        drop_rate: float = 0.0,
    ) -> Tensor:
        """
        Return the feature matrix smoothed with a Laplacian matrix.

        Args:
            x: Node feature matrix. Size ``(num_nodes, C)``.
            laplacian_matrix: The Laplacian matrix. Size ``(num_nodes, num_nodes)``.
            drop_rate: Randomly dropout the connections in the Laplacian with probability ``drop_rate``. Defaults to ``0.0``.

        Returns:
            The smoothed feature matrix. Size ``(num_nodes, C)``.
        """
        if drop_rate > 0.0:
            laplacian_matrix = utils.sparse_dropout(laplacian_matrix, drop_rate)
        return laplacian_matrix.matmul(x)

edge_weights property

Return the edge weights, if present.

edge_weights_tensor property

Return the edge weights as a tensor, if present.

num_nodes property

Return the number of nodes in the graph.

num_edges property

Return the number of edges in the graph.

remove_selfloops()

Remove self-loops from the graph.

Returns:

Type Description
Graph

List of edges without self-loops.

Source code in hyperbench/types/graph.py
def remove_selfloops(self) -> "Graph":
    """
    Remove self-loops from the graph.

    Returns:
        List of edges without self-loops.
    """
    if self.num_edges == 0:
        return self

    edges_tensor = torch.tensor(self.edges, dtype=torch.long)
    edge_weights_tensor = (
        torch.tensor(self.edge_weights, dtype=torch.float)
        if self.edge_weights is not None
        else None
    )

    # Example: edges = [[0, 1],
    #                   [1, 1],
    #                   [2, 3]] shape (|E|, 2)
    #          -> no_selfloop_mask = [True, False, True]
    #          -> edges without self-loops = [[0, 1],
    #                                         [2, 3]]
    no_selfloop_mask = edges_tensor[:, 0] != edges_tensor[:, 1]
    self.edges = edges_tensor[no_selfloop_mask].tolist()

    # Example: edge_weights = [0.5, 1.0, 0.8], no_selfloop_mask = [True, False, True]
    #         -> edge_weights without self-loops = [0.5, 0.8]
    if edge_weights_tensor is not None:
        self.__edge_weights = edge_weights_tensor[no_selfloop_mask].tolist()

    return self

to_edge_index()

Convert the graph to edge index representation.

Returns:

Name Type Description
edge_index Tensor

Tensor of shape (2, |E|) representing edges.

Source code in hyperbench/types/graph.py
def to_edge_index(self) -> Tensor:
    """
    Convert the graph to edge index representation.

    Returns:
        edge_index: Tensor of shape (2, |E|) representing edges.
    """
    if self.num_edges == 0:
        return torch.empty((2, 0), dtype=torch.long)

    # Example: edges = [[0, 1],
    #                   [1, 2],
    #                   [2, 3]] shape (|E|, 2)
    #          ->  edge_index = [[0, 1, 2],
    #                            [1, 2, 3]] shape (2, |E|)
    edge_index = torch.tensor(self.edges, dtype=torch.long).t()
    return edge_index

smoothing_with_laplacian_matrix(x, laplacian_matrix, drop_rate=0.0) staticmethod

Return the feature matrix smoothed with a Laplacian matrix.

Parameters:

Name Type Description Default
x Tensor

Node feature matrix. Size (num_nodes, C).

required
laplacian_matrix Tensor

The Laplacian matrix. Size (num_nodes, num_nodes).

required
drop_rate float

Randomly dropout the connections in the Laplacian with probability drop_rate. Defaults to 0.0.

0.0

Returns:

Type Description
Tensor

The smoothed feature matrix. Size (num_nodes, C).

Source code in hyperbench/types/graph.py
@staticmethod
def smoothing_with_laplacian_matrix(
    x: Tensor,
    laplacian_matrix: Tensor,
    drop_rate: float = 0.0,
) -> Tensor:
    """
    Return the feature matrix smoothed with a Laplacian matrix.

    Args:
        x: Node feature matrix. Size ``(num_nodes, C)``.
        laplacian_matrix: The Laplacian matrix. Size ``(num_nodes, num_nodes)``.
        drop_rate: Randomly dropout the connections in the Laplacian with probability ``drop_rate``. Defaults to ``0.0``.

    Returns:
        The smoothed feature matrix. Size ``(num_nodes, C)``.
    """
    if drop_rate > 0.0:
        laplacian_matrix = utils.sparse_dropout(laplacian_matrix, drop_rate)
    return laplacian_matrix.matmul(x)

HIFHypergraph

A hypergraph data structure that supports directed/undirected hyperedges with incidence-based representation.

Parameters:

Name Type Description Default
network_type Literal['asc', 'directed', 'undirected'] | None

The type of hypergraph, which can be "asc" (or "directed") for directed hyperedges, or "undirected" for undirected hyperedges.

None
metadata dict[str, Any] | None

Optional dictionary of metadata about the hypergraph.

None
incidences list[dict[str, Any]] | None

A list of incidences, where each incidence is a dictionary with keys "node" and "edge" representing the relationship between a node and a hyperedge.

None
nodes list[dict[str, Any]] | None

A list of node dictionaries, where each dictionary contains information about a node (e.g., id, features).

None
hyperedges list[dict[str, Any]] | None

A list of edge dictionaries, where each dictionary contains information about a hyperedge (e.g., id, features).

None
Source code in hyperbench/types/hypergraph.py
class HIFHypergraph:
    """
    A hypergraph data structure that supports directed/undirected hyperedges
    with incidence-based representation.

    Args:
        network_type: The type of hypergraph, which can be "asc" (or "directed") for directed hyperedges, or "undirected" for undirected hyperedges.
        metadata: Optional dictionary of metadata about the hypergraph.
        incidences: A list of incidences, where each incidence is a dictionary with keys "node" and "edge" representing the relationship between a node and a hyperedge.
        nodes: A list of node dictionaries, where each dictionary contains information about a node (e.g., id, features).
        hyperedges: A list of edge dictionaries, where each dictionary contains information about a hyperedge (e.g., id, features).
    """

    def __init__(
        self,
        network_type: Literal["asc", "directed", "undirected"] | None = None,
        metadata: dict[str, Any] | None = None,
        incidences: list[dict[str, Any]] | None = None,
        nodes: list[dict[str, Any]] | None = None,
        hyperedges: list[dict[str, Any]] | None = None,
    ):
        self.network_type = network_type
        self.metadata = metadata if metadata is not None else {}
        self.incidences = incidences if incidences is not None else []
        self.nodes = nodes if nodes is not None else []
        self.hyperedges = hyperedges if hyperedges is not None else []

    @classmethod
    def empty(cls) -> "HIFHypergraph":
        return cls(
            network_type="undirected",
            nodes=[],
            hyperedges=[],
            incidences=[],
            metadata=None,
        )

    @classmethod
    def from_hif(cls, data: dict[str, Any]) -> "HIFHypergraph":
        """
        Create a Hypergraph from a HIF (Hypergraph Interchange Format).

        Args:
            data: Dictionary with keys: network-type, metadata, incidences, nodes, hyperedges

        Returns:
            Hypergraph instance
        """
        network_type = data.get("network-type") or data.get("network_type")
        metadata = data.get("metadata", {})
        incidences = data.get("incidences", [])
        nodes = data.get("nodes", [])
        hyperedges = data.get("edges", [])

        return cls(
            network_type=network_type,
            metadata=metadata,
            incidences=incidences,
            nodes=nodes,
            hyperedges=hyperedges,
        )

    @property
    def num_nodes(self) -> int:
        """Return the number of nodes in the hypergraph."""
        return len(self.nodes)

    @property
    def num_hyperedges(self) -> int:
        """Return the number of hyperedges in the hypergraph."""
        return len(self.hyperedges)

    def stats(self) -> dict[str, Any]:
        """
        Compute statistics for the HIFhypergraph.
        The fields returned in the dictionary include:
        - ``num_nodes``: The number of nodes in the hypergraph.
        - ``num_hyperedges``: The number of hyperedges in the hypergraph.
        - ``avg_degree_node_raw``: The average degree of nodes, calculated as the mean number of hyperedges each node belongs to.
        - ``avg_degree_node``: The floored node average degree.
        - ``avg_degree_hyperedge_raw``: The average size of hyperedges, calculated as the mean number of nodes each hyperedge contains.
        - ``avg_degree_hyperedge``: The floored hyperedge average size.
        - ``node_degree_max``: The maximum degree of any node in the hypergraph.
        - ``hyperedge_degree_max``: The maximum size of any hyperedge in the hypergraph.
        - ``node_degree_median``: The median degree of nodes in the hypergraph.
        - ``hyperedge_degree_median``: The median size of hyperedges in the hypergraph.
        - ``distribution_node_degree``: A list where the value at index ``i`` represents the count of nodes with degree ``i``.
        - ``distribution_hyperedge_size``: A list where the value at index ``i`` represents the count of hyperedges with size ``i``.
        - ``distribution_node_degree_hist``: A dictionary where the keys are node degrees and the values are the count of nodes with that degree.
        - ``distribution_hyperedge_size_hist``: A dictionary where the keys are hyperedge sizes and the values are the count of hyperedges with that size.

        Returns:
            A dictionary containing various statistics about the hypergraph.
        """

        node_degree: dict[Any, int] = {}
        hyperedge_size: dict[Any, int] = {}

        for incidence in self.incidences:
            node_id = incidence.get("node")
            edge_id = incidence.get("edge")
            node_degree[node_id] = node_degree.get(node_id, 0) + 1
            hyperedge_size[edge_id] = hyperedge_size.get(edge_id, 0) + 1

        num_nodes = len(self.nodes)
        num_hyperedges = len(self.hyperedges)
        total_incidences = len(self.incidences)

        distribution_node_degree: list[int] = sorted(node_degree.values())
        distribution_hyperedge_size: list[int] = sorted(hyperedge_size.values())

        avg_degree_node_raw = total_incidences / num_nodes if num_nodes else 0
        avg_degree_node = int(avg_degree_node_raw)
        avg_degree_hyperedge_raw = total_incidences / num_hyperedges if num_hyperedges else 0
        avg_degree_hyperedge = int(avg_degree_hyperedge_raw)

        node_degree_max = max(distribution_node_degree) if distribution_node_degree else 0
        hyperedge_degree_max = (
            max(distribution_hyperedge_size) if distribution_hyperedge_size else 0
        )

        n_n = len(distribution_node_degree)
        node_degree_median = (
            (
                distribution_node_degree[n_n // 2]
                if n_n % 2
                else (distribution_node_degree[n_n // 2 - 1] + distribution_node_degree[n_n // 2])
                / 2
            )
            if n_n
            else 0
        )

        n_e = len(distribution_hyperedge_size)
        hyperedge_degree_median = (
            (
                distribution_hyperedge_size[n_e // 2]
                if n_e % 2
                else (
                    distribution_hyperedge_size[n_e // 2 - 1]
                    + distribution_hyperedge_size[n_e // 2]
                )
                / 2
            )
            if n_e
            else 0
        )

        distribution_node_degree_hist: dict[int, int] = {}
        for d in distribution_node_degree:
            distribution_node_degree_hist[d] = distribution_node_degree_hist.get(d, 0) + 1

        distribution_hyperedge_size_hist: dict[int, int] = {}
        for s in distribution_hyperedge_size:
            distribution_hyperedge_size_hist[s] = distribution_hyperedge_size_hist.get(s, 0) + 1

        return {
            "num_nodes": num_nodes,
            "num_hyperedges": num_hyperedges,
            "avg_degree_node_raw": avg_degree_node_raw,
            "avg_degree_node": avg_degree_node,
            "avg_degree_hyperedge_raw": avg_degree_hyperedge_raw,
            "avg_degree_hyperedge": avg_degree_hyperedge,
            "node_degree_max": node_degree_max,
            "hyperedge_degree_max": hyperedge_degree_max,
            "node_degree_median": node_degree_median,
            "hyperedge_degree_median": hyperedge_degree_median,
            "distribution_node_degree": distribution_node_degree,
            "distribution_hyperedge_size": distribution_hyperedge_size,
            "distribution_node_degree_hist": distribution_node_degree_hist,
            "distribution_hyperedge_size_hist": distribution_hyperedge_size_hist,
        }

num_nodes property

Return the number of nodes in the hypergraph.

num_hyperedges property

Return the number of hyperedges in the hypergraph.

from_hif(data) classmethod

Create a Hypergraph from a HIF (Hypergraph Interchange Format).

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary with keys: network-type, metadata, incidences, nodes, hyperedges

required

Returns:

Type Description
HIFHypergraph

Hypergraph instance

Source code in hyperbench/types/hypergraph.py
@classmethod
def from_hif(cls, data: dict[str, Any]) -> "HIFHypergraph":
    """
    Create a Hypergraph from a HIF (Hypergraph Interchange Format).

    Args:
        data: Dictionary with keys: network-type, metadata, incidences, nodes, hyperedges

    Returns:
        Hypergraph instance
    """
    network_type = data.get("network-type") or data.get("network_type")
    metadata = data.get("metadata", {})
    incidences = data.get("incidences", [])
    nodes = data.get("nodes", [])
    hyperedges = data.get("edges", [])

    return cls(
        network_type=network_type,
        metadata=metadata,
        incidences=incidences,
        nodes=nodes,
        hyperedges=hyperedges,
    )

stats()

Compute statistics for the HIFhypergraph. The fields returned in the dictionary include: - num_nodes: The number of nodes in the hypergraph. - num_hyperedges: The number of hyperedges in the hypergraph. - avg_degree_node_raw: The average degree of nodes, calculated as the mean number of hyperedges each node belongs to. - avg_degree_node: The floored node average degree. - avg_degree_hyperedge_raw: The average size of hyperedges, calculated as the mean number of nodes each hyperedge contains. - avg_degree_hyperedge: The floored hyperedge average size. - node_degree_max: The maximum degree of any node in the hypergraph. - hyperedge_degree_max: The maximum size of any hyperedge in the hypergraph. - node_degree_median: The median degree of nodes in the hypergraph. - hyperedge_degree_median: The median size of hyperedges in the hypergraph. - distribution_node_degree: A list where the value at index i represents the count of nodes with degree i. - distribution_hyperedge_size: A list where the value at index i represents the count of hyperedges with size i. - distribution_node_degree_hist: A dictionary where the keys are node degrees and the values are the count of nodes with that degree. - distribution_hyperedge_size_hist: A dictionary where the keys are hyperedge sizes and the values are the count of hyperedges with that size.

Returns:

Type Description
dict[str, Any]

A dictionary containing various statistics about the hypergraph.

Source code in hyperbench/types/hypergraph.py
def stats(self) -> dict[str, Any]:
    """
    Compute statistics for the HIFhypergraph.
    The fields returned in the dictionary include:
    - ``num_nodes``: The number of nodes in the hypergraph.
    - ``num_hyperedges``: The number of hyperedges in the hypergraph.
    - ``avg_degree_node_raw``: The average degree of nodes, calculated as the mean number of hyperedges each node belongs to.
    - ``avg_degree_node``: The floored node average degree.
    - ``avg_degree_hyperedge_raw``: The average size of hyperedges, calculated as the mean number of nodes each hyperedge contains.
    - ``avg_degree_hyperedge``: The floored hyperedge average size.
    - ``node_degree_max``: The maximum degree of any node in the hypergraph.
    - ``hyperedge_degree_max``: The maximum size of any hyperedge in the hypergraph.
    - ``node_degree_median``: The median degree of nodes in the hypergraph.
    - ``hyperedge_degree_median``: The median size of hyperedges in the hypergraph.
    - ``distribution_node_degree``: A list where the value at index ``i`` represents the count of nodes with degree ``i``.
    - ``distribution_hyperedge_size``: A list where the value at index ``i`` represents the count of hyperedges with size ``i``.
    - ``distribution_node_degree_hist``: A dictionary where the keys are node degrees and the values are the count of nodes with that degree.
    - ``distribution_hyperedge_size_hist``: A dictionary where the keys are hyperedge sizes and the values are the count of hyperedges with that size.

    Returns:
        A dictionary containing various statistics about the hypergraph.
    """

    node_degree: dict[Any, int] = {}
    hyperedge_size: dict[Any, int] = {}

    for incidence in self.incidences:
        node_id = incidence.get("node")
        edge_id = incidence.get("edge")
        node_degree[node_id] = node_degree.get(node_id, 0) + 1
        hyperedge_size[edge_id] = hyperedge_size.get(edge_id, 0) + 1

    num_nodes = len(self.nodes)
    num_hyperedges = len(self.hyperedges)
    total_incidences = len(self.incidences)

    distribution_node_degree: list[int] = sorted(node_degree.values())
    distribution_hyperedge_size: list[int] = sorted(hyperedge_size.values())

    avg_degree_node_raw = total_incidences / num_nodes if num_nodes else 0
    avg_degree_node = int(avg_degree_node_raw)
    avg_degree_hyperedge_raw = total_incidences / num_hyperedges if num_hyperedges else 0
    avg_degree_hyperedge = int(avg_degree_hyperedge_raw)

    node_degree_max = max(distribution_node_degree) if distribution_node_degree else 0
    hyperedge_degree_max = (
        max(distribution_hyperedge_size) if distribution_hyperedge_size else 0
    )

    n_n = len(distribution_node_degree)
    node_degree_median = (
        (
            distribution_node_degree[n_n // 2]
            if n_n % 2
            else (distribution_node_degree[n_n // 2 - 1] + distribution_node_degree[n_n // 2])
            / 2
        )
        if n_n
        else 0
    )

    n_e = len(distribution_hyperedge_size)
    hyperedge_degree_median = (
        (
            distribution_hyperedge_size[n_e // 2]
            if n_e % 2
            else (
                distribution_hyperedge_size[n_e // 2 - 1]
                + distribution_hyperedge_size[n_e // 2]
            )
            / 2
        )
        if n_e
        else 0
    )

    distribution_node_degree_hist: dict[int, int] = {}
    for d in distribution_node_degree:
        distribution_node_degree_hist[d] = distribution_node_degree_hist.get(d, 0) + 1

    distribution_hyperedge_size_hist: dict[int, int] = {}
    for s in distribution_hyperedge_size:
        distribution_hyperedge_size_hist[s] = distribution_hyperedge_size_hist.get(s, 0) + 1

    return {
        "num_nodes": num_nodes,
        "num_hyperedges": num_hyperedges,
        "avg_degree_node_raw": avg_degree_node_raw,
        "avg_degree_node": avg_degree_node,
        "avg_degree_hyperedge_raw": avg_degree_hyperedge_raw,
        "avg_degree_hyperedge": avg_degree_hyperedge,
        "node_degree_max": node_degree_max,
        "hyperedge_degree_max": hyperedge_degree_max,
        "node_degree_median": node_degree_median,
        "hyperedge_degree_median": hyperedge_degree_median,
        "distribution_node_degree": distribution_node_degree,
        "distribution_hyperedge_size": distribution_hyperedge_size,
        "distribution_node_degree_hist": distribution_node_degree_hist,
        "distribution_hyperedge_size_hist": distribution_hyperedge_size_hist,
    }

Hypergraph

A simple hypergraph data structure using edge list representation.

Parameters:

Name Type Description Default
hyperedges list[list[int]]

A list of hyperedges, where each hyperedge is represented as a list of node IDs.

required
Source code in hyperbench/types/hypergraph.py
class Hypergraph:
    """
    A simple hypergraph data structure using edge list representation.

    Args:
        hyperedges: A list of hyperedges, where each hyperedge is represented as a list of node IDs.
    """

    def __init__(self, hyperedges: list[list[int]]):
        self.hyperedges = hyperedges

    @property
    def num_nodes(self) -> int:
        """Return the number of nodes in the hypergraph."""
        nodes = set()
        for edge in self.hyperedges:
            nodes.update(edge)
        return len(nodes)

    @property
    def num_hyperedges(self) -> int:
        """Return the number of hyperedges in the hypergraph."""
        return len(self.hyperedges)

    def neighbors_of(self, node: int) -> Neighborhood:
        """
        Return the set of nodes that share at least one hyperedge with node.

        A node u is a neighbor of v if there exists a hyperedge e such that
        both u and v are in e. The node itself is excluded from the result.

        Args:
            node: The node ID to find neighbors for.

        Returns:
            A set of neighbor node IDs (excluding the node itself).
        """
        neighbors: Neighborhood = set()
        for hyperedge in self.hyperedges:
            if node in hyperedge:
                neighbors.update(hyperedge)

        neighbors.discard(node)
        return neighbors

    def neighbors_of_all(self) -> dict[int, Neighborhood]:
        """
        Build a mapping from every node to its neighbors.

        This precomputes ``neighbors_of`` for all nodes at once, which is
        more efficient when scoring many candidate hyperedges.

        Returns:
            A dictionary mapping each node ID to its set of neighbors.
        """
        nodes: set[int] = set()
        for hyperedge in self.hyperedges:
            nodes.update(hyperedge)

        node_to_neighbors: dict[int, Neighborhood] = {}
        for node in nodes:
            node_to_neighbors[node] = self.neighbors_of(node)

        return node_to_neighbors

    def stats(self) -> dict[str, Any]:
        """Return basic statistics about the hypergraph."""
        node_degree: dict[int, int] = {}
        distribution_hyperedge_size: list[int] = []
        total_incidences = 0

        for hyperedge in self.hyperedges:
            size = len(hyperedge)
            distribution_hyperedge_size.append(size)
            total_incidences += size
            for node in hyperedge:
                node_degree[node] = node_degree.get(node, 0) + 1

        num_nodes = len(node_degree)
        num_hyperedges = len(self.hyperedges)
        distribution_node_degree: list[int] = sorted(node_degree.values())

        avg_degree_hyperedge = total_incidences / num_hyperedges if num_hyperedges else 0
        total_incidences_nodes = sum(distribution_node_degree)
        avg_degree_node = total_incidences_nodes / num_nodes if num_nodes else 0

        hyperedge_degree_max = (
            max(distribution_hyperedge_size) if distribution_hyperedge_size else 0
        )
        node_degree_max = max(distribution_node_degree) if distribution_node_degree else 0

        sorted_hyperedge_sizes = sorted(distribution_hyperedge_size)
        n_e = len(sorted_hyperedge_sizes)
        hyperedge_degree_median = (
            (
                sorted_hyperedge_sizes[n_e // 2]
                if n_e % 2
                else (sorted_hyperedge_sizes[n_e // 2 - 1] + sorted_hyperedge_sizes[n_e // 2]) / 2
            )
            if n_e
            else 0
        )

        n_n = len(distribution_node_degree)
        node_degree_median = (
            (
                distribution_node_degree[n_n // 2]
                if n_n % 2
                else (distribution_node_degree[n_n // 2 - 1] + distribution_node_degree[n_n // 2])
                / 2
            )
            if n_n
            else 0
        )

        distribution_hyperedge_size_hist: dict[int, int] = {}
        for s in distribution_hyperedge_size:
            distribution_hyperedge_size_hist[s] = distribution_hyperedge_size_hist.get(s, 0) + 1

        distribution_node_degree_hist: dict[int, int] = {}
        for d in distribution_node_degree:
            distribution_node_degree_hist[d] = distribution_node_degree_hist.get(d, 0) + 1

        return {
            "num_nodes": num_nodes,
            "num_hyperedges": num_hyperedges,
            "avg_degree_node": avg_degree_node,
            "avg_degree_hyperedge": avg_degree_hyperedge,
            "node_degree_max": node_degree_max,
            "hyperedge_degree_max": hyperedge_degree_max,
            "node_degree_median": node_degree_median,
            "hyperedge_degree_median": hyperedge_degree_median,
            "distribution_node_degree": distribution_node_degree,
            "distribution_hyperedge_size": distribution_hyperedge_size,
            "distribution_node_degree_hist": distribution_node_degree_hist,
            "distribution_hyperedge_size_hist": distribution_hyperedge_size_hist,
        }

    @classmethod
    def from_hyperedge_index(cls, hyperedge_index: Tensor) -> "Hypergraph":
        """
        Create a Hypergraph from a hyperedge index representation.

        Args:
            hyperedge_index: Tensor of shape (2, |E|) representing hyperedges, where each column is (node, hyperedge).

        Returns:
            Hypergraph instance
        """
        if hyperedge_index.size(1) < 1:
            return cls(hyperedges=[])

        unique_hyperedge_ids = hyperedge_index[1].unique()
        hyperedges = [
            hyperedge_index[0, hyperedge_index[1] == hyperedge_id].tolist()
            for hyperedge_id in unique_hyperedge_ids
        ]

        return cls(hyperedges=hyperedges)

    @staticmethod
    def smoothing_with_matrix(
        x: Tensor,
        matrix: Tensor,
        drop_rate: float = 0.0,
    ) -> Tensor:
        """
        Return the feature matrix smoothed with a smoothing matrix.
        Computes ``M @ X`` where ``M`` is the smoothing matrix and ``X`` is the node feature matrix.

        Args:
            x: Node feature matrix. Size ``(num_nodes, C)``.
            matrix: The smoothing matrix. Size ``(num_nodes, num_nodes)``.
            drop_rate: Randomly dropout the connections in the smoothing matrix with probability ``drop_rate``. Defaults to ``0.0``.

        Returns:
            The smoothed feature matrix. Size ``(num_nodes, C)``.
        """
        if drop_rate > 0.0:
            matrix = sparse_dropout(matrix, drop_rate)
        return matrix.matmul(x)

num_nodes property

Return the number of nodes in the hypergraph.

num_hyperedges property

Return the number of hyperedges in the hypergraph.

neighbors_of(node)

Return the set of nodes that share at least one hyperedge with node.

A node u is a neighbor of v if there exists a hyperedge e such that both u and v are in e. The node itself is excluded from the result.

Parameters:

Name Type Description Default
node int

The node ID to find neighbors for.

required

Returns:

Type Description
Neighborhood

A set of neighbor node IDs (excluding the node itself).

Source code in hyperbench/types/hypergraph.py
def neighbors_of(self, node: int) -> Neighborhood:
    """
    Return the set of nodes that share at least one hyperedge with node.

    A node u is a neighbor of v if there exists a hyperedge e such that
    both u and v are in e. The node itself is excluded from the result.

    Args:
        node: The node ID to find neighbors for.

    Returns:
        A set of neighbor node IDs (excluding the node itself).
    """
    neighbors: Neighborhood = set()
    for hyperedge in self.hyperedges:
        if node in hyperedge:
            neighbors.update(hyperedge)

    neighbors.discard(node)
    return neighbors

neighbors_of_all()

Build a mapping from every node to its neighbors.

This precomputes neighbors_of for all nodes at once, which is more efficient when scoring many candidate hyperedges.

Returns:

Type Description
dict[int, Neighborhood]

A dictionary mapping each node ID to its set of neighbors.

Source code in hyperbench/types/hypergraph.py
def neighbors_of_all(self) -> dict[int, Neighborhood]:
    """
    Build a mapping from every node to its neighbors.

    This precomputes ``neighbors_of`` for all nodes at once, which is
    more efficient when scoring many candidate hyperedges.

    Returns:
        A dictionary mapping each node ID to its set of neighbors.
    """
    nodes: set[int] = set()
    for hyperedge in self.hyperedges:
        nodes.update(hyperedge)

    node_to_neighbors: dict[int, Neighborhood] = {}
    for node in nodes:
        node_to_neighbors[node] = self.neighbors_of(node)

    return node_to_neighbors

stats()

Return basic statistics about the hypergraph.

Source code in hyperbench/types/hypergraph.py
def stats(self) -> dict[str, Any]:
    """Return basic statistics about the hypergraph."""
    node_degree: dict[int, int] = {}
    distribution_hyperedge_size: list[int] = []
    total_incidences = 0

    for hyperedge in self.hyperedges:
        size = len(hyperedge)
        distribution_hyperedge_size.append(size)
        total_incidences += size
        for node in hyperedge:
            node_degree[node] = node_degree.get(node, 0) + 1

    num_nodes = len(node_degree)
    num_hyperedges = len(self.hyperedges)
    distribution_node_degree: list[int] = sorted(node_degree.values())

    avg_degree_hyperedge = total_incidences / num_hyperedges if num_hyperedges else 0
    total_incidences_nodes = sum(distribution_node_degree)
    avg_degree_node = total_incidences_nodes / num_nodes if num_nodes else 0

    hyperedge_degree_max = (
        max(distribution_hyperedge_size) if distribution_hyperedge_size else 0
    )
    node_degree_max = max(distribution_node_degree) if distribution_node_degree else 0

    sorted_hyperedge_sizes = sorted(distribution_hyperedge_size)
    n_e = len(sorted_hyperedge_sizes)
    hyperedge_degree_median = (
        (
            sorted_hyperedge_sizes[n_e // 2]
            if n_e % 2
            else (sorted_hyperedge_sizes[n_e // 2 - 1] + sorted_hyperedge_sizes[n_e // 2]) / 2
        )
        if n_e
        else 0
    )

    n_n = len(distribution_node_degree)
    node_degree_median = (
        (
            distribution_node_degree[n_n // 2]
            if n_n % 2
            else (distribution_node_degree[n_n // 2 - 1] + distribution_node_degree[n_n // 2])
            / 2
        )
        if n_n
        else 0
    )

    distribution_hyperedge_size_hist: dict[int, int] = {}
    for s in distribution_hyperedge_size:
        distribution_hyperedge_size_hist[s] = distribution_hyperedge_size_hist.get(s, 0) + 1

    distribution_node_degree_hist: dict[int, int] = {}
    for d in distribution_node_degree:
        distribution_node_degree_hist[d] = distribution_node_degree_hist.get(d, 0) + 1

    return {
        "num_nodes": num_nodes,
        "num_hyperedges": num_hyperedges,
        "avg_degree_node": avg_degree_node,
        "avg_degree_hyperedge": avg_degree_hyperedge,
        "node_degree_max": node_degree_max,
        "hyperedge_degree_max": hyperedge_degree_max,
        "node_degree_median": node_degree_median,
        "hyperedge_degree_median": hyperedge_degree_median,
        "distribution_node_degree": distribution_node_degree,
        "distribution_hyperedge_size": distribution_hyperedge_size,
        "distribution_node_degree_hist": distribution_node_degree_hist,
        "distribution_hyperedge_size_hist": distribution_hyperedge_size_hist,
    }

from_hyperedge_index(hyperedge_index) classmethod

Create a Hypergraph from a hyperedge index representation.

Parameters:

Name Type Description Default
hyperedge_index Tensor

Tensor of shape (2, |E|) representing hyperedges, where each column is (node, hyperedge).

required

Returns:

Type Description
Hypergraph

Hypergraph instance

Source code in hyperbench/types/hypergraph.py
@classmethod
def from_hyperedge_index(cls, hyperedge_index: Tensor) -> "Hypergraph":
    """
    Create a Hypergraph from a hyperedge index representation.

    Args:
        hyperedge_index: Tensor of shape (2, |E|) representing hyperedges, where each column is (node, hyperedge).

    Returns:
        Hypergraph instance
    """
    if hyperedge_index.size(1) < 1:
        return cls(hyperedges=[])

    unique_hyperedge_ids = hyperedge_index[1].unique()
    hyperedges = [
        hyperedge_index[0, hyperedge_index[1] == hyperedge_id].tolist()
        for hyperedge_id in unique_hyperedge_ids
    ]

    return cls(hyperedges=hyperedges)

smoothing_with_matrix(x, matrix, drop_rate=0.0) staticmethod

Return the feature matrix smoothed with a smoothing matrix. Computes M @ X where M is the smoothing matrix and X is the node feature matrix.

Parameters:

Name Type Description Default
x Tensor

Node feature matrix. Size (num_nodes, C).

required
matrix Tensor

The smoothing matrix. Size (num_nodes, num_nodes).

required
drop_rate float

Randomly dropout the connections in the smoothing matrix with probability drop_rate. Defaults to 0.0.

0.0

Returns:

Type Description
Tensor

The smoothed feature matrix. Size (num_nodes, C).

Source code in hyperbench/types/hypergraph.py
@staticmethod
def smoothing_with_matrix(
    x: Tensor,
    matrix: Tensor,
    drop_rate: float = 0.0,
) -> Tensor:
    """
    Return the feature matrix smoothed with a smoothing matrix.
    Computes ``M @ X`` where ``M`` is the smoothing matrix and ``X`` is the node feature matrix.

    Args:
        x: Node feature matrix. Size ``(num_nodes, C)``.
        matrix: The smoothing matrix. Size ``(num_nodes, num_nodes)``.
        drop_rate: Randomly dropout the connections in the smoothing matrix with probability ``drop_rate``. Defaults to ``0.0``.

    Returns:
        The smoothed feature matrix. Size ``(num_nodes, C)``.
    """
    if drop_rate > 0.0:
        matrix = sparse_dropout(matrix, drop_rate)
    return matrix.matmul(x)

HyperedgeIndex

A wrapper for hyperedge index representation. Hyperedge index is a tensor of shape (2, num_incidences) that encodes the relationships between nodes and hyperedges. Each column in the tensor represents an incidence between a node and a hyperedge, with the first row containing node indices and the second row containing corresponding hyperedge indices.

Examples:

>>> hyperedge_index = [[0, 1, 2, 0],
...                    [0, 0, 0, 1]]

This represents two hyperedges: - Hyperedge 0 connects nodes 0, 1, and 2. - Hyperedge 1 connects node 0.

The number of nodes in this hypergraph is 3 (nodes 0, 1, and 2). The number of hyperedges is 2 (hyperedges 0 and 1).

Parameters:

Name Type Description Default
hyperedge_index Tensor

A tensor of shape (2, num_incidences) representing hyperedges, where each column is (node, hyperedge).

required
Source code in hyperbench/types/hypergraph.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
class HyperedgeIndex:
    """
    A wrapper for hyperedge index representation.
    Hyperedge index is a tensor of shape ``(2, num_incidences)`` that encodes the relationships between nodes and hyperedges.
    Each column in the tensor represents an incidence between a node and a hyperedge, with the first row containing node indices
    and the second row containing corresponding hyperedge indices.

    Examples:
        >>> hyperedge_index = [[0, 1, 2, 0],
        ...                    [0, 0, 0, 1]]

        This represents two hyperedges:
            - Hyperedge 0 connects nodes 0, 1, and 2.
            - Hyperedge 1 connects node 0.

        The number of nodes in this hypergraph is 3 (nodes 0, 1, and 2).
        The number of hyperedges is 2 (hyperedges 0 and 1).

    Args:
        hyperedge_index: A tensor of shape ``(2, num_incidences)`` representing hyperedges, where each column is (node, hyperedge).
    """

    def __init__(self, hyperedge_index: Tensor):
        self.__hyperedge_index = hyperedge_index

    @property
    def all_node_ids(self) -> Tensor:
        """Return the tensor of all node IDs in the hyperedge index."""
        return self.__hyperedge_index[0]

    @property
    def all_hyperedge_ids(self) -> Tensor:
        """Return the tensor of all hyperedge IDs in the hyperedge index."""
        return self.__hyperedge_index[1]

    @property
    def item(self) -> Tensor:
        """Return the hyperedge index tensor."""
        return self.__hyperedge_index

    @property
    def node_ids(self) -> Tensor:
        """Return the sorted unique node IDs from the hyperedge index."""
        return self.__hyperedge_index[0].unique(sorted=True)

    @property
    def hyperedge_ids(self) -> Tensor:
        """Return the sorted unique hyperedge IDs from the hyperedge index."""
        return self.__hyperedge_index[1].unique(sorted=True)

    @property
    def num_hyperedges(self) -> int:
        """Return the number of hyperedges in the hypergraph."""
        if self.num_incidences < 1:
            return 0

        hyperedges = self.__hyperedge_index[1]
        return len(hyperedges.unique())

    @property
    def num_nodes(self) -> int:
        """Return the number of nodes in the hypergraph."""
        if self.num_incidences < 1:
            return 0

        nodes = self.__hyperedge_index[0]
        return len(nodes.unique())

    @property
    def num_incidences(self) -> int:
        """Return the number of incidences in the hypergraph, which is the number of columns in the hyperedge index."""
        return self.__hyperedge_index.size(1)

    def nodes_in(self, hyperedge_id: int) -> list[int]:
        """Return the list of node IDs that belong to the given hyperedge."""
        return self.__hyperedge_index[0, self.__hyperedge_index[1] == hyperedge_id].tolist()

    def num_nodes_if_isolated_exist(self, num_nodes: int) -> int:
        """
        Return the number of nodes in the hypergraph, accounting for isolated nodes that may not appear in the hyperedge index.

        Args:
            num_nodes: The total number of nodes in the hypergraph, including isolated nodes.

        Returns:
            The number of nodes in the hypergraph, which is the maximum of the number of unique nodes in the hyperedge index and the provided ``num_nodes``.
        """
        return max(self.num_nodes, num_nodes)

    def get_sparse_incidence_matrix(
        self,
        num_nodes: int | None = None,
        num_hyperedges: int | None = None,
    ) -> Tensor:
        """
        Compute the sparse incidence matrix H of shape ``(num_nodes, num_hyperedges)``.
        Each entry ``H[v, e] = 1`` if node ``v`` belongs to hyperedge ``e``, and 0 otherwise.

        Args:
            num_nodes: Total number of nodes. If ``None``, inferred from hyperedge index.
            num_hyperedges: Total number of hyperedges. If ``None``, inferred from hyperedge index.

        Returns:
            The sparse incidence matrix H of shape ``(num_nodes, num_hyperedges)``.
        """
        device = self.__hyperedge_index.device
        num_nodes = num_nodes if num_nodes is not None else self.num_nodes
        num_hyperedges = num_hyperedges if num_hyperedges is not None else self.num_hyperedges

        incidence_values = torch.ones(self.num_incidences, dtype=torch.float, device=device)
        incidence_indices = torch.stack([self.all_node_ids, self.all_hyperedge_ids], dim=0)
        incidence_matrix = torch.sparse_coo_tensor(
            indices=incidence_indices,
            values=incidence_values,
            size=(num_nodes, num_hyperedges),
        )
        return incidence_matrix.coalesce()

    def get_sparse_normalized_node_degree_matrix(
        self,
        incidence_matrix: Tensor,
        power: float,
        num_nodes: int | None = None,
    ) -> Tensor:
        """
        Compute a sparse diagonal node degree matrix from row-sums of the incidence matrix.

        Args:
            incidence_matrix: The sparse incidence matrix H of shape ``(num_nodes, num_hyperedges)``.
            power: Exponent applied to node degrees before placing them on the diagonal.
            num_nodes: Total number of nodes. If ``None``, inferred from hyperedge index.

        Returns:
            The sparse diagonal matrix of shape ``(num_nodes, num_nodes)``.
        """
        device = self.__hyperedge_index.device
        num_nodes = num_nodes if num_nodes is not None else self.num_nodes

        degrees = torch.sparse.sum(incidence_matrix, dim=1).to_dense()
        normalized_degrees = degrees.pow(power)
        normalized_degrees[normalized_degrees == float("inf")] = 0

        diagonal_indices = torch.arange(num_nodes, device=device).unsqueeze(0).repeat(2, 1)
        degree_matrix = torch.sparse_coo_tensor(
            indices=diagonal_indices,
            values=normalized_degrees,
            size=(num_nodes, num_nodes),
        )
        return degree_matrix.coalesce()

    def get_sparse_rownormalized_node_degree_matrix(
        self,
        incidence_matrix: Tensor,
        num_nodes: int | None = None,
    ) -> Tensor:
        """
        Compute the sparse normalized node degree matrix D_n^-1.
        The node degree ``d_n[i]`` is the number of hyperedges containing node ``i``
        (i.e., the row-sum of the incidence matrix H).

        Args:
            incidence_matrix: The sparse incidence matrix H of shape ``(num_nodes, num_hyperedges)``.
            num_nodes: Total number of nodes. If ``None``, inferred from hyperedge index.

        Returns:
            The sparse diagonal matrix D_n^-1 of shape ``(num_nodes, num_nodes)``.
        """
        # Example: hyperedge_index = [[0, 1, 2, 0],
        #                             [0, 0, 0, 1]]
        #                         hyperedges 0  1
        #          -> incidence_matrix H = [[1, 1], node 0
        #                                   [1, 0], node 1
        #                                   [1, 0]] node 2
        #                                          nodes 0  1  2
        #          -> row-sum gives node degrees: d_n = [2, 1, 1]
        #          -> D_n^{-1} has diagonal [1/2, 1, 1]
        return self.get_sparse_normalized_node_degree_matrix(
            incidence_matrix=incidence_matrix,
            power=-1,
            num_nodes=num_nodes,
        )

    def get_sparse_symnormalized_node_degree_matrix(
        self,
        incidence_matrix: Tensor,
        num_nodes: int | None = None,
    ) -> Tensor:
        """
        Compute the sparse normalized node degree matrix D_n^-1/2.
        The node degree ``d_n[i]`` is the number of hyperedges containing node ``i``
        (i.e., the row-sum of the incidence matrix H).

        Args:
            incidence_matrix: The sparse incidence matrix H of shape ``(num_nodes, num_hyperedges)``.
            num_nodes: Total number of nodes. If ``None``, inferred from hyperedge index.

        Returns:
            The sparse diagonal matrix D_n^-1/2 of shape ``(num_nodes, num_nodes)``.
        """
        # Example: hyperedge_index = [[0, 1, 2, 0],
        #                             [0, 0, 0, 1]]
        #                         hyperedges 0  1
        #          -> incidence_matrix H = [[1, 1], node 0
        #                                   [1, 0], node 1
        #                                   [1, 0]] node 2
        #                                          nodes 0  1  2
        #          -> row-sum gives node degrees: d_n = [2, 1, 1]
        #          -> D_n^{-1/2} has diagonal [1/sqrt(2), 1, 1]
        return self.get_sparse_normalized_node_degree_matrix(
            incidence_matrix=incidence_matrix,
            power=-0.5,
            num_nodes=num_nodes,
        )

    def get_sparse_normalized_hyperedge_degree_matrix(
        self,
        incidence_matrix: Tensor,
        num_hyperedges: int | None = None,
    ) -> Tensor:
        """
        Compute the sparse normalized hyperedge degree matrix D_e^-1.

        The hyperedge degree ``d_e[j]`` is the number of nodes in hyperedge ``j``
        (i.e., the column-sum of the incidence matrix H).

        Args:
            incidence_matrix: The sparse incidence matrix H of shape ``(num_nodes, num_hyperedges)``.
            num_hyperedges: Total number of hyperedges. If ``None``, inferred from hyperedge index.

        Returns:
            The sparse diagonal matrix D_e^-1 of shape ``(num_hyperedges, num_hyperedges)``.
        """
        device = self.__hyperedge_index.device
        num_hyperedges = num_hyperedges if num_hyperedges is not None else self.num_hyperedges

        # Example: hyperedge_index = [[0, 1, 2, 0],
        #                             [0, 0, 0, 1]]
        #                         hyperedges 0  1
        #          -> incidence_matrix H = [[1, 1], node 0
        #                                   [1, 0], node 1
        #                                   [1, 0]] node 2
        #          -> column-sum gives hyperedge degrees: d_e = [3, 1], shape (num_hyperedges,)
        degrees = torch.sparse.sum(incidence_matrix, dim=0).to_dense()

        # Example: d_e = [3, 1]
        #          -> degree_inv = [1/3, 1]
        degree_inv = degrees.pow(-1)
        degree_inv[degree_inv == float("inf")] = 0

        # Construct the sparse diagonal matrix D_e^{-1}
        # Example: degree_inv = [1/3, 1] as the diagonal values,
        #          diagonal_indices = [[0, 0],
        #                              [1, 1]]
        #               hyperedges 0  1
        #          -> D_e^{-1} = [[1/3, 0], hyperedge 0
        #                         [0, 1]]   hyperedge 1
        diagonal_indices = torch.arange(num_hyperedges, device=device).unsqueeze(0).repeat(2, 1)
        degree_matrix = torch.sparse_coo_tensor(
            indices=diagonal_indices,
            values=degree_inv,
            size=(num_hyperedges, num_hyperedges),
        )
        return degree_matrix.coalesce()

    def get_sparse_hgnn_smoothing_matrix(
        self,
        num_nodes: int | None = None,
        num_hyperedges: int | None = None,
    ) -> Tensor:
        """
        Compute the sparse HGNN Laplacian matrix for hypergraph spectral convolution.

        Implements: ``L_HGNN = D_n^{-1/2} H D_e^{-1} H^T D_n^{-1/2}``

        where:
            - H is the incidence matrix of shape ``(num_nodes, num_hyperedges)``
            - D_n^-1/2 is the normalized node degree matrix
            - D_e^-1 is the inverse hyperedge degree matrix (with W = I)

        Args:
            num_nodes: Total number of nodes. If ``None``, inferred from hyperedge index.
            num_hyperedges: Total number of hyperedges. If ``None``, inferred from hyperedge index.

        Returns:
            The sparse HGNN Laplacian matrix of shape ``(num_nodes, num_nodes)``.
        """
        num_nodes = num_nodes if num_nodes is not None else self.num_nodes
        num_hyperedges = num_hyperedges if num_hyperedges is not None else self.num_hyperedges

        incidence_matrix = self.get_sparse_incidence_matrix(num_nodes, num_hyperedges)
        node_degree_matrix = self.get_sparse_symnormalized_node_degree_matrix(
            incidence_matrix,
            num_nodes,
        )
        hyperedge_degree_matrix = self.get_sparse_normalized_hyperedge_degree_matrix(
            incidence_matrix, num_hyperedges
        )

        normalized_laplacian_matrix = torch.sparse.mm(
            node_degree_matrix,
            torch.sparse.mm(
                incidence_matrix,
                torch.sparse.mm(
                    hyperedge_degree_matrix,
                    torch.sparse.mm(incidence_matrix.t(), node_degree_matrix),
                ),
            ),
        )
        return normalized_laplacian_matrix.coalesce()

    def get_sparse_hgnnp_smoothing_matrix(
        self,
        num_nodes: int | None = None,
        num_hyperedges: int | None = None,
    ) -> Tensor:
        """
        Compute the sparse HGNN+ smoothing matrix for hypergraph mean aggregation.

        Implements: ``M_HGNN+ = D_v^{-1} H D_e^{-1} H^T``

        This matrix is row-stochastic for non-isolated nodes and corresponds to
        the two-stage mean aggregation used by HGNN+:
            1. ``D_e^{-1} H^T X``: mean over nodes in each hyperedge.
            2. ``D_v^{-1} H (...)``: mean over hyperedges incident to each node.

        Args:
            num_nodes: Total number of nodes. If ``None``, inferred from hyperedge index.
            num_hyperedges: Total number of hyperedges. If ``None``, inferred from hyperedge index.

        Returns:
            The sparse HGNN+ smoothing matrix of shape ``(num_nodes, num_nodes)``.
        """
        num_nodes = num_nodes if num_nodes is not None else self.num_nodes
        num_hyperedges = num_hyperedges if num_hyperedges is not None else self.num_hyperedges

        incidence_matrix = self.get_sparse_incidence_matrix(num_nodes, num_hyperedges)
        node_degree_matrix = self.get_sparse_rownormalized_node_degree_matrix(
            incidence_matrix,
            num_nodes,
        )
        hyperedge_degree_matrix = self.get_sparse_normalized_hyperedge_degree_matrix(
            incidence_matrix,
            num_hyperedges,
        )

        smoothing_matrix = torch.sparse.mm(
            node_degree_matrix,
            torch.sparse.mm(
                incidence_matrix,
                torch.sparse.mm(hyperedge_degree_matrix, incidence_matrix.t()),
            ),
        )
        return smoothing_matrix.coalesce()

    def reduce(self, strategy: Literal["clique_expansion"], **kwargs) -> Tensor:
        """
        Reduce the hypergraph to a graph represented by edge index using the specified strategy.

        Args:
            strategy: The reduction strategy to use. Defaults to ``clique_expansion``.
            **kwargs: Additional keyword arguments for specific strategies.

        Returns:
            The edge index of the reduced graph. Size ``(2, num_edges)``.
        """
        match strategy:
            case _:
                return self.reduce_to_edge_index_on_clique_expansion()

    def reduce_to_edge_index_on_clique_expansion(self) -> Tensor:
        """
        Construct a graph from a hypergraph via clique expansion using ``H @ H^T``, where ``H`` is the incidence matrix of the hypergraph.
        In clique expansion, each hyperedge is replaced by a clique connecting all its member nodes.

        For each hyperedge, all pairs of member nodes become edges in the resulting graph.
        This is computed efficiently using the incidence matrix: ``A = H @ H^T``, where ``H`` is
        the sparse incidence matrix of shape ``[num_nodes, num_hyperedges]`` and ``A`` is the adjacency matrix of the clique-expanded graph.

        Returns:
            The edge index of the clique-expanded graph. Size ``(2, |E'|)``.
        """
        incidence_matrix = self.get_sparse_incidence_matrix()

        # A = H @ H^T gives adjacency with self-loops on diagonal
        # Example: For hyperedge_index = [[0, 1, 2, 0],
        #                                 [0, 0, 0, 1]]
        #                         hyperedges 0  1
        #          -> incidence_matrix H = [[1, 1], node 0
        #                                   [1, 0], node 1
        #                                   [1, 0]] node 2
        #               nodes 0  1  2
        #          -> H^T = [[1, 1, 1], hyperedge 0
        #                    [1, 0, 0]] hyperedge 1
        #                       nodes 0  1  2
        #          -> A = H @ H^T = [[2, 1, 1], node 0
        #                            [1, 1, 1], node 1
        #                            [1, 1, 1]] node 2
        #                                         nodes 0  1  2
        #          -> A (after removing self-loops) = [[0, 1, 1], node 0
        #                                              [1, 0, 1], node 1
        #                                              [1, 1, 0]] node 2
        adj_matrix = torch.sparse.mm(incidence_matrix, incidence_matrix.t()).coalesce()

        # Extract edge_index, make undirected, and deduplicate
        return EdgeIndex(adj_matrix.indices()).to_undirected().item

    def reduce_to_edge_index_on_random_direction(
        self,
        x: Tensor,
        with_mediators: bool = False,
        remove_selfloops: bool = True,
        return_weights: bool = False,
    ) -> tuple[Tensor, Tensor | None]:
        """
        Construct a graph from a hypergraph with methods proposed in `HyperGCN: A New Method of Training Graph Convolutional Networks on Hypergraphs <https://arxiv.org/pdf/1809.02589.pdf>`_ paper.
        Reference implementation: `source <https://deephypergraph.readthedocs.io/en/latest/_modules/dhg/structure/graphs/graph.html#Graph.from_hypergraph_hypergcn>`_.

        Args:
            x: Node feature matrix. Size ``(num_nodes, C)``.
            with_mediators: Whether to use mediator to transform the hyperedges to edges in the graph. Defaults to ``False``.
            remove_selfloops: Whether to remove self-loops. Defaults to ``True``.
            return_weights: Whether to return the DHG-style reduced-edge weights alongside the edge index. Defaults to ``False``.

        Returns:
            A tuple ``(edge_index, edge_weights)`` where:
            - ``edge_index`` has size ``(2, |num_edges|)``.
            - ``edge_weights`` has size ``(|num_edges|,)`` when ``return_weights=True``, otherwise ``None``.

        Raises:
            ValueError: If any hyperedge contains fewer than 2 nodes.
        """
        device = x.device

        hypergraph = Hypergraph.from_hyperedge_index(self.__hyperedge_index)
        hypergraph_edges: list[list[int]] = hypergraph.hyperedges
        graph_edges: list[list[int]] = []
        graph_edge_weights: list[float] = []

        # Random direction (feature_dim, 1) for projecting nodes in each hyperedge
        # Geometrically, we are choosing a random line through the origin in ℝᵈ, where ᵈ = feature_dim
        random_direction = torch.rand((x.shape[1], 1), device=device)

        for edge in hypergraph_edges:
            num_nodes_in_edge = len(edge)
            if num_nodes_in_edge < 2:
                raise ValueError("The number of vertices in an hyperedge must be >= 2.")

            # projections (num_nodes_in_edge,) contains a scalar value for each node in the hyperedge,
            # indicating its projection on the random vector 'random_direction'.
            # Key idea: If two points are very far apart in ℝᵈ, there is a high probability
            # that a random projection will still separate them
            projections = torch.matmul(x[edge], random_direction).squeeze()

            # The indices of the nodes that the farthest apart in the direction of 'random_direction'
            node_max_proj_idx = torch.argmax(projections)
            node_min_proj_idx = torch.argmin(projections)

            if not with_mediators:  # Just connect the two farthest nodes
                graph_edges.append([edge[node_min_proj_idx], edge[node_max_proj_idx]])
                graph_edge_weights.append(1.0 / num_nodes_in_edge)
                continue

            edge_weight = 1.0 / (2 * num_nodes_in_edge - 3)
            for node_idx in range(num_nodes_in_edge):
                if node_idx not in {node_max_proj_idx.item(), node_min_proj_idx.item()}:
                    graph_edges.append([edge[node_min_proj_idx], edge[node_idx]])
                    graph_edges.append([edge[node_max_proj_idx], edge[node_idx]])
                    graph_edge_weights.extend([edge_weight, edge_weight])

        graph = Graph(
            edges=graph_edges,
            edge_weights=graph_edge_weights if return_weights else None,
        )
        if remove_selfloops:
            graph.remove_selfloops()

        return (
            graph.to_edge_index().to(device),
            graph.edge_weights_tensor.to(device) if return_weights else None,
        )

    def remove_duplicate_edges(self) -> "HyperedgeIndex":
        """Remove duplicate edges from the hyperedge index. Keeps the tensor contiguous in memory."""
        # Example: hyperedge_index = [[0, 1, 2, 2, 0, 3, 2],
        #                             [3, 4, 4, 3, 4, 3, 3]], shape (2, 7)
        #          -> after torch.unique(..., dim=1):
        #             hyperedge_index = [[0, 1, 2, 2, 0, 3],
        #                                [3, 4, 4, 3, 4, 3]], shape (2, |E'| = 6)
        # Note: we need to call contiguous() after torch.unique() to ensure
        # the resulting tensor is contiguous in memory, which is important for efficient indexing
        # and further operations (e.g., searchsorted)
        self.__hyperedge_index = torch.unique(self.__hyperedge_index, dim=1).contiguous()
        return self

    def remove_hyperedges_with_fewer_than_k_nodes(self, k: int) -> "HyperedgeIndex":
        """
        Remove hyperedges that contain fewer than k nodes.

        Example:
            >>> hyperedge_index = [[0, 1, 2, 3, 5, 4],
            ...                    [0, 0, 1, 1, 2, 1]], shape (2, |E| = 6)

            >>> k = 3
            >>> unique_hyperedge_ids: [0, 1, 2]
            ... # inverse -> idx_to_hyperedge_id, counts -> num_nodes_per_hyperedge
            ... inverse           = [0, 0, 1, 1, 2, 1]  # (index into unique_hyperedge_ids per column)
            ... counts            = [2, 3, 1]
            >>> # counts[inverse] is equivalent to:
            ... # for i, inv in enumerate(inverse): keep_mask[i] = counts[inv]
            >>> counts[inverse]   = [2, 2, 3, 3, 1, 3]
            >>> keep_mask         = [F, F, T, T, F, T]

            >>> # after filtering hyperedges with fewer than k=3 nodes:
            >>> hyperedge_index = [[2, 3, 4],
            ...                    [1, 1, 1]], shape (2, |E'| = 3)

        Args:
            k: The minimum number of nodes a hyperedge must contain to be kept.

        Returns:
            A new :class:`HyperedgeIndex` instance with hyperedges containing fewer than k nodes.
        """
        _, idx_to_hyperedge_id, num_nodes_per_hyperedge = torch.unique(
            self.all_hyperedge_ids,
            return_inverse=True,
            return_counts=True,
        )
        keep_mask = num_nodes_per_hyperedge[idx_to_hyperedge_id] >= k
        self.__hyperedge_index = self.__hyperedge_index[:, keep_mask]
        return self

    def to_0based(
        self,
        node_ids_to_rebase: Tensor | None = None,
        hyperedge_ids_to_rebase: Tensor | None = None,
    ) -> "HyperedgeIndex":
        """
        Convert hyperedge index to the 0-based format by rebasing node IDs to the range ``[0, num_nodes-1]`` and hyperedge IDs ``[0, num_hyperedges-1]``.

        Args:
            node_ids_to_rebase: Tensor of shape ``(num_nodes,)`` containing the original node IDs that need to be rebased to 0-based format.
                If ``None``, all node IDs in the hyperedge index will be rebased to 0-based format based on their unique sorted order.
            hyperedge_ids_to_rebase: Tensor of shape ``(num_hyperedges,)`` containing the original hyperedge IDs that need to be rebased to 0-based format.
                If ``None``, all hyperedge IDs in the hyperedge index will be rebased to 0-based format based on their unique sorted order.

        Returns:
            A new :class:`HyperedgeIndex` instance with the hyperedge index converted to 0-based format.
        """
        # Example: hyperedge_index after sorting: [[0, 0, 1, 2, 3, 4],
        #                                          [3, 4, 4, 3, 4, 3]]
        #          node_ids_to_rebase = [0, 1, 2, 3, 4]
        #          -> hyperedge_index after remapping: [[0, 0, 1, 2, 3, 4],
        #                                               [3, 4, 4, 3, 4, 3]]
        self.__hyperedge_index[0] = to_0based_ids(self.all_node_ids, node_ids_to_rebase)

        # Example: hyperedge_index after remapping nodes: [[0, 0, 1, 2, 3, 4],
        #                                                  [3, 4, 4, 3, 4, 3]]
        #          hyperedge_ids_to_rebase = [3, 4]
        #          -> hyperedge_index after remapping hyperedges: [[0, 0, 1, 2, 3, 4],
        #                                                          [0, 0, 1, 0, 1, 0]]
        self.__hyperedge_index[1] = to_0based_ids(self.all_hyperedge_ids, hyperedge_ids_to_rebase)

        return self

all_node_ids property

Return the tensor of all node IDs in the hyperedge index.

all_hyperedge_ids property

Return the tensor of all hyperedge IDs in the hyperedge index.

item property

Return the hyperedge index tensor.

node_ids property

Return the sorted unique node IDs from the hyperedge index.

hyperedge_ids property

Return the sorted unique hyperedge IDs from the hyperedge index.

num_hyperedges property

Return the number of hyperedges in the hypergraph.

num_nodes property

Return the number of nodes in the hypergraph.

num_incidences property

Return the number of incidences in the hypergraph, which is the number of columns in the hyperedge index.

nodes_in(hyperedge_id)

Return the list of node IDs that belong to the given hyperedge.

Source code in hyperbench/types/hypergraph.py
def nodes_in(self, hyperedge_id: int) -> list[int]:
    """Return the list of node IDs that belong to the given hyperedge."""
    return self.__hyperedge_index[0, self.__hyperedge_index[1] == hyperedge_id].tolist()

num_nodes_if_isolated_exist(num_nodes)

Return the number of nodes in the hypergraph, accounting for isolated nodes that may not appear in the hyperedge index.

Parameters:

Name Type Description Default
num_nodes int

The total number of nodes in the hypergraph, including isolated nodes.

required

Returns:

Type Description
int

The number of nodes in the hypergraph, which is the maximum of the number of unique nodes in the hyperedge index and the provided num_nodes.

Source code in hyperbench/types/hypergraph.py
def num_nodes_if_isolated_exist(self, num_nodes: int) -> int:
    """
    Return the number of nodes in the hypergraph, accounting for isolated nodes that may not appear in the hyperedge index.

    Args:
        num_nodes: The total number of nodes in the hypergraph, including isolated nodes.

    Returns:
        The number of nodes in the hypergraph, which is the maximum of the number of unique nodes in the hyperedge index and the provided ``num_nodes``.
    """
    return max(self.num_nodes, num_nodes)

get_sparse_incidence_matrix(num_nodes=None, num_hyperedges=None)

Compute the sparse incidence matrix H of shape (num_nodes, num_hyperedges). Each entry H[v, e] = 1 if node v belongs to hyperedge e, and 0 otherwise.

Parameters:

Name Type Description Default
num_nodes int | None

Total number of nodes. If None, inferred from hyperedge index.

None
num_hyperedges int | None

Total number of hyperedges. If None, inferred from hyperedge index.

None

Returns:

Type Description
Tensor

The sparse incidence matrix H of shape (num_nodes, num_hyperedges).

Source code in hyperbench/types/hypergraph.py
def get_sparse_incidence_matrix(
    self,
    num_nodes: int | None = None,
    num_hyperedges: int | None = None,
) -> Tensor:
    """
    Compute the sparse incidence matrix H of shape ``(num_nodes, num_hyperedges)``.
    Each entry ``H[v, e] = 1`` if node ``v`` belongs to hyperedge ``e``, and 0 otherwise.

    Args:
        num_nodes: Total number of nodes. If ``None``, inferred from hyperedge index.
        num_hyperedges: Total number of hyperedges. If ``None``, inferred from hyperedge index.

    Returns:
        The sparse incidence matrix H of shape ``(num_nodes, num_hyperedges)``.
    """
    device = self.__hyperedge_index.device
    num_nodes = num_nodes if num_nodes is not None else self.num_nodes
    num_hyperedges = num_hyperedges if num_hyperedges is not None else self.num_hyperedges

    incidence_values = torch.ones(self.num_incidences, dtype=torch.float, device=device)
    incidence_indices = torch.stack([self.all_node_ids, self.all_hyperedge_ids], dim=0)
    incidence_matrix = torch.sparse_coo_tensor(
        indices=incidence_indices,
        values=incidence_values,
        size=(num_nodes, num_hyperedges),
    )
    return incidence_matrix.coalesce()

get_sparse_normalized_node_degree_matrix(incidence_matrix, power, num_nodes=None)

Compute a sparse diagonal node degree matrix from row-sums of the incidence matrix.

Parameters:

Name Type Description Default
incidence_matrix Tensor

The sparse incidence matrix H of shape (num_nodes, num_hyperedges).

required
power float

Exponent applied to node degrees before placing them on the diagonal.

required
num_nodes int | None

Total number of nodes. If None, inferred from hyperedge index.

None

Returns:

Type Description
Tensor

The sparse diagonal matrix of shape (num_nodes, num_nodes).

Source code in hyperbench/types/hypergraph.py
def get_sparse_normalized_node_degree_matrix(
    self,
    incidence_matrix: Tensor,
    power: float,
    num_nodes: int | None = None,
) -> Tensor:
    """
    Compute a sparse diagonal node degree matrix from row-sums of the incidence matrix.

    Args:
        incidence_matrix: The sparse incidence matrix H of shape ``(num_nodes, num_hyperedges)``.
        power: Exponent applied to node degrees before placing them on the diagonal.
        num_nodes: Total number of nodes. If ``None``, inferred from hyperedge index.

    Returns:
        The sparse diagonal matrix of shape ``(num_nodes, num_nodes)``.
    """
    device = self.__hyperedge_index.device
    num_nodes = num_nodes if num_nodes is not None else self.num_nodes

    degrees = torch.sparse.sum(incidence_matrix, dim=1).to_dense()
    normalized_degrees = degrees.pow(power)
    normalized_degrees[normalized_degrees == float("inf")] = 0

    diagonal_indices = torch.arange(num_nodes, device=device).unsqueeze(0).repeat(2, 1)
    degree_matrix = torch.sparse_coo_tensor(
        indices=diagonal_indices,
        values=normalized_degrees,
        size=(num_nodes, num_nodes),
    )
    return degree_matrix.coalesce()

get_sparse_rownormalized_node_degree_matrix(incidence_matrix, num_nodes=None)

Compute the sparse normalized node degree matrix D_n^-1. The node degree d_n[i] is the number of hyperedges containing node i (i.e., the row-sum of the incidence matrix H).

Parameters:

Name Type Description Default
incidence_matrix Tensor

The sparse incidence matrix H of shape (num_nodes, num_hyperedges).

required
num_nodes int | None

Total number of nodes. If None, inferred from hyperedge index.

None

Returns:

Type Description
Tensor

The sparse diagonal matrix D_n^-1 of shape (num_nodes, num_nodes).

Source code in hyperbench/types/hypergraph.py
def get_sparse_rownormalized_node_degree_matrix(
    self,
    incidence_matrix: Tensor,
    num_nodes: int | None = None,
) -> Tensor:
    """
    Compute the sparse normalized node degree matrix D_n^-1.
    The node degree ``d_n[i]`` is the number of hyperedges containing node ``i``
    (i.e., the row-sum of the incidence matrix H).

    Args:
        incidence_matrix: The sparse incidence matrix H of shape ``(num_nodes, num_hyperedges)``.
        num_nodes: Total number of nodes. If ``None``, inferred from hyperedge index.

    Returns:
        The sparse diagonal matrix D_n^-1 of shape ``(num_nodes, num_nodes)``.
    """
    # Example: hyperedge_index = [[0, 1, 2, 0],
    #                             [0, 0, 0, 1]]
    #                         hyperedges 0  1
    #          -> incidence_matrix H = [[1, 1], node 0
    #                                   [1, 0], node 1
    #                                   [1, 0]] node 2
    #                                          nodes 0  1  2
    #          -> row-sum gives node degrees: d_n = [2, 1, 1]
    #          -> D_n^{-1} has diagonal [1/2, 1, 1]
    return self.get_sparse_normalized_node_degree_matrix(
        incidence_matrix=incidence_matrix,
        power=-1,
        num_nodes=num_nodes,
    )

get_sparse_symnormalized_node_degree_matrix(incidence_matrix, num_nodes=None)

Compute the sparse normalized node degree matrix D_n^-½. The node degree d_n[i] is the number of hyperedges containing node i (i.e., the row-sum of the incidence matrix H).

Parameters:

Name Type Description Default
incidence_matrix Tensor

The sparse incidence matrix H of shape (num_nodes, num_hyperedges).

required
num_nodes int | None

Total number of nodes. If None, inferred from hyperedge index.

None

Returns:

Type Description
Tensor

The sparse diagonal matrix D_n^-½ of shape (num_nodes, num_nodes).

Source code in hyperbench/types/hypergraph.py
def get_sparse_symnormalized_node_degree_matrix(
    self,
    incidence_matrix: Tensor,
    num_nodes: int | None = None,
) -> Tensor:
    """
    Compute the sparse normalized node degree matrix D_n^-1/2.
    The node degree ``d_n[i]`` is the number of hyperedges containing node ``i``
    (i.e., the row-sum of the incidence matrix H).

    Args:
        incidence_matrix: The sparse incidence matrix H of shape ``(num_nodes, num_hyperedges)``.
        num_nodes: Total number of nodes. If ``None``, inferred from hyperedge index.

    Returns:
        The sparse diagonal matrix D_n^-1/2 of shape ``(num_nodes, num_nodes)``.
    """
    # Example: hyperedge_index = [[0, 1, 2, 0],
    #                             [0, 0, 0, 1]]
    #                         hyperedges 0  1
    #          -> incidence_matrix H = [[1, 1], node 0
    #                                   [1, 0], node 1
    #                                   [1, 0]] node 2
    #                                          nodes 0  1  2
    #          -> row-sum gives node degrees: d_n = [2, 1, 1]
    #          -> D_n^{-1/2} has diagonal [1/sqrt(2), 1, 1]
    return self.get_sparse_normalized_node_degree_matrix(
        incidence_matrix=incidence_matrix,
        power=-0.5,
        num_nodes=num_nodes,
    )

get_sparse_normalized_hyperedge_degree_matrix(incidence_matrix, num_hyperedges=None)

Compute the sparse normalized hyperedge degree matrix D_e^-1.

The hyperedge degree d_e[j] is the number of nodes in hyperedge j (i.e., the column-sum of the incidence matrix H).

Parameters:

Name Type Description Default
incidence_matrix Tensor

The sparse incidence matrix H of shape (num_nodes, num_hyperedges).

required
num_hyperedges int | None

Total number of hyperedges. If None, inferred from hyperedge index.

None

Returns:

Type Description
Tensor

The sparse diagonal matrix D_e^-1 of shape (num_hyperedges, num_hyperedges).

Source code in hyperbench/types/hypergraph.py
def get_sparse_normalized_hyperedge_degree_matrix(
    self,
    incidence_matrix: Tensor,
    num_hyperedges: int | None = None,
) -> Tensor:
    """
    Compute the sparse normalized hyperedge degree matrix D_e^-1.

    The hyperedge degree ``d_e[j]`` is the number of nodes in hyperedge ``j``
    (i.e., the column-sum of the incidence matrix H).

    Args:
        incidence_matrix: The sparse incidence matrix H of shape ``(num_nodes, num_hyperedges)``.
        num_hyperedges: Total number of hyperedges. If ``None``, inferred from hyperedge index.

    Returns:
        The sparse diagonal matrix D_e^-1 of shape ``(num_hyperedges, num_hyperedges)``.
    """
    device = self.__hyperedge_index.device
    num_hyperedges = num_hyperedges if num_hyperedges is not None else self.num_hyperedges

    # Example: hyperedge_index = [[0, 1, 2, 0],
    #                             [0, 0, 0, 1]]
    #                         hyperedges 0  1
    #          -> incidence_matrix H = [[1, 1], node 0
    #                                   [1, 0], node 1
    #                                   [1, 0]] node 2
    #          -> column-sum gives hyperedge degrees: d_e = [3, 1], shape (num_hyperedges,)
    degrees = torch.sparse.sum(incidence_matrix, dim=0).to_dense()

    # Example: d_e = [3, 1]
    #          -> degree_inv = [1/3, 1]
    degree_inv = degrees.pow(-1)
    degree_inv[degree_inv == float("inf")] = 0

    # Construct the sparse diagonal matrix D_e^{-1}
    # Example: degree_inv = [1/3, 1] as the diagonal values,
    #          diagonal_indices = [[0, 0],
    #                              [1, 1]]
    #               hyperedges 0  1
    #          -> D_e^{-1} = [[1/3, 0], hyperedge 0
    #                         [0, 1]]   hyperedge 1
    diagonal_indices = torch.arange(num_hyperedges, device=device).unsqueeze(0).repeat(2, 1)
    degree_matrix = torch.sparse_coo_tensor(
        indices=diagonal_indices,
        values=degree_inv,
        size=(num_hyperedges, num_hyperedges),
    )
    return degree_matrix.coalesce()

get_sparse_hgnn_smoothing_matrix(num_nodes=None, num_hyperedges=None)

Compute the sparse HGNN Laplacian matrix for hypergraph spectral convolution.

Implements: L_HGNN = D_n^{-1/2} H D_e^{-1} H^T D_n^{-1/2}

where
  • H is the incidence matrix of shape (num_nodes, num_hyperedges)
  • D_n^-½ is the normalized node degree matrix
  • D_e^-1 is the inverse hyperedge degree matrix (with W = I)

Parameters:

Name Type Description Default
num_nodes int | None

Total number of nodes. If None, inferred from hyperedge index.

None
num_hyperedges int | None

Total number of hyperedges. If None, inferred from hyperedge index.

None

Returns:

Type Description
Tensor

The sparse HGNN Laplacian matrix of shape (num_nodes, num_nodes).

Source code in hyperbench/types/hypergraph.py
def get_sparse_hgnn_smoothing_matrix(
    self,
    num_nodes: int | None = None,
    num_hyperedges: int | None = None,
) -> Tensor:
    """
    Compute the sparse HGNN Laplacian matrix for hypergraph spectral convolution.

    Implements: ``L_HGNN = D_n^{-1/2} H D_e^{-1} H^T D_n^{-1/2}``

    where:
        - H is the incidence matrix of shape ``(num_nodes, num_hyperedges)``
        - D_n^-1/2 is the normalized node degree matrix
        - D_e^-1 is the inverse hyperedge degree matrix (with W = I)

    Args:
        num_nodes: Total number of nodes. If ``None``, inferred from hyperedge index.
        num_hyperedges: Total number of hyperedges. If ``None``, inferred from hyperedge index.

    Returns:
        The sparse HGNN Laplacian matrix of shape ``(num_nodes, num_nodes)``.
    """
    num_nodes = num_nodes if num_nodes is not None else self.num_nodes
    num_hyperedges = num_hyperedges if num_hyperedges is not None else self.num_hyperedges

    incidence_matrix = self.get_sparse_incidence_matrix(num_nodes, num_hyperedges)
    node_degree_matrix = self.get_sparse_symnormalized_node_degree_matrix(
        incidence_matrix,
        num_nodes,
    )
    hyperedge_degree_matrix = self.get_sparse_normalized_hyperedge_degree_matrix(
        incidence_matrix, num_hyperedges
    )

    normalized_laplacian_matrix = torch.sparse.mm(
        node_degree_matrix,
        torch.sparse.mm(
            incidence_matrix,
            torch.sparse.mm(
                hyperedge_degree_matrix,
                torch.sparse.mm(incidence_matrix.t(), node_degree_matrix),
            ),
        ),
    )
    return normalized_laplacian_matrix.coalesce()

get_sparse_hgnnp_smoothing_matrix(num_nodes=None, num_hyperedges=None)

Compute the sparse HGNN+ smoothing matrix for hypergraph mean aggregation.

Implements: M_HGNN+ = D_v^{-1} H D_e^{-1} H^T

This matrix is row-stochastic for non-isolated nodes and corresponds to the two-stage mean aggregation used by HGNN+: 1. D_e^{-1} H^T X: mean over nodes in each hyperedge. 2. D_v^{-1} H (...): mean over hyperedges incident to each node.

Parameters:

Name Type Description Default
num_nodes int | None

Total number of nodes. If None, inferred from hyperedge index.

None
num_hyperedges int | None

Total number of hyperedges. If None, inferred from hyperedge index.

None

Returns:

Type Description
Tensor

The sparse HGNN+ smoothing matrix of shape (num_nodes, num_nodes).

Source code in hyperbench/types/hypergraph.py
def get_sparse_hgnnp_smoothing_matrix(
    self,
    num_nodes: int | None = None,
    num_hyperedges: int | None = None,
) -> Tensor:
    """
    Compute the sparse HGNN+ smoothing matrix for hypergraph mean aggregation.

    Implements: ``M_HGNN+ = D_v^{-1} H D_e^{-1} H^T``

    This matrix is row-stochastic for non-isolated nodes and corresponds to
    the two-stage mean aggregation used by HGNN+:
        1. ``D_e^{-1} H^T X``: mean over nodes in each hyperedge.
        2. ``D_v^{-1} H (...)``: mean over hyperedges incident to each node.

    Args:
        num_nodes: Total number of nodes. If ``None``, inferred from hyperedge index.
        num_hyperedges: Total number of hyperedges. If ``None``, inferred from hyperedge index.

    Returns:
        The sparse HGNN+ smoothing matrix of shape ``(num_nodes, num_nodes)``.
    """
    num_nodes = num_nodes if num_nodes is not None else self.num_nodes
    num_hyperedges = num_hyperedges if num_hyperedges is not None else self.num_hyperedges

    incidence_matrix = self.get_sparse_incidence_matrix(num_nodes, num_hyperedges)
    node_degree_matrix = self.get_sparse_rownormalized_node_degree_matrix(
        incidence_matrix,
        num_nodes,
    )
    hyperedge_degree_matrix = self.get_sparse_normalized_hyperedge_degree_matrix(
        incidence_matrix,
        num_hyperedges,
    )

    smoothing_matrix = torch.sparse.mm(
        node_degree_matrix,
        torch.sparse.mm(
            incidence_matrix,
            torch.sparse.mm(hyperedge_degree_matrix, incidence_matrix.t()),
        ),
    )
    return smoothing_matrix.coalesce()

reduce(strategy, **kwargs)

Reduce the hypergraph to a graph represented by edge index using the specified strategy.

Parameters:

Name Type Description Default
strategy Literal['clique_expansion']

The reduction strategy to use. Defaults to clique_expansion.

required
**kwargs

Additional keyword arguments for specific strategies.

{}

Returns:

Type Description
Tensor

The edge index of the reduced graph. Size (2, num_edges).

Source code in hyperbench/types/hypergraph.py
def reduce(self, strategy: Literal["clique_expansion"], **kwargs) -> Tensor:
    """
    Reduce the hypergraph to a graph represented by edge index using the specified strategy.

    Args:
        strategy: The reduction strategy to use. Defaults to ``clique_expansion``.
        **kwargs: Additional keyword arguments for specific strategies.

    Returns:
        The edge index of the reduced graph. Size ``(2, num_edges)``.
    """
    match strategy:
        case _:
            return self.reduce_to_edge_index_on_clique_expansion()

reduce_to_edge_index_on_clique_expansion()

Construct a graph from a hypergraph via clique expansion using H @ H^T, where H is the incidence matrix of the hypergraph. In clique expansion, each hyperedge is replaced by a clique connecting all its member nodes.

For each hyperedge, all pairs of member nodes become edges in the resulting graph. This is computed efficiently using the incidence matrix: A = H @ H^T, where H is the sparse incidence matrix of shape [num_nodes, num_hyperedges] and A is the adjacency matrix of the clique-expanded graph.

Returns:

Type Description
Tensor

The edge index of the clique-expanded graph. Size (2, |E'|).

Source code in hyperbench/types/hypergraph.py
def reduce_to_edge_index_on_clique_expansion(self) -> Tensor:
    """
    Construct a graph from a hypergraph via clique expansion using ``H @ H^T``, where ``H`` is the incidence matrix of the hypergraph.
    In clique expansion, each hyperedge is replaced by a clique connecting all its member nodes.

    For each hyperedge, all pairs of member nodes become edges in the resulting graph.
    This is computed efficiently using the incidence matrix: ``A = H @ H^T``, where ``H`` is
    the sparse incidence matrix of shape ``[num_nodes, num_hyperedges]`` and ``A`` is the adjacency matrix of the clique-expanded graph.

    Returns:
        The edge index of the clique-expanded graph. Size ``(2, |E'|)``.
    """
    incidence_matrix = self.get_sparse_incidence_matrix()

    # A = H @ H^T gives adjacency with self-loops on diagonal
    # Example: For hyperedge_index = [[0, 1, 2, 0],
    #                                 [0, 0, 0, 1]]
    #                         hyperedges 0  1
    #          -> incidence_matrix H = [[1, 1], node 0
    #                                   [1, 0], node 1
    #                                   [1, 0]] node 2
    #               nodes 0  1  2
    #          -> H^T = [[1, 1, 1], hyperedge 0
    #                    [1, 0, 0]] hyperedge 1
    #                       nodes 0  1  2
    #          -> A = H @ H^T = [[2, 1, 1], node 0
    #                            [1, 1, 1], node 1
    #                            [1, 1, 1]] node 2
    #                                         nodes 0  1  2
    #          -> A (after removing self-loops) = [[0, 1, 1], node 0
    #                                              [1, 0, 1], node 1
    #                                              [1, 1, 0]] node 2
    adj_matrix = torch.sparse.mm(incidence_matrix, incidence_matrix.t()).coalesce()

    # Extract edge_index, make undirected, and deduplicate
    return EdgeIndex(adj_matrix.indices()).to_undirected().item

reduce_to_edge_index_on_random_direction(x, with_mediators=False, remove_selfloops=True, return_weights=False)

Construct a graph from a hypergraph with methods proposed in HyperGCN: A New Method of Training Graph Convolutional Networks on Hypergraphs <https://arxiv.org/pdf/1809.02589.pdf>_ paper. Reference implementation: source <https://deephypergraph.readthedocs.io/en/latest/_modules/dhg/structure/graphs/graph.html#Graph.from_hypergraph_hypergcn>_.

Parameters:

Name Type Description Default
x Tensor

Node feature matrix. Size (num_nodes, C).

required
with_mediators bool

Whether to use mediator to transform the hyperedges to edges in the graph. Defaults to False.

False
remove_selfloops bool

Whether to remove self-loops. Defaults to True.

True
return_weights bool

Whether to return the DHG-style reduced-edge weights alongside the edge index. Defaults to False.

False

Returns:

Type Description
Tensor

A tuple (edge_index, edge_weights) where:

Tensor | None
  • edge_index has size (2, |num_edges|).
tuple[Tensor, Tensor | None]
  • edge_weights has size (|num_edges|,) when return_weights=True, otherwise None.

Raises:

Type Description
ValueError

If any hyperedge contains fewer than 2 nodes.

Source code in hyperbench/types/hypergraph.py
def reduce_to_edge_index_on_random_direction(
    self,
    x: Tensor,
    with_mediators: bool = False,
    remove_selfloops: bool = True,
    return_weights: bool = False,
) -> tuple[Tensor, Tensor | None]:
    """
    Construct a graph from a hypergraph with methods proposed in `HyperGCN: A New Method of Training Graph Convolutional Networks on Hypergraphs <https://arxiv.org/pdf/1809.02589.pdf>`_ paper.
    Reference implementation: `source <https://deephypergraph.readthedocs.io/en/latest/_modules/dhg/structure/graphs/graph.html#Graph.from_hypergraph_hypergcn>`_.

    Args:
        x: Node feature matrix. Size ``(num_nodes, C)``.
        with_mediators: Whether to use mediator to transform the hyperedges to edges in the graph. Defaults to ``False``.
        remove_selfloops: Whether to remove self-loops. Defaults to ``True``.
        return_weights: Whether to return the DHG-style reduced-edge weights alongside the edge index. Defaults to ``False``.

    Returns:
        A tuple ``(edge_index, edge_weights)`` where:
        - ``edge_index`` has size ``(2, |num_edges|)``.
        - ``edge_weights`` has size ``(|num_edges|,)`` when ``return_weights=True``, otherwise ``None``.

    Raises:
        ValueError: If any hyperedge contains fewer than 2 nodes.
    """
    device = x.device

    hypergraph = Hypergraph.from_hyperedge_index(self.__hyperedge_index)
    hypergraph_edges: list[list[int]] = hypergraph.hyperedges
    graph_edges: list[list[int]] = []
    graph_edge_weights: list[float] = []

    # Random direction (feature_dim, 1) for projecting nodes in each hyperedge
    # Geometrically, we are choosing a random line through the origin in ℝᵈ, where ᵈ = feature_dim
    random_direction = torch.rand((x.shape[1], 1), device=device)

    for edge in hypergraph_edges:
        num_nodes_in_edge = len(edge)
        if num_nodes_in_edge < 2:
            raise ValueError("The number of vertices in an hyperedge must be >= 2.")

        # projections (num_nodes_in_edge,) contains a scalar value for each node in the hyperedge,
        # indicating its projection on the random vector 'random_direction'.
        # Key idea: If two points are very far apart in ℝᵈ, there is a high probability
        # that a random projection will still separate them
        projections = torch.matmul(x[edge], random_direction).squeeze()

        # The indices of the nodes that the farthest apart in the direction of 'random_direction'
        node_max_proj_idx = torch.argmax(projections)
        node_min_proj_idx = torch.argmin(projections)

        if not with_mediators:  # Just connect the two farthest nodes
            graph_edges.append([edge[node_min_proj_idx], edge[node_max_proj_idx]])
            graph_edge_weights.append(1.0 / num_nodes_in_edge)
            continue

        edge_weight = 1.0 / (2 * num_nodes_in_edge - 3)
        for node_idx in range(num_nodes_in_edge):
            if node_idx not in {node_max_proj_idx.item(), node_min_proj_idx.item()}:
                graph_edges.append([edge[node_min_proj_idx], edge[node_idx]])
                graph_edges.append([edge[node_max_proj_idx], edge[node_idx]])
                graph_edge_weights.extend([edge_weight, edge_weight])

    graph = Graph(
        edges=graph_edges,
        edge_weights=graph_edge_weights if return_weights else None,
    )
    if remove_selfloops:
        graph.remove_selfloops()

    return (
        graph.to_edge_index().to(device),
        graph.edge_weights_tensor.to(device) if return_weights else None,
    )

remove_duplicate_edges()

Remove duplicate edges from the hyperedge index. Keeps the tensor contiguous in memory.

Source code in hyperbench/types/hypergraph.py
def remove_duplicate_edges(self) -> "HyperedgeIndex":
    """Remove duplicate edges from the hyperedge index. Keeps the tensor contiguous in memory."""
    # Example: hyperedge_index = [[0, 1, 2, 2, 0, 3, 2],
    #                             [3, 4, 4, 3, 4, 3, 3]], shape (2, 7)
    #          -> after torch.unique(..., dim=1):
    #             hyperedge_index = [[0, 1, 2, 2, 0, 3],
    #                                [3, 4, 4, 3, 4, 3]], shape (2, |E'| = 6)
    # Note: we need to call contiguous() after torch.unique() to ensure
    # the resulting tensor is contiguous in memory, which is important for efficient indexing
    # and further operations (e.g., searchsorted)
    self.__hyperedge_index = torch.unique(self.__hyperedge_index, dim=1).contiguous()
    return self

remove_hyperedges_with_fewer_than_k_nodes(k)

Remove hyperedges that contain fewer than k nodes.

Example

hyperedge_index = [[0, 1, 2, 3, 5, 4], ... [0, 0, 1, 1, 2, 1]], shape (2, |E| = 6)

k = 3 unique_hyperedge_ids: [0, 1, 2] ... # inverse -> idx_to_hyperedge_id, counts -> num_nodes_per_hyperedge ... inverse = [0, 0, 1, 1, 2, 1] # (index into unique_hyperedge_ids per column) ... counts = [2, 3, 1]

counts[inverse] is equivalent to:

... # for i, inv in enumerate(inverse): keep_mask[i] = counts[inv] counts[inverse] = [2, 2, 3, 3, 1, 3] keep_mask = [F, F, T, T, F, T]

after filtering hyperedges with fewer than k=3 nodes:

hyperedge_index = [[2, 3, 4], ... [1, 1, 1]], shape (2, |E'| = 3)

Parameters:

Name Type Description Default
k int

The minimum number of nodes a hyperedge must contain to be kept.

required

Returns:

Type Description
HyperedgeIndex

A new :class:HyperedgeIndex instance with hyperedges containing fewer than k nodes.

Source code in hyperbench/types/hypergraph.py
def remove_hyperedges_with_fewer_than_k_nodes(self, k: int) -> "HyperedgeIndex":
    """
    Remove hyperedges that contain fewer than k nodes.

    Example:
        >>> hyperedge_index = [[0, 1, 2, 3, 5, 4],
        ...                    [0, 0, 1, 1, 2, 1]], shape (2, |E| = 6)

        >>> k = 3
        >>> unique_hyperedge_ids: [0, 1, 2]
        ... # inverse -> idx_to_hyperedge_id, counts -> num_nodes_per_hyperedge
        ... inverse           = [0, 0, 1, 1, 2, 1]  # (index into unique_hyperedge_ids per column)
        ... counts            = [2, 3, 1]
        >>> # counts[inverse] is equivalent to:
        ... # for i, inv in enumerate(inverse): keep_mask[i] = counts[inv]
        >>> counts[inverse]   = [2, 2, 3, 3, 1, 3]
        >>> keep_mask         = [F, F, T, T, F, T]

        >>> # after filtering hyperedges with fewer than k=3 nodes:
        >>> hyperedge_index = [[2, 3, 4],
        ...                    [1, 1, 1]], shape (2, |E'| = 3)

    Args:
        k: The minimum number of nodes a hyperedge must contain to be kept.

    Returns:
        A new :class:`HyperedgeIndex` instance with hyperedges containing fewer than k nodes.
    """
    _, idx_to_hyperedge_id, num_nodes_per_hyperedge = torch.unique(
        self.all_hyperedge_ids,
        return_inverse=True,
        return_counts=True,
    )
    keep_mask = num_nodes_per_hyperedge[idx_to_hyperedge_id] >= k
    self.__hyperedge_index = self.__hyperedge_index[:, keep_mask]
    return self

to_0based(node_ids_to_rebase=None, hyperedge_ids_to_rebase=None)

Convert hyperedge index to the 0-based format by rebasing node IDs to the range [0, num_nodes-1] and hyperedge IDs [0, num_hyperedges-1].

Parameters:

Name Type Description Default
node_ids_to_rebase Tensor | None

Tensor of shape (num_nodes,) containing the original node IDs that need to be rebased to 0-based format. If None, all node IDs in the hyperedge index will be rebased to 0-based format based on their unique sorted order.

None
hyperedge_ids_to_rebase Tensor | None

Tensor of shape (num_hyperedges,) containing the original hyperedge IDs that need to be rebased to 0-based format. If None, all hyperedge IDs in the hyperedge index will be rebased to 0-based format based on their unique sorted order.

None

Returns:

Type Description
HyperedgeIndex

A new :class:HyperedgeIndex instance with the hyperedge index converted to 0-based format.

Source code in hyperbench/types/hypergraph.py
def to_0based(
    self,
    node_ids_to_rebase: Tensor | None = None,
    hyperedge_ids_to_rebase: Tensor | None = None,
) -> "HyperedgeIndex":
    """
    Convert hyperedge index to the 0-based format by rebasing node IDs to the range ``[0, num_nodes-1]`` and hyperedge IDs ``[0, num_hyperedges-1]``.

    Args:
        node_ids_to_rebase: Tensor of shape ``(num_nodes,)`` containing the original node IDs that need to be rebased to 0-based format.
            If ``None``, all node IDs in the hyperedge index will be rebased to 0-based format based on their unique sorted order.
        hyperedge_ids_to_rebase: Tensor of shape ``(num_hyperedges,)`` containing the original hyperedge IDs that need to be rebased to 0-based format.
            If ``None``, all hyperedge IDs in the hyperedge index will be rebased to 0-based format based on their unique sorted order.

    Returns:
        A new :class:`HyperedgeIndex` instance with the hyperedge index converted to 0-based format.
    """
    # Example: hyperedge_index after sorting: [[0, 0, 1, 2, 3, 4],
    #                                          [3, 4, 4, 3, 4, 3]]
    #          node_ids_to_rebase = [0, 1, 2, 3, 4]
    #          -> hyperedge_index after remapping: [[0, 0, 1, 2, 3, 4],
    #                                               [3, 4, 4, 3, 4, 3]]
    self.__hyperedge_index[0] = to_0based_ids(self.all_node_ids, node_ids_to_rebase)

    # Example: hyperedge_index after remapping nodes: [[0, 0, 1, 2, 3, 4],
    #                                                  [3, 4, 4, 3, 4, 3]]
    #          hyperedge_ids_to_rebase = [3, 4]
    #          -> hyperedge_index after remapping hyperedges: [[0, 0, 1, 2, 3, 4],
    #                                                          [0, 0, 1, 0, 1, 0]]
    self.__hyperedge_index[1] = to_0based_ids(self.all_hyperedge_ids, hyperedge_ids_to_rebase)

    return self

HData

Container for hypergraph data.

Examples:

>>> x = torch.randn(10, 16)  # 10 nodes with 16 features each
>>> hyperedge_index = torch.tensor([[0, 0, 1, 1, 1],  # node IDs
...                                 [0, 1, 2, 3, 4]]) # hyperedge IDs
>>> data = HData(x=x, hyperedge_index=hyperedge_index)

Parameters:

Name Type Description Default
x Tensor

Node feature matrix of shape [num_nodes, num_features].

required
hyperedge_index Tensor

Hyperedge connectivity in COO format of shape [2, num_incidences], where hyperedge_index[0] contains node IDs and hyperedge_index[1] contains hyperedge IDs.

required
hyperedge_weights Tensor | None

Optional tensor of shape [num_hyperedges] containing weights for each hyperedge.

None
hyperedge_attr Tensor | None

Hyperedge feature matrix of shape [num_hyperedges, num_hyperedge_features]. Features associated with each hyperedge (e.g., weights, timestamps, types).

None
num_nodes int | None

Number of nodes in the hypergraph. If None, inferred as x.size(0).

None
num_hyperedges int | None

Number of hyperedges in the hypergraph. If None, inferred as the number of unique hyperedge IDs in hyperedge_index[1].

None
y Tensor | None

Labels for hyperedges, of shape [num_hyperedges]. Used for supervised learning tasks. For unsupervised tasks, this can be ignored. Default is a tensor of ones, indicating all hyperedges are positive examples.

None
global_node_ids Tensor | None

Optional stable node IDs of shape [num_nodes] matching the row order of x. Use this to preserve access to the canonical node space when hyperedge_index is rebased locally. If None, defaults to torch.arange(num_nodes), assuming that these are the global node IDs in the same order as the rows of x.

None
Source code in hyperbench/types/hdata.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
class HData:
    """
    Container for hypergraph data.

    Examples:
        >>> x = torch.randn(10, 16)  # 10 nodes with 16 features each
        >>> hyperedge_index = torch.tensor([[0, 0, 1, 1, 1],  # node IDs
        ...                                 [0, 1, 2, 3, 4]]) # hyperedge IDs
        >>> data = HData(x=x, hyperedge_index=hyperedge_index)

    Args:
        x: Node feature matrix of shape ``[num_nodes, num_features]``.
        hyperedge_index: Hyperedge connectivity in COO format of shape ``[2, num_incidences]``,
            where ``hyperedge_index[0]`` contains node IDs and ``hyperedge_index[1]`` contains hyperedge IDs.
        hyperedge_weights: Optional tensor of shape ``[num_hyperedges]`` containing weights for each hyperedge.
        hyperedge_attr: Hyperedge feature matrix of shape ``[num_hyperedges, num_hyperedge_features]``.
            Features associated with each hyperedge (e.g., weights, timestamps, types).
        num_nodes: Number of nodes in the hypergraph.
            If ``None``, inferred as ``x.size(0)``.
        num_hyperedges: Number of hyperedges in the hypergraph.
            If ``None``, inferred as the number of unique hyperedge IDs in ``hyperedge_index[1]``.
        y: Labels for hyperedges, of shape ``[num_hyperedges]``.
            Used for supervised learning tasks. For unsupervised tasks, this can be ignored.
            Default is a tensor of ones, indicating all hyperedges are positive examples.
        global_node_ids: Optional stable node IDs of shape ``[num_nodes]`` matching the row order of ``x``.
            Use this to preserve access to the canonical node space when ``hyperedge_index`` is rebased locally.
            If ``None``, defaults to ``torch.arange(num_nodes)``, assuming that these are the global node IDs in the same order as the rows of ``x``.
    """

    def __init__(
        self,
        x: Tensor,
        hyperedge_index: Tensor,
        hyperedge_weights: Tensor | None = None,
        hyperedge_attr: Tensor | None = None,
        num_nodes: int | None = None,
        num_hyperedges: int | None = None,
        global_node_ids: Tensor | None = None,
        y: Tensor | None = None,
    ):
        self.x: Tensor = x

        self.hyperedge_index: Tensor = hyperedge_index

        self.hyperedge_weights: Tensor | None = hyperedge_weights

        self.hyperedge_attr: Tensor | None = hyperedge_attr

        hyperedge_index_wrapper = HyperedgeIndex(hyperedge_index)

        self.num_nodes: int = (
            num_nodes
            if num_nodes is not None
            # There should never be isolated nodes when HData is created by Dataset
            # as each isolted node gets its own self-loop hyperedge
            else hyperedge_index_wrapper.num_nodes_if_isolated_exist(num_nodes=x.size(0))
        )

        self.num_hyperedges: int = (
            num_hyperedges if num_hyperedges is not None else hyperedge_index_wrapper.num_hyperedges
        )

        self.global_node_ids: Tensor | None = (
            # torch.arange is to handle isolated nodes, as they are already considered
            # when computing self.num_nodes via num_nodes_if_isolated_exist
            global_node_ids if global_node_ids is not None else torch.arange(self.num_nodes)
        )

        self.y = (
            y
            if y is not None
            else torch.ones((self.num_hyperedges,), dtype=torch.float, device=self.x.device)
        )

        self.device = self.get_device_if_all_consistent()

    def __repr__(self) -> str:
        return (
            f"{self.__class__.__name__}(\n"
            f"    num_nodes={self.num_nodes},\n"
            f"    num_hyperedges={self.num_hyperedges},\n"
            f"    x_shape={self.x.shape},\n"
            f"    global_node_ids_shape={self.global_node_ids.shape if self.global_node_ids is not None else None},\n"
            f"    hyperedge_index_shape={self.hyperedge_index.shape},\n"
            f"    hyperedge_weights_shape={self.hyperedge_weights.shape if self.hyperedge_weights is not None else None},\n"
            f"    hyperedge_attr_shape={self.hyperedge_attr.shape if self.hyperedge_attr is not None else None},\n"
            f"    y_shape={self.y.shape if self.y is not None else None}\n"
            f"    device={self.device}\n"
            f")"
        )

    @classmethod
    def cat_same_node_space(cls, hdatas: Sequence["HData"], x: Tensor | None = None) -> "HData":
        """
        Concatenate :class:`HData` instances that share the same node space, meaning nodes with the same ID in different instances are the same node.
        This is useful when combining positive and negative hyperedges that reference the same set of nodes.

        Notes:
            - ``x`` is derived from the instance with the largest number of nodes, if not provided explicitly. If there are conflicting features for the same node ID across instances, the features from the instance with the largest number of nodes will be used.
            - ``hyperedge_index`` is the concatenation of all input hyperedge indices.
            - ``hyperedge_weights`` is the concatenation of all input hyperedge weights, if present. If some instances have hyperedge weights and others do not, the resulting ``hyperedge_weights`` will be set to ``None``.
            - ``hyperedge_attr`` is the concatenation of all input hyperedge attributes, if present. If some instances have hyperedge attributes and others do not, the resulting ``hyperedge_attr`` will be set to ``None``.
            - ``y`` is the concatenation of all input labels.

        Examples:
            >>> x = torch.randn(5, 8)
            >>> pos = HData(x=x, hyperedge_index=torch.tensor([[0, 1, 2, 3, 4], [0, 0, 1, 2, 2]]))
            >>> neg = HData(x=x, hyperedge_index=torch.tensor([[0, 2], [3, 3]]))
            >>> new = HData.cat_same_node_space([pos, neg])
            >>> new.num_nodes  # 5 — nodes [0, 1, 2, 3, 4]
            >>> new.num_hyperedges  # 4 — hyperedges [0, 1, 2, 3]

        Args:
            hdatas: One or more :class:`HData` instances sharing the same node space.
            x: Optional node feature matrix to use for the resulting :class:`HData`.
                If ``None``, the node features from the instance with the largest number of nodes will be used.

        Returns:
            A new :class:`HData` with shared nodes and concatenated hyperedges.

        Raises:
            ValueError: If the node counts do not match across inputs.
        """
        if len(hdatas) < 1:
            raise ValueError("At least one instance is required.")

        joint_hyperedge_ids = torch.cat([hdata.hyperedge_index[1].unique() for hdata in hdatas])
        unique_joint_hyperedge_ids = joint_hyperedge_ids.unique()
        if unique_joint_hyperedge_ids.size(0) != joint_hyperedge_ids.size(0):
            raise ValueError(
                "Overlapping hyperedge IDs found across instances. Ensure each instance uses distinct hyperedge IDs."
            )

        new_x = x if x is not None else max(hdatas, key=lambda hdata: hdata.num_nodes).x
        new_y = torch.cat([hdata.y for hdata in hdatas], dim=0)
        new_hyperedge_index = torch.cat([hdata.hyperedge_index for hdata in hdatas], dim=1)

        hyperedge_attrs = []
        hyperedge_weights = []
        have_all_hyperedge_attr = all(hdata.hyperedge_attr is not None for hdata in hdatas)
        have_all_hyperedge_weights = all(hdata.hyperedge_weights is not None for hdata in hdatas)
        for hdata in hdatas:
            if have_all_hyperedge_attr and hdata.hyperedge_attr is not None:
                hyperedge_attrs.append(hdata.hyperedge_attr)
            if have_all_hyperedge_weights and hdata.hyperedge_weights is not None:
                hyperedge_weights.append(hdata.hyperedge_weights)
        new_hyperedge_attr = torch.cat(hyperedge_attrs, dim=0) if len(hyperedge_attrs) > 0 else None
        new_hyperedge_weights = (
            torch.cat(hyperedge_weights, dim=0) if len(hyperedge_weights) > 0 else None
        )
        return cls(
            x=new_x,
            hyperedge_index=new_hyperedge_index,
            hyperedge_weights=new_hyperedge_weights,
            hyperedge_attr=new_hyperedge_attr,
            num_nodes=new_x.size(0),
            num_hyperedges=new_y.size(0),
            global_node_ids=max(hdatas, key=lambda hdata: hdata.num_nodes).global_node_ids,
            y=new_y,
        )

    def add_negative_samples(
        self,
        negative_sampler: "NegativeSampler",
        seed: int | None = None,
    ) -> "HData":
        """
        Return a new :class:`HData` with sampled negative hyperedges added.

        Args:
            negative_sampler: Sampler used to generate negative hyperedges from this instance.
            seed: Optional random seed used for both negative sampling and the final shuffle.

        Returns:
            A new :class:`HData` containing the original hyperedges and sampled negatives.
        """
        neg_hdata = negative_sampler.sample(self, seed=seed)
        hdata_with_negatives = self.cat_same_node_space([self, neg_hdata])
        return hdata_with_negatives.shuffle(seed=seed)

    @classmethod
    def empty(cls) -> "HData":
        return cls(
            x=empty_nodefeatures(),
            hyperedge_index=empty_hyperedgeindex(),
            hyperedge_weights=None,
            hyperedge_attr=None,
            num_nodes=0,
            num_hyperedges=0,
            global_node_ids=torch.empty(size=(0, 0), dtype=torch.long),
            y=None,
        )

    @classmethod
    def from_hyperedge_index(cls, hyperedge_index: Tensor) -> "HData":
        """
        Build an :class:`HData` from a given hyperedge index, with empty node features and hyperedge attributes.

        - Node features are initialized as an empty tensor of shape ``[0, 0]``.
        - Hyperedge attributes are set to ``None``.
        - Hyperedge weights are set to ``None``.
        - The number of nodes and hyperedges are inferred from the hyperedge index.

        Examples:
            >>> hyperedge_index = [[0, 0, 1, 2, 3, 4],
            ...                    [0, 0, 0, 1, 2, 2]]
            >>> num_nodes = 5
            >>> num_hyperedges = 3
            >>> x = []  # Empty node features with shape [0, 0]
            >>> hyperedge_attr = None
            >>> hyperedge_weights = None

        Args:
            hyperedge_index: Tensor of shape ``[2, num_incidences]`` representing the hypergraph connectivity.

        Returns:
            An :class:`HData` instance with the given hyperedge index and default values for other attributes.
        """
        return cls(
            x=empty_nodefeatures(),
            hyperedge_index=hyperedge_index,
            hyperedge_weights=None,
            hyperedge_attr=None,
            global_node_ids=None,
            y=None,
        )

    @classmethod
    def split(
        cls,
        hdata: "HData",
        split_hyperedge_ids: Tensor,
        node_space_setting: NodeSpaceSetting = "transductive",
    ) -> "HData":
        """
        Build an :class:`HData` for a single split from the given hyperedge IDs.

        Examples:
            Transductive split (default) preserving the full node space:
            >>> split_hdata = HData.split(hdata, torch.tensor([1]), node_space_setting="transductive")
            >>> split_hdata.x.shape[0] == hdata.x.shape[0]
            >>> split_hdata.hyperedge_index
            ... # node IDs stay in the original row space, hyperedge IDs are rebased

            Inductive split:
            >>> split_hdata = HData.split(hdata, torch.tensor([1]), node_space_setting="inductive")
            >>> split_hdata.x.shape[0]  # only nodes incident to hyperedge 1
            ... 2

        Args:
            hdata: The original :class:`HData` containing the full hypergraph.
            split_hyperedge_ids: Tensor of hyperedge IDs to include in this split.
            node_space_setting: Whether to preserve the full node space in the splits.
                ``transductive`` (default) ensures all nodes are present in every split,
                while ``inductive`` allows splits to have disjoint node spaces.

        Returns:
            The splitted instance with remapped node and hyperedge IDs.
        """
        # Mask to keep only incidences belonging to selected hyperedges
        # Example: hyperedge_index = [[0, 0, 1, 2, 3, 4],
        #                             [0, 0, 0, 1, 2, 2]]
        #          split_hyperedge_ids = [0, 2]
        #          -> mask = [True, True, True, False, True, True]
        keep_mask = torch.isin(hdata.hyperedge_index[1], split_hyperedge_ids)

        # Example: hyperedge_index = [[0, 0, 1, 3, 4],
        #                             [0, 0, 0, 2, 2]]
        #          incidence [2, 1] is missing as 1 is not in split_hyperedge_ids = [0, 2]
        split_hyperedge_index = hdata.hyperedge_index[:, keep_mask].clone()

        # Example: split_hyperedge_index = [[2, 3, 4],
        #                                   [2, 2, 5]]
        #          -> split_unique_hyperedge_ids = [2, 5]
        split_unique_hyperedge_ids = split_hyperedge_index[1].unique()

        split_y = hdata.y[split_unique_hyperedge_ids]

        split_hyperedge_attr = None
        if hdata.hyperedge_attr is not None:
            split_hyperedge_attr = hdata.hyperedge_attr[split_unique_hyperedge_ids]

        split_hyperedge_weights = None
        if hdata.hyperedge_weights is not None:
            split_hyperedge_weights = hdata.hyperedge_weights[split_unique_hyperedge_ids]

        # We don't need to split nodes, so we split only hyperedges and rebase their IDs to 0-based
        if is_transductive_setting(node_space_setting):
            # Example: split_unique_hyperedge_ids = [2, 5]
            #          -> hyperedge 2 -> 0, hyperedge 5 -> 1
            split_hyperedge_index[1] = to_0based_ids(
                original_ids=split_hyperedge_index[1],
                ids_to_rebase=split_unique_hyperedge_ids,
            )
            return cls(
                x=hdata.x,
                hyperedge_index=split_hyperedge_index,
                hyperedge_weights=split_hyperedge_weights,
                hyperedge_attr=split_hyperedge_attr,
                num_nodes=hdata.num_nodes,
                num_hyperedges=len(split_unique_hyperedge_ids),
                global_node_ids=hdata.global_node_ids,
                y=split_y,
            )

        # Example: split_hyperedge_index = [[0, 0, 1, 3, 4],
        #                                   [0, 0, 0, 2, 2]]
        #          -> split_unique_node_ids = [0, 1, 3, 4]
        split_unique_node_ids = split_hyperedge_index[0].unique()

        split_hyperedge_index = (
            HyperedgeIndex(split_hyperedge_index)
            .to_0based(
                node_ids_to_rebase=split_unique_node_ids,
                hyperedge_ids_to_rebase=split_unique_hyperedge_ids,
            )
            .item
        )

        split_x = hdata.x[split_unique_node_ids]

        split_global_node_ids = None
        if hdata.global_node_ids is not None:
            split_global_node_ids = hdata.global_node_ids[split_unique_node_ids]

        return cls(
            x=split_x,
            hyperedge_index=split_hyperedge_index,
            hyperedge_weights=split_hyperedge_weights,
            hyperedge_attr=split_hyperedge_attr,
            num_nodes=len(split_unique_node_ids),
            num_hyperedges=len(split_unique_hyperedge_ids),
            global_node_ids=split_global_node_ids,
            y=split_y,
        )

    def enrich_node_features(
        self,
        enricher: NodeEnricher,
        enrichment_mode: EnrichmentMode | None = None,
    ) -> "HData":
        """
        Enrich node features using the provided node feature enricher.

        Args:
            enricher: An instance of NodeEnricher to generate structural node features from hypergraph topology.
            enrichment_mode: How to combine generated features with existing ``hdata.x``.
                ``concatenate`` appends new features as additional columns.
                ``replace`` substitutes ``hdata.x`` entirely.
        """
        enriched_features = enricher.enrich(self.hyperedge_index)

        match enrichment_mode:
            case "concatenate":
                x = torch.cat([self.x, enriched_features], dim=1)
            case _:
                x = enriched_features

        return self.__class__(
            x=x,
            hyperedge_index=self.hyperedge_index,
            hyperedge_weights=self.hyperedge_weights,
            hyperedge_attr=self.hyperedge_attr,
            num_nodes=self.num_nodes,
            num_hyperedges=self.num_hyperedges,
            global_node_ids=self.global_node_ids,
            y=self.y,
        )

    def enrich_node_features_from(
        self,
        hdata_with_features: "HData",
        node_space_setting: NodeSpaceSetting = "transductive",
        fill_value: NodeSpaceFiller | None = None,
    ) -> "HData":
        """
        Copy node features from another :class:`HData` by aligning features by ``global_node_ids``.

        Examples:
            Transductive enrichment (default) expecting the same node space in both source and target:
            >>> target = target.enrich_node_features_from(source, node_space_setting="transductive")

            Inductive with a scalar fill value:
            >>> target = target.enrich_node_features_from(
            ...     source,
            ...     node_space_setting="inductive",
            ...     fill_value=0.0,
            ... )

            Inductive with a feature vector fill value:
            >>> target = target.enrich_node_features_from(
            ...     source,
            ...     node_space_setting="inductive",
            ...     fill_value=[0.0, 1.0, 0.0],
            ... )

        Args:
            hdata_with_features: Source :class:`HData` providing node features.
            node_space_setting: The setting for the node space, determining how nodes are handled.
                If ``"transductive"``, every target node is expected to exist in the source.
                If ``"inductive"``, the target dataset may have a different node space, and missing nodes are filled using ``fill_value``.
            fill_value: Scalar or vector used to fill missing node features when ``node_space_setting`` is not transductive.

        Returns:
            A new :class:`HData` with node features copied from ``hdata_with_features``.

        Raises:
            ValueError: If either instance lacks ``global_node_ids``, if the source feature rows
                do not align with the source node IDs, if ``fill_value`` is used with
                ``node_space_setting="transductive"``, or if ``fill_value`` is missing or malformed when ``node_space_setting="inductive"``.
        """
        source_global_node_ids = hdata_with_features.global_node_ids
        source_x = hdata_with_features.x
        if self.global_node_ids is None or source_global_node_ids is None:
            raise ValueError(
                "Both HData instances must define global_node_ids to align node features."
            )
        if source_x.size(0) != source_global_node_ids.size(0):
            raise ValueError(
                "Expected hdata_with_features.x rows to align with hdata_with_features.global_node_ids."
            )
        self.__validate_node_space_setting(node_space_setting, fill_value)

        target_global_node_ids = self.global_node_ids.detach().cpu().tolist()

        # We need the index of the features for each node in the source, as we will use the index to track back
        # to the node feautures after we match the global node id in the target to the one that is in the source
        source_feature_idx_by_global_node_id = {
            int(global_node_id): feature_idx
            for feature_idx, global_node_id in enumerate(
                source_global_node_ids.detach().cpu().tolist()
            )
        }

        fill_features = self.__to_fill_features(
            fill_value=fill_value,
            num_features=int(source_x.size(1)),
            dtype=source_x.dtype,
            device=source_x.device,
        )

        enriched_rows = []
        missing_global_node_ids = []
        for global_node_id in target_global_node_ids:
            source_feature_idx = source_feature_idx_by_global_node_id.get(int(global_node_id))
            if source_feature_idx is None:
                # Example: global_node_id = 30 is not present in the source
                #          -> strict transductive mode records it as missing and then raises an error
                #          -> non-transductive mode fills the features with fill_value and continues enriching the other nodes
                if is_transductive_setting(node_space_setting):
                    missing_global_node_ids.append(
                        int(global_node_id)
                    )  # record missing node for error message
                else:
                    enriched_rows.append(
                        fill_features
                    )  # fill missing node features with fill_value and
                continue

            # Match the global node IDs in the target to the corresponding feature indices in the source
            # Example: source_global_node_ids = [10, 20, 30], source_x has shape (3, num_features)
            #          target_global_node_ids = [10, 30]
            #          -> source_feature_idx_by_global_node_id = {10: 0, 20: 1, 30: 2}
            #          -> pick source_x rows 0 and 2 for the target
            enriched_rows.append(source_x[source_feature_idx])

        if len(missing_global_node_ids) > 0:
            raise ValueError(
                f"Missing node features for target global_node_ids: {missing_global_node_ids}."
            )

        enriched_x = torch.stack(enriched_rows, dim=0).to(device=self.device)

        return self.__class__(
            x=enriched_x,
            hyperedge_index=self.hyperedge_index,
            hyperedge_weights=self.hyperedge_weights,
            hyperedge_attr=self.hyperedge_attr,
            num_nodes=self.num_nodes,
            num_hyperedges=self.num_hyperedges,
            global_node_ids=self.global_node_ids,
            y=self.y,
        )

    def enrich_hyperedge_weights(
        self,
        enricher: HyperedgeEnricher,
        enrichment_mode: EnrichmentMode | None = None,
    ) -> "HData":
        """Enrich hyperedge weights using the provided hyperedge weight enricher.
        Args:
            enricher: An instance of HyperedgeEnricher to generate hyperedge weights from hypergraph topology.
            enrichment_mode: How to combine generated weights with existing ``hdata.hyperedge_weights``.
                ``concatenate`` appends new weights to the existing 1D tensor.
                ``replace`` substitutes ``hdata.hyperedge_weights`` entirely.
        """
        enriched_weights = enricher.enrich(self.hyperedge_index)

        match enrichment_mode:
            case "concatenate":
                hyperedge_weights = (
                    torch.cat([self.hyperedge_weights, enriched_weights], dim=0)
                    if self.hyperedge_weights is not None
                    else enriched_weights
                )
            case _:
                hyperedge_weights = enriched_weights

        return self.__class__(
            x=self.x,
            hyperedge_index=self.hyperedge_index,
            hyperedge_weights=hyperedge_weights,
            hyperedge_attr=self.hyperedge_attr,
            num_nodes=self.num_nodes,
            num_hyperedges=self.num_hyperedges,
            global_node_ids=self.global_node_ids,
            y=self.y,
        )

    def enrich_hyperedge_attr(
        self,
        enricher: HyperedgeEnricher,
        enrichment_mode: EnrichmentMode | None = None,
    ) -> "HData":
        """
        Enrich hyperedge features using the provided hyperedge feature enricher.

        Args:
            enricher: An instance of HyperedgeEnricher to generate structural hyperedge features from hypergraph topology.
            enrichment_mode: How to combine generated features with existing ``hdata.hyperedge_attr``.
                ``concatenate`` appends new features as additional columns.
                ``replace`` substitutes ``hdata.hyperedge_attr`` entirely.
        """
        enriched_features = enricher.enrich(self.hyperedge_index)

        match enrichment_mode:
            case "concatenate":
                hyperedge_attr = (
                    torch.cat([self.hyperedge_attr, enriched_features], dim=1)
                    if self.hyperedge_attr is not None
                    else enriched_features
                )
            case _:
                hyperedge_attr = enriched_features

        return self.__class__(
            x=self.x,
            hyperedge_index=self.hyperedge_index,
            hyperedge_weights=self.hyperedge_weights,
            hyperedge_attr=hyperedge_attr,
            num_nodes=self.num_nodes,
            num_hyperedges=self.num_hyperedges,
            global_node_ids=self.global_node_ids,
            y=self.y,
        )

    def get_device_if_all_consistent(self) -> torch.device:
        """
        Check that all tensors are on the same device and return that device.
        If there are no tensors or if they are on different devices, return CPU.

        Returns:
            The common device if all tensors are on the same device, otherwise CPU.

        Raises:
            ValueError: If tensors are on different devices.
        """
        devices = {self.x.device, self.hyperedge_index.device, self.y.device}
        if self.global_node_ids is not None:
            devices.add(self.global_node_ids.device)
        if self.hyperedge_attr is not None:
            devices.add(self.hyperedge_attr.device)
        if self.hyperedge_weights is not None:
            devices.add(self.hyperedge_weights.device)
        if len(devices) > 1:
            raise ValueError(f"Inconsistent device placement: {devices}")

        return devices.pop() if len(devices) == 1 else torch.device("cpu")

    def remove_hyperedges_with_fewer_than_k_nodes(self, k: int) -> "HData":
        hyperedge_index_wrapper = HyperedgeIndex(
            self.hyperedge_index
        ).remove_hyperedges_with_fewer_than_k_nodes(k)

        x = self.x[hyperedge_index_wrapper.node_ids]
        global_node_ids = None
        if self.global_node_ids is not None:
            global_node_ids = self.global_node_ids[hyperedge_index_wrapper.node_ids]
        y = self.y[hyperedge_index_wrapper.hyperedge_ids]

        hyperedge_attr = None
        if self.hyperedge_attr is not None:
            hyperedge_attr = self.hyperedge_attr[hyperedge_index_wrapper.hyperedge_ids]

        hyperedge_weights = None
        if self.hyperedge_weights is not None:
            hyperedge_weights = self.hyperedge_weights[hyperedge_index_wrapper.hyperedge_ids]

        return self.__class__(
            x=x,
            hyperedge_index=hyperedge_index_wrapper.to_0based().item,
            hyperedge_weights=hyperedge_weights,
            hyperedge_attr=hyperedge_attr,
            num_nodes=hyperedge_index_wrapper.num_nodes,
            num_hyperedges=hyperedge_index_wrapper.num_hyperedges,
            global_node_ids=global_node_ids,
            y=y,
        )

    def shuffle(self, seed: int | None = None) -> "HData":
        """
        Return a new :class:`HData` instance with hyperedge IDs randomly reassigned.

        Each hyperedge keeps its original set of nodes, but is assigned a new ID via a random permutation.
        ``y`` and ``hyperedge_attr`` are reordered to match, so that ``y[new_id]`` still corresponds to the correct hyperedge.
        Same for ``hyperedge_attr[new_id]`` if hyperedge attributes are present.

        Examples:
            >>> hyperedge_index = torch.tensor([[0, 1, 2, 3], [0, 0, 1, 1]])
            >>> y  = torch.tensor([1, 0])
            >>> hdata = HData(x=x, hyperedge_index=hyperedge_index, y=y)
            >>> shuffled_hdata = hdata.shuffle(seed=42)
            >>> shuffled_hdata.hyperedge_index  # hyperedges may be reassigned
            ... # e.g.,
            ...     [[0, 1, 2, 3],
            ...      [1, 1, 0, 0]]
            >>> shuffled_hdata.y  # labels are permuted to match new hyperedge IDs, e.g., [0, 1]

        Args:
            seed: Optional random seed for reproducibility. If ``None``, the shuffle will be non-deterministic.

        Returns:
            A new :class:`HData` instance with hyperedge IDs, ``y``, and ``hyperedge_attr`` permuted.
        """
        generator = torch.Generator(device=self.device)
        if seed is not None:
            generator.manual_seed(seed)

        permutation = torch.randperm(self.num_hyperedges, generator=generator, device=self.device)

        # permutation[new_id] = old_id, so y[permutation] puts old labels into new slots
        # inverse_permutation[old_id] = new_id, used to remap hyperedge IDs in incidences
        # Example: permutation = [1, 2, 0] means new_id 0 gets old_id 1, new_id 1 gets old_id 2, new_id 2 gets old_id 0
        #          -> inverse_permutation = [2, 0, 1] means old_id 0 gets new_id 2, old_id 1 gets new_id 0, old_id 2 gets new_id 1
        inverse_permutation = torch.empty_like(permutation)
        inverse_permutation[permutation] = torch.arange(self.num_hyperedges, device=self.device)

        new_hyperedge_index = self.hyperedge_index.clone()

        # Example: hyperedge_index = [[0, 1, 2, 3, 4],
        #                             [0, 0, 1, 1, 2]],
        #          inverse_permutation = [2, 0, 1] (new_id 0 -> old_id 2, new_id 1 -> old_id 0, new_id 2 -> old_id 1)
        #          -> new_hyperedge_index = [[0, 1, 2, 3, 4],
        #                                    [2, 2, 0, 0, 1]]
        old_hyperedge_ids = self.hyperedge_index[1]
        new_hyperedge_index[1] = inverse_permutation[old_hyperedge_ids]

        # Example: hyperedge_attr = [attr_0, attr_1, attr_2], permutation = [1, 2, 0]
        #          -> new_hyperedge_attr = [attr_1  (attr of old_id 1), attr_2 (attr of old_id 2), attr_0 (attr of old_id 0)]
        new_hyperedge_attr = (
            self.hyperedge_attr[permutation] if self.hyperedge_attr is not None else None
        )

        new_hyperedge_weights = (
            self.hyperedge_weights[permutation] if self.hyperedge_weights is not None else None
        )

        # Example: y = [1, 1, 0], permutation = [1, 2, 0]
        #          -> new_y = [y[1], y[2], y[0]] = [1, 0, 1]
        new_y = self.y[permutation]

        return self.__class__(
            x=self.x,
            hyperedge_index=new_hyperedge_index,
            hyperedge_weights=new_hyperedge_weights,
            hyperedge_attr=new_hyperedge_attr,
            num_nodes=self.num_nodes,
            num_hyperedges=self.num_hyperedges,
            global_node_ids=self.global_node_ids,
            y=new_y,
        )

    def clone(self) -> "HData":
        """
        Return a deep copy of this :class:`HData`.

        Returns:
            A new :class:`HData` that is a deep copy of this instance.
        """
        cloned_hyperedge_weights = (
            self.hyperedge_weights.clone() if self.hyperedge_weights is not None else None
        )
        cloned_hyperedge_attr = (
            self.hyperedge_attr.clone() if self.hyperedge_attr is not None else None
        )
        cloned_global_node_ids = (
            self.global_node_ids.clone() if self.global_node_ids is not None else None
        )
        return self.__class__(
            x=self.x.clone(),
            hyperedge_index=self.hyperedge_index.clone(),
            hyperedge_weights=cloned_hyperedge_weights,
            hyperedge_attr=cloned_hyperedge_attr,
            num_nodes=self.num_nodes,
            num_hyperedges=self.num_hyperedges,
            global_node_ids=cloned_global_node_ids,
            y=self.y.clone(),
        )

    def to(self, device: torch.device | str, non_blocking: bool = False) -> "HData":
        """
        Move all tensors to the specified device.

        Args:
            device: The target device (e.g., 'cpu', 'cuda:0').
            non_blocking: If ``True`` and the source and destination devices are both CUDA, the copy will be non-blocking.

        Returns:
            The :class:`HData` instance with all tensors moved to the specified device.
        """
        self.x = self.x.to(device=device, non_blocking=non_blocking)
        self.hyperedge_index = self.hyperedge_index.to(device=device, non_blocking=non_blocking)
        self.y = self.y.to(device=device, non_blocking=non_blocking)

        if self.global_node_ids is not None:
            self.global_node_ids = self.global_node_ids.to(device=device, non_blocking=non_blocking)

        if self.hyperedge_attr is not None:
            self.hyperedge_attr = self.hyperedge_attr.to(device=device, non_blocking=non_blocking)

        if self.hyperedge_weights is not None:
            self.hyperedge_weights = self.hyperedge_weights.to(
                device=device, non_blocking=non_blocking
            )

        self.device = device if isinstance(device, torch.device) else torch.device(device)
        return self

    def with_y_to(self, value: float) -> "HData":
        """
        Return a copy of this instance with a y attribute set to the given value.

        Args:
            value: The value to set for all entries in the y attribute.

        Returns:
            A new :class:`HData` instance with the same attributes except for y, which is set to a tensor of the given value.
        """
        return self.__class__(
            x=self.x,
            hyperedge_index=self.hyperedge_index,
            hyperedge_weights=self.hyperedge_weights,
            hyperedge_attr=self.hyperedge_attr,
            num_nodes=self.num_nodes,
            num_hyperedges=self.num_hyperedges,
            global_node_ids=self.global_node_ids,
            y=torch.full((self.num_hyperedges,), value, dtype=torch.float, device=self.device),
        )

    def with_y_ones(self) -> "HData":
        """Return a copy of this instance with a y attribute of all ones."""
        return self.with_y_to(1.0)

    def with_y_zeros(self) -> "HData":
        """Return a copy of this instance with a y attribute of all zeros."""
        return self.with_y_to(0.0)

    def stats(self) -> dict[str, Any]:
        """
        Compute statistics for the hypergraph data.
        The fields returned in the dictionary include:
        - ``shape_x``: The shape of the node feature matrix ``x``.
        - ``shape_hyperedge_weights``: The shape of the hyperedge weights tensor, or ``None`` if hyperedge weights are not present.
        - ``shape_hyperedge_attr``: The shape of the hyperedge attribute matrix, or ``None`` if hyperedge attributes are not present.
        - ``num_nodes``: The number of nodes in the hypergraph.
        - ``num_hyperedges``: The number of hyperedges in the hypergraph.
        - ``avg_degree_node_raw``: The average degree of nodes, calculated as the mean number of hyperedges each node belongs to.
        - ``avg_degree_node``: The floored node average degree.
        - ``avg_degree_hyperedge_raw``: The average size of hyperedges, calculated as the mean number of nodes each hyperedge contains.
        - ``avg_degree_hyperedge``: The floored hyperedge average size.
        - ``node_degree_max``: The maximum degree of any node in the hypergraph.
        - ``hyperedge_degree_max``: The maximum size of any hyperedge in the hypergraph.
        - ``node_degree_median``: The median degree of nodes in the hypergraph.
        - ``hyperedge_degree_median``: The median size of hyperedges in the hypergraph.
        - ``distribution_node_degree``: A list where the value at index ``i`` represents the count of nodes with degree ``i``.
        - ``distribution_hyperedge_size``: A list where the value at index ``i`` represents the count of hyperedges with size ``i``.
        - ``distribution_node_degree_hist``: A dictionary where the keys are node degrees and the values are the count of nodes with that degree.
        - ``distribution_hyperedge_size_hist``: A dictionary where the keys are hyperedge sizes and the values are the count of hyperedges with that size.

        Returns:
            A dictionary containing various statistics about the hypergraph.
        """

        node_ids = self.hyperedge_index[0]
        hyperedge_ids = self.hyperedge_index[1]

        # Degree of each node = number of hyperedges it belongs to
        # Size of each hyperedge = number of nodes it contains
        if node_ids.numel() > 0:
            distribution_node_degree = torch.bincount(node_ids, minlength=self.num_nodes).float()
            distribution_hyperedge_size = torch.bincount(
                hyperedge_ids, minlength=self.num_hyperedges
            ).float()
        else:
            distribution_node_degree = torch.zeros(self.num_nodes, dtype=torch.float)
            distribution_hyperedge_size = torch.zeros(self.num_hyperedges, dtype=torch.float)

        num_nodes = self.num_nodes
        num_hyperedges = self.num_hyperedges

        if distribution_node_degree.numel() > 0:
            avg_degree_node_raw = distribution_node_degree.mean().item()
            avg_degree_node = int(avg_degree_node_raw)
            avg_degree_hyperedge_raw = distribution_hyperedge_size.mean().item()
            avg_degree_hyperedge = int(avg_degree_hyperedge_raw)
            node_degree_max = int(distribution_node_degree.max().item())
            hyperedge_degree_max = int(distribution_hyperedge_size.max().item())
            node_degree_median = int(distribution_node_degree.median().item())
            hyperedge_degree_median = int(distribution_hyperedge_size.median().item())
        else:
            avg_degree_node_raw = 0
            avg_degree_node = 0
            avg_degree_hyperedge_raw = 0
            avg_degree_hyperedge = 0
            node_degree_max = 0
            hyperedge_degree_max = 0
            node_degree_median = 0
            hyperedge_degree_median = 0

        # Histograms: index i holds count of nodes/hyperedges with degree/size i
        distribution_node_degree_hist = torch.bincount(distribution_node_degree.long())
        distribution_hyperedge_size_hist = torch.bincount(distribution_hyperedge_size.long())

        distribution_node_degree_hist = {
            i: int(count.item())
            for i, count in enumerate(distribution_node_degree_hist)
            if count.item() > 0
        }
        distribution_hyperedge_size_hist = {
            i: int(count.item())
            for i, count in enumerate(distribution_hyperedge_size_hist)
            if count.item() > 0
        }

        return {
            "shape_x": self.x.shape,
            "shape_hyperedge_weights": self.hyperedge_weights.shape
            if self.hyperedge_weights is not None
            else None,
            "shape_hyperedge_attr": self.hyperedge_attr.shape
            if self.hyperedge_attr is not None
            else None,
            "num_nodes": num_nodes,
            "num_hyperedges": num_hyperedges,
            "avg_degree_node_raw": avg_degree_node_raw,
            "avg_degree_node": avg_degree_node,
            "avg_degree_hyperedge_raw": avg_degree_hyperedge_raw,
            "avg_degree_hyperedge": avg_degree_hyperedge,
            "node_degree_max": node_degree_max,
            "hyperedge_degree_max": hyperedge_degree_max,
            "node_degree_median": node_degree_median,
            "hyperedge_degree_median": hyperedge_degree_median,
            "distribution_node_degree": distribution_node_degree.int().tolist(),
            "distribution_hyperedge_size": distribution_hyperedge_size.int().tolist(),
            "distribution_node_degree_hist": distribution_node_degree_hist,
            "distribution_hyperedge_size_hist": distribution_hyperedge_size_hist,
        }

    def __to_fill_features(
        self,
        fill_value: NodeSpaceFiller | None,
        num_features: int,
        dtype: torch.dtype,
        device: torch.device,
    ) -> Tensor:
        if fill_value is None:
            return torch.empty((0,), dtype=dtype, device=device)

        if isinstance(fill_value, Tensor):
            fill_features = fill_value.to(dtype=dtype, device=device)
        elif isinstance(fill_value, (int, float)):
            fill_features = torch.full(
                (num_features,), float(fill_value), dtype=dtype, device=device
            )
        else:
            fill_features = torch.tensor(fill_value, dtype=dtype, device=device)

        # This can happen when fill_value is:
        # - A scalar tensor, e.g., tensor(0.0), which should be broadcasted to all features
        # - A list with a single value, e.g., [0.0], which should also be broadcasted to all features
        if fill_features.numel() == 1:
            fill_features = fill_features.repeat(num_features)

        if fill_features.dim() != 1 or fill_features.numel() != num_features:
            raise ValueError(
                f"Expected fill_value to define exactly {num_features} features, got shape "
                f"{tuple(fill_features.shape)}."
            )
        return fill_features

    def __validate_node_space_setting(
        self,
        node_space_setting: NodeSpaceSetting,
        fill_value: NodeSpaceFiller | None,
    ) -> None:
        if is_transductive_setting(node_space_setting) and fill_value is not None:
            raise ValueError(
                "fill_value cannot be provided when node_space_setting='transductive'."
            )
        if is_inductive_setting(node_space_setting) and fill_value is None:
            raise ValueError("fill_value must be provided when node_space_setting='inductive'.")

cat_same_node_space(hdatas, x=None) classmethod

Concatenate :class:HData instances that share the same node space, meaning nodes with the same ID in different instances are the same node. This is useful when combining positive and negative hyperedges that reference the same set of nodes.

Notes
  • x is derived from the instance with the largest number of nodes, if not provided explicitly. If there are conflicting features for the same node ID across instances, the features from the instance with the largest number of nodes will be used.
  • hyperedge_index is the concatenation of all input hyperedge indices.
  • hyperedge_weights is the concatenation of all input hyperedge weights, if present. If some instances have hyperedge weights and others do not, the resulting hyperedge_weights will be set to None.
  • hyperedge_attr is the concatenation of all input hyperedge attributes, if present. If some instances have hyperedge attributes and others do not, the resulting hyperedge_attr will be set to None.
  • y is the concatenation of all input labels.

Examples:

>>> x = torch.randn(5, 8)
>>> pos = HData(x=x, hyperedge_index=torch.tensor([[0, 1, 2, 3, 4], [0, 0, 1, 2, 2]]))
>>> neg = HData(x=x, hyperedge_index=torch.tensor([[0, 2], [3, 3]]))
>>> new = HData.cat_same_node_space([pos, neg])
>>> new.num_nodes  # 5 — nodes [0, 1, 2, 3, 4]
>>> new.num_hyperedges  # 4 — hyperedges [0, 1, 2, 3]

Parameters:

Name Type Description Default
hdatas Sequence[HData]

One or more :class:HData instances sharing the same node space.

required
x Tensor | None

Optional node feature matrix to use for the resulting :class:HData. If None, the node features from the instance with the largest number of nodes will be used.

None

Returns:

Type Description
HData

A new :class:HData with shared nodes and concatenated hyperedges.

Raises:

Type Description
ValueError

If the node counts do not match across inputs.

Source code in hyperbench/types/hdata.py
@classmethod
def cat_same_node_space(cls, hdatas: Sequence["HData"], x: Tensor | None = None) -> "HData":
    """
    Concatenate :class:`HData` instances that share the same node space, meaning nodes with the same ID in different instances are the same node.
    This is useful when combining positive and negative hyperedges that reference the same set of nodes.

    Notes:
        - ``x`` is derived from the instance with the largest number of nodes, if not provided explicitly. If there are conflicting features for the same node ID across instances, the features from the instance with the largest number of nodes will be used.
        - ``hyperedge_index`` is the concatenation of all input hyperedge indices.
        - ``hyperedge_weights`` is the concatenation of all input hyperedge weights, if present. If some instances have hyperedge weights and others do not, the resulting ``hyperedge_weights`` will be set to ``None``.
        - ``hyperedge_attr`` is the concatenation of all input hyperedge attributes, if present. If some instances have hyperedge attributes and others do not, the resulting ``hyperedge_attr`` will be set to ``None``.
        - ``y`` is the concatenation of all input labels.

    Examples:
        >>> x = torch.randn(5, 8)
        >>> pos = HData(x=x, hyperedge_index=torch.tensor([[0, 1, 2, 3, 4], [0, 0, 1, 2, 2]]))
        >>> neg = HData(x=x, hyperedge_index=torch.tensor([[0, 2], [3, 3]]))
        >>> new = HData.cat_same_node_space([pos, neg])
        >>> new.num_nodes  # 5 — nodes [0, 1, 2, 3, 4]
        >>> new.num_hyperedges  # 4 — hyperedges [0, 1, 2, 3]

    Args:
        hdatas: One or more :class:`HData` instances sharing the same node space.
        x: Optional node feature matrix to use for the resulting :class:`HData`.
            If ``None``, the node features from the instance with the largest number of nodes will be used.

    Returns:
        A new :class:`HData` with shared nodes and concatenated hyperedges.

    Raises:
        ValueError: If the node counts do not match across inputs.
    """
    if len(hdatas) < 1:
        raise ValueError("At least one instance is required.")

    joint_hyperedge_ids = torch.cat([hdata.hyperedge_index[1].unique() for hdata in hdatas])
    unique_joint_hyperedge_ids = joint_hyperedge_ids.unique()
    if unique_joint_hyperedge_ids.size(0) != joint_hyperedge_ids.size(0):
        raise ValueError(
            "Overlapping hyperedge IDs found across instances. Ensure each instance uses distinct hyperedge IDs."
        )

    new_x = x if x is not None else max(hdatas, key=lambda hdata: hdata.num_nodes).x
    new_y = torch.cat([hdata.y for hdata in hdatas], dim=0)
    new_hyperedge_index = torch.cat([hdata.hyperedge_index for hdata in hdatas], dim=1)

    hyperedge_attrs = []
    hyperedge_weights = []
    have_all_hyperedge_attr = all(hdata.hyperedge_attr is not None for hdata in hdatas)
    have_all_hyperedge_weights = all(hdata.hyperedge_weights is not None for hdata in hdatas)
    for hdata in hdatas:
        if have_all_hyperedge_attr and hdata.hyperedge_attr is not None:
            hyperedge_attrs.append(hdata.hyperedge_attr)
        if have_all_hyperedge_weights and hdata.hyperedge_weights is not None:
            hyperedge_weights.append(hdata.hyperedge_weights)
    new_hyperedge_attr = torch.cat(hyperedge_attrs, dim=0) if len(hyperedge_attrs) > 0 else None
    new_hyperedge_weights = (
        torch.cat(hyperedge_weights, dim=0) if len(hyperedge_weights) > 0 else None
    )
    return cls(
        x=new_x,
        hyperedge_index=new_hyperedge_index,
        hyperedge_weights=new_hyperedge_weights,
        hyperedge_attr=new_hyperedge_attr,
        num_nodes=new_x.size(0),
        num_hyperedges=new_y.size(0),
        global_node_ids=max(hdatas, key=lambda hdata: hdata.num_nodes).global_node_ids,
        y=new_y,
    )

add_negative_samples(negative_sampler, seed=None)

Return a new :class:HData with sampled negative hyperedges added.

Parameters:

Name Type Description Default
negative_sampler NegativeSampler

Sampler used to generate negative hyperedges from this instance.

required
seed int | None

Optional random seed used for both negative sampling and the final shuffle.

None

Returns:

Type Description
HData

A new :class:HData containing the original hyperedges and sampled negatives.

Source code in hyperbench/types/hdata.py
def add_negative_samples(
    self,
    negative_sampler: "NegativeSampler",
    seed: int | None = None,
) -> "HData":
    """
    Return a new :class:`HData` with sampled negative hyperedges added.

    Args:
        negative_sampler: Sampler used to generate negative hyperedges from this instance.
        seed: Optional random seed used for both negative sampling and the final shuffle.

    Returns:
        A new :class:`HData` containing the original hyperedges and sampled negatives.
    """
    neg_hdata = negative_sampler.sample(self, seed=seed)
    hdata_with_negatives = self.cat_same_node_space([self, neg_hdata])
    return hdata_with_negatives.shuffle(seed=seed)

from_hyperedge_index(hyperedge_index) classmethod

Build an :class:HData from a given hyperedge index, with empty node features and hyperedge attributes.

  • Node features are initialized as an empty tensor of shape [0, 0].
  • Hyperedge attributes are set to None.
  • Hyperedge weights are set to None.
  • The number of nodes and hyperedges are inferred from the hyperedge index.

Examples:

>>> hyperedge_index = [[0, 0, 1, 2, 3, 4],
...                    [0, 0, 0, 1, 2, 2]]
>>> num_nodes = 5
>>> num_hyperedges = 3
>>> x = []  # Empty node features with shape [0, 0]
>>> hyperedge_attr = None
>>> hyperedge_weights = None

Parameters:

Name Type Description Default
hyperedge_index Tensor

Tensor of shape [2, num_incidences] representing the hypergraph connectivity.

required

Returns:

Name Type Description
An HData

class:HData instance with the given hyperedge index and default values for other attributes.

Source code in hyperbench/types/hdata.py
@classmethod
def from_hyperedge_index(cls, hyperedge_index: Tensor) -> "HData":
    """
    Build an :class:`HData` from a given hyperedge index, with empty node features and hyperedge attributes.

    - Node features are initialized as an empty tensor of shape ``[0, 0]``.
    - Hyperedge attributes are set to ``None``.
    - Hyperedge weights are set to ``None``.
    - The number of nodes and hyperedges are inferred from the hyperedge index.

    Examples:
        >>> hyperedge_index = [[0, 0, 1, 2, 3, 4],
        ...                    [0, 0, 0, 1, 2, 2]]
        >>> num_nodes = 5
        >>> num_hyperedges = 3
        >>> x = []  # Empty node features with shape [0, 0]
        >>> hyperedge_attr = None
        >>> hyperedge_weights = None

    Args:
        hyperedge_index: Tensor of shape ``[2, num_incidences]`` representing the hypergraph connectivity.

    Returns:
        An :class:`HData` instance with the given hyperedge index and default values for other attributes.
    """
    return cls(
        x=empty_nodefeatures(),
        hyperedge_index=hyperedge_index,
        hyperedge_weights=None,
        hyperedge_attr=None,
        global_node_ids=None,
        y=None,
    )

split(hdata, split_hyperedge_ids, node_space_setting='transductive') classmethod

Build an :class:HData for a single split from the given hyperedge IDs.

Examples:

Transductive split (default) preserving the full node space:

>>> split_hdata = HData.split(hdata, torch.tensor([1]), node_space_setting="transductive")
>>> split_hdata.x.shape[0] == hdata.x.shape[0]
>>> split_hdata.hyperedge_index
... # node IDs stay in the original row space, hyperedge IDs are rebased

Inductive split:

>>> split_hdata = HData.split(hdata, torch.tensor([1]), node_space_setting="inductive")
>>> split_hdata.x.shape[0]  # only nodes incident to hyperedge 1
... 2

Parameters:

Name Type Description Default
hdata HData

The original :class:HData containing the full hypergraph.

required
split_hyperedge_ids Tensor

Tensor of hyperedge IDs to include in this split.

required
node_space_setting NodeSpaceSetting

Whether to preserve the full node space in the splits. transductive (default) ensures all nodes are present in every split, while inductive allows splits to have disjoint node spaces.

'transductive'

Returns:

Type Description
HData

The splitted instance with remapped node and hyperedge IDs.

Source code in hyperbench/types/hdata.py
@classmethod
def split(
    cls,
    hdata: "HData",
    split_hyperedge_ids: Tensor,
    node_space_setting: NodeSpaceSetting = "transductive",
) -> "HData":
    """
    Build an :class:`HData` for a single split from the given hyperedge IDs.

    Examples:
        Transductive split (default) preserving the full node space:
        >>> split_hdata = HData.split(hdata, torch.tensor([1]), node_space_setting="transductive")
        >>> split_hdata.x.shape[0] == hdata.x.shape[0]
        >>> split_hdata.hyperedge_index
        ... # node IDs stay in the original row space, hyperedge IDs are rebased

        Inductive split:
        >>> split_hdata = HData.split(hdata, torch.tensor([1]), node_space_setting="inductive")
        >>> split_hdata.x.shape[0]  # only nodes incident to hyperedge 1
        ... 2

    Args:
        hdata: The original :class:`HData` containing the full hypergraph.
        split_hyperedge_ids: Tensor of hyperedge IDs to include in this split.
        node_space_setting: Whether to preserve the full node space in the splits.
            ``transductive`` (default) ensures all nodes are present in every split,
            while ``inductive`` allows splits to have disjoint node spaces.

    Returns:
        The splitted instance with remapped node and hyperedge IDs.
    """
    # Mask to keep only incidences belonging to selected hyperedges
    # Example: hyperedge_index = [[0, 0, 1, 2, 3, 4],
    #                             [0, 0, 0, 1, 2, 2]]
    #          split_hyperedge_ids = [0, 2]
    #          -> mask = [True, True, True, False, True, True]
    keep_mask = torch.isin(hdata.hyperedge_index[1], split_hyperedge_ids)

    # Example: hyperedge_index = [[0, 0, 1, 3, 4],
    #                             [0, 0, 0, 2, 2]]
    #          incidence [2, 1] is missing as 1 is not in split_hyperedge_ids = [0, 2]
    split_hyperedge_index = hdata.hyperedge_index[:, keep_mask].clone()

    # Example: split_hyperedge_index = [[2, 3, 4],
    #                                   [2, 2, 5]]
    #          -> split_unique_hyperedge_ids = [2, 5]
    split_unique_hyperedge_ids = split_hyperedge_index[1].unique()

    split_y = hdata.y[split_unique_hyperedge_ids]

    split_hyperedge_attr = None
    if hdata.hyperedge_attr is not None:
        split_hyperedge_attr = hdata.hyperedge_attr[split_unique_hyperedge_ids]

    split_hyperedge_weights = None
    if hdata.hyperedge_weights is not None:
        split_hyperedge_weights = hdata.hyperedge_weights[split_unique_hyperedge_ids]

    # We don't need to split nodes, so we split only hyperedges and rebase their IDs to 0-based
    if is_transductive_setting(node_space_setting):
        # Example: split_unique_hyperedge_ids = [2, 5]
        #          -> hyperedge 2 -> 0, hyperedge 5 -> 1
        split_hyperedge_index[1] = to_0based_ids(
            original_ids=split_hyperedge_index[1],
            ids_to_rebase=split_unique_hyperedge_ids,
        )
        return cls(
            x=hdata.x,
            hyperedge_index=split_hyperedge_index,
            hyperedge_weights=split_hyperedge_weights,
            hyperedge_attr=split_hyperedge_attr,
            num_nodes=hdata.num_nodes,
            num_hyperedges=len(split_unique_hyperedge_ids),
            global_node_ids=hdata.global_node_ids,
            y=split_y,
        )

    # Example: split_hyperedge_index = [[0, 0, 1, 3, 4],
    #                                   [0, 0, 0, 2, 2]]
    #          -> split_unique_node_ids = [0, 1, 3, 4]
    split_unique_node_ids = split_hyperedge_index[0].unique()

    split_hyperedge_index = (
        HyperedgeIndex(split_hyperedge_index)
        .to_0based(
            node_ids_to_rebase=split_unique_node_ids,
            hyperedge_ids_to_rebase=split_unique_hyperedge_ids,
        )
        .item
    )

    split_x = hdata.x[split_unique_node_ids]

    split_global_node_ids = None
    if hdata.global_node_ids is not None:
        split_global_node_ids = hdata.global_node_ids[split_unique_node_ids]

    return cls(
        x=split_x,
        hyperedge_index=split_hyperedge_index,
        hyperedge_weights=split_hyperedge_weights,
        hyperedge_attr=split_hyperedge_attr,
        num_nodes=len(split_unique_node_ids),
        num_hyperedges=len(split_unique_hyperedge_ids),
        global_node_ids=split_global_node_ids,
        y=split_y,
    )

enrich_node_features(enricher, enrichment_mode=None)

Enrich node features using the provided node feature enricher.

Parameters:

Name Type Description Default
enricher NodeEnricher

An instance of NodeEnricher to generate structural node features from hypergraph topology.

required
enrichment_mode EnrichmentMode | None

How to combine generated features with existing hdata.x. concatenate appends new features as additional columns. replace substitutes hdata.x entirely.

None
Source code in hyperbench/types/hdata.py
def enrich_node_features(
    self,
    enricher: NodeEnricher,
    enrichment_mode: EnrichmentMode | None = None,
) -> "HData":
    """
    Enrich node features using the provided node feature enricher.

    Args:
        enricher: An instance of NodeEnricher to generate structural node features from hypergraph topology.
        enrichment_mode: How to combine generated features with existing ``hdata.x``.
            ``concatenate`` appends new features as additional columns.
            ``replace`` substitutes ``hdata.x`` entirely.
    """
    enriched_features = enricher.enrich(self.hyperedge_index)

    match enrichment_mode:
        case "concatenate":
            x = torch.cat([self.x, enriched_features], dim=1)
        case _:
            x = enriched_features

    return self.__class__(
        x=x,
        hyperedge_index=self.hyperedge_index,
        hyperedge_weights=self.hyperedge_weights,
        hyperedge_attr=self.hyperedge_attr,
        num_nodes=self.num_nodes,
        num_hyperedges=self.num_hyperedges,
        global_node_ids=self.global_node_ids,
        y=self.y,
    )

enrich_node_features_from(hdata_with_features, node_space_setting='transductive', fill_value=None)

Copy node features from another :class:HData by aligning features by global_node_ids.

Examples:

Transductive enrichment (default) expecting the same node space in both source and target:

>>> target = target.enrich_node_features_from(source, node_space_setting="transductive")

Inductive with a scalar fill value:

>>> target = target.enrich_node_features_from(
...     source,
...     node_space_setting="inductive",
...     fill_value=0.0,
... )

Inductive with a feature vector fill value:

>>> target = target.enrich_node_features_from(
...     source,
...     node_space_setting="inductive",
...     fill_value=[0.0, 1.0, 0.0],
... )

Parameters:

Name Type Description Default
hdata_with_features HData

Source :class:HData providing node features.

required
node_space_setting NodeSpaceSetting

The setting for the node space, determining how nodes are handled. If "transductive", every target node is expected to exist in the source. If "inductive", the target dataset may have a different node space, and missing nodes are filled using fill_value.

'transductive'
fill_value NodeSpaceFiller | None

Scalar or vector used to fill missing node features when node_space_setting is not transductive.

None

Returns:

Type Description
HData

A new :class:HData with node features copied from hdata_with_features.

Raises:

Type Description
ValueError

If either instance lacks global_node_ids, if the source feature rows do not align with the source node IDs, if fill_value is used with node_space_setting="transductive", or if fill_value is missing or malformed when node_space_setting="inductive".

Source code in hyperbench/types/hdata.py
def enrich_node_features_from(
    self,
    hdata_with_features: "HData",
    node_space_setting: NodeSpaceSetting = "transductive",
    fill_value: NodeSpaceFiller | None = None,
) -> "HData":
    """
    Copy node features from another :class:`HData` by aligning features by ``global_node_ids``.

    Examples:
        Transductive enrichment (default) expecting the same node space in both source and target:
        >>> target = target.enrich_node_features_from(source, node_space_setting="transductive")

        Inductive with a scalar fill value:
        >>> target = target.enrich_node_features_from(
        ...     source,
        ...     node_space_setting="inductive",
        ...     fill_value=0.0,
        ... )

        Inductive with a feature vector fill value:
        >>> target = target.enrich_node_features_from(
        ...     source,
        ...     node_space_setting="inductive",
        ...     fill_value=[0.0, 1.0, 0.0],
        ... )

    Args:
        hdata_with_features: Source :class:`HData` providing node features.
        node_space_setting: The setting for the node space, determining how nodes are handled.
            If ``"transductive"``, every target node is expected to exist in the source.
            If ``"inductive"``, the target dataset may have a different node space, and missing nodes are filled using ``fill_value``.
        fill_value: Scalar or vector used to fill missing node features when ``node_space_setting`` is not transductive.

    Returns:
        A new :class:`HData` with node features copied from ``hdata_with_features``.

    Raises:
        ValueError: If either instance lacks ``global_node_ids``, if the source feature rows
            do not align with the source node IDs, if ``fill_value`` is used with
            ``node_space_setting="transductive"``, or if ``fill_value`` is missing or malformed when ``node_space_setting="inductive"``.
    """
    source_global_node_ids = hdata_with_features.global_node_ids
    source_x = hdata_with_features.x
    if self.global_node_ids is None or source_global_node_ids is None:
        raise ValueError(
            "Both HData instances must define global_node_ids to align node features."
        )
    if source_x.size(0) != source_global_node_ids.size(0):
        raise ValueError(
            "Expected hdata_with_features.x rows to align with hdata_with_features.global_node_ids."
        )
    self.__validate_node_space_setting(node_space_setting, fill_value)

    target_global_node_ids = self.global_node_ids.detach().cpu().tolist()

    # We need the index of the features for each node in the source, as we will use the index to track back
    # to the node feautures after we match the global node id in the target to the one that is in the source
    source_feature_idx_by_global_node_id = {
        int(global_node_id): feature_idx
        for feature_idx, global_node_id in enumerate(
            source_global_node_ids.detach().cpu().tolist()
        )
    }

    fill_features = self.__to_fill_features(
        fill_value=fill_value,
        num_features=int(source_x.size(1)),
        dtype=source_x.dtype,
        device=source_x.device,
    )

    enriched_rows = []
    missing_global_node_ids = []
    for global_node_id in target_global_node_ids:
        source_feature_idx = source_feature_idx_by_global_node_id.get(int(global_node_id))
        if source_feature_idx is None:
            # Example: global_node_id = 30 is not present in the source
            #          -> strict transductive mode records it as missing and then raises an error
            #          -> non-transductive mode fills the features with fill_value and continues enriching the other nodes
            if is_transductive_setting(node_space_setting):
                missing_global_node_ids.append(
                    int(global_node_id)
                )  # record missing node for error message
            else:
                enriched_rows.append(
                    fill_features
                )  # fill missing node features with fill_value and
            continue

        # Match the global node IDs in the target to the corresponding feature indices in the source
        # Example: source_global_node_ids = [10, 20, 30], source_x has shape (3, num_features)
        #          target_global_node_ids = [10, 30]
        #          -> source_feature_idx_by_global_node_id = {10: 0, 20: 1, 30: 2}
        #          -> pick source_x rows 0 and 2 for the target
        enriched_rows.append(source_x[source_feature_idx])

    if len(missing_global_node_ids) > 0:
        raise ValueError(
            f"Missing node features for target global_node_ids: {missing_global_node_ids}."
        )

    enriched_x = torch.stack(enriched_rows, dim=0).to(device=self.device)

    return self.__class__(
        x=enriched_x,
        hyperedge_index=self.hyperedge_index,
        hyperedge_weights=self.hyperedge_weights,
        hyperedge_attr=self.hyperedge_attr,
        num_nodes=self.num_nodes,
        num_hyperedges=self.num_hyperedges,
        global_node_ids=self.global_node_ids,
        y=self.y,
    )

enrich_hyperedge_weights(enricher, enrichment_mode=None)

Enrich hyperedge weights using the provided hyperedge weight enricher. Args: enricher: An instance of HyperedgeEnricher to generate hyperedge weights from hypergraph topology. enrichment_mode: How to combine generated weights with existing hdata.hyperedge_weights. concatenate appends new weights to the existing 1D tensor. replace substitutes hdata.hyperedge_weights entirely.

Source code in hyperbench/types/hdata.py
def enrich_hyperedge_weights(
    self,
    enricher: HyperedgeEnricher,
    enrichment_mode: EnrichmentMode | None = None,
) -> "HData":
    """Enrich hyperedge weights using the provided hyperedge weight enricher.
    Args:
        enricher: An instance of HyperedgeEnricher to generate hyperedge weights from hypergraph topology.
        enrichment_mode: How to combine generated weights with existing ``hdata.hyperedge_weights``.
            ``concatenate`` appends new weights to the existing 1D tensor.
            ``replace`` substitutes ``hdata.hyperedge_weights`` entirely.
    """
    enriched_weights = enricher.enrich(self.hyperedge_index)

    match enrichment_mode:
        case "concatenate":
            hyperedge_weights = (
                torch.cat([self.hyperedge_weights, enriched_weights], dim=0)
                if self.hyperedge_weights is not None
                else enriched_weights
            )
        case _:
            hyperedge_weights = enriched_weights

    return self.__class__(
        x=self.x,
        hyperedge_index=self.hyperedge_index,
        hyperedge_weights=hyperedge_weights,
        hyperedge_attr=self.hyperedge_attr,
        num_nodes=self.num_nodes,
        num_hyperedges=self.num_hyperedges,
        global_node_ids=self.global_node_ids,
        y=self.y,
    )

enrich_hyperedge_attr(enricher, enrichment_mode=None)

Enrich hyperedge features using the provided hyperedge feature enricher.

Parameters:

Name Type Description Default
enricher HyperedgeEnricher

An instance of HyperedgeEnricher to generate structural hyperedge features from hypergraph topology.

required
enrichment_mode EnrichmentMode | None

How to combine generated features with existing hdata.hyperedge_attr. concatenate appends new features as additional columns. replace substitutes hdata.hyperedge_attr entirely.

None
Source code in hyperbench/types/hdata.py
def enrich_hyperedge_attr(
    self,
    enricher: HyperedgeEnricher,
    enrichment_mode: EnrichmentMode | None = None,
) -> "HData":
    """
    Enrich hyperedge features using the provided hyperedge feature enricher.

    Args:
        enricher: An instance of HyperedgeEnricher to generate structural hyperedge features from hypergraph topology.
        enrichment_mode: How to combine generated features with existing ``hdata.hyperedge_attr``.
            ``concatenate`` appends new features as additional columns.
            ``replace`` substitutes ``hdata.hyperedge_attr`` entirely.
    """
    enriched_features = enricher.enrich(self.hyperedge_index)

    match enrichment_mode:
        case "concatenate":
            hyperedge_attr = (
                torch.cat([self.hyperedge_attr, enriched_features], dim=1)
                if self.hyperedge_attr is not None
                else enriched_features
            )
        case _:
            hyperedge_attr = enriched_features

    return self.__class__(
        x=self.x,
        hyperedge_index=self.hyperedge_index,
        hyperedge_weights=self.hyperedge_weights,
        hyperedge_attr=hyperedge_attr,
        num_nodes=self.num_nodes,
        num_hyperedges=self.num_hyperedges,
        global_node_ids=self.global_node_ids,
        y=self.y,
    )

get_device_if_all_consistent()

Check that all tensors are on the same device and return that device. If there are no tensors or if they are on different devices, return CPU.

Returns:

Type Description
device

The common device if all tensors are on the same device, otherwise CPU.

Raises:

Type Description
ValueError

If tensors are on different devices.

Source code in hyperbench/types/hdata.py
def get_device_if_all_consistent(self) -> torch.device:
    """
    Check that all tensors are on the same device and return that device.
    If there are no tensors or if they are on different devices, return CPU.

    Returns:
        The common device if all tensors are on the same device, otherwise CPU.

    Raises:
        ValueError: If tensors are on different devices.
    """
    devices = {self.x.device, self.hyperedge_index.device, self.y.device}
    if self.global_node_ids is not None:
        devices.add(self.global_node_ids.device)
    if self.hyperedge_attr is not None:
        devices.add(self.hyperedge_attr.device)
    if self.hyperedge_weights is not None:
        devices.add(self.hyperedge_weights.device)
    if len(devices) > 1:
        raise ValueError(f"Inconsistent device placement: {devices}")

    return devices.pop() if len(devices) == 1 else torch.device("cpu")

shuffle(seed=None)

Return a new :class:HData instance with hyperedge IDs randomly reassigned.

Each hyperedge keeps its original set of nodes, but is assigned a new ID via a random permutation. y and hyperedge_attr are reordered to match, so that y[new_id] still corresponds to the correct hyperedge. Same for hyperedge_attr[new_id] if hyperedge attributes are present.

Examples:

>>> hyperedge_index = torch.tensor([[0, 1, 2, 3], [0, 0, 1, 1]])
>>> y  = torch.tensor([1, 0])
>>> hdata = HData(x=x, hyperedge_index=hyperedge_index, y=y)
>>> shuffled_hdata = hdata.shuffle(seed=42)
>>> shuffled_hdata.hyperedge_index  # hyperedges may be reassigned
... # e.g.,
...     [[0, 1, 2, 3],
...      [1, 1, 0, 0]]
>>> shuffled_hdata.y  # labels are permuted to match new hyperedge IDs, e.g., [0, 1]

Parameters:

Name Type Description Default
seed int | None

Optional random seed for reproducibility. If None, the shuffle will be non-deterministic.

None

Returns:

Type Description
HData

A new :class:HData instance with hyperedge IDs, y, and hyperedge_attr permuted.

Source code in hyperbench/types/hdata.py
def shuffle(self, seed: int | None = None) -> "HData":
    """
    Return a new :class:`HData` instance with hyperedge IDs randomly reassigned.

    Each hyperedge keeps its original set of nodes, but is assigned a new ID via a random permutation.
    ``y`` and ``hyperedge_attr`` are reordered to match, so that ``y[new_id]`` still corresponds to the correct hyperedge.
    Same for ``hyperedge_attr[new_id]`` if hyperedge attributes are present.

    Examples:
        >>> hyperedge_index = torch.tensor([[0, 1, 2, 3], [0, 0, 1, 1]])
        >>> y  = torch.tensor([1, 0])
        >>> hdata = HData(x=x, hyperedge_index=hyperedge_index, y=y)
        >>> shuffled_hdata = hdata.shuffle(seed=42)
        >>> shuffled_hdata.hyperedge_index  # hyperedges may be reassigned
        ... # e.g.,
        ...     [[0, 1, 2, 3],
        ...      [1, 1, 0, 0]]
        >>> shuffled_hdata.y  # labels are permuted to match new hyperedge IDs, e.g., [0, 1]

    Args:
        seed: Optional random seed for reproducibility. If ``None``, the shuffle will be non-deterministic.

    Returns:
        A new :class:`HData` instance with hyperedge IDs, ``y``, and ``hyperedge_attr`` permuted.
    """
    generator = torch.Generator(device=self.device)
    if seed is not None:
        generator.manual_seed(seed)

    permutation = torch.randperm(self.num_hyperedges, generator=generator, device=self.device)

    # permutation[new_id] = old_id, so y[permutation] puts old labels into new slots
    # inverse_permutation[old_id] = new_id, used to remap hyperedge IDs in incidences
    # Example: permutation = [1, 2, 0] means new_id 0 gets old_id 1, new_id 1 gets old_id 2, new_id 2 gets old_id 0
    #          -> inverse_permutation = [2, 0, 1] means old_id 0 gets new_id 2, old_id 1 gets new_id 0, old_id 2 gets new_id 1
    inverse_permutation = torch.empty_like(permutation)
    inverse_permutation[permutation] = torch.arange(self.num_hyperedges, device=self.device)

    new_hyperedge_index = self.hyperedge_index.clone()

    # Example: hyperedge_index = [[0, 1, 2, 3, 4],
    #                             [0, 0, 1, 1, 2]],
    #          inverse_permutation = [2, 0, 1] (new_id 0 -> old_id 2, new_id 1 -> old_id 0, new_id 2 -> old_id 1)
    #          -> new_hyperedge_index = [[0, 1, 2, 3, 4],
    #                                    [2, 2, 0, 0, 1]]
    old_hyperedge_ids = self.hyperedge_index[1]
    new_hyperedge_index[1] = inverse_permutation[old_hyperedge_ids]

    # Example: hyperedge_attr = [attr_0, attr_1, attr_2], permutation = [1, 2, 0]
    #          -> new_hyperedge_attr = [attr_1  (attr of old_id 1), attr_2 (attr of old_id 2), attr_0 (attr of old_id 0)]
    new_hyperedge_attr = (
        self.hyperedge_attr[permutation] if self.hyperedge_attr is not None else None
    )

    new_hyperedge_weights = (
        self.hyperedge_weights[permutation] if self.hyperedge_weights is not None else None
    )

    # Example: y = [1, 1, 0], permutation = [1, 2, 0]
    #          -> new_y = [y[1], y[2], y[0]] = [1, 0, 1]
    new_y = self.y[permutation]

    return self.__class__(
        x=self.x,
        hyperedge_index=new_hyperedge_index,
        hyperedge_weights=new_hyperedge_weights,
        hyperedge_attr=new_hyperedge_attr,
        num_nodes=self.num_nodes,
        num_hyperedges=self.num_hyperedges,
        global_node_ids=self.global_node_ids,
        y=new_y,
    )

clone()

Return a deep copy of this :class:HData.

Returns:

Type Description
HData

A new :class:HData that is a deep copy of this instance.

Source code in hyperbench/types/hdata.py
def clone(self) -> "HData":
    """
    Return a deep copy of this :class:`HData`.

    Returns:
        A new :class:`HData` that is a deep copy of this instance.
    """
    cloned_hyperedge_weights = (
        self.hyperedge_weights.clone() if self.hyperedge_weights is not None else None
    )
    cloned_hyperedge_attr = (
        self.hyperedge_attr.clone() if self.hyperedge_attr is not None else None
    )
    cloned_global_node_ids = (
        self.global_node_ids.clone() if self.global_node_ids is not None else None
    )
    return self.__class__(
        x=self.x.clone(),
        hyperedge_index=self.hyperedge_index.clone(),
        hyperedge_weights=cloned_hyperedge_weights,
        hyperedge_attr=cloned_hyperedge_attr,
        num_nodes=self.num_nodes,
        num_hyperedges=self.num_hyperedges,
        global_node_ids=cloned_global_node_ids,
        y=self.y.clone(),
    )

to(device, non_blocking=False)

Move all tensors to the specified device.

Parameters:

Name Type Description Default
device device | str

The target device (e.g., 'cpu', 'cuda:0').

required
non_blocking bool

If True and the source and destination devices are both CUDA, the copy will be non-blocking.

False

Returns:

Name Type Description
The HData

class:HData instance with all tensors moved to the specified device.

Source code in hyperbench/types/hdata.py
def to(self, device: torch.device | str, non_blocking: bool = False) -> "HData":
    """
    Move all tensors to the specified device.

    Args:
        device: The target device (e.g., 'cpu', 'cuda:0').
        non_blocking: If ``True`` and the source and destination devices are both CUDA, the copy will be non-blocking.

    Returns:
        The :class:`HData` instance with all tensors moved to the specified device.
    """
    self.x = self.x.to(device=device, non_blocking=non_blocking)
    self.hyperedge_index = self.hyperedge_index.to(device=device, non_blocking=non_blocking)
    self.y = self.y.to(device=device, non_blocking=non_blocking)

    if self.global_node_ids is not None:
        self.global_node_ids = self.global_node_ids.to(device=device, non_blocking=non_blocking)

    if self.hyperedge_attr is not None:
        self.hyperedge_attr = self.hyperedge_attr.to(device=device, non_blocking=non_blocking)

    if self.hyperedge_weights is not None:
        self.hyperedge_weights = self.hyperedge_weights.to(
            device=device, non_blocking=non_blocking
        )

    self.device = device if isinstance(device, torch.device) else torch.device(device)
    return self

with_y_to(value)

Return a copy of this instance with a y attribute set to the given value.

Parameters:

Name Type Description Default
value float

The value to set for all entries in the y attribute.

required

Returns:

Type Description
HData

A new :class:HData instance with the same attributes except for y, which is set to a tensor of the given value.

Source code in hyperbench/types/hdata.py
def with_y_to(self, value: float) -> "HData":
    """
    Return a copy of this instance with a y attribute set to the given value.

    Args:
        value: The value to set for all entries in the y attribute.

    Returns:
        A new :class:`HData` instance with the same attributes except for y, which is set to a tensor of the given value.
    """
    return self.__class__(
        x=self.x,
        hyperedge_index=self.hyperedge_index,
        hyperedge_weights=self.hyperedge_weights,
        hyperedge_attr=self.hyperedge_attr,
        num_nodes=self.num_nodes,
        num_hyperedges=self.num_hyperedges,
        global_node_ids=self.global_node_ids,
        y=torch.full((self.num_hyperedges,), value, dtype=torch.float, device=self.device),
    )

with_y_ones()

Return a copy of this instance with a y attribute of all ones.

Source code in hyperbench/types/hdata.py
def with_y_ones(self) -> "HData":
    """Return a copy of this instance with a y attribute of all ones."""
    return self.with_y_to(1.0)

with_y_zeros()

Return a copy of this instance with a y attribute of all zeros.

Source code in hyperbench/types/hdata.py
def with_y_zeros(self) -> "HData":
    """Return a copy of this instance with a y attribute of all zeros."""
    return self.with_y_to(0.0)

stats()

Compute statistics for the hypergraph data. The fields returned in the dictionary include: - shape_x: The shape of the node feature matrix x. - shape_hyperedge_weights: The shape of the hyperedge weights tensor, or None if hyperedge weights are not present. - shape_hyperedge_attr: The shape of the hyperedge attribute matrix, or None if hyperedge attributes are not present. - num_nodes: The number of nodes in the hypergraph. - num_hyperedges: The number of hyperedges in the hypergraph. - avg_degree_node_raw: The average degree of nodes, calculated as the mean number of hyperedges each node belongs to. - avg_degree_node: The floored node average degree. - avg_degree_hyperedge_raw: The average size of hyperedges, calculated as the mean number of nodes each hyperedge contains. - avg_degree_hyperedge: The floored hyperedge average size. - node_degree_max: The maximum degree of any node in the hypergraph. - hyperedge_degree_max: The maximum size of any hyperedge in the hypergraph. - node_degree_median: The median degree of nodes in the hypergraph. - hyperedge_degree_median: The median size of hyperedges in the hypergraph. - distribution_node_degree: A list where the value at index i represents the count of nodes with degree i. - distribution_hyperedge_size: A list where the value at index i represents the count of hyperedges with size i. - distribution_node_degree_hist: A dictionary where the keys are node degrees and the values are the count of nodes with that degree. - distribution_hyperedge_size_hist: A dictionary where the keys are hyperedge sizes and the values are the count of hyperedges with that size.

Returns:

Type Description
dict[str, Any]

A dictionary containing various statistics about the hypergraph.

Source code in hyperbench/types/hdata.py
def stats(self) -> dict[str, Any]:
    """
    Compute statistics for the hypergraph data.
    The fields returned in the dictionary include:
    - ``shape_x``: The shape of the node feature matrix ``x``.
    - ``shape_hyperedge_weights``: The shape of the hyperedge weights tensor, or ``None`` if hyperedge weights are not present.
    - ``shape_hyperedge_attr``: The shape of the hyperedge attribute matrix, or ``None`` if hyperedge attributes are not present.
    - ``num_nodes``: The number of nodes in the hypergraph.
    - ``num_hyperedges``: The number of hyperedges in the hypergraph.
    - ``avg_degree_node_raw``: The average degree of nodes, calculated as the mean number of hyperedges each node belongs to.
    - ``avg_degree_node``: The floored node average degree.
    - ``avg_degree_hyperedge_raw``: The average size of hyperedges, calculated as the mean number of nodes each hyperedge contains.
    - ``avg_degree_hyperedge``: The floored hyperedge average size.
    - ``node_degree_max``: The maximum degree of any node in the hypergraph.
    - ``hyperedge_degree_max``: The maximum size of any hyperedge in the hypergraph.
    - ``node_degree_median``: The median degree of nodes in the hypergraph.
    - ``hyperedge_degree_median``: The median size of hyperedges in the hypergraph.
    - ``distribution_node_degree``: A list where the value at index ``i`` represents the count of nodes with degree ``i``.
    - ``distribution_hyperedge_size``: A list where the value at index ``i`` represents the count of hyperedges with size ``i``.
    - ``distribution_node_degree_hist``: A dictionary where the keys are node degrees and the values are the count of nodes with that degree.
    - ``distribution_hyperedge_size_hist``: A dictionary where the keys are hyperedge sizes and the values are the count of hyperedges with that size.

    Returns:
        A dictionary containing various statistics about the hypergraph.
    """

    node_ids = self.hyperedge_index[0]
    hyperedge_ids = self.hyperedge_index[1]

    # Degree of each node = number of hyperedges it belongs to
    # Size of each hyperedge = number of nodes it contains
    if node_ids.numel() > 0:
        distribution_node_degree = torch.bincount(node_ids, minlength=self.num_nodes).float()
        distribution_hyperedge_size = torch.bincount(
            hyperedge_ids, minlength=self.num_hyperedges
        ).float()
    else:
        distribution_node_degree = torch.zeros(self.num_nodes, dtype=torch.float)
        distribution_hyperedge_size = torch.zeros(self.num_hyperedges, dtype=torch.float)

    num_nodes = self.num_nodes
    num_hyperedges = self.num_hyperedges

    if distribution_node_degree.numel() > 0:
        avg_degree_node_raw = distribution_node_degree.mean().item()
        avg_degree_node = int(avg_degree_node_raw)
        avg_degree_hyperedge_raw = distribution_hyperedge_size.mean().item()
        avg_degree_hyperedge = int(avg_degree_hyperedge_raw)
        node_degree_max = int(distribution_node_degree.max().item())
        hyperedge_degree_max = int(distribution_hyperedge_size.max().item())
        node_degree_median = int(distribution_node_degree.median().item())
        hyperedge_degree_median = int(distribution_hyperedge_size.median().item())
    else:
        avg_degree_node_raw = 0
        avg_degree_node = 0
        avg_degree_hyperedge_raw = 0
        avg_degree_hyperedge = 0
        node_degree_max = 0
        hyperedge_degree_max = 0
        node_degree_median = 0
        hyperedge_degree_median = 0

    # Histograms: index i holds count of nodes/hyperedges with degree/size i
    distribution_node_degree_hist = torch.bincount(distribution_node_degree.long())
    distribution_hyperedge_size_hist = torch.bincount(distribution_hyperedge_size.long())

    distribution_node_degree_hist = {
        i: int(count.item())
        for i, count in enumerate(distribution_node_degree_hist)
        if count.item() > 0
    }
    distribution_hyperedge_size_hist = {
        i: int(count.item())
        for i, count in enumerate(distribution_hyperedge_size_hist)
        if count.item() > 0
    }

    return {
        "shape_x": self.x.shape,
        "shape_hyperedge_weights": self.hyperedge_weights.shape
        if self.hyperedge_weights is not None
        else None,
        "shape_hyperedge_attr": self.hyperedge_attr.shape
        if self.hyperedge_attr is not None
        else None,
        "num_nodes": num_nodes,
        "num_hyperedges": num_hyperedges,
        "avg_degree_node_raw": avg_degree_node_raw,
        "avg_degree_node": avg_degree_node,
        "avg_degree_hyperedge_raw": avg_degree_hyperedge_raw,
        "avg_degree_hyperedge": avg_degree_hyperedge,
        "node_degree_max": node_degree_max,
        "hyperedge_degree_max": hyperedge_degree_max,
        "node_degree_median": node_degree_median,
        "hyperedge_degree_median": hyperedge_degree_median,
        "distribution_node_degree": distribution_node_degree.int().tolist(),
        "distribution_hyperedge_size": distribution_hyperedge_size.int().tolist(),
        "distribution_node_degree_hist": distribution_node_degree_hist,
        "distribution_hyperedge_size_hist": distribution_hyperedge_size_hist,
    }

ModelConfig

A class representing the configuration of a model for training.

Parameters:

Name Type Description Default
name str

The name of the model.

required
version str

The version of the model.

'default'
model LightningModule

a LightningModule instance.

required
is_trainable bool

Whether the model is trainable.

True
trainer Trainer | None

a Trainer instance.

None
train_dataloader DataLoader | None

Optional per-model train dataloader. When set, fit_all uses this instead of the shared train_dataloader argument.

None
val_dataloader DataLoader | None

Optional per-model validation dataloader. When set, fit_all uses this instead of the shared val_dataloader argument.

None
test_dataloader DataLoader | None

Optional per-model test dataloader. When set, test_all uses this instead of the shared dataloader argument.

None
Source code in hyperbench/types/model.py
class ModelConfig:
    """
    A class representing the configuration of a model for training.

    Args:
        name: The name of the model.
        version: The version of the model.
        model: a LightningModule instance.
        is_trainable: Whether the model is trainable.
        trainer: a Trainer instance.
        train_dataloader: Optional per-model train dataloader. When set, ``fit_all``
            uses this instead of the shared ``train_dataloader`` argument.
        val_dataloader: Optional per-model validation dataloader. When set, ``fit_all``
            uses this instead of the shared ``val_dataloader`` argument.
        test_dataloader: Optional per-model test dataloader. When set, ``test_all``
            uses this instead of the shared ``dataloader`` argument.
    """

    def __init__(
        self,
        name: str,
        model: L.LightningModule,
        version: str = "default",
        is_trainable: bool = True,
        trainer: L.Trainer | None = None,
        train_dataloader: DataLoader | None = None,
        val_dataloader: DataLoader | None = None,
        test_dataloader: DataLoader | None = None,
    ) -> None:
        self.name = name
        self.version = version
        self.model = model
        self.is_trainable = is_trainable
        self.trainer = trainer
        self.train_dataloader = train_dataloader
        self.val_dataloader = val_dataloader
        self.test_dataloader = test_dataloader

    def full_model_name(self) -> str:
        return f"{self.name}:{self.version}"