This tutorial gives an introduction to Apache Spark in Scala taking as use case protein sequences and amino acids, commonly used in bioinformatics. The same exercises can also be done with genomic data using nucleotides (A,C,G,T) and the code can be adapted to Python, Java or R.
Download the dataset
The dataset used for this tutorial corresponds to all Swiss-Prot sequences manually reviewed by curators until August 2017: swissprot-aug-2017.tsv.gz (66.0 MB compressed, 200 MB uncompressed, 555’100 sequences or lines).
The data size is good enough to teach the basics of Apache Spark and run on a laptop in standalone mode.
There is also a more challenging dataset, that can be downloaded through this link: uniprot-trembl-aug-2017.tsv.gz (16.7GB compressed, 29.7GB uncompressed, 88’032’926 sequences or lines). This data corresponds to the TrEMBL / automatic sequences that were not reviewed by curators until August 2017.
Once uncompressed, you can see that each line corresponds to a protein sequence and the columns are ordered as follows:
Note: To make the examples more readable, we use Swiss-Prot organism mnemonics to represent species but we named it specie (instead of species). For a real analysis of species, you should use the taxon identifiers instead.
Setup Spark environment
This tutorial can be followed, either with the spark-shell (recommended way) or spark-notebooks:
spark-shell: Download Apache Spark (requires to install Java 8). Start a spark shell with 2GB of memory $SPARK_HOME/bin/spark-shell --driver-memory 2G.
By default Spark uses all CPUs available in the machine, but can be modified with the option –master local[n] option. Example $SPARK_HOME/bin/spark-shell --master local[2] will only use 2 cores. In case of using a cluster, the spark-shell can be initialized like this: $SPARK_HOME/bin/spark-shell --executor-memory 4G --master spark://$master_hostname:7077
Resilient Distributed Dataset (RDD) are a collection of elements partitioned across the nodes of the cluster that can be operated in parallel.
RDDs automatically recover from node failures
RDDs can be cached in memory (of the workers)
RDD can be created either by parallelize or from reading a datasource (fileystem, cassandra, hbase, hdfs, amazon s3…)
Data can be read from directory using wildcards and can even be compressed. For example this is a valid path: sc.textFile("apache-logs/**/*.gz"). It supports HDFS and other file system, …
Note that this operation goes pretty fast and it will even work if the file is not present. If you run on a cluster the file must be accessed by all workers using the same path.
To make sure the file is placed in the correct directory, we can ask to show the first line:
If you get an error message that says “Input path does not exist:” then you might need to specify the full path of the file (this happens if you didn’t start the spark-shell where the file was)
Classes and functions
Even though it is possible to do complex tasks in a purely functional manner using anonymous functions and lambdas, it is a good practice to define named functions and classes to better understand the code.
Transformations and actions
In Spark there are two types of operations: transformations and actions.
A transformation (transforms) creates a new RDD from an existing one. They are lazyly evaluated: They are executed on demand, and therefore are CPU friendly.
An action will return a non-RDD type. Actions trigger execution and usually CPU time.
Transformations
Actions
map, flatMap, filter
first, take
sample, groupByKey
collect (careful with this one!)
distinct
count
reduceByKey, sortByKey, …
reduce, save, lookupKey, …
Transformations are added to the DAG (Directed Acyclic Graph). If you are coming from the RDBMS world, you can think about it as the execution plan, returned with the EXPLAIN keyword.
Let’s see some examples:
More complex operations / custom functions
Scala objects can also be created with more complex business logic. Those objects are sent to the workers as long as they are serializable.
Example of Trypsin peptide cuts
The Trypsin is an enzyme used in mass spectometry to identify peptides. It cuts the proteins after an Arginine or a Lysine, but not if it is followed by a Proline.
This code shows, how to find the top 10 biggest peptides cut by this enzyme for all given sequences.
It also shows how to compute the frequency by peptide length (spectrum)
External dependencies
Any artifact / jar can be added to the spark context. The spark-shell needs to be initialized with –package, and jars will be sent over the network to the workers
DataFrame and SQL
A dataframe is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.
Working with dataframes / SQL can be handy for developers with RDMS experience.
Avro and Parquet are some of the most used file formats in the Big Data ecosystem.
Parquet is a column storage format and is particularly interesting because:
It eliminates I/O for columns that are not part of query.
It provides great compression as similar data is grouped together in columnar format.
It can be saved in partitions
Open a new shell and count the number of sequences per species and get the top 10:
As you can see this query is executed almost instantly, because only the specie column needs to be read.
Spark notebook
The Spark Notebook allows performing reproducible analysis with Scala, Apache Spark and the Big Data ecosystem. They offer the possibilty to visualize directly results in a chart.
Appendix
ADAM project
ADAM is a genomics analysis platform with specialized file formats built using Apache Avro, Apache Spark and Parquet. Apache 2 licensed.
The Python script below was used to convert UniProt FASTA files to TSV format.
This post was written by Daniel Teixeira.
If you have any suggestions for improvements don’t hesitate to contact him . If you find a typo / bug don’t hesitate to submit a pull request.