Documentation for the utils modules

scprint2.utils.sinkhorn

Classes:

Name Description
SinkhornDistance

SinkhornDistance

Bases: Module

SinkhornDistance Initialize the SinkhornDistance class

Parameters:
  • eps (float, default: 0.01 ) –

    Regularization parameter. Defaults to 1e-2.

  • max_iter (int, default: 100 ) –

    Maximum number of Sinkhorn iterations. Defaults to 100.

  • reduction (str, default: 'none' ) –

    Specifies the reduction to apply to the output. Defaults to "none".

Methods:

Name Description
M

Modified cost for logarithmic updates

ave

Barycenter subroutine, used by kinetic acceleration through extrapolation.

forward

forward Compute the Sinkhorn distance between two measures with cost matrix c

Source code in scprint2/utils/sinkhorn.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def __init__(self, eps: float = 1e-2, max_iter: int = 100, reduction: str = "none"):
    """
    SinkhornDistance Initialize the SinkhornDistance class

    Args:
        eps (float, optional): Regularization parameter. Defaults to 1e-2.
        max_iter (int, optional): Maximum number of Sinkhorn iterations. Defaults to 100.
        reduction (str, optional): Specifies the reduction to apply to the output. Defaults to "none".
    """
    super(SinkhornDistance, self).__init__()
    self.eps = eps
    self.max_iter = max_iter
    self.reduction = reduction

M

Modified cost for logarithmic updates

Source code in scprint2/utils/sinkhorn.py
 99
100
101
102
def M(self, C, u, v):
    "Modified cost for logarithmic updates"
    """$M_{ij} = (-c_{ij} + u_i + v_j) / epsilon$"""
    return (-C + u.unsqueeze(-1) + v.unsqueeze(1)) / self.eps

ave staticmethod

Barycenter subroutine, used by kinetic acceleration through extrapolation.

Source code in scprint2/utils/sinkhorn.py
104
105
106
107
@staticmethod
def ave(u, u1, tau):
    "Barycenter subroutine, used by kinetic acceleration through extrapolation."
    return tau * u + (1 - tau) * u1

forward

forward Compute the Sinkhorn distance between two measures with cost matrix c

Parameters:
  • c (Tensor) –

    The cost matrix between the two measures.

Returns:
  • Tensor

    torch.Tensor: The computed Sinkhorn distance.

Source code in scprint2/utils/sinkhorn.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def forward(self, c: torch.Tensor) -> torch.Tensor:
    """
    forward Compute the Sinkhorn distance between two measures with cost matrix c

    Args:
        c (torch.Tensor): The cost matrix between the two measures.

    Returns:
        torch.Tensor: The computed Sinkhorn distance.
    """
    C = -c
    x_points = C.shape[-2]
    batch_size = C.shape[0]

    # both marginals are fixed with equal weights
    mu = (
        torch.empty(
            batch_size,
            x_points,
            dtype=C.dtype,
            requires_grad=False,
            device=C.device,
        )
        .fill_(1.0 / x_points)
        .squeeze()
    )
    nu = (
        torch.empty(
            batch_size,
            x_points,
            dtype=C.dtype,
            requires_grad=False,
            device=C.device,
        )
        .fill_(1.0 / x_points)
        .squeeze()
    )
    u = torch.zeros_like(mu)
    v = torch.zeros_like(nu)

    # Stopping criterion
    thresh = 1e-12

    # Sinkhorn iterations
    for i in range(self.max_iter):
        if i % 2 == 0:
            u1 = u  # useful to check the update
            u = (
                self.eps
                * (torch.log(mu) - torch.logsumexp(self.M(C, u, v), dim=-1))
                + u
            )
            err = (u - u1).abs().sum(-1).mean()
        else:
            v = (
                self.eps
                * (
                    torch.log(nu)
                    - torch.logsumexp(self.M(C, u, v).transpose(-2, -1), dim=-1)
                )
                + v
            )
            v = v.detach().requires_grad_(False)
            v[v > 9 * 1e8] = 0.0
            v = v.detach().requires_grad_(True)

        if err.item() < thresh:
            break

    U, V = u, v
    # Transport plan pi = diag(a)*K*diag(b)
    pi = torch.exp(self.M(C, U, V))

    # Sinkhorn distance

    return pi, C, U, V

scprint2.utils.utils

Functions:

Name Description
add_points

parts of volcano plot

category_str2int

category_str2int converts a list of category strings to a list of category integers.

correlationMatrix

Make an interactive correlation matrix from an array using bokeh

createFoldersFor

will recursively create folders if needed until having all the folders required to save the file in this filepath

fileToList

loads an input file with a\n b\n.. into a list [a,b,..]

get_free_gpu

get_free_gpu finds the GPU with the most free memory using nvidia-smi.

get_git_commit

get_git_commit gets the current git commit hash.

heatmap

Make an interactive heatmap from a dataframe using bokeh

inf_loop

wrapper function for endless data loader.

isnotebook

check whether excuting in jupyter notebook.

listToFile

listToFile loads a list with [a,b,..] into an input file a\n b\n..

prepare_device

setup GPU device if available. get gpu device indices which are used for DataParallel

run_command

run_command runs a command in the shell and prints the output.

selector

Part of Volcano plot: A function to separate tfs from everything else

set_seed

set random seed.

subset_h5ad_by_format

Create new anndata object according to slot info specifications.

volcano

Make an interactive volcano plot from Differential Expression analysis tools outputs

add_points

parts of volcano plot

Source code in scprint2/utils/utils.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
def add_points(p, df1, x, y, color="blue", alpha=0.2, outline=False, maxvalue=100):
    """parts of volcano plot"""
    # Define colors in a dictionary to access them with
    # the key from the pandas groupby funciton.
    df = df1.copy()
    transformed_q = -df[y].apply(np.log10).values
    transformed_q[transformed_q == np.inf] = maxvalue
    transformed_q[transformed_q > maxvalue] = maxvalue
    df["transformed_q"] = transformed_q
    df["color"] = color
    df["alpha"] = alpha
    df["size"] = 7
    source1 = ColumnDataSource(df)

    # Specify data source
    p.scatter(
        x=x,
        y="transformed_q",
        size="size",
        alpha="alpha",
        source=source1,
        color="color",
        name="circles",
    )
    if outline:
        p.scatter(
            x=x,
            y="transformed_q",
            size=7,
            alpha=1,
            source=source1,
            color="black",
            fill_color=None,
            name="outlines",
        )

    # prettify
    p.background_fill_color = "#DFDFE5"
    p.background_fill_alpha = 0.5
    return p, source1

category_str2int

category_str2int converts a list of category strings to a list of category integers.

Parameters:
  • category_strs (List[str]) –

    A list of category strings to be converted.

Returns:
  • List[int]

    List[int]: A list of integers corresponding to the input category strings.

Source code in scprint2/utils/utils.py
105
106
107
108
109
110
111
112
113
114
115
116
117
def category_str2int(category_strs: List[str]) -> List[int]:
    """
    category_str2int converts a list of category strings to a list of category integers.

    Args:
        category_strs (List[str]): A list of category strings to be converted.

    Returns:
        List[int]: A list of integers corresponding to the input category strings.
    """
    set_category_strs = set(category_strs)
    name2id = {name: i for i, name in enumerate(set_category_strs)}
    return [name2id[name] for name in category_strs]

correlationMatrix

Make an interactive correlation matrix from an array using bokeh


data: arrayLike of int / float/ bool of size(namesval) or (namesnames) names: list[str] of names for each rows colors: list[int] of size(names) a color for each names (good to display clusters) pvals: arraylike of int / float/ bool of size(namesval) or (namesnames) with the corresponding pvalues maxokpval: float threshold when pvalue is considered good. otherwise lowers the size of the square until 10-3 when it disappears other: arrayLike of int / float/ bool of size(namesval) or (namesnames), an additional information matrix that you want ot display with opacity whereas correlations willl be displayed with title: str the plot title dataIsCorr: bool if not true, we will compute the corrcoef of the data array invert: bool whether or not to invert the matrix before running corrcoef size: int the plot size folder: str of folder location where to save the plot, won't save if empty interactive: bool whether or not to make the plot interactive (else will use matplotlib) maxval: float clamping coloring up to maxval minval: float clamping coloring down to minval


the bokeh object if interactive else None

