pydex

Created: 2011-07-16 12:30
Updated: 2013-10-09 01:29

README.mkd

NOTE

PLEASE READ IT HERE

  • All the code present here is actually have been moved and integrated inside PomeGranate

Pydex - A simple case study of inverted index creation

What is an Inverted Index?

In computer science, an inverted index (also referred to as postings file or inverted file) is an index data structure storing a mapping from content, such as words or numbers, to its locations in a database file, or in a document or a set of documents. The purpose of an inverted index is to allow fast full text searches, at a cost of increased processing when a document is added to the database -- Wikipedia

Requirements

In order to have a proper working copy you need at least Python 2.7 and mpi4py package compiled from the source.

Requirements on Ubuntu

You need to install the following packages on ubuntu in order to run our solution:

$ sudo apt-get install mpich2 libcr-dev g++ python-dev

Installing requirements

For the tests we have used MPICHv2, but note that any implementation of MPI should be fine to get pydex working. So you choose your favourite implementation from the package management system of your distribution.

Regarding the python requirements I really suggest to use a virtual environment for installing all the dependences. Therefore we strongly recommend you to use virtualenv. You can successively use the requirements.txt file with pip to install all the missing packages.

How to run?

We have used pydex to run a reverse index creation on top of a wikipedia dump. In order to successfully reiterate our experiment you have to download the latest dump of wikipedia database from this page. The file you have to download is called enwiki-latest-pages-articles.xml.bz2 and its size is about 7.0GB.

After you download it, you have to pass it through a preliminary phase by using wiki-extractor script which resides in src/utils directory. What you have to do is to specify an output directory and the size in bytes of the partitions you want to create:

$ bzcat enwiki-latest-pages-articles.xml.bz2 | \
  python src/utils/wiki-extractor.py /mnt/root/collection 66060288

This command will extract on the fly the dump file of wikipedia and pipe the contents to our script. The script is in charge of filtering out and in converting articles contents and to recompress them in tgz archives of about 64MB each.

This preliminary phase of conversion is needed to feed our indexer. The extraction and recompression takes about 3 hours and half. After having completed this step you are ready to launch the real indexing phase.

Take a look at the configuration file pydex.json in order to specify input/output directories and other settings. Then you can spawn your command by simply running:

$ mpiexec -np 6 python src/main.py

The default configuration has 2 mappers, 2 reducers. You have to take in consideration that 1 master and 1 combiner node will be needed. That is the reason why you have to specify 6 as number of processing elements.

Take also in consideration that both directories you specify in the configuration file should be accessible by the remote Python MPI interpreters. Therefore a mechanism like NFS or a distributed/network filesystem should be used. This is not required if you are testing pydex on a single machine.

How it works?

The construction of the reverse index based on the tf-idf model is really straightforward. It consist in three phases of map-reduce-combine optimized for massively parallel architecture like a farm of cluster. The code should therefore be run on top of a distributed filesystem like kosmosfs, kobold of tahoe-fs in order to further optimize the distribution of the workers.

Now we briefly describe the work assigned to each subcomponent:

  • MAP: The map phase simply consist in reading a portion of file and producing a tuple (key, value) that should be passed to the reducer.
  • REDUCER: The reducer is in charge of collecting multiple (key, value) tuples from a huge number of mappers. The aim is to produce a single value (key, sum-of-values) whenever multiple tuples contains the same key. These partial results are further forwarded to the combiner
  • COMBINER: The combiner goal is to just sort results coming from reducers and to collate in various output files.

Now we further explain the goal of each phase in order to derive the correct tf-idf reverse index.

Inputs

Assumptions: 3 mappers, 1 reducer, 1 combiner

Document #1: foo python hello world foo
Document #2: python world
Document #3: hello world bar bar

First phase

Mapper #1: (<foo, #1>, 1), (<python, #1>, 1), (<hello, #1>, 1), (<world, #1>, 1), (<foo, #1>, 1)
Mapper #2: (<python, #2>, 1), (<world, #2>, 1)
Mapper #3: (<hello, #3>, 1), (<world, #3>, 1), (<bar, #3>, 1), (<bar, #3>, 1)

Assuming 2 by 2 round robin:

