-
Notifications
You must be signed in to change notification settings - Fork 13
Home
NOTE: This wiki has just been moved from an older installation of Trac, so it's likely there are errors in the markup, broken links, etc.
- Large Survey Database Python API and Reference
- schemas of various tables
- Frequently Asked Questions
- Future Concepts
- LSD internals for developers
- AAS 217 LSD poster
wget -N -nv http://lsddb.org/go.sh && bash go.sh
See here for more detailed instructions.
- Always available on github, but see below for setup instructions and prerequisites
- Release Notes, v0.5.0 (Transactions)
- Release Notes, v0.5.3 (lsd-import and lsd-admin)
- Release Notes, v0.5.4 (Remote Database Access)
- Release Notes, v0.5.5 (User-Defined Functions)
The Large Survey Database (LSD) is a framework for storing, cross-matching, querying, and rapidly iterating through large survey datasets (catalogs of >10^9^ rows, >1 TB) on multi-core machines. It is implemented in Python, written and maintained by Mario Juric ([email protected]).
Using LSD can be as simple as writing queries such as:
lsd-query --format=fits --bounds='rectangle(180, 10, 190, 20)'\
'SELECT ra, dec, cal_psf_mag, filterid, sdss.g as g, sdss.r as r, sdss.i as i
FROM ps1_obj, ps1_det, sdss WHERE (0.3 < g-r) & (g-r < 0.4)'
and getting the output as a FITS file, or as powerful as writing MapReduce Python kernels. In both cases, the LSD framework will parallelize the workload.
The target audience for LSD is anyone who needs to frequently stream through a local copy (of a large chunk thereof) of PS1 catalogs but considers flat text/FITS files too cumbersome, DVO too inflexible (and single-threaded), and a full-fledged RDBMS too slow and/or expensive. Special emphasis is paid to distribution of computation and performance of whole-dataset operations: an LSD (v0.1) setup on a dual Intel Xeon X5560 @ 2.80GHz system (8 physical, 16 logical cores with HT) with 48G of RAM can iterate through 1.1 Grows of PanSTARRS 3Pi data (162GB), cross-matched with 220 Mrows of SDSS data (42G) in ~15 minutes.
LSD databases are queried using the {{{lsd-query}}} command line utility (described further below), interactively from Python (e.g., from {{{ipython}}} shell), or from Python scripts. In all cases, the query syntax is the same.
For example, using {{{lsd-query}}} on the command line, we could write:
lsd-query --format=fits --bounds='rectangle(180, 30, 190, 40, coordsys="gal")' 'select ra, dec, r, sdss.r FROM ps1_obj, sdss'
This would query the default database (specified via environment variable $LSD_DB) for (ra, dec) and PS1 and SDSS r-band magnitudes of all objects cross-matched between PS1 and SDSS in a Galactic coordinate rectangle bound by l=180, b=30 and l=190, b=40. The output will be stored as a FITS file (named output.fits, by default).
If using the Python API, begin with importing the {{{lsd}}} module and instantiating a {{{DB}}} object:
from lsd import DB
db = DB('db')
where 'db' is the path to the directory where your LSD tables reside (see below for how to import SDSS and .smf files into LSD).
To fetch all rows satisfying a given ''query'', run (for example):
rows = db.query('ra, dec, g, r, sdss.g as sg, sdss.r as sr, sg-sr as sgr, cos(radians(dec)) as cdec FROM ps1, sdss WHERE (sr < 22.5) & (0.3 < sgr) & (sgr < 0.4)').fetch()
The above asks LSD for the positions, PS1 and SDSS g and r magnitudes, and cosines of declination, for all PS1 objects cross-matched with SDSS and satisfying the '(sdss.r < 22.5) and (0.3 < sdss.g-sdss.r < 0.4)' criterion.
You don't need to fetch all the rows at once, but can iterate row by row:
for row in db.query('ra, dec, g, r, sdss.g as sg, sdss.r as sr, sg-sr as sgr, cos(radians(dec)) as cdec FROM ps1, sdss WHERE (sr < 22.5) & (0.3 < sgr) & (sgr < 0.4)').iterate():
... do something with this row ...
This is useful if the returned dataset is expected to be very large (like the one above), but inefficient. If whatever postprocessing you plan to do operates on ''vectors'' of data, where the vectors are columns from the table, it's better to ask the system to return the results in blocks of some reasonable (system defined) size. To do that, just add 'return_blocks=True' to the iterate call above.
If you're only interested in a particular area of the sky, you can specify the bounds (in space as well as time) to which to limit the search:
import lsd.bounds as b
bounds_xy = b.rectangle(180, 10, 181, 11) # (ra,dec)_bottomleft, (ra,dec)_topright
bounds_t = b.intervalset((53342, 53345), (53346, 53348)) # A set of time intervals (MJD)
rows = db.query('ra, dec, ap_mag, filterid from ps1_det').fetch(bounds=[(bounds_xy, bounds_t)])
Note that the spatial part of the bounds is actually an arbitrary polygon (an instance of Polygon class) -- you can construct more complicated bounds yourself (assuming you know how; more aux. functions to enable this are on the way). Also, you can specify a list of as many space/time bounds as you wish, to run the same query over different areas of the sky.
LSD query syntax is intentionally similar to SQL, but with important differences. Here's a (contrived) LSD query designed to show off the features we support:
SeLEct
ra, dec, g, r, sdss.g as sg, sdss.r as sr,
sg-sr as sgr, ffitskw(chip_hdr(chip_id), "ZPT_OBS") as zpt
FROM
ps1_obj, ps1_det, ps1_exp, sdss(outer)
WHERE
sr < 22.5
INTO
mytable
Important things to note (PLEASE READ THIS):
- SQL-ish keywords such as SELECT, FROM, WHERE, INTO and AS are not case sensitive. Anything else, however (e.g., the column names), is.
- In the list of columns in SELECT and WHERE clauses, each column can be a valid Python expression and include calls to Python functions. This is illustrated by the last (zpt) column in the above query. Note that the '{{{... AS zpt}}}' bit is not mandatory (but good practice). To select all columns, use '' (asterisk). To select all columns from a specific joined catalog, use 'catalogname.' (e.g., 'sdss.*')
- The FROM clause lists the tables one wants to join and query. In contrast to SQL, the joins in LSD are (for now) predefined. For example, a table containing static sky objects knows how to join to the table of detections from which it was built up (i.e., knows which key to use for the join). Similarly, LSD knows how to join a table of a cross-matched catalog (e.g., sdss in the above example) to the other tables. Internaly, LSD is capable of doing a wide variety of join's but that functionallity hasn't yet been exposed to the user (and probably won't, unless there's a demonstrated use case for it).
- By default, all tables listed in the FROM clause are joined using an INNER JOIN (in SQL parlance). That will cause the database to omit objects with no counterparts in the joined catalog. If you want to keep them, add the word 'outer' in parenthesis after the name of the catalog (e.g., '... sdss(outer) ...' in the example above). Any missing rows will be filled by zeros.
- LSD does not support SQLs three-valued logic (i.e., NULLs), and never will. Among casual users, SQL NULLs are root of more evil than good. Instead, we support implicit (boolean) columns named .ISNULL (e.g., sdss.ISNULL, ps1_det.ISNULL, etc...) that are True if the fields sourced from the given table are to be considered empty, and False otherwise. The fields of non-existing rows from OUTER join-ed tables are instead filled with zeros (or None in the case of BLOBs).
- We support columns that are arrays, and these can be indexed in the query. In the above example, the chip_hdr column is a 64-element array in the ps1_exp table, while the chip_id column is a per-detection column in the detections table. The 'ps1_exp(chip_id)' construct selects the chip on which the given detection was made.
- We support efficient storage of BLOBs (Binary Large Objects). In the above example, chip_hdr is a BLOB containing the entire FITS header of the .smf file from which the catalog was loaded.
- The WHERE clause is internally evaluated as a single Python expression, with columns being numpy arrays. Therefore, one does not have the usual logical operators (and, or, not) at their disposal but has to mimic these with Python's bitwise logical ops (&, |, ~). IMPORTANT CAVEAT: note that the bitwise operators have a higher precedence than most other operators; '{{{x == 0 & y == 2}}}' will therefore evaluate to {{{False}}} even if {{{x=0}}} and {{{y=2}}}. This is because {{{0 & y}}} is evaluated first (yielding 0), followed by {{{x == 0}}} (evaluating to True), followed by {{{True == 2}}} (yielding False). Instead, write the condition as '{{{(x == 0) & (y == 2)}}}' to obtain the expected behavior.
- One can redirect output to an existing or new table using the INTO statement. This is extremely useful to store temporary results when performing data analysis.
- Aggregates (e.g., COUNT(), SUM(), etc.), as well as other related SQL keywords and functionallity (HAVING, GROUP BY, etc..) are not supported. But we know how to, and have plans to do it in the future.
In a query, most or all clauses can be omitted. If the entire query is omitted, a '*' query is implied (meaning, fetch all columns of the current catalog). Note that this may be inefficient -- e.g., if you're only interested in {{{(l, b)}}} positions, there's no need to waste time fetching other 20+ columns as well.
In the previous example, the query had the following {{{FROM}}} clause:
.... FROM ps1_obj, ps1_det, ps1_exp, sdss(outer) ...
To form the result, the tables from the FROM clause are implicitly joined one to another, based on the information stored in join definition files (files ending in .join in the database directory).
Join definitions tell LSD how to [http://en.wikipedia.org/wiki/Relational_algebra#Joins_and_join-like_operators join] various catalogs together when given in a query. They are a substitute for the traditional SQL JOIN clause. Since there's usually only one way to correctly join two astronomical catalogs, it's better and more user friendly for these to be defined and fixed at the database level, instead of requiring the user to spell out the join criteria in every single query. LSD's engine ''is'' versatile enough to perform nearly arbitrary joins; it's just a matter of a) demonstrating an actual need for it and b) extending the query parser.
As an example, a reasonable way to join ps1_det to ps1_exp is to match the rows in ps1_exp that have the same exp_id as rows in ps1_det. That is what is spelled out in the corresponding {{{.ps1_det:ps1_exp.join}}} file. The effect is equivalent an SQL fragment:
... FROM ps1_det, ps1_exp WHERE ps1_exp.exp_id = ps1_det.det_id
In a more complex case of the object catalog (the ps1_obj table), there can be many detections that match to the same object, requiring an indirection table to properly join ps1_obj to ps1_det. This indirection table (typically named _ps1_obj_to_ps1_det), as well as the appropriate .join file, are constructed in the process of creating of the object catalog. The effect is similar to an SQL fragment:
... FROM ps1_obj, ps1_det, _ps1_obj_to_ps1_det as ind WHERE ps1_obj.obj_id = ind.obj_id and ind.det_id = ps1_det.det_id
In the case of LSD, this join is implicit whenever ps1_obj and ps1_det appear together in the FROM clause.
In case one wished to perform an OUTER JOIN (i.e., keep the rows from ps1_obj that don't have a counterpart in sdss), simply say 'sdss(outer)' instead just 'sdss' in the FROM clause. The effect will be the same as if we had an SQL query of the form:
... FROM ps1_obj
LEFT OUTER JOIN _ps1_obj_to_sdss ON _ps1_obj_to_ps1_det.id1 = ps1_obj.obj_id
LEFT OUTER JOIN sdss ON _ps1_obj_to_ps1_det.id2 = sdss.sdss_id
but with significantly less verbiage required.
LSD can cross-match "on the fly" two tables having positional indices (that is, (ra, dec)). These are most often main tables of different catalogs. As the number of catalogs in LSD grows, it becomes prohibitive to precompute cross-match joins for everything-to-everything; on-the-fly cross-matching resolves this problem.
Here is an example query returning only those that are blue (g-r<0.4), out of three closest SDSS neighbors within 10 arcseconds of every GALEX GR5 source:
SELECT
ra, dec, sdss.ra, sdss.dec, sdss._NR, sdss._DIST*3600
FROM
galex_gr5, sdss(matchedto=galexgr5,nmax=3,dmax=10)
WHERE
g-r < 0.4
Note the keywords in parenthesis after 'sdss' in the FROM clause. The {{{matchedto}} keyword designates the catalog against whose sources this one will be matched. {{{nmax}}} is the maximum number of neighbors to match, and {{{dmax}}} is the maximum distance (in arcseconds). {{{dmax}}} must not be greater than 30 arcsec (the neighbor cache margin). Also note the two pseudocolumns in the SELECT clause: _NR and _DIST. _NR gives the neighbor rank, and is zero if the object is the nearest neighbor, one if it's the first, two if the second, etc. _DIST column gives the distance of the two sources, in degrees (we've multiplied it by 3600 in the query above to convert it to arcseconds).
It is important to understand what gets matched to what. In a query such as:
.... FROM A, B(matchedto=A,nmax=N,dmax=D) ...
for each source from table A, up to N sources from table B not farther away than distance D, will be found and joined with its row. In the example with SDSS and GALEX, for each GALEX source we ask for up to 3 nearest SDSS sources within 10arcsec radius.
Secondly, {{understand that the cross-matching happens first, and WHERE clause gets executed second}}. In the SDSS-GALEX example above that means that out of the three nearest neighbors within 10 arcsec, only those with g-r<0.4 will be returned. It does NOT mean that out of all neighbors with g-r<0.4, up to three nearest one out to 10arcsecs get returned.
On Odyssey, LSD has been set up in {{{/n/panlfs/mjuric/lsd}}}. DO NOT EVEN TRY TO USE IT DIRECTLY FROM ONE OF THE ACCESS NODES; the glaciers will melt long before it actually does anything. Instead, request an interactive session on one of the compute nodes. If you have access to the {{{itc}}} queue, do:
bsub -Is -n 8 -R "span[ptile=8]" -q itc bash
This will give you an entire node (8 cores) to play with.
Alternatively, use the {{{interact}}} queue:
bsub -Is -n 4 -R "span[ptile=4]" -q interact bash
The maximum the interactive queue will give you is 4 cores. You have to let LSD know not to use more cores than that (the default is to use all cores available):
export NWORKERS=4
To set up LSD environment (assuming you're using {{{bash}}}), run:
source /n/panlfs/mjuric/lsd/lsd-setup-odyssey.sh
This will set up the paths and environment variables necessary to run LSD (including the location of the default database). The current database contains the following tables:
- ps1_det: Catalog of PS1 (3pi and MD) detections as of May 24th
- ps1_exp: Catalog of PS1 (3pi and MD) exposures (useful when joining with detections)
- ps1_obj: Static sky (a.k.a. object) catalog built from ps1_det
- ps1_recalib_mags_sdss: Table with recalibrated magnitudes using SDSS as "standards" (see [http://ps1sc.ifa.hawaii.edu/PS1wiki/index.php?title=Recalibrated_photometry_%28vs_SDSS%29] for details)
- sdss: SDSS DR8 catalog
- galex_gr5: GALEX GR5 all sky point source catalog
The Python sources of the LSD code you're using are in {{{$LSD_BIN/../lib/python/lsd}}}. These are useful as currently the only way to inspect the schema of these tables (i.e., what fields are available) is to take a peek at the (rather obvious) parts of where they're defined in {{{smf.py}}} (for PS1 related tables) and {{{sdss.py}}} (for SDSS).
Many of the examples mentioned here can be found in {{{$LSD_BIN/../share/lsd/examples}}} directory. You are encouraged to run and play with them.
-
LSD makes liberal use of memory-mapped temporary files for inter-process communication and data storage. These files must be created on a local disk, or otherwise serious performance degradation occurs. By default, LSD places these temporary files into the default TMP directory (as located by python's tempfile module). This can be overridden by setting an environment variable LSD_TEMPDIR.
-
Because {{{/n/panlfs}}} is a network disk, LSD will be slow when operated on from a single node on Odyssey (the I/O bandwidth per node is 100Mbytes/sec). Ultimately, when we get it to work in parallel over multiple nodes, it should be significantly faster (the total bandwidth to Odyssey's Lustre file system boxes is 2Gbytes/sec).
Follow the instructions for Odyssey, except for logging on to interactive nodes.
LSD is no longer supported on nebel. Use pan.
When/if (re)installing, make sure to undefine F77 (that is mysteriously set to f77.sh somewhere in IRAF directories).
First, get the latest sources from github (https://github.com/mjuric/lsd) and install the prerequisites.
Prerequisites (some of which can be downloaded from [http://nebel.rc.fas.harvard.edu/mjuric/lsd/sources/dependencies/ here]) are:
- Python 2.7.2 (older versions have a nasty [http://bugs.python.org/issue11875 bug] that affects LSD)
- g++ (any recent version should do)
- SWIG >= 1.3.36
- numpy >= 1.6 (older versions may work, but have serious bugs; eg. [http://projects.scipy.org/numpy/ticket/1163 loading large numbers] works incorrectly, there's a memory leak when np.all() is used with object ndarrays)
- [http://www.pytables.org/moin/Downloads pytables] >= 2.2.1. To build pytables, you will likely need:
- [http://www.hdfgroup.org/HDF5/ HDF5] >= 1.8.2
- [http://cython.org/#download cython] 0.14 (for PyTables)
- [http://code.google.com/p/numexpr/downloads/list numexpr] >=1.4.1 (for PyTables)
- [http://urlgrabber.baseurl.org/ urlgrabber] is needed for remote database functionallity
- [http://pycurl.sourceforge.net/ pycurl] (needed to build urlgrabber)
- [http://curl.haxx.se/libcurl/ libcurl] (needed for pycurl)
- [http://pycurl.sourceforge.net/ pycurl] (needed to build urlgrabber)
- [http://pyyaml.org/wiki/PyYAML PyYAML] >= 3.0
- [http://www.stsci.edu/resources/software_hardware/pyfits/Download pyfits] >= 2.3.1
- Polygon 2.0.2-mjuric
- You must use our custom version (available from [http://nebel.rc.fas.harvard.edu/mjuric/lsd/sources/dependencies/ here]), as it has been modified to work efficiently on numpy arrays. Before installing, please edit {{{setup.py}}} to modify the {{{numPyIncludePath}}} as necessary.
- pyslalib
- download the [http://github.com/scottransom/pyslalib/tarball/master github master version from Scot Ransom]
- Approximate Nearest Neighbor (ANN) 1.1.2 library
- File 'Make-config' needs file to be modified to include -fPIC flag in CFLAGS; or get the fixed version from [http://nebel.rc.fas.harvard.edu/mjuric/lsd/sources/dependencies/ here]
- scikits-ann
- the stock version is out of date and won't run correctly on 64bit machines -- get the fixed version from [http://nebel.rc.fas.harvard.edu/mjuric/lsd/sources/dependencies/ here]
- this package needs SWIG 1.3.36 or higher to compile (otherwise you'll get 'const char* vs char*' compile-time errors)
- this package needs setuptools (your distribution may already have them installed, otherwise look [http://nebel.rc.fas.harvard.edu/mjuric/lsd/sources/dependencies/ here])
- If your ANN libary is not in /usr/local/bin, modify {{{site.cfg}}} accordingly.
- scipy
- scipy is required to run the {{{ps1-*}}} scripts.
- Note: To have scipy 0.8 compile with Python 2.7, you'll need to apply the following simple patch http://projects.scipy.org/scipy/changeset/6646
- GSL
- Gnu Scientific Library (GSL) is needed by {{{ps1-*}}} scripts; any recent version should do
Let me know if anything is missing from the list.
Once you download and install the prerequisites, unpack the LSD tarball and build the required module (make sure Python 2.7 is in your path!!):
python setup.py install --home=<dir_where_you_want_to_set_it_up>
Add the <dir_where_you_want_to_set_it_up>/lib/python to your PYTHONPATH:
export PYTHONPATH="<dir_where_you_want_to_set_it_up>/lib/python:$PYTHONPATH"
For convenience, you may also want to add the <dir_where_you_want_to_set_it_up>/bin directory to your PATH as well.
Note: If you're using the setup on pan, all of this has already been done for you.
First, create a directory that will contain all database files, and let LSD know this will be your default database. E.g.:
mkdir db
export LSD_DB=db
Then import a list of .smf files using an analog of:
lsd-import-smf -c 3pi ps1_det ps1_exp ~/ps1-sas-smfs/*
where the first argument is the ID of the sub-survey, the last argument is the list of .smf files to import, and ps1_det and ps1_exp are the tables that will be created with the information about detections (ps1_det) and individual exposures (ps1_exp).
Proceed to create an object catalog: ``bash lsd-make-object-catalog -c ps1_obj ps1_det ps1_exp
which will identify all detection falling within 1arcsec of each other as the same object, and store its information in ps1_obj table.
If you have access to SDSS "sweep" files, import the SDSS catalog and cross-match it to PS1:
```bash
lsd-import-sdss -c sdss /raid14/sweeps/sdss3/2009-11-16.v2/301
lsd-xmatch ps1_obj sdss
where the final argument in to the first command is the directory where (gzipped) sweep files can be found.
After import, try running a few simple query using {{{lsd-query}}}:
lsd-query --bounds='rectangle(180, 10, 190, 20)' 'SELECT ra, dec FROM ps1_obj' > result.txt
where the bounds are lower left and top right corner in ra/dec.
Or, to get the output as a FITS binary table, using bounds in Galactic coordinates:
lsd-query --format=fits --bounds='rectangle(180, 30, 190, 40, coordsys="gal")' 'SELECT ra, dec FROM ps1_obj'
You may also compute the covered footprint map, using:
lsd-footprint ps1_det foot.fits
or, to manually set the binning resolution (in arcminutes):
lsd-footprint --resolution=1 ps1_obj foot.fits
Tip: type any of these commands with no arguments (or --help) to see all the available options.
LSD is optimized for fast queries and efficient parallel iteration through a positionally (lon, lat) and temporally (time) indexed^(*)^ set of rows. The design and some of the terminology have been inspired by Google's BigTable distributed database and the MapReduce programming model. The goal was to build a system that will scale to LSST-sized datasets (>10^11^ rows), and be possible to distribute over a cluster of machines.
Logically, an LSD database consists of one or more tables with a large number (billions) of rows, as well as numerous (~tens to hundreds) columns. A single row in and LSD table is typically an object on the sky (a static sky table), or a detection in space and time (a temporal table). The only difference between the two is whether a time coordinate is associated with an object, or not; static sky objects implicitly exist from -inf < t < inf.
The tables are internally vertically split into column groups (cgroups) -- groups of columns with related data (e.g., astrometry, photometry, survey metadata, etc.). They're further horizontally partitioned into equal-area space and time cells (HEALPix pixels on the sky (typically, ~3deg^2^ each), equal intervals in time (typically, 1 day per cell)). The horizontal partitioning to cells maps to a directory tree on the disk -- every space/time cell has a unique directory to which it maps (similar to DVO). The data in each cell are stored in separate (compressed, checksummed) HDF5 tables in the cell's directory, one per column group. We call these tablets. Internally, we use PyTables to access and manage it. LSD's design, in principle, allows this directory structure to be distributed onto a cluster of computers with no common storage, where each node stores and operates on a subset of cells. While not currently implemented, this capability was a design requirement and is planned for the future.
Each tablet also contains a copy of all rows that are within a margin (typically, 30 arcsec) outside the cell's boundaries (the neighbor cache). This allows for efficient neighbor lookup (or, for example, the application of spatial matched filters) without the need to access tablets in neighboring cells. To facilitate parallelization and distribution, all LSD operations are internally always performed on cell-by-cell basis, with no inter-cell communication.
The catalogs are partitioned into cells in space and time. The spatial partitioning is performed by projecting and recursively bisecting a ''butterfly HEALPix'' (''BHpix'') map of (ra, dec) to a desired subdivision level, ''k''. Typically, k=6 or 7, corresponding to values HEALPix NSIDE parameter of 16 or 32, with pixel scales of 13.42deg^2^ and 3.35deg^2^, respectively. The subdivision level is recordes in each table's schema.conf file (see below).
A visualization of a BHpix analog from [http://www.progonos.com/furuti/MapProj/Dither/ProjPoly/projPoly2.html prognos.com], with the north pole slightly offset from the center of projection:
In our case, the center is exactly at the north celestial pole, while the south pole projects to the four corner points. The projection is obtained from a "classical" HEALPix map by cutting it along the ra=(0, 90, 180, 270) meridians, and rearranging the four pieces so they meet at the North Pole. For database applications, it's advantage over HEALPix is a) less discontinuities around the North pole, and b) that any well sampled non-self intersecting polygon on the sphere, when projected to BHpix, remains a non-self intersecting polygon that encloses the same area. The latter property allows for easy translation of arbitrary queries on the sphere into BHpix projection space.
Note that BHpix pixels are still HEALPix pixels, even though we prefer to index them two-dimensionally (x,y), instead of a 1D index. The only exceptions are pixels in the equatorial region on ra=(0, 90, 180, 270) meridians: these get split into halves in BHpix (and thus each half gets its own BHpix (x,y) index).
NOTE: We plan to transition to [http://skyserver.org/htm/ Hierarhical Triangular Mesh] (HTM) pixelization scheme in not to distant future.
LSD employs a simple file layout. At the top level, the "database" is a directory, containing the tables (directories themselves), and "join definitions" (files ending in .join). Any directory assumed to be a table, and any file ending in .join is assumed to contain a join definition. Everything else is ignored. Example:
[mjuric@nebel src]$ ls -la db/
drwxrwxr-x 3 mjuric mjuric 37 Dec 3 02:03 ps1_det
drwxrwxr-x 4 mjuric mjuric 49 Dec 3 02:03 ps1_exp
drwxrwxr-x 3 mjuric mjuric 37 Dec 3 02:09 ps1_obj
drwxrwxr-x 3 mjuric mjuric 37 Dec 3 02:09 _ps1_obj_to_ps1_det
drwxrwxr-x 3 mjuric mjuric 37 Dec 3 03:27 _ps1_obj_to_sdss
drwxrwxr-x 3 mjuric mjuric 37 Dec 3 02:13 sdss
-rw-rw-r-- 1 mjuric mjuric 140 Dec 3 02:03 .ps1_det:ps1_exp.join
-rw-rw-r-- 1 mjuric mjuric 150 Dec 3 03:27 .ps1_obj:sdss.join
-rw-rw-r-- 1 mjuric mjuric 153 Dec 3 03:27 .ps1_obj:_ps1_obj_to_sdss.join
-rw-rw-r-- 1 mjuric mjuric 153 Dec 3 03:27 .sdss:_ps1_obj_to_sdss.join
-rw-rw-r-- 1 mjuric mjuric 164 Dec 3 02:09 .ps1_det:_ps1_obj_to_ps1_det.join
-rw-rw-r-- 1 mjuric mjuric 164 Dec 3 02:09 .ps1_obj:ps1_det.join
-rw-rw-r-- 1 mjuric mjuric 164 Dec 3 02:09 .ps1_obj:_ps1_obj_to_ps1_det.join
Each table subdirectory contains at least a ''schema.cfg'' file, describing the table schema and layout, and a 'tablets' subdirectory with the directory tree structure holding HDF5 tablets. A table can also have a 'files' subdirectory, keeping the externally stored BLOBs (typically, large files that are unfeasible to keep within the HDF5 files itself). Examples:
[mjuric@nebel src]$ ls -la db/sdss
total 28
drwxrwxr-x 3 mjuric mjuric 37 Dec 3 02:13 .
drwxrwxr-x 8 mjuric mjuric 4096 Dec 3 03:27 ..
-rw-rw-r-- 1 mjuric mjuric 4379 Dec 3 03:26 schema.cfg
drwxrwxr-x 6 mjuric mjuric 66 Dec 3 02:14 tablets
[mjuric@nebel src]$ ls -la db/ps1_exp
total 32
drwxrwxr-x 4 mjuric mjuric 49 Dec 3 02:03 .
drwxrwxr-x 8 mjuric mjuric 4096 Dec 3 03:27 ..
drwxrwxr-x 3 mjuric mjuric 16 Dec 3 02:03 files
-rw-rw-r-- 1 mjuric mjuric 6129 Dec 3 02:09 schema.cfg
drwxrwxr-x 3 mjuric mjuric 21 Dec 3 02:03 tablets
The directory structure under {{{tablets}}} reflects the hierarchical nested Butterfly HEALPix (''BHpix''; see above) spatial partitioning of the catalog. Each branch of the directory tree ends with a 'static' subdirectory (for static catalogs), or one named 'TXXXXX', where XXXXX is the MJD of a temporal cell. An example:
[mjuric@nebel src]$ du -a db/sdss/tablets | head
1668 db/sdss/tablets/+0.5-0.5/+0.75-0.25/+0.625-0.375/+0.6875-0.3125/+0.65625-0.34375/+0.640625-0.359375/+0.632812-0.367188/static/sdss.main.h5
2844 db/sdss/tablets/+0.5-0.5/+0.75-0.25/+0.625-0.375/+0.6875-0.3125/+0.65625-0.34375/+0.640625-0.359375/+0.632812-0.367188/static/sdss.photometry.h5
88 db/sdss/tablets/+0.5-0.5/+0.75-0.25/+0.625-0.375/+0.6875-0.3125/+0.65625-0.34375/+0.640625-0.359375/+0.632812-0.367188/static/sdss.survey.h5
4604 db/sdss/tablets/+0.5-0.5/+0.75-0.25/+0.625-0.375/+0.6875-0.3125/+0.65625-0.34375/+0.640625-0.359375/+0.632812-0.367188/static
4608 db/sdss/tablets/+0.5-0.5/+0.75-0.25/+0.625-0.375/+0.6875-0.3125/+0.65625-0.34375/+0.640625-0.359375/+0.632812-0.367188
1708 db/sdss/tablets/+0.5-0.5/+0.75-0.25/+0.625-0.375/+0.6875-0.3125/+0.65625-0.34375/+0.640625-0.359375/+0.632812-0.351562/static/sdss.main.h5
2908 db/sdss/tablets/+0.5-0.5/+0.75-0.25/+0.625-0.375/+0.6875-0.3125/+0.65625-0.34375/+0.640625-0.359375/+0.632812-0.351562/static/sdss.photometry.h5
92 db/sdss/tablets/+0.5-0.5/+0.75-0.25/+0.625-0.375/+0.6875-0.3125/+0.65625-0.34375/+0.640625-0.359375/+0.632812-0.351562/static/sdss.survey.h5
4712 db/sdss/tablets/+0.5-0.5/+0.75-0.25/+0.625-0.375/+0.6875-0.3125/+0.65625-0.34375/+0.640625-0.359375/+0.632812-0.351562/static
4716 db/sdss/tablets/+0.5-0.5/+0.75-0.25/+0.625-0.375/+0.6875-0.3125/+0.65625-0.34375/+0.640625-0.359375/+0.632812-0.351562
[mjuric@nebel src]$ du -a db/ps1_det/tablets | head
6696 db/ps1_det/tablets/-0.5+0.5/-0.25+0.25/-0.375+0.375/-0.3125+0.3125/-0.34375+0.28125/-0.328125+0.265625/-0.335938+0.257812/T55248/ps1_det.astrometry.h5
4816 db/ps1_det/tablets/-0.5+0.5/-0.25+0.25/-0.375+0.375/-0.3125+0.3125/-0.34375+0.28125/-0.328125+0.265625/-0.335938+0.257812/T55248/ps1_det.detxy.h5
2744 db/ps1_det/tablets/-0.5+0.5/-0.25+0.25/-0.375+0.375/-0.3125+0.3125/-0.34375+0.28125/-0.328125+0.265625/-0.335938+0.257812/T55248/ps1_det.moments.h5
36 db/ps1_det/tablets/-0.5+0.5/-0.25+0.25/-0.375+0.375/-0.3125+0.3125/-0.34375+0.28125/-0.328125+0.265625/-0.335938+0.257812/T55248/ps1_det.kron.h5
4152 db/ps1_det/tablets/-0.5+0.5/-0.25+0.25/-0.375+0.375/-0.3125+0.3125/-0.34375+0.28125/-0.328125+0.265625/-0.335938+0.257812/T55248/ps1_det.photometry.h5
6704 db/ps1_det/tablets/-0.5+0.5/-0.25+0.25/-0.375+0.375/-0.3125+0.3125/-0.34375+0.28125/-0.328125+0.265625/-0.335938+0.257812/T55248/ps1_det.quality.h5
25156 db/ps1_det/tablets/-0.5+0.5/-0.25+0.25/-0.375+0.375/-0.3125+0.3125/-0.34375+0.28125/-0.328125+0.265625/-0.335938+0.257812/T55248
764 db/ps1_det/tablets/-0.5+0.5/-0.25+0.25/-0.375+0.375/-0.3125+0.3125/-0.34375+0.28125/-0.328125+0.265625/-0.335938+0.257812/T55249/ps1_det.astrometry.h5
544 db/ps1_det/tablets/-0.5+0.5/-0.25+0.25/-0.375+0.375/-0.3125+0.3125/-0.34375+0.28125/-0.328125+0.265625/-0.335938+0.257812/T55249/ps1_det.detxy.h5
312 db/ps1_det/tablets/-0.5+0.5/-0.25+0.25/-0.375+0.375/-0.3125+0.3125/-0.34375+0.28125/-0.328125+0.265625/-0.335938+0.257812/T55249/ps1_det.moments.h5
The numbers forming the path are the centers, (x, y), of recursively subdivided BHpix pixels. The BHpix domain (and its single k=0 level pixel) stretches from [-1, 1] x [-1, 1]. Every subsequent level in the directory hierarchy corresponds to a center of a k=1,2,3,...level pixel, formed by subdivision into four of the pixel one level up.
If there's no data falling within a particular pixel, at any level, its directory (nor any of directories that would have been below it) will exist. For example, if a catalog contained no objects in 0 < ra < 90deg range, the directory db/ps1_det/tablets/+0.5+0.5 (and the entire subtree below it) would not exist. This scheme allows one to easily locate the subset of cells containing data for any polygonal footprint in BHpix projection.
The formats of schema.cfg and .join files are intentionally left undocumented, as they're subject to change.
A typical LSD query, such as one posed through {{{lsd-query}}} command line utility, or via {{{DB.query}}} in the Python API, passes through the following stages:
- The query is parsed to determine which tables it refers to. A "join tree" is constructed, resolving how to join the rows of each table to one another.
- We next find the list of all cells that overlap with space/time bounds given by the user.
- We launch N ''worker processes'', where N is typically equal to the number of cores on the machine where you run LSD. The workers fetch and process cells from the list in the previous step, until there are none left.
- Each worker fetches a cell, loads its data, and executes the query on it constructing a set of result rows.
- Depending on what the user requested, these are then returned to the master, or passed on to the user's functions (map/reduce ''kernels''; see the next section)
- The master collects the output from the workers, and returns it to the user.
As task distribution is accomplished on a per-cell basis, given sufficient computing capability LSD can launch over >10^4^ tasks to simultaneously process large (all-sky) queries.
The native high-performance programming model of LSD is LSD/MapReduce. As discussed above, the execution of queries is distributed over many worker tasks that funnel back to the user the rows generated by the query. The user then does further postprocessing or data analysis with the retrieved result set.
For many types of applications, however, computationally-intensive postprocessing can also be partitioned and distributed to the workers. In such a scenario, instead of returning rows to the master, each worker passes them on to a chain of (user defined) Python functions (kernels) for further processing. Only the output of the last kernel in the chain is then returned to the master. In this way, the user gains a >10x performance gain over what would be achievable from a single-threaded program.
A trivial example of this is a program designed to count the rows returned by a query. The user constructs a kernel that only returns the count of rows in each cell returned by the query. These are then returned to the master process. The master process collects them, and computes the final tally. For a less trivial example, a kernel can be constructed to perform Bayesian classification of light curves of objects within each static sky cell, and instead of returning the full light curves of all (including non-varying) objects, just return the most interesting objects.
Importantly, the distribution and transmission/reception of results is completely transparent and entirely handled by LSD. The user needs to provide LSD with only two things:
- The query to execute (on each cell within the bounds)
- One or more kernels that will operate on the result of the query.
An example program that counts rows returned by a query is given below. We will use it to demonstrate the most important points in writing kernels. Note that this (and other) examples assume you have a database named 'db' in the current directory; if you're using the LSD setup on nebel, you can satisfy this by making a symlink named 'db' to wherever $LSD_DB is pointing.
from lsd import DB
def row_counter_kernel(qresult):
for rows in qresult:
yield len(rows)
db = DB('db')
query = db.query("SELECT obj_id FROM ps1_obj, sdss")
total = 0
for subtotal in query.execute([row_counter_kernel]):
total += subtotal
print "The total number of rows returned by the query '%s' is %d" % (query, total)
We begin by importing {{{DB}}} from the LSD Python package. Next follows the definition of our kernel: the function named {{{row_counter_kernel}}}. After that, there's the opening of the database, and construction of the "query object" (note that at this point the query hasn't been executed yet!).
Then comes the loop where the query gets executed: the {{{query.execute()}}} call does the trick. Note that, as a paremeter to that call, we pass a list of kernels to execute. In this case, it's just one kernel -- {{{row_counter_kernel}}}. Transparently to the user, {{{query.execute}}} computes the cells over which to run, launches the workers, and begins feeding them the IDs of cells that they should process. The workers (working in parallel), execute the query on the data in each cell and pass the results to our first kernel ({{{row_counter_kernel}}}) as its first parameter ({{{qresult}}}).
The {{{qresult}}} object is a Python [http://docs.python.org/glossary.html#term-iterable ''iterable'']. You can think of it as something that you can stick into a for loop, and make it yield other objects. In the case of {{{qresult}}}, those objects are ''blocks of rows'' that are the result of query execution in that cell; each block is a an instance of {{{ColGroup}}}, a class with API and functionallity equivalent to {{{numpy.ndarray}}} record array. Our kernel above iterates over those blocks, and computes the "length" of each (i.e., the number of rows).
Our kernel informs LSD of what it computed not by {{{return}}}-ing it, but by {{{yield}}}-ing it. A good description of what Python yield is is given [http://stackoverflow.com/questions/231767/the-python-yield-keyword-explained#answer-231855 here] -- I strongly advise you to read it if this is the first time you encounter this construct. For the lazy, yield "returns" the value to the caller, but with the understanding that a) the caller has called it from a {{{for}}} loop, and b) that the execution will return to the next line after {{{yield}}} on the next iteration of the for loop. Again, I urge you to read the previous link. When you do, you'll see that our kernels are in fact ''Python generators'' (of values).
The value that our kernel yields gets returned to the LSD framework that called it. LSD transparently funnels all those values (from different workers) back to the main program (to the {{{for}}} loop from which we called {{{query.execute}}}). The effect is that {{{len(rows)}}} that was {{{yield}}}-ed by the kernel, gets assigned to {{{subtotal}}}. From then on, {{{subtotal}}} is added to {{{total}}} and once the loop ends (once all workers have processed all cells), the loop is exited and the result is printed out.
When we execute the program, here's what we get:
[mjuric@nebel src]$ ./examples/count_rows.py
[m/r (67 elem): ::::::::::::::::::::] 0.84 sec
The total number of rows returned by the query 'SELECT obj_id FROM ps1_obj, sdss' is 738875
The nice progress bar and execution timing are transparently provided by LSD. The progress bar also informs us that the query has been run over 67 cells (processing elements), meaning it could in principle be parallelized over 67 cores. Note: If you're wondering, the total of 738875 objects from ps1_obj that have been matched to sdss (that is what the query above is counting) is small because the database we queried contains only the Small Area Survey.
For comparison, what would happen without parallelization? If {{{NWORKERS}}} environment variable is set, LSD framework will use it to determine how many workers to launch. With NWORKERS=1:
[mjuric@nebel src]$ NWORKERS=1 ./examples/count_rows.py
[m/r (67 elem): ::::::::::::::::::::] 4.88 sec
The total number of rows returned by the query 'SELECT obj_id FROM ps1_obj, sdss' is 738875
[mjuric@nebel src]$
So the parallelization (that is entirely transparent to the user) to 16 threads (over 8 physical cores) brought us ~5.8x speedup. Note also that the same code will run unmodified when distributed over thousands of cores on a cluster (once that feature is added to LSD).
As mentioned before, more than one kernel can be passed to {{{query.execute}}} call. In that case, the values that are yielded from the n-th kernel are passed on to the (n+1)st kernel, with some restrictions on how the data the n-th kernel yields must look like, and some additional processing of the data by the framework, before it passes them to the (n+1)st kernal. In fact, it is the "additional processing" that gives the special usefullness to chaining of kernels.
The first kernel in the chain is typically referred to as the mapper. The others are referred to as reducers. Let us first consider the case of two chained kernels - a mapper, and a reducer.
Whenever kernels are chained, we restrict the type of data ''all but the last kernel'' must be yield: it must always be a (key, value) pair (technically, a Python tuple). What the key is, and what the value is, are almost irrelevant (see below) -- the important point is that there must be a pair of them. For example, the {{{row_counter_kernel}}} in the previous example is not a valid candidate for chaining, as it returns just a single value (a number). This restriction exists because (in contrast to the previous example, where it just returned the result to the user), the LSD framework sitting between the chained kernels will inspect the (key, value) pairs that the mappers are producing, and regroup them by key. For example, if we had the mappers that yielded the following list of key/value pairs:
("star", 20), ("star", 45), ("qso", 442), ("galaxy", 5443), ("star", 322), ("qso", 11)
the LSD framework would regroup these into:
("star", [20, 45, 322])
("qso", [442, 11])
("galaxy", [5443])
It is those regrouped values that then get be passed on to the next kernel (the reducer). So the reducer, as its first argument, gets the key-values pair formed by the regrouping. This is commonly known as "MapReduce", a distributed computing model employed and popularized by Google (see [http://en.wikipedia.org/wiki/MapReduce the Wikipedia article] and the [http://labs.google.com/papers/mapreduce.html Dean & Ghemawat (2004) slides and paper] for details.
Virtually any large-scale data processing task can be formulated in terms of MapReduce kernels. If there are more than two kernels, each subsequent kernel serves as a reducer to the data output by the previous one, and each kernel feeding the next reducer must yield (key, value) pairs. Only the last kernel in the chain (the one whose data will be returned to the user unmodified) is free to yield whatever it pleases.
A simple example will serve to illustrate just how useful is this model.
Only [http://docs.python.org/glossary.html#term-hashable] hashable objects are permitted to serve as the keys. In short, these are the objects that:
- have a hash() method that returns a value that does not change within that object's lifetime
- can be tested for equality using == (i.e., have a eq() method), and
- if x == y, it is implied that x.hash() == y.hash(). The converse doesn't have to be true.
Built-in immutable types (str, int, long, bool, float, tuple) are hashable. We recommend you always use these types as the key, because it's easy to make a mistake otherwise and accidentally use objects that satisfy 1) and 2), but not 3). An example are records derived from {{{numpy.recarray}}}:
In [266]: x1=np.array([1,1]); x2=np.array(['a','a']); r = np.core.records.fromarrays([x1,x2],names='a,b')
In [270]: r[0].__hash__()
Out[270]: 4444286
In [271]: r[1].__hash__()
Out[271]: -8070450532243484546
In [272]: r[0] == r[1]
Out[272]: True
In [274]: set([r[0], r[1]])
Out[274]: set([(1, 'a'), (1, 'a')])
In particular, note the set at the last line appears to have two elements that are the same (!).
As a simple example, here is a complete LSD/MapReduce program that computes the distribution of SDSS sources with respect to declination:
from lsd import DB
import numpy as np
def mapper(qresult, bins):
for rows in qresult:
counts, _ = np.histogram(rows['dec'], bins)
for (bin, count) in zip(bins, counts):
if count != 0:
yield (bin, count)
def reducer(kv):
bin, counts = kv
yield (bin, sum(counts))
db = DB('db')
query = db.query("SELECT dec FROM sdss")
ddec = 10.
bins = np.arange(-90, 90.0001, ddec)
hist = {}
for (bin, count) in query.execute([(mapper, bins), reducer]):
hist[bin + ddec/2] = count
for binctr in sorted(hist.keys()):
print "%+05.1f %10d" % (binctr, hist[binctr])
print "Total number of objects:", sum(hist.values())
In the above:
- The Map step: LSD executes the mapper on each cell, passing it all rows (in blocks, as described earlier) within that cell that satisfied the query. Each mapper creates a histogram vs. declinations of the passed rows, and yields the non-zero bins as key-value pairs (Python tuples). The key is a bin, and value is the number of stars in that bin.
- Behind the scenes, LSD framework collects the outputs of all mappers and regroups them by key (in the example above, by bin).
- The Reduce step: A key, and the list of values that were associated with that key are passed to a reducer as its first parameter ({{{kv}}}). In our case, the two will be the bin (the key), and the list counts belonging to that bin that were computed in each cell (the values). Our reducer simply sums up the counts and yields back the {{{(bin, sum(counts))}}} pair back to the framework.
- The framework funnels these back to the user with no further processing. Those appear as {{{(bin, count)}}} values in the for loop that called {{{query.execute}}}, where we store them into the {{{hist}}} hash-map (a Python dict).
- Finally, we print out the (sorted) results, as well as compute the total number of objects.
Here is the actual output:
[mjuric@nebel src]$ ./examples/latitude_histogram.py
[7033 el.]::::::::::::::::::::[12 el.]++++++++++++++++++++> 25.02 sec
-25.0 1616939
-15.0 10253928
-05.0 32180680
+05.0 54744216
+15.0 44663275
+25.0 38728332
+35.0 30418618
+45.0 27443296
+55.0 21233697
+65.0 13315608
+75.0 3289278
+85.0 786345
Total number of objects: 278674212
Note the first two numbers in the progress bar on top. They are the number of mappers (the former) and the number of reducers (the latter) that were run. There were 7033 mappers (equal to the number of cells of the catalog). But there were only 12 reducers, because once the mapper regrouped the values returned by mappers by keys, there were only 12 unique keys (12 non-empty bins of declination). Therefore, the framework could have (in principle) parallelized the map step over 7033 cores, and the reduce step over 12. This illustrates the scalability of the MapReduce approach.
Extensions of this example into something more usable are fairly straightforward -- i.e., we leave it to the reader to write a map/reduce pair to compute color-magnitude diagrams over an arbitrary pixelization of the sky.
Finally as a side note, note that the high-level query interface functions (e.g., {{{db.query(...).fetch()}}} or {{{db.query(...).iterate()}}})) are just an "identity" map with no reduce step -- all rows are simply returned to the user.
Set up on Harvard Odyssey clusterAdd BLOB and array columns (for headers/per-chip info)Add the time componentWrite .smf file importerMake queries work for static-dynamic catalog joinsBuild LSD object catalog from the detection catalogSupport writing into tables (INTO)Generalize tables structure and JOINsImplement xmatching of detections to objects- Build the LSD catalogs from .smf files on Odyssey
Include Doug's flat field corrections- Include Eddie's MDF-derived per-night zero-point correction when he makes them available.
- Compute average, zero-point corrected magnitudes for the object catalog
- Build MPI-based IPC, allowing us to run distributed on Odyssey