Hadoop: Genomic Segments And Join
3
5
Entering edit mode
13.5 years ago

It's a question I've asked two years ago on SO: http://stackoverflow.com/questions/1832103 but I wonder if there could have now a new solution from some skilled users.

Say I have two BED files in a Hadoop Distributed File System. How should the MapReduce algorithm be designed to find the segments of the first collection overlapping those of the 2nd set ?

genomics merge • 4.8k views
ADD COMMENT
8
Entering edit mode
13.5 years ago

Here is a solution using cascalog, which provides a high level query interface on top of Hadoop fully integrated with the Clojure programming language. By abstracting away the map/reduce details, it eases implementation and understandability of the code. You can grab the full code example from GitHub.

First define the query. This is two functions: testing when one start end coordinate overlaps with a second and writing the cascalog query that uses this to return only those chromosome start end pairs in the first that overlap with the second:

(deffilterop overlaps [s1 e1 s2 e2]
  "Pass filter if s1,e1 start end pair overlaps s2,e2 pair."
  (or (and (>= s1 s2) (< s1 e2))
      (and (>= e1 s2) (< e1 e2))))

(defn run-bed-overlap [bed-one bed-two]
  "Cascalog query to pull data from two bed files and intersect."
  (?<- (stdout) [?chr ?s1 ?e1]
       (bed-one ?chr ?s1 ?e1)
       (bed-two ?chr ?s2 ?e2) ; Matches chromosome with bed-one
       (overlaps ?s1 ?e1 ?s2 ?e2)))

The other part is writing the plumbing to parse BED files and feed them into the run-bed-overlap function:

(defmapop parse-bed-line [line]
  (take 3 (split line #"\t")))

(defn to-int [x] (Integer/parseInt x))

(defn bed-from-hfs [dir]
  "With a BED file from HDFS, produce chromsome, start, end."
  (let [source (hfs-textline dir)]
    (<- [?chr ?s ?e]
        (source ?line)
        (parse-bed-line ?line :> ?chr ?s-str ?e-str)
        (to-int ?s-str :> ?s)
        (to-int ?e-str :> ?e))))

(defn bed-hfs-overlap [dir1 dir2]
  (run-bed-overlap (bed-from-hfs dir1) (bed-from-hfs dir2)))

Push your bed files to HDFS and run on Hadoop with:

% cd /path/to/bed-hadoop
% lein deps
% lein uberjar
% hadoop fs -mkdir /tmp/bed-hadoop/bed-1
% hadoop fs -mkdir /tmp/bed-hadoop/bed-2
% hadoop fs -put test/one.bed /tmp/bed-hadoop/bed-1
% hadoop fs -put test/two.bed /tmp/bed-hadoop/bed-2
% hadoop jar bed-hadoop-0.0.1-SNAPSHOT-standalone.jar
             bed_hadoop.core /tmp/bed-hadoop/bed-1 /tmp/bed-hadoop/bed-2
RESULTS
----------
chr1 20 30
chr2 40 50
----------
ADD COMMENT
0
Entering edit mode

i don't know how anyone writes this stuff (Clojure)

ADD REPLY
6
Entering edit mode
13.5 years ago
Pablo ★ 1.9k

Here are a few quick thoughts (I haven't implemented this, so apologies in advanced if the explanation is messy):

Map : Produce a tuple <"chr:start" , "end, collectionNumber" >

Partitioner: Divide by chromosome? (probably not the most efficient)

Sort: Tuples are sorted by key (i.e. "chr:start")

Reduce Step : You'll have to cheat the "MR" model and keep in memory all entries where "end" is less than current "start". The rest of the algorithm should be standard.

I hope this helps.

ADD COMMENT
1
Entering edit mode

Yes, that's the standard way Hadoop works (sorts before reducing). You can change the sorting method, but it's just easier to use a key like: String.format("%s:%08d", chr, start)

ADD REPLY
0
Entering edit mode

thanks Pablo. Does it mean that, for the reduce step, the keys will be sorted ?

ADD REPLY
0
Entering edit mode

Many thanks Pablo.

ADD REPLY
2
Entering edit mode
13.5 years ago
Helwr ▴ 20

Use can use Hadoop streaming http://wiki.apache.org/hadoop/HadoopStreaming and Unix join command: http://www.albany.edu/~ig4895/join.htm

basically you can use hadoop as a distributed shell without writing any Java code

first you load the files into dfs, then run the streaming job with unix cat command as a mapper and pipe the output into xargs sort/uniq/join and may be some awk for formatting, this way the join will be done on the hdfs client box where you run the command (for large files that single box would the IO bottleneck)

alternatively you can do join in parallel on multiple boxes, by putting it inside of mapper.sh script and assigning mapper.sh as an argument to the streaming jar, so that each mapper will do partial join on multiple nodes, the identity reducer will aggregate the output into a single file.

also see http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html#More+Usage+Examples and http://hadoop.apache.org/common/docs/r0.15.2/streaming.html

ADD COMMENT

Login before adding your answer.

Traffic: 1678 users visited in the last hour
Help About
FAQ
Access RSS
API
Stats

Use of this site constitutes acceptance of our User Agreement and Privacy Policy.

Powered by the version 2.3.6