Reducer: (<foo, #1>, 1), (<python, #1>, 1),
         (<python, #2>, 1), (<world, #2>, 1),
         (<hello, #3>, 1), (<world, #3>, 1),
         (<hello, #1>, 1), (<world, #1>, 1),
         (<bar, #3>, 2), (<foo, #1>, 1)

After sorting (by the way this is done in place) and assuming the heap is of static size of 5 tuples max we have:

Chunk #1: (<foo, #1>, 1), (<hello, #3>, 1), (<python, #1>, 1), (<python, #2>, 1), (<world, #2>, 1)
Chunk #2: (<bar, #3>, 2), (<foo, #1>, 1), (<hello, #1>, 1), (<world, #1>, 1), (<world, #3>, 1)

Combiner: (<bar, #3>, 2),
          (<foo, #1>, 2),
          (<hello, #1>, 1),

          (<hello, #3>, 1),
          (<python, #1>, 1),
          (<python, #2>, 1),

          (<world, #1>, 1),
          (<world, #2>, 1),
          (<world, #3>, 1)

Second phase

In the second phase the final goal is to have the word-count per document. In this case each mapper executes:

{[word, docId] => wordCount} -> {docId => [word, wordCount]}

The reducer on the other hand:

{docId => [[word-1, wordCount], [word-2, wordCount], ...]} => {[word, docId] => [wordCount, wordsPerDoc]}

In the specific example we have:

Mapper #1: (<#3>, bar, 2),   (<#1>, foo, 2),    (<#1>, hello, 1)
Mapper #2: (<#3>, hello, 1), (<#1>, python, 1), (<#2>, python, 1)
Mapper #3: (<#1>, world, 1), (<#2>, world, 1),  (<#3>, world, 1)

Reducer: (<#3>, bar, 2, 0), (<#1>, foo, 2, 0), (<#3>, hello, 1, 0), (<#1>, python, 1, 0), (<#1>, world, 1, 0),
         (<#2>, world, 1, 0), (<#1>, hello, 1, 0), (<#2>, python, 1, 0), (<#3>, world, 1, 0)

After sorting (by the way this is done in place) and assuming the heap is of static size of 5 tuples max we have:

Chunk #1: (<#1>, foo, 2, 4), (<#1>, python, 1, 4), (<#1>, world, 1, 4), (<#3>, hello, 1, 3), (<#3>, bar, 2, 3)
Chunk #2: (<#1>, hello, 1, 1), (<#2>, python, 1, 2), (<#2>, world, 1, 2), (<#3>, world, 1, 1)

The combiner at this point will keep up a counter and stop writing to disk until the document id of the iterator is different from the previous one. The output at this point will be:

They might be not in order

(<#1>, hello, 1, 5),
(<#1>, foo, 2, 5),
(<#1>, python, 1, 5),

(<#1>, world, 1, 5),
(<#2>, python, 1, 2),
(<#2>, world, 1, 2),

(<#3>, hello, 1, 4),
(<#3>, bar, 2, 4),
(<#3>, world, 1, 4)

Third phase

The mapper in this phase shall produce something like:

{[word, docId] => [wordCount, wordsPerDoc]} => {word => [docId, wordCount, wordsPerDoc]}

While the reducer:

{word => [[docId-1, wordCount-1, wordsPerDoc-1], [docId-2, wordCount-2, wordsPerDoc-2], ...]} =>

{[word, docId] => [wordCount, wordsPerDoc, docsPerWord]} =>

{[word, docId] => tfidf}

So in our case:

Mapper #1: (<hello>, #1, 1, 5), (<foo>,    #1, 2, 5), (<python>, #1, 1, 5)
Mapper #2: (<world>, #1, 1, 5), (<python>, #2, 1, 2), (<world>,  #2, 1, 2)
Mapper #3: (<hello>, #3, 1, 4), (<bar>,    #3, 2, 4), (<world>,  #3, 1, 4)

Reducer: (<hello>, #1, 1, 5, 0), (<foo>, #1, 2, 5, 0), (<world>, #1, 1, 5, 0), (<python>, #2, 1, 2, 0), (<hello>, #3, 1, 4, 0)
         (<bar>, #3, 2, 4, 0), (<python>, #1, 1, 5, 0), (<world>, #2, 1, 2, 0), (<world>, #3, 1, 4, 0)

Reducer after sorting:

Reducer: (<foo, #1>, 2, 5, 1), (<hello, #1>, 1, 5, 2), (<hello, #3>, 1, 4, 2), (<python, #2>, 1, 2, 1), (<world, #1>, 1, 5, 1)
         (<bar, #3>, 2, 4, 1), (<python, #1>, 1, 5, 1), (<world, #2>, 1, 2, 2), (<world, #3>, 1, 4, 2)

Combiner:

(<bar, #3>, 2, 4, 1),
(<foo, #1>, 2, 5, 1),
(<hello, #1>, 1, 5, 2),

(<hello, #3>, 1, 4, 2),
(<python, #1>, 1, 5, 2), // after computing
(<python, #2>, 1, 2, 2),

(<world, #1>, 1, 5, 3), // after computing
(<world, #2>, 1, 2, 3),
(<world, #3>, 1, 4, 3)

Please note that the only feasible approach here is to have a multiway merge sort that outputs (<word, docid>, wordCount, wordPerDoc) in sorted ascending order and a counter keeping track of the docsPerWord flushed on a secondary file buffer.

Final phase

Mapper #1: <bar, #3>    => (2 / 4) * log(3 / 1) = 0.79
           <foo, #1>    => (2 / 5) * log(3 / 1) = 0.63
           <hello, #1>  => (1 / 5) * log(3 / 2) = 0.12

Mapper #2: <hello, #3>  => (1 / 4) * log(3 / 2) = 0.15
           <python, #1> => (1 / 5) * log(3 / 2) = 0.12
           <python, #2> => (1 / 2) * log(3 / 2) = 0.29

Mapper #3: <world, #1>  => (1 / 5) * log(3 / 3) = 0
           <world, #2>  => (1 / 2) * log(3 / 3) = 0
           <world, #3>  => (1 / 4) * log(3 / 3) = 0

Final matrix

------------+------+------+------+------+
            | foo  | bar  | hello|python|
------------+------+------+------+------+
Document #1 | 0.63 | -    | 0.12 | 0.12 |
Document #2 | -    | -    | -    | 0.29 |
Document #3 | -    | 0.39 | 0.15 | -    |
------------+------+------+------+------+

Implementation

Instead of counting the number of termination messages from the mappers the reducer can simply wait the termination message from the master. This would simplify the code a lot and make it possible to introduce dynamic process feature in the code (varying at runtime the number of mappers involved).

Cookies help us deliver our services. By using our services, you agree to our use of cookies Learn more