Source code in scprint2/utils/utils.py
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
def correlationMatrix(
    data,
    names,
    colors=None,
    pvals=None,
    maxokpval=10**-9,
    other=None,
    title="correlation Matrix",
    dataIsCorr=False,
    invert=False,
    size=40,
    folder="",
    interactive=False,
    maxval=None,
    minval=None,
):
    """
    Make an interactive correlation matrix from an array using bokeh

    Args:
    -----
      data: arrayLike of int / float/ bool of size(names*val) or (names*names)
      names: list[str] of names for each rows
      colors: list[int] of size(names) a color for each names (good to display clusters)
      pvals: arraylike of int / float/ bool of size(names*val) or (names*names) with the corresponding pvalues
      maxokpval: float threshold when pvalue is considered good. otherwise lowers the size of the square
        until 10**-3 when it disappears
      other: arrayLike of int / float/ bool of size(names*val) or (names*names), an additional information
        matrix that you want ot display with opacity whereas correlations willl be displayed with
      title: str the plot title
      dataIsCorr: bool if not true, we will compute the corrcoef of the data array
      invert: bool whether or not to invert the matrix before running corrcoef
      size: int the plot size
      folder: str of folder location where to save the plot, won't save if empty
      interactive: bool whether or not to make the plot interactive (else will use matplotlib)
      maxval: float clamping coloring up to maxval
      minval: float clamping coloring down to minval

    Returns:
    -------
      the bokeh object if interactive else None

    """
    if not dataIsCorr:
        print("computing correlations")
        data = np.corrcoef(np.array(data) if not invert else np.array(data).T)
    else:
        data = np.array(data)
    regdata = data.copy()
    if minval is not None:
        data[data < minval] = minval
    if maxval is not None:
        data[data > maxval] = maxval
    data = data / data.max()
    TOOLS = (
        "hover,crosshair,pan,wheel_zoom,zoom_in,zoom_out,box_zoom,undo,redo,reset,save"
    )
    xname = []
    yname = []
    color = []
    alpha = []
    height = []
    width = []
    if type(colors) is list:
        print("we are assuming you want to display clusters with colors")
    elif other is not None:
        print(
            "we are assuming you want to display the other of your correlation with opacity"
        )
    if pvals is not None:
        print(
            "we are assuming you want to display the pvals of your correlation with size"
        )
        regpvals = pvals.copy()
        u = pvals < maxokpval
        pvals[~u] = np.log10(1 / pvals[~u])
        pvals = pvals / pvals.max()
        pvals[u] = 1
    if interactive:
        xname = []
        yname = []
        color = []
        for i, name1 in enumerate(names):
            for j, name2 in enumerate(names):
                xname.append(name1)
                yname.append(name2)
                if pvals is not None:
                    height.append(max(0.1, min(0.9, pvals[i, j])))
                    color.append(cc.coolwarm[int((data[i, j] * 127) + 127)])
                    alpha.append(min(abs(data[i, j]), 0.9))
                elif other is not None:
                    color.append(cc.coolwarm[int((data[i, j] * 127) + 127)])
                    alpha.append(
                        max(min(other[i, j], 0.9), 0.1) if other[i, j] != 0 else 0
                    )
                else:
                    alpha.append(min(abs(data[i, j]), 0.9))
                if colors is not None:
                    if type(colors) is list:
                        if colors[i] == colors[j]:
                            color.append(Category10[10][colors[i]])
                        else:
                            color.append("lightgrey")

                elif pvals is None and other is None:
                    color.append("grey" if data[i, j] > 0 else Category20[3][2])
        print(regdata.max())
        if pvals is not None:
            width = height.copy()
            data = dict(
                xname=xname,
                yname=yname,
                colors=color,
                alphas=alpha,
                data=regdata.ravel(),
                pvals=regpvals.ravel(),
                width=width,
                height=height,
            )
        else:
            data = dict(
                xname=xname, yname=yname, colors=color, alphas=alpha, data=data.ravel()
            )
        tt = [("names", "@yname, @xname"), ("value", "@data")]
        if pvals is not None:
            tt.append(("pvals", "@pvals"))
        p = figure(
            title=title if title is not None else "Correlation Matrix",
            x_axis_location="above",
            tools=TOOLS,
            x_range=list(reversed(names)),
            y_range=names,
            tooltips=tt,
        )

        p.width = 800
        p.height = 800
        p.grid.grid_line_color = None
        p.axis.axis_line_color = None
        p.axis.major_tick_line_color = None
        p.axis.major_label_text_font_size = "5pt"
        p.axis.major_label_standoff = 0
        p.xaxis.major_label_orientation = np.pi / 3
        p.output_backend = "svg"
        p.rect(
            "xname",
            "yname",
            width=0.9 if not width else "width",
            height=0.9 if not height else "height",
            source=data,
            color="colors",
            alpha="alphas",
            line_color=None,
            hover_line_color="black",
            hover_color="colors",
        )
        save(p, folder + title.replace(" ", "_") + "_correlation.html")
        try:
            p.output_backend = "svg"
            export_svg(
                p, filename=folder + title.replace(" ", "_") + "_correlation.svg"
            )
        except (RuntimeError, Exception) as e:
            print(f"Could not save SVG: {e}")
        try:
            show(p)
        except Exception as e:
            print(f"Could not show plot: {e}")
        return p  # show the plot
    else:
        plt.figure(figsize=(size, 200))
        plt.title("the correlation matrix")
        plt.imshow(data)
        plt.savefig(title + "_correlation.pdf")
        plt.show()

createFoldersFor

will recursively create folders if needed until having all the folders required to save the file in this filepath

Source code in scprint2/utils/utils.py
 94
 95
 96
 97
 98
 99
100
101
102
def createFoldersFor(filepath):
    """
    will recursively create folders if needed until having all the folders required to save the file in this filepath
    """
    prevval = ""
    for val in os.path.expanduser(filepath).split("/")[:-1]:
        prevval += val + "/"
        if not os.path.exists(prevval):
            os.mkdir(prevval)

fileToList

loads an input file with a\n b\n.. into a list [a,b,..]

Parameters:
  • filename (str) –

    The path to the file to be loaded.

  • strconv (callable, default: lambda x: x ) –

    Function to convert each line. Defaults to identity function.

Returns:
  • list( list ) –

    The list of converted values from the file.

Source code in scprint2/utils/utils.py
49
50
51
52
53
54
55
56
57
58
59
60
61
def fileToList(filename: str, strconv: callable = lambda x: x) -> list:
    """
    loads an input file with a\\n b\\n.. into a list [a,b,..]

    Args:
        filename (str): The path to the file to be loaded.
        strconv (callable): Function to convert each line. Defaults to identity function.

    Returns:
        list: The list of converted values from the file.
    """
    with open(filename) as f:
        return [strconv(val[:-1]) for val in f.readlines()]

get_free_gpu

get_free_gpu finds the GPU with the most free memory using nvidia-smi.

Returns:
  • int( int ) –

    The index of the GPU with the most free memory.

Source code in scprint2/utils/utils.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def get_free_gpu() -> int:
    """
    get_free_gpu finds the GPU with the most free memory using nvidia-smi.

    Returns:
        int: The index of the GPU with the most free memory.
    """
    import subprocess
    import sys
    from io import StringIO

    gpu_stats = subprocess.check_output(
        [
            "nvidia-smi",
            "--format=csv",
            "--query-gpu=memory.used,memory.free",
        ]
    ).decode("utf-8")
    gpu_df = pd.read_csv(
        StringIO(gpu_stats), names=["memory.used", "memory.free"], skiprows=1
    )
    print("GPU usage:\n{}".format(gpu_df))
    gpu_df["memory.free"] = gpu_df["memory.free"].map(lambda x: int(x.rstrip(" [MiB]")))
    idx = gpu_df["memory.free"].idxmax()
    print(
        "Find free GPU{} with {} free MiB".format(idx, gpu_df.iloc[idx]["memory.free"])
    )

    return idx

get_git_commit

get_git_commit gets the current git commit hash.

Returns:
  • str( str ) –

    The current git commit

Source code in scprint2/utils/utils.py
165
166
167
168
169
170
171
172
def get_git_commit() -> str:
    """
    get_git_commit gets the current git commit hash.

    Returns:
        str: The current git commit
    """
    return subprocess.check_output(["git", "rev-parse", "HEAD"]).decode("utf-8").strip()

heatmap

Make an interactive heatmap from a dataframe using bokeh


data: dataframe of int / float/ bool of size(names1names2) colors: list[int] of size(names) a color for each names (good to display clusters) pvals: arraylike of int / float/ bool of size(namesval) or (namesnames) with the corresponding pvalues maxokpval: float threshold when pvalue is considered good. otherwise lowers the size of the square until 10*-3 when it disappears title: str the plot title size: int the plot size folder: str of folder location where to save the plot, won't save if empty interactive: bool whether or not to make the plot interactive (else will use matplotlib) maxval: float clamping coloring up to maxval minval: float clamping coloring down to minval


the bokeh object if interactive else None

Source code in scprint2/utils/utils.py
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
def heatmap(
    data,
    colors=None,
    title="correlation Matrix",
    size=40,
    other=None,
    folder="",
    interactive=False,
    pvals=None,
    maxokpval=10**-9,
    maxval=None,
    minval=None,
):
    """
    Make an interactive heatmap from a dataframe using bokeh

    Args:
    -----
      data: dataframe of int / float/ bool of size(names1*names2)
      colors: list[int] of size(names) a color for each names (good to display clusters)
      pvals: arraylike of int / float/ bool of size(names*val) or (names*names) with the corresponding pvalues
      maxokpval: float threshold when pvalue is considered good. otherwise lowers the size of the square
        until 10**-3 when it disappears
      title: str the plot title
      size: int the plot size
      folder: str of folder location where to save the plot, won't save if empty
      interactive: bool whether or not to make the plot interactive (else will use matplotlib)
      maxval: float clamping coloring up to maxval
      minval: float clamping coloring down to minval

    Returns:
    -------
      the bokeh object if interactive else None

    """
    regdata = data.copy()
    if minval is not None:
        data[data < minval] = minval
    if maxval is not None:
        data[data > maxval] = maxval
    data = data / data.max()
    data = data.values
    TOOLS = (
        "hover,crosshair,pan,wheel_zoom,zoom_in,zoom_out,box_zoom,undo,redo,reset,save"
    )
    xname = []
    yname = []
    color = []
    alpha = []
    height = []
    width = []
    if pvals is not None:
        print(
            "we are assuming you want to display the pvals of your correlation with size"
        )
        regpvals = pvals.copy()
        u = pvals < maxokpval
        pvals[~u] = np.log10(1 / pvals[~u])
        pvals = pvals / pvals.max()
        pvals[u] = 1
    if interactive:
        xname = []
        yname = []
        color = []
        for i, name1 in enumerate(regdata.index):
            for j, name2 in enumerate(regdata.columns):
                xname.append(name2)
                yname.append(name1)
                if pvals is not None:
                    # import pdb;pdb.set_trace()
                    height.append(max(0.1, min(0.9, pvals.loc[name1][name2])))
                    color.append(cc.coolwarm[int((data[i, j] * 128) + 127)])
                    alpha.append(min(abs(data[i, j]), 0.9))
                elif other is not None:
                    color.append(cc.coolwarm[int((data[i, j] * 128) + 127)])
                    alpha.append(
                        max(min(other[i, j], 0.9), 0.1) if other[i, j] != 0 else 0
                    )
                else:
                    alpha.append(min(abs(data[i, j]), 0.9))
                if colors is not None:
                    if type(colors) is list:
                        if colors[i] == colors[j]:
                            color.append(Category10[10][colors[i]])
                        else:
                            color.append("lightgrey")

                elif pvals is None and other is None:
                    color.append("grey" if data[i, j] > 0 else Category20[3][2])
        if pvals is not None:
            width = height.copy()
            data = dict(
                xname=xname,
                yname=yname,
                colors=color,
                alphas=alpha,
                data=regdata.values.ravel(),
                pvals=regpvals.values.ravel(),
                width=width,
                height=height,
            )
        else:
            data = dict(
                xname=xname, yname=yname, colors=color, alphas=alpha, data=data.ravel()
            )
        tt = [("names", "@yname, @xname"), ("value", "@data")]
        if pvals is not None:
            tt.append(("pvals", "@pvals"))
        p = figure(
            title=title if title is not None else "Heatmap",
            x_axis_location="above",
            tools=TOOLS,
            x_range=list(reversed(regdata.columns.astype(str).tolist())),
            y_range=regdata.index.tolist(),
            tooltips=tt,
        )

        p.width = 800
        p.height = 800
        p.grid.grid_line_color = None
        p.axis.axis_line_color = None
        p.axis.major_tick_line_color = None
        p.axis.major_label_text_font_size = "5pt"
        p.axis.major_label_standoff = 0
        p.xaxis.major_label_orientation = np.pi / 3
        p.output_backend = "svg"
        p.rect(
            "xname",
            "yname",
            width=0.9 if not width else "width",
            height=0.9 if not height else "height",
            source=data,
            color="colors",
            alpha="alphas",
            line_color=None,
            hover_line_color="black",
            hover_color="colors",
        )
        save(p, folder + title.replace(" ", "_") + "_heatmap.html")
        try:
            p.output_backend = "svg"
            export_svg(
                p, filename=folder + title.replace(" ", "_") + "_correlation.svg"
            )
        except (RuntimeError, Exception) as e:
            print(f"Could not save SVG: {e}")
        try:
            show(p)
        except Exception as e:
            print(f"Could not show plot: {e}")
        return p  # show the plot
    else:
        plt.figure(figsize=size)
        plt.title("the correlation matrix")
        plt.imshow(data)
        plt.savefig(title + "_correlation.pdf")
        plt.show()

inf_loop

wrapper function for endless data loader.

Source code in scprint2/utils/utils.py
193
194
195
196
def inf_loop(data_loader):
    """wrapper function for endless data loader."""
    for loader in repeat(data_loader):
        yield from loader

isnotebook

check whether excuting in jupyter notebook.

Source code in scprint2/utils/utils.py
120
121
122
123
124
125
126
127
128
129
130
131
def isnotebook() -> bool:
    """check whether excuting in jupyter notebook."""
    try:
        shell = get_ipython().__class__.__name__
        if shell == "ZMQInteractiveShell":
            return True  # Jupyter notebook or qtconsole
        elif shell == "TerminalInteractiveShell":
            return True  # Terminal running IPython
        else:
            return False  # Other type (?)
    except NameError:
        return False  # Probably standard Python interpreter

listToFile

listToFile loads a list with [a,b,..] into an input file a\n b\n..

Parameters:
  • li (list) –

    The list of elements to be written to the file.

  • filename (str) –

    The name of the file where the list will be written.

  • strconv (callable, default: lambda x: str(x) ) –

    A function to convert each element of the list to a string. Defaults to str.

Returns:
  • None

    None

Source code in scprint2/utils/utils.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def listToFile(
    li: List[str], filename: str, strconv: callable = lambda x: str(x)
) -> None:
    """
    listToFile loads a list with [a,b,..] into an input file a\\n b\\n..

    Args:
        li (list): The list of elements to be written to the file.
        filename (str): The name of the file where the list will be written.
        strconv (callable, optional): A function to convert each element of the list to a string. Defaults to str.

    Returns:
        None
    """
    with open(filename, "w") as f:
        for item in li:
            f.write("%s\n" % strconv(item))

prepare_device

setup GPU device if available. get gpu device indices which are used for DataParallel

Source code in scprint2/utils/utils.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def prepare_device(n_gpu_use):
    """
    setup GPU device if available. get gpu device indices which are used for DataParallel
    """
    n_gpu = torch.cuda.device_count()
    if n_gpu_use > 0 and n_gpu == 0:
        print(
            "Warning: There's no GPU available on this machine,"
            "training will be performed on CPU."
        )
        n_gpu_use = 0
    if n_gpu_use > n_gpu:
        print(
            f"Warning: The number of GPU's configured to use is {n_gpu_use}, but only {n_gpu} are "
            "available on this machine."
        )
        n_gpu_use = n_gpu
    device = torch.device("cuda:0" if n_gpu_use > 0 else "cpu")
    list_ids = list(range(n_gpu_use))
    return device, list_ids

run_command

run_command runs a command in the shell and prints the output.

Parameters:
  • command (str) –

    The command to be executed in the shell.

Returns:
  • int( int ) –

    The return code of the command executed.

Source code in scprint2/utils/utils.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def run_command(command: str, **kwargs) -> int:
    """
    run_command runs a command in the shell and prints the output.

    Args:
        command (str): The command to be executed in the shell.

    Returns:
        int: The return code of the command executed.
    """
    process = subprocess.Popen(command, stdout=subprocess.PIPE, **kwargs)
    while True:
        if process.poll() is not None:
            break
        output = process.stdout.readline()
        if output:
            print(output.strip())
    rc = process.poll()
    return rc

selector

Part of Volcano plot: A function to separate tfs from everything else

Source code in scprint2/utils/utils.py
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
def selector(
    df,
    valtoextract=[],
    logfoldtohighlight=0.15,
    pvaltohighlight=0.1,
    minlogfold=0.15,
    minpval=0.1,
):
    """Part of Volcano plot: A function to separate tfs from everything else"""
    toshow = (df.pvalue < minpval) & (abs(df.log2FoldChange) > minlogfold)
    df = df[toshow]
    sig = (df.pvalue < pvaltohighlight) & (abs(df.log2FoldChange) > logfoldtohighlight)
    if valtoextract:
        not_tf = ~df.gene_id.isin(valtoextract)
        is_tf = df.gene_id.isin(valtoextract)
        to_plot_not = df[~sig | not_tf]
        to_plot_yes = df[sig & is_tf]
    else:
        to_plot_not = df[~sig]
        to_plot_yes = df[sig]
    return to_plot_not, to_plot_yes

set_seed

set random seed.

Source code in scprint2/utils/utils.py
83
84
85
86
87
88
89
def set_seed(seed: int = 42):
    """set random seed."""
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

subset_h5ad_by_format

Create new anndata object according to slot info specifications.

Arguments: adata -- An AnnData object to subset (required) config -- A Viash config object as read by openproblems.project.read_viash_config (required) arg_name -- The name of the argument in the config file that specifies the output format (required) field_rename_dict -- A mapping between the slots of the source h5ad and the slots of the destination h5ad. Example of slot_mapping: ``` slot_mapping = { "layers": { "counts": par["layer_counts"], }, "obs": { "cell_type": par["obs_cell_type"], "batch": par["obs_batch"], } }

Source code in scprint2/utils/utils.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def subset_h5ad_by_format(adata, config, arg_name, field_rename_dict={}):
    """Create new anndata object according to slot info specifications.

    Arguments:
    adata -- An AnnData object to subset (required)
    config -- A Viash config object as read by openproblems.project.read_viash_config (required)
    arg_name -- The name of the argument in the config file that specifies the output format (required)
    field_rename_dict -- A mapping between the slots of the source h5ad and the slots of the destination h5ad.
      Example of slot_mapping:
        ```
        slot_mapping = {
          "layers": {
            "counts": par["layer_counts"],
          },
          "obs": {
            "cell_type": par["obs_cell_type"],
            "batch": par["obs_batch"],
          }
        }
    """
    import anndata as ad
    import pandas as pd

    assert isinstance(adata, ad.AnnData), "adata must be an AnnData object"
    assert isinstance(config, dict), "config must be a dictionary"

    # find argument
    arg = next(
        (x for x in config["all_arguments"] if x["clean_name"] == arg_name), None
    )
    assert arg, f"Argument '{arg_name}' not found in config"

    # find file format
    file_format = (arg.get("info") or {}).get("format")
    assert file_format, f"Argument '{arg_name}' has no .info.format"

    # find file format type
    file_format_type = file_format.get("type")
    assert file_format_type == "h5ad", "format must be a h5ad type"

    structs = ["layers", "obs", "var", "uns", "obsp", "obsm", "varp", "varm"]
    kwargs = {}

    for struct in structs:
        struct_format = file_format.get(struct, {})
        struct_rename = field_rename_dict.get(struct, {})

        # fetch data from adata
        data = {}
        for field_format in struct_format:
            dest_name = field_format["name"]
            # where to find the data. if the dest_name is in the rename dict, use the renamed name
            # as the source name, otherwise use the dest_name as the source name
            src_name = struct_rename.get(dest_name, dest_name)
            data[dest_name] = getattr(adata, struct)[src_name]

        if len(data) > 0:
            if struct in ["obs", "var"]:
                data = pd.concat(data, axis=1)
            kwargs[struct] = data
        elif struct in ["obs", "var"]:
            # if no columns need to be copied, we still need an 'obs' and a 'var'
            # to help determine the shape of the adata
            kwargs[struct] = getattr(adata, struct).iloc[:, []]

    return ad.AnnData(**kwargs)

volcano

Make an interactive volcano plot from Differential Expression analysis tools outputs


data: a df with rows genes and cols [log2FoldChange, pvalue, gene_id]
folder: str of location where to save the plot, won't save if empty
tohighlight: list[str] of genes to highlight in the plot
tooltips: list[tuples(str,str)] if user wants tot specify another bokeh tooltip
title: str plot title
xlabel: str if user wants to specify the title of the x axis
ylabel: str if user wants tot specify the title of the y axis
maxvalue: float the max -log2(pvalue authorized usefull when managing inf vals)
searchbox: bool whether or not to add a searchBox to interactively highlight genes
logfoldtohighlight: float min logfoldchange when to diplay points
pvaltohighlight: float min pvalue when to diplay points
showlabels: bool whether or not to show a text above each datapoint with its label information

The bokeh object
Source code in scprint2/utils/utils.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
def volcano(
    data,
    folder="",
    tohighlight=None,
    tooltips=[("gene", "@gene_id")],
    title="volcano plot",
    xlabel="log-fold change",
    ylabel="-log(Q)",
    maxvalue=100,
    searchbox=False,
    logfoldtohighlight=0.15,
    pvaltohighlight=0.1,
    showlabels=False,
):
    """
    Make an interactive volcano plot from Differential Expression analysis tools outputs

    Args:
    -----
        data: a df with rows genes and cols [log2FoldChange, pvalue, gene_id]
        folder: str of location where to save the plot, won't save if empty
        tohighlight: list[str] of genes to highlight in the plot
        tooltips: list[tuples(str,str)] if user wants tot specify another bokeh tooltip
        title: str plot title
        xlabel: str if user wants to specify the title of the x axis
        ylabel: str if user wants tot specify the title of the y axis
        maxvalue: float the max -log2(pvalue authorized usefull when managing inf vals)
        searchbox: bool whether or not to add a searchBox to interactively highlight genes
        logfoldtohighlight: float min logfoldchange when to diplay points
        pvaltohighlight: float min pvalue when to diplay points
        showlabels: bool whether or not to show a text above each datapoint with its label information

    Returns:
    --------
        The bokeh object
    """
    to_plot_not, to_plot_yes = selector(
        data,
        tohighlight if tohighlight is not None else [],
        logfoldtohighlight,
        pvaltohighlight,
    )
    hover = HoverTool(tooltips=tooltips, name="circles")

    # Create figure
    p = figure(title=title, width=650, height=450)

    p.xgrid.grid_line_color = "white"
    p.ygrid.grid_line_color = "white"
    p.xaxis.axis_label = xlabel
    p.yaxis.axis_label = ylabel

    # Add the hover tool
    p.add_tools(hover)
    p, source1 = add_points(
        p, to_plot_not, "log2FoldChange", "pvalue", color="#1a9641", maxvalue=maxvalue
    )
    p, source2 = add_points(
        p,
        to_plot_yes,
        "log2FoldChange",
        "pvalue",
        color="#fc8d59",
        alpha=0.6,
        outline=True,
        maxvalue=maxvalue,
    )
    if showlabels:
        labels = LabelSet(
            x="log2FoldChange",
            y="transformed_q",
            text_font_size="7pt",
            text="gene_id",
            level="glyph",
            x_offset=5,
            y_offset=5,
            source=source2,
            # renderers="canvas",
        )
        p.add_layout(labels)
    if searchbox:
        text = TextInput(title="text", value="gene")
        text.js_on_change(
            "value",
            CustomJS(
                args=dict(source=source1),
                code="""
                var data = source.data
                var value = cb_obj.value
                var gene_id = data.gene_id
                var a = -1
                for (let i=0; i < gene_id.length; i++) {
                    if ( gene_id[i]===value ) { a=i; console.log(i); data.size[i]=7; data.alpha[i]=1; data.color[i]='#fc8d59' }
                }
                source.data = data
                console.log(source)
                console.log(cb_obj)
                source.change.emit()
                console.log(source)
                """,
            ),
        )
        p = column(text, p)
    p.output_backend = "svg"
    if folder:
        save(p, folder + title.replace(" ", "_") + "_volcano.html")
        try:
            p.output_backend = "svg"
            export_svg(p, filename=folder + title.replace(" ", "_") + "_volcano.svg")
        except (RuntimeError, Exception) as e:
            print(f"Could not save SVG: {e}")
    try:
        show(p)
    except Exception as e:
        print(f"Could not show plot: {e}")
    return p

scprint2.utils.get_seq

Functions:

Name Description
load_fasta_species

Downloads and caches FASTA files for a given species from the Ensembl FTP server.

seq

Fetch nucleotide or amino acid sequence (FASTA) of a gene (and all its isoforms) or transcript by Ensembl, WormBase, or FlyBase ID.

subset_fasta

subset_fasta: creates a new fasta file with only the sequence which names contain one of gene_names

load_fasta_species

Downloads and caches FASTA files for a given species from the Ensembl FTP server.

Parameters:
  • species (str, default: 'homo_sapiens' ) –

    The species name for which to download FASTA files. Defaults to "homo_sapiens".

  • output_path (str, default: '/tmp/data/fasta/' ) –

    The local directory path where the FASTA files will be saved. Defaults to "/tmp/data/fasta/".

  • cache (bool, default: True ) –

    If True, use cached files if they exist. If False, re-download the files. Defaults to True.

Source code in scprint2/utils/get_seq.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def load_fasta_species(
    species: str = "homo_sapiens",
    output_path: str = "/tmp/data/fasta/",
    load: List[str] = ["pep", "ncrna", "cds"],
    cache: bool = True,
) -> None:
    """
    Downloads and caches FASTA files for a given species from the Ensembl FTP server.

    Args:
        species (str, optional): The species name for which to download FASTA files. Defaults to "homo_sapiens".
        output_path (str, optional): The local directory path where the FASTA files will be saved. Defaults to "/tmp/data/fasta/".
        cache (bool, optional): If True, use cached files if they exist. If False, re-download the files. Defaults to True.
    """
    ftp = ftplib.FTP("ftp.ensembl.org")
    ftp.login()
    local_file_path = []
    try:
        ftp.cwd("/pub/release-110/fasta/" + species + "/pep/")
        types = "animals"
    except ftplib.error_perm:
        try:
            ftp = ftplib.FTP("ftp.ensemblgenomes.ebi.ac.uk")
            ftp.login()
            ftp.cwd("/pub/plants/release-60/fasta/" + species + "/pep/")
            types = "plants"
        except ftplib.error_perm:
            try:
                ftp.cwd("/pub/metazoa/release-60/fasta/" + species + "/pep/")
                types = "metazoa"
            except ftplib.error_perm:
                raise ValueError(
                    f"Species {species} not found in Ensembl or Ensembl Genomes."
                )

    os.makedirs(output_path, exist_ok=True)
    if "pep" in load:
        file = list_files(ftp, ".all.fa.gz")[0]
        local_file_path.append(output_path + file)
        if not os.path.exists(local_file_path[-1]) or not cache:
            with open(local_file_path[-1], "wb") as local_file:
                ftp.retrbinary("RETR " + file, local_file.write)

    # ncRNA
    if "ncrna" in load:
        if types == "animals":
            ftp.cwd("/pub/release-110/fasta/" + species + "/ncrna/")
        elif types == "plants":
            ftp.cwd("/pub/plants/release-60/fasta/" + species + "/ncrna/")
        file = list_files(ftp, ".ncrna.fa.gz")[0]
        local_file_path.append(output_path + file)
        if not os.path.exists(local_file_path[-1]) or not cache:
            with open(local_file_path[-1], "wb") as local_file:
                ftp.retrbinary("RETR " + file, local_file.write)

    # CDNA:
    if "cdna" in load:
        if types == "animals":
            ftp.cwd("/pub/release-110/fasta/" + species + "/cdna/")
        elif types == "plants":
            ftp.cwd("/pub/plants/release-60/fasta/" + species + "/cdna/")
        file = list_files(ftp, ".cdna.all.fa.gz")[0]
        local_file_path.append(output_path + file)
        if not os.path.exists(local_file_path[-1]) or not cache:
            with open(local_file_path[-1], "wb") as local_file:
                ftp.retrbinary("RETR " + file, local_file.write)

    ftp.quit()
    return local_file_path

seq

Fetch nucleotide or amino acid sequence (FASTA) of a gene (and all its isoforms) or transcript by Ensembl, WormBase, or FlyBase ID.

Parameters:
  • ens_ids (Union[str, List[str]]) –

    One or more Ensembl IDs (passed as string or list of strings). Also supports WormBase and FlyBase IDs.

  • translate (bool, default: False ) –

    Defines whether nucleotide or amino acid sequences are returned. Defaults to False (returns nucleotide sequences). Nucleotide sequences are fetched from the Ensembl REST API server. Amino acid sequences are fetched from the UniProt REST API server.

  • isoforms (bool, default: False ) –

    If True, returns the sequences of all known transcripts. Defaults to False. (Only for gene IDs.)

  • parallel (bool, default: True ) –

    If True, fetches sequences in parallel. Defaults to True.

  • save (bool, default: False ) –

    If True, saves output FASTA to current directory. Defaults to False.

  • transcribe (bool, default: None ) –

    Deprecated. Use 'translate' instead.

  • seqtype (str, default: None ) –

    Deprecated. Use 'translate' instead.

  • verbose (bool, default: True ) –

    If True, prints progress information. Defaults to True.

Returns:
  • List[str]

    List[str]: A list containing the requested sequences, or a FASTA file if 'save' is True.

Raises:
  • ValueError

    If an invalid Ensembl ID is provided.

Source code in scprint2/utils/get_seq.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
def seq(
    ens_ids: Union[str, List[str]],
    translate: bool = False,
    isoforms: bool = False,
    parallel: bool = True,
    save: bool = False,
    transcribe: Optional[bool] = None,
    seqtype: Optional[str] = None,
    verbose: bool = True,
) -> List[str]:
    """
    Fetch nucleotide or amino acid sequence (FASTA) of a gene (and all its isoforms) or transcript by Ensembl, WormBase, or FlyBase ID.

    Args:
        ens_ids (Union[str, List[str]]): One or more Ensembl IDs (passed as string or list of strings).
                                         Also supports WormBase and FlyBase IDs.
        translate (bool, optional): Defines whether nucleotide or amino acid sequences are returned.
                                    Defaults to False (returns nucleotide sequences).
                                    Nucleotide sequences are fetched from the Ensembl REST API server.
                                    Amino acid sequences are fetched from the UniProt REST API server.
        isoforms (bool, optional): If True, returns the sequences of all known transcripts. Defaults to False.
                                   (Only for gene IDs.)
        parallel (bool, optional): If True, fetches sequences in parallel. Defaults to True.
        save (bool, optional): If True, saves output FASTA to current directory. Defaults to False.
        transcribe (bool, optional): Deprecated. Use 'translate' instead.
        seqtype (str, optional): Deprecated. Use 'translate' instead.
        verbose (bool, optional): If True, prints progress information. Defaults to True.

    Returns:
        List[str]: A list containing the requested sequences, or a FASTA file if 'save' is True.

    Raises:
        ValueError: If an invalid Ensembl ID is provided.
    """
    # Handle deprecated arguments
    if seqtype:
        logging.error(
            "'seqtype' argument deprecated! Please use True/False argument 'translate' instead."
        )
        return
    if transcribe:
        translate = transcribe

    ## Clean up arguments
    # Clean up Ensembl IDs
    # If single Ensembl ID passed as string, convert to list
    if type(ens_ids) is str:
        ens_ids = [ens_ids]
    # Remove Ensembl ID version if passed
    ens_ids_clean = []
    temp = 0
    for ensembl_ID in ens_ids:
        # But only for Ensembl ID (and not for flybase/wormbase IDs)
        if ensembl_ID.startswith("ENS"):
            ens_ids_clean.append(ensembl_ID.split(".")[0])

            if "." in ensembl_ID and temp == 0:
                if verbose:
                    logging.info(
                        "We noticed that you may have passed a version number with your Ensembl ID.\n"
                        "Please note that gget seq will return information linked to the latest Ensembl ID version."
                    )
                temp = +1

        else:
            ens_ids_clean.append(ensembl_ID)

    # Initiate empty 'fasta'
    fasta = []

    ## Fetch nucleotide sequece
    if translate is False:
        # Define Ensembl REST API server
        server = ENSEMBL_REST_API
        # Define type of returned content from REST
        content_type = "application/json"

        # Initiate dictionary to save results for all IDs in
        master_dict = {}

        # Query REST APIs from https://rest.ensembl.org/
        for ensembl_ID in ens_ids_clean:
            # Create dict to save query results
            results_dict = {ensembl_ID: {}}

            # If isoforms False, just fetch sequences of passed Ensembl ID
            if isoforms is False:
                # sequence/id/ query: Request sequence by stable identifier
                query = "sequence/id/" + ensembl_ID + "?"

                # Try if query valid
                try:
                    # Submit query; this will throw RuntimeError if ID not found
                    df_temp = rest_query(server, query, content_type)

                    # Delete superfluous entries
                    keys_to_delete = ["query", "id", "version", "molecule"]
                    for key in keys_to_delete:
                        # Pop keys, None -> do not raise an error if key to delete not found
                        df_temp.pop(key, None)

                    # Add results to main dict
                    results_dict[ensembl_ID].update({"seq": df_temp})

                    if verbose:
                        logging.info(
                            f"Requesting nucleotide sequence of {ensembl_ID} from Ensembl."
                        )

                except RuntimeError:
                    logging.error(
                        f"ID {ensembl_ID} not found. Please double-check spelling/arguments and try again."
                    )

            # If isoforms true, fetch sequences of isoforms instead
            if isoforms is True:
                # Get ID type (gene, transcript, ...) using gget info
                info_df = info(
                    ensembl_ID, verbose=False, pdb=False, ncbi=False, uniprot=False
                )

                # Check if Ensembl ID was found
                if isinstance(info_df, type(None)):
                    logging.warning(
                        f"ID '{ensembl_ID}' not found. Please double-check spelling/arguments and try again."
                    )
                    continue

                ens_ID_type = info_df.loc[ensembl_ID]["object_type"]

                # If the ID is a gene, get the IDs of all its transcripts
                if ens_ID_type == "Gene":
                    if verbose:
                        logging.info(
                            f"Requesting nucleotide sequences of all transcripts of {ensembl_ID} from Ensembl."
                        )

                    for transcipt_id in info_df.loc[ensembl_ID]["all_transcripts"]:
                        # Remove version number for Ensembl IDs (not for flybase/wormbase IDs)
                        if transcipt_id.startswith("ENS"):
                            transcipt_id = transcipt_id.split(".")[0]

                        # Try if query is valid
                        try:
                            # Define the REST query
                            query = "sequence/id/" + transcipt_id + "?"
                            # Submit query
                            df_temp = rest_query(server, query, content_type)

                            # Delete superfluous entries
                            keys_to_delete = ["query", "version", "molecule"]
                            for key in keys_to_delete:
                                # Pop keys, None -> do not raise an error if key to delete not found
                                df_temp.pop(key, None)

                            # Add results to main dict
                            results_dict[ensembl_ID].update(
                                {f"{transcipt_id}": df_temp}
                            )

                        except RuntimeError:
                            logging.error(
                                f"ID {transcipt_id} not found. "
                                "Please double-check spelling/arguments and try again."
                            )

                # If isoform true, but ID is not a gene; ignore the isoform parameter
                else:
                    # Try if query is valid
                    try:
                        # Define the REST query
                        query = "sequence/id/" + ensembl_ID + "?"

                        # Submit query
                        df_temp = rest_query(server, query, content_type)

                        # Delete superfluous entries
                        keys_to_delete = ["query", "id", "version", "molecule"]
                        for key in keys_to_delete:
                            # Pop keys, None -> do not raise an error if key to delete not found
                            df_temp.pop(key, None)

                        # Add results to main dict
                        results_dict[ensembl_ID].update({"seq": df_temp})

                        logging.info(
                            f"Requesting nucleotide sequence of {ensembl_ID} from Ensembl."
                        )
                        logging.warning("The isoform option only applies to gene IDs.")

                    except RuntimeError:
                        logging.error(
                            f"ID {ensembl_ID} not found. "
                            "Please double-check spelling/arguments and try again."
                        )

            # Add results to master dict
            master_dict.update(results_dict)

        # Build FASTA file
        for ens_ID in master_dict:
            for key in master_dict[ens_ID].keys():
                if key == "seq":
                    fasta.append(">" + ens_ID + " " + master_dict[ens_ID][key]["desc"])
                    fasta.append(master_dict[ens_ID][key]["seq"])
                else:
                    fasta.append(
                        ">"
                        + master_dict[ens_ID][key]["id"]
                        + " "
                        + master_dict[ens_ID][key]["desc"]
                    )
                    fasta.append(master_dict[ens_ID][key]["seq"])

    ## Fetch amino acid sequences from UniProt
    if translate is True:
        if isoforms is False:
            # List to collect transcript IDs
            trans_ids = []

            # Get ID type (gene, transcript, ...) using gget info
            info_df = info(
                ens_ids_clean, verbose=False, pdb=False, ncbi=False, uniprot=False
            )

            # Check that Ensembl ID was found
            missing = set(ens_ids_clean) - set(info_df.index.values)
            if len(missing) > 0:
                logging.warning(
                    f"{str(missing)} IDs not found. Please double-check spelling/arguments."
                )

            ens_ID_type = info_df.loc[ens_ids_clean[0]]["object_type"]

            # If the ID is a gene, use the ID of its canonical transcript
            if ens_ID_type == "Gene":
                # Get ID of canonical transcript
                for ensembl_ID in info_df.index.values:
                    can_trans = info_df.loc[ensembl_ID]["canonical_transcript"]

                    if ensembl_ID.startswith("ENS"):
                        # Remove Ensembl ID version from transcript IDs and append to transcript IDs list
                        temp_trans_id = can_trans.split(".")[0]
                        trans_ids.append(temp_trans_id)

                    elif ensembl_ID.startswith("WB"):
                        # Remove added "." at the end of transcript IDs
                        temp_trans_id = ".".join(can_trans.split(".")[:-1])
                        # # For WormBase transcript IDs, also remove the version number for submission to UniProt API
                        # temp_trans_id = ".".join(temp_trans_id1.split(".")[:-1])
                        trans_ids.append(temp_trans_id)

                    else:
                        # Remove added "." at the end of other transcript IDs
                        temp_trans_id = ".".join(can_trans.split(".")[:-1])
                        trans_ids.append(temp_trans_id)

                    if verbose:
                        logging.info(
                            f"Requesting amino acid sequence of the canonical transcript {temp_trans_id} of gene {ensembl_ID} from UniProt."
                        )

            # If the ID is a transcript, append the ID directly
            elif ens_ID_type == "Transcript":
                # # For WormBase transcript IDs, remove the version number for submission to UniProt API
                # if ensembl_ID.startswith("T"):
                #     trans_ids.append(".".join(ensembl_ID.split(".")[:-1]))
                # else:
                trans_ids = ens_ids_clean

                if verbose:
                    logging.info(
                        f"Requesting amino acid sequence of {trans_ids} from UniProt."
                    )

            else:
                logging.warning(
                    "ensembl_IDs not recognized as either a gene or transcript ID. It will not be included in the UniProt query."
                )

            # Fetch the amino acid sequences of the transcript Ensembl IDs
            df_uniprot = get_uniprot_seqs(UNIPROT_REST_API, trans_ids)
            # Add info_df.loc[ensembl_ID] to df_uniprot by joining on "canonical_transcript" / "gene_name" respectively
            info_df.set_index("canonical_transcript", inplace=True)

            df_uniprot.loc[:, "gene_id"] = info_df.loc[
                df_uniprot["query"], "gene_name"
            ].values

        if isoforms is True:
            # List to collect transcript IDs
            trans_ids = []

            for ensembl_ID in ens_ids_clean:
                # Get ID type (gene, transcript, ...) using gget info
                info_df = info(
                    ensembl_ID, verbose=False, pdb=False, ncbi=False, uniprot=False
                )

                # Check that Ensembl ID was found
                if isinstance(info_df, type(None)):
                    logging.warning(
                        f"ID '{ensembl_ID}' not found. Please double-check spelling/arguments."
                    )
                    continue

                ens_ID_type = info_df.loc[ensembl_ID]["object_type"]

                # If the ID is a gene, get the IDs of all isoforms
                if ens_ID_type == "Gene":
                    # Get the IDs of all transcripts from the gget info results
                    for transcipt_id in info_df.loc[ensembl_ID]["all_transcripts"]:
                        if ensembl_ID.startswith("ENS"):
                            # Append transcript ID (without Ensembl version number) to list of transcripts to fetch
                            trans_ids.append(transcipt_id.split(".")[0])

                        # elif ensembl_ID.startswith("WB"):
                        #     # For WormBase transcript IDs, remove the version number for submission to UniProt API
                        #     temp_trans_id = ".".join(transcipt_id.split(".")[:-1])
                        #     trans_ids.append(temp_trans_id)

                        else:
                            # Note: No need to remove the added "." at the end of unversioned transcripts here, because "all_transcripts" are returned without it
                            trans_ids.append(transcipt_id)

                    if verbose:
                        logging.info(
                            f"Requesting amino acid sequences of all transcripts of gene {ensembl_ID} from UniProt."
                        )

                elif ens_ID_type == "Transcript":
                    # # For WormBase transcript IDs, remove the version number for submission to UniProt API
                    # if ensembl_ID.startswith("T"):
                    #     trans_ids.append(".".join(ensembl_ID.split(".")[:-1]))

                    # else:
                    trans_ids.append(ensembl_ID)

                    if verbose:
                        logging.info(
                            f"Requesting amino acid sequence of {ensembl_ID} from UniProt."
                        )
                    logging.warning("The isoform option only applies to gene IDs.")

                else:
                    logging.warning(
                        f"{ensembl_ID} not recognized as either a gene or transcript ID. It will not be included in the UniProt query."
                    )

            # Fetch amino acid sequences of all isoforms from the UniProt REST API
            df_uniprot = get_uniprot_seqs(UNIPROT_REST_API, trans_ids)

        # Check if any results were found
        if len(df_uniprot) < 1:
            logging.error("No UniProt amino acid sequences were found for these ID(s).")

        else:
            # Build FASTA file from UniProt results
            for (
                uniprot_id,
                query_ensembl_id,
                gene_name,
                organism,
                sequence_length,
                uniprot_seq,
            ) in zip(
                df_uniprot["uniprot_id"].values,
                df_uniprot["query"].values,
                df_uniprot["gene_name"].values,
                df_uniprot["gene_id"].values,
                df_uniprot["organism"].values,
                df_uniprot["sequence_length"].values,
                df_uniprot["sequence"].values,
            ):
                fasta.append(
                    ">"
                    + str(query_ensembl_id)
                    + " uniprot_id: "
                    + str(uniprot_id)
                    + " ensembl_id: "
                    + str(query_ensembl_id)
                    + " gene_name: "
                    + str(gene_name)
                    + " organism: "
                    + str(organism)
                    + " sequence_length: "
                    + str(sequence_length)
                )
                fasta.append(str(uniprot_seq))

    # Save
    if save:
        file = open("gget_seq_results.fa", "w")
        for element in fasta:
            file.write(element + "\n")
        file.close()
        # missed samples
        return (set(trans_ids) - set(df_uniprot["query"].values)) | set(missing)

    return fasta

subset_fasta

subset_fasta: creates a new fasta file with only the sequence which names contain one of gene_names

Parameters:
  • gene_tosubset (set, default: None ) –

    A set of gene names to subset from the original FASTA file.

  • fasta_path (str, default: None ) –

    The path to the original FASTA file.

  • subfasta_path (str, default: './data/fasta/subset.fa' ) –

    The path to save the subsetted FASTA file. Defaults to "./data/fasta/subset.fa".

  • drop_unknown_seq (bool, default: True ) –

    If True, drop sequences containing unknown amino acids (denoted by '*'). Defaults to True.

  • subset_protein_coding (bool, default: True ) –

    If True, subset only protein coding genes. Defaults to True.

Returns: set: A set of gene names that were found and included in the subsetted FASTA file.

Raises:
  • ValueError

    If a gene name does not start with "ENS".

Source code in scprint2/utils/get_seq.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def subset_fasta(
    gene_tosubset: set = None,
    fasta_path: str = None,
    subfasta_path: str = "./data/fasta/subset.fa",
    drop_unknown_seq: bool = True,
    subset_protein_coding: bool = True,
) -> set:
    """
    subset_fasta: creates a new fasta file with only the sequence which names contain one of gene_names

    Args:
        gene_tosubset (set): A set of gene names to subset from the original FASTA file.
        fasta_path (str): The path to the original FASTA file.
        subfasta_path (str, optional): The path to save the subsetted FASTA file. Defaults to "./data/fasta/subset.fa".
        drop_unknown_seq (bool, optional): If True, drop sequences containing unknown amino acids (denoted by '*'). Defaults to True.
        subset_protein_coding (bool, optional): If True, subset only protein coding genes. Defaults to True.
    Returns:
        set: A set of gene names that were found and included in the subsetted FASTA file.

    Raises:
        ValueError: If a gene name does not start with "ENS".
    """
    dup = set()
    weird = 0
    nc = 0
    genes_found = set()
    gene_tosubset = set(gene_tosubset) if gene_tosubset else []
    names = []
    with (
        open(fasta_path, "r") as original_fasta,
        open(subfasta_path, "w") as subset_fasta,
    ):
        for record in SeqIO.parse(original_fasta, "fasta"):
            gene_name = (
                record.description.split(" gene:")[1].split(" ")[0].split(".")[0]
            )
            gene_biotype = record.description.split("gene_biotype:")[1].split(" ")[0]
            if "gene_symbol:" not in record.description:
                gene_symbol = gene_name
            else:
                gene_symbol = record.description.split("gene_symbol:")[1].split(" ")[0]
            if "description:" not in record.description:
                description = ""
            else:
                description = record.description.split("description:")[1]
            names.append([gene_name, gene_biotype, record.id, gene_symbol, description])
            if subset_protein_coding and gene_biotype != "protein_coding":
                nc += 1
                continue
            if len(gene_tosubset) == 0 or gene_name in gene_tosubset:
                if drop_unknown_seq:
                    if "*" in record.seq:
                        weird += 1

                        continue
                if gene_name in genes_found:
                    dup.add(gene_name)
                    continue
                record.description = ""
                record.id = gene_name
                SeqIO.write(record, subset_fasta, "fasta")
                genes_found.add(gene_name)
    print(len(dup), " genes had duplicates")
    print("dropped", weird, "weird sequences")
    print("dropped", nc, "non-coding sequences")
    return genes_found, pd.DataFrame(
        names, columns=["name", "biotype", "ensembl_id", "gene_symbol", "description"]
    )

scprint2.utils.graph_refinement

Graph-regularized logit refinement implementation.

This module implements the GRIT (Graph-Regularized logIT) refinement method for improving cell type predictions using graph structure.

Functions:

Name Description
build_knn_graph

Build a k-nearest neighbor graph and store it in adata.obsp.

graph_regularized_logit_refinement

Refine logits using graph-regularized optimization.

test_graph_refinement

Test function for graph refinement.

zero_shot_annotation_with_refinement

Perform zero-shot cell type annotation with graph refinement.

build_knn_graph

Build a k-nearest neighbor graph and store it in adata.obsp.

Parameters:
  • adata (AnnData) –

    AnnData object

  • representation_key (str, default: 'X_pca' ) –

    Key in adata.obsm for the representation to use. Defaults to "X_pca".

  • n_neighbors (int, default: 15 ) –

    Number of nearest neighbors. Defaults to 15.

  • metric (str, default: 'euclidean' ) –

    Distance metric for nearest neighbor search. Defaults to "euclidean".

Returns:
  • AnnData

    anndata.AnnData: Updated AnnData object with connectivity matrix

Source code in scprint2/utils/graph_refinement.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def build_knn_graph(
    adata: anndata.AnnData,
    representation_key: str = "X_pca",
    n_neighbors: int = 15,
    metric: str = "euclidean",
) -> anndata.AnnData:
    """
    Build a k-nearest neighbor graph and store it in adata.obsp.

    Args:
        adata (anndata.AnnData): AnnData object
        representation_key (str): Key in adata.obsm for the representation to use. Defaults to "X_pca".
        n_neighbors (int): Number of nearest neighbors. Defaults to 15.
        metric (str): Distance metric for nearest neighbor search. Defaults to "euclidean".

    Returns:
        anndata.AnnData: Updated AnnData object with connectivity matrix
    """
    try:
        import scanpy as sc
    except ImportError:
        raise ImportError("scanpy is required for building k-NN graphs")

    # Compute neighbors
    sc.pp.neighbors(
        adata,
        use_rep=representation_key,
        n_neighbors=n_neighbors,
        metric=metric,
    )

    return adata

graph_regularized_logit_refinement

Refine logits using graph-regularized optimization. Optimized version that solves for all classes simultaneously.

This function implements the optimization problem: P̃ = arg min_P ||P - P₀||²_F + λ Tr(P^T L P)

where P₀ are the initial logits, L is the graph Laplacian, and λ controls the strength of regularization.

The solution has a closed form: P̃ = (I + λL)⁻¹P₀

Parameters:
  • pred (ndarray) –

    Initial logits of shape (n_cells, n_classes)

  • adata (AnnData) –

    AnnData object containing graph connectivity

  • connectivity_key (str, default: 'connectivities' ) –

    Key in adata.obsp for connectivity matrix

  • lambda_reg (float, default: 0.1 ) –

    Regularization strength λ > 0

  • use_laplacian (bool, default: True ) –

    If True, use graph Laplacian; if False, use adjacency matrix

Returns:
  • ndarray

    np.ndarray: Refined logits of same shape as input pred

Raises:
  • ValueError

    If connectivity matrix is not found or dimensions don't match

  • KeyError

    If connectivity_key is not in adata.obsp

Source code in scprint2/utils/graph_refinement.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def graph_regularized_logit_refinement(
    pred: np.ndarray,
    adata: anndata.AnnData,
    connectivity_key: str = "connectivities",
    lambda_reg: float = 0.1,
    use_laplacian: bool = True,
) -> np.ndarray:
    """
    Refine logits using graph-regularized optimization.
    Optimized version that solves for all classes simultaneously.

    This function implements the optimization problem:
    P̃ = arg min_P ||P - P₀||²_F + λ Tr(P^T L P)

    where P₀ are the initial logits, L is the graph Laplacian, and λ controls
    the strength of regularization.

    The solution has a closed form: P̃ = (I + λL)⁻¹P₀

    Args:
        pred (np.ndarray): Initial logits of shape (n_cells, n_classes)
        adata (anndata.AnnData): AnnData object containing graph connectivity
        connectivity_key (str): Key in adata.obsp for connectivity matrix
        lambda_reg (float): Regularization strength λ > 0
        use_laplacian (bool): If True, use graph Laplacian; if False, use adjacency matrix

    Returns:
        np.ndarray: Refined logits of same shape as input pred

    Raises:
        ValueError: If connectivity matrix is not found or dimensions don't match
        KeyError: If connectivity_key is not in adata.obsp
    """

    # Validate inputs
    if connectivity_key not in adata.obsp:
        raise KeyError(f"Connectivity key '{connectivity_key}' not found in adata.obsp")

    A = adata.obsp[connectivity_key]
    n_cells, n_classes = pred.shape

    # Check dimensions
    if A.shape[0] != n_cells or A.shape[1] != n_cells:
        raise ValueError(
            f"Connectivity matrix shape {A.shape} doesn't match number of cells {n_cells}"
        )

    # Ensure adjacency matrix is symmetric and sparse
    if not sp.issparse(A):
        A = sp.csr_matrix(A)

    # Make symmetric if not already
    A = (A + A.T) / 2

    if use_laplacian:
        # Compute graph Laplacian: L = D - A
        # where D is the diagonal degree matrix
        degrees = np.array(A.sum(axis=1)).flatten()
        D = sp.diags(degrees, format="csr")
        L = D - A
    else:
        # Use adjacency matrix directly
        L = A

    identity_matrix = sp.identity(n_cells, format="csr")
    system_matrix = identity_matrix + lambda_reg * L

    # Solve for all classes at once instead of looping
    # spsolve can handle multiple right-hand sides
    refined_pred = spsolve(system_matrix, pred)

    # Handle the case where spsolve returns 1D array for single class
    if refined_pred.ndim == 1 and n_classes == 1:
        refined_pred = refined_pred.reshape(-1, 1)
    elif refined_pred.ndim == 1:
        refined_pred = refined_pred.reshape(n_cells, n_classes)

    return refined_pred

test_graph_refinement

Test function for graph refinement.

Source code in scprint2/utils/graph_refinement.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def test_graph_refinement():
    """Test function for graph refinement."""

    # Create synthetic data
    n_cells, n_classes = 100, 5

    # Random logits
    np.random.seed(42)
    pred = np.random.randn(n_cells, n_classes)

    # Create synthetic AnnData with connectivity
    adata = anndata.AnnData(X=np.random.randn(n_cells, 50))

    # Create a random sparse connectivity matrix
    from scipy.sparse import random

    connectivity = random(n_cells, n_cells, density=0.1, format="csr")
    connectivity = (connectivity + connectivity.T) / 2  # Make symmetric
    adata.obsp["connectivities"] = connectivity

    # Test refinement
    refined_pred = graph_regularized_logit_refinement(pred, adata, lambda_reg=0.1)

    print(f"Original logits shape: {pred.shape}")
    print(f"Refined logits shape: {refined_pred.shape}")
    print(f"Logits changed: {not np.allclose(pred, refined_pred)}")

    # Test zero-shot annotation
    predictions, probabilities = zero_shot_annotation_with_refinement(
        pred, adata, return_probabilities=True
    )

    print(f"Predictions shape: {predictions.shape}")
    print(f"Probabilities shape: {probabilities.shape}")
    print(f"Predicted classes: {np.unique(predictions)}")

zero_shot_annotation_with_refinement

Perform zero-shot cell type annotation with graph refinement.

This function first refines the logits using graph regularization, then performs argmax to get final predictions.

Parameters:
  • pred (ndarray) –

    Initial logits of shape (n_cells, n_classes)

  • adata (AnnData) –

    AnnData object containing graph connectivity

  • connectivity_key (str, default: 'connectivities' ) –

    Key in adata.obsp for connectivity matrix

  • lambda_reg (float, default: 0.1 ) –

    Regularization strength

  • return_probabilities (bool, default: False ) –

    If True, also return refined probabilities

Returns:
  • Union[ndarray, tuple]

    np.ndarray or tuple: If return_probabilities is False, returns array of predicted class indices. If True, returns tuple of (predictions, refined_probabilities)

Source code in scprint2/utils/graph_refinement.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def zero_shot_annotation_with_refinement(
    pred: np.ndarray,
    adata: anndata.AnnData,
    connectivity_key: str = "connectivities",
    representation_key: str = "X_pca",
    n_neighbors: int = 15,
    metric: str = "euclidean",
    lambda_reg: float = 0.1,
    return_probabilities: bool = False,
    return_raw: bool = False,
) -> Union[np.ndarray, tuple]:
    """
    Perform zero-shot cell type annotation with graph refinement.

    This function first refines the logits using graph regularization,
    then performs argmax to get final predictions.

    Args:
        pred (np.ndarray): Initial logits of shape (n_cells, n_classes)
        adata (anndata.AnnData): AnnData object containing graph connectivity
        connectivity_key (str): Key in adata.obsp for connectivity matrix
        lambda_reg (float): Regularization strength
        return_probabilities (bool): If True, also return refined probabilities

    Returns:
        np.ndarray or tuple: If return_probabilities is False, returns array of
                           predicted class indices. If True, returns tuple of
                           (predictions, refined_probabilities)
    """
    if pred is type(pd.DataFrame):
        pred = pred.values
    if adata.obsp.get(connectivity_key) is None:
        # Refine logits
        adata = build_knn_graph(
            adata=adata,
            representation_key=representation_key,
            n_neighbors=n_neighbors,
            metric=metric,
        )
        connectivity_key = "connectivities"
    print(adata.obsp)
    refined_logits = graph_regularized_logit_refinement(
        pred, adata, connectivity_key, lambda_reg
    )
    if return_raw:
        return refined_logits
    # Get predictions: g(xi) = arg max_j {P̃(i)}
    predictions = np.argmax(refined_logits, axis=1)

    if return_probabilities:
        # Convert to probabilities using softmax
        refined_probs = np.exp(refined_logits)
        refined_probs = refined_probs / refined_probs.sum(axis=1, keepdims=True)
        return predictions, refined_probs

    return predictions