Source code for bdgenomics.mango.alignments

#
# Licensed to Big Data Genomics (BDG) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The BDG licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

r"""
==========
Alignments
==========
.. currentmodule:: bdgenomics.mango.alignments
.. autosummary::
   :toctree: _generate/

   AlignmentSummary
   FragmentDistribution
   MapQDistribution
   IndelDistribution

"""

import bdgenomics.mango.pileup as pileup
from bdgenomics.mango.pileup.track import *
from bdgenomics.adam.adamContext import ADAMContext

from .coverage import *
from .utils import *

from collections import Counter, OrderedDict
from .distribution import CountDistribution
from cigar import Cigar
import matplotlib.pyplot as plt
plt.rcdefaults()

[docs]class AlignmentSummary(object): """AlignmentSummary class. AlignmentSummary provides scrollable visualization of alignments based on genomic regions. """
[docs] def __init__(self, spark, ac, alignmentDataset, sample = 1.0): """ Initializes an AlignmentSummary class. Args: :param spark: SparkSession :param ac: ADAMContext :param alignmentDataset: bdgenomics.adam.Dataset.AlignmentRecoDatasetataset :param sample: fraction of reads to sample from """ # sample must be between 0 and 1 if sample <= 0 or sample > 1: raise Exception('sample {} should be > 0 and <= 1'.format(self.sample)) self.ss = spark self.ac = ac self.sample = sample # sample dataset self.dataset = alignmentDataset # where to store collected data self.coverageDistribution = None self.fragmentDistribution = None self.mapQDistribution = None self.indelDistribution = None
def getCoverageDistribution(self, bin_size = 10): """ Computes coverage distribution for this AlignmentDataset. Args: :param bin_size: size to bin coverage by Returns: CoverageDistribution object """ if self.coverageDistribution == None: print("Computing coverage distributions...") # pre-sample before computing coverage sampledDataset = self.dataset.transform(lambda dataset: dataset.sample(False, self.sample)) self.coverageDistribution = CoverageDistribution(self.ss, \ sampledDataset.toCoverage(), \ sample = self.sample, \ bin_size = bin_size, pre_sampled = True) return self.coverageDistribution def getFragmentDistribution(self): """ Computes fragment distribution for this AlignmentDataset. Returns: FragmentDistribution object """ if self.fragmentDistribution == None: print("Computing fragment distributions...") self.fragmentDistribution = FragmentDistribution(self.ss, self.dataset, sample=self.sample) return self.fragmentDistribution def getMapQDistribution(self): """ Computes mapping quality distribution for this AlignmentDataset. Returns: MapQDistribution object """ if self.mapQDistribution == None: print("Computing MapQ distributions...") self.mapQDistribution = MapQDistribution(self.ss, self.dataset, sample=self.sample) return self.mapQDistribution def getIndelDistribution(self, bin_size=10000000): """ Computes insertion and deletion distribution for this AlignmentDataset Returns: IndelDistribution object """ if self.indelDistribution == None: print("Computing Indel distributions...") self.indelDistribution = IndelDistribution(self.ss, self.dataset, bin_size=bin_size, sample=self.sample) return self.indelDistribution def viewPileup(self, contig, start, end, reference = 'hg19', label = "Reads", multipleGroupNames = False, showPlot = True): """ Visualizes a portion of this AlignmentDataset in a scrollable pileup widget Args: :param contig: contig of locus to view :param start: start position of locus to view :param end: end position of locus to view :param reference: genome build. Default is hg19 :param label: name of alignment track :param multipleGroupName: determines whether to different show tracks for each group name. If false, coalesces into one track. :param showPlot: Disables widget, used for testing. Default is true. Returns: pileup view for alignments """ contig_trimmed, contig_full = formatContig(contig) # Filter Dataset filtered = self.dataset.transform(lambda r: r.filter(((r.referenceName == contig_full) | (r.referenceName == contig_trimmed)) & (r.start < end) & (r.end > start) & (r.readMapped))) # convert to GA4GH JSON to be consumed by mango-pileup module json_map = self.ac._jvm.org.bdgenomics.mango.converters.GA4GHutil.alignmentDatasetToJSON(filtered._jvmDataset, multipleGroupNames) # visualize if (showPlot): # make pileup tracks tracks = [] # iterate through all group names and make a track for each. for groupName in json_map: # set label if only one track. Otherwise, set to group names. thisLabel = groupName if multipleGroupNames else label # create new track for this recordGroupName. files with unspecified recordGroup name # have a default label of "1" track=Track(viz="pileup", label=thisLabel, source=pileup.sources.GA4GHAlignmentJson(json_map[groupName])) tracks.append(track) locus="%s:%i-%i" % (contig, start, end) return pileup.PileupViewer(locus=locus, reference=reference, tracks=tracks)
[docs]class FragmentDistribution(CountDistribution): """ FragmentDistribution class. Plotting functionality for visualizing fragment distributions of multi-sample cohorts. """
[docs] def __init__(self, ss, alignmentDataset, sample = 1.0): """ Initializes a FragmentDistribution class. Computes the fragment distribution of a AlignmentDataset. This Dataset can have data for multiple samples. Args: :param ss: global SparkSession. :param alignmentDataset: bdgenomics.adam.Dataset.AlignmentDataset :param sample: Fraction to sample AlignmentDataset. Should be between 0 and 1 """ self.sc = ss.sparkContext self.sample = sample self.rdd = alignmentDataset.toDF().rdd \ .map(lambda r: ((r["readGroupSampleId"], len(r["sequence"])), 1)) CountDistribution.__init__(self)
[docs]class MapQDistribution(CountDistribution): """ MapQDistribution class. Plotting functionality for visualizing mapping quality distributions of multi-sample cohorts. """
[docs] def __init__(self, ss, alignmentDataset, sample = 1.0): """ Initializes a MapQDistribution class. Computes the mapping quality distribution of an AlignmentDataset. This Dataset can have data for multiple samples. Args: :param ss: global SparkSession. :param alignmentDataset: A bdgenomics.adam.dataset.AlignmentDataset object. :param sample: Fraction to sample AlignmentDataset. Should be between 0 and 1 """ self.sc = ss.sparkContext self.sample = sample # filter out reads that are not mappped self.rdd = alignmentDataset.transform(lambda r: r.filter(r.readMapped)).toDF().rdd \ .map(lambda r: ((r["readGroupSampleId"], int(r["mappingQuality"] or 0)), 1)) CountDistribution.__init__(self)
[docs]class IndelDistribution(object): """ IndelDistribution class. IndelDistribution calculates indel distributions on an AlignmentDataset. """
[docs] def __init__(self, ss, alignmentDataset, sample=1.0, bin_size=10000000): """ Initializes a IndelDistribution class. Computes the insertiona and deletion distribution of alignmentDataset. Args: :param SparkSession: the global SparkSession :param alignmentDataset: A bdgenomics.adam.dataset.AlignmentDataset object :param bin_size: Division size per bin """ bin_size = int(bin_size) self.bin_size = bin_size self.sc = ss.sparkContext self.sample = sample # filter alignments without a position filteredAlignments = alignmentDataset.transform(lambda x: x.sample(False, self.sample)) \ .transform(lambda x: x.filter(x["start"] >= 0)) # Assign alignments with counter for contigs. Reduce and collect. mappedDistributions = filteredAlignments.toDF().rdd \ .map(lambda r: ((r["referenceName"], r["start"] - r["start"]%bin_size), \ Counter(dict([(y,x) for x,y in Cigar(r["cigar"]).items()])))) \ .reduceByKey(lambda x,y: x+y) self.alignments = mappedDistributions.collect()
def plot(self, testMode = False, plotType="I", **kwargs): """ Plots final distribution values and returns the plotted distribution as a counter object. Args: :param xScaleLog: rescales xaxis to log :param yScaleLog: rescales yaxis to log :param testMode: if true, does not generate plot. Used for testing. :param plotType: Cigar type to plot, from ['I', 'H', 'D', 'M', 'S'] Returns: matplotlib axis to plot and computed data """ chromosomes = Counter() # count contig type at each location for index, counts in self.alignments: chromosomes[index] += counts[plotType] if (not testMode): # For testing: do not run plots if testMode # get distinct chromosome names for plot keys = sorted(list(set(map(lambda x: x[0][0], self.alignments)))) # offset for max chr offset = 0 # holds xtick information midPoints = [] # make figure plot figsize = kwargs.get('figsize',(10, 5)) f, ax = plt.subplots(figsize=figsize) # for all chromosomes for key in keys: # filter by chromosome key values = [x for x in chromosomes.items() if x[0][0] == key] # get positions positions = list(map(lambda x: x[0][1] + offset, values)) # get counts counts = list(map(lambda x: x[1], values)) ax.plot(positions, counts, ls='', marker='.') # set label for chromosome midPoint = min(positions) + (max(positions) - min(positions))/2 midPoints.append(midPoint) offset = max(positions) ax.set_xticks(midPoints) ax.set_xticklabels(keys, rotation=-90, size=8.5) return ax, chromosomes else: return None, chromosomes