Skip to content

Future Concepts

Mario Juric edited this page Jan 1, 2017 · 1 revision

Various ideas for future improvements to LSD.

Overall Roadmap

  • Switch to variable-sized cells and HTM
  • Switch to MonetDB (hard?) or sqlite (easy) as backend, to gain better support for SQL
  • Add an query-rewriter

Supporting ''and, or, not'' in queries

Right now we don't support expressions like {{{0 < x and x < 1}}} because Python does not allow the logical operators to operate on and return numpy arrays. So this has to be emulated by the user with a clumsy and error prone {{{(0 < x) & (x < 1)}}} construct.

A possibly simple way of supporting true logical ops. would be using {{{module ast}}} to parse the {{{WHERE}}} expression, locating all logical ops, and replacing them by bitwise operators (and comparisons to 1 and 0). We should try this out.

Supporting aggregates

Aggregates (like COUNT(), MEAN(), etc..) fit naturally into M/R framework.

Concept I:

  • Allow any python callable to be an aggregate, if it has an 'AGGREGATE = True' attribute defined.
  • Inspect the where clause for aggregates (using {{{ast}}} to find all callables, and inspecting their members). If an aggregate is found, any non-aggregates implicitly become "GROUP BY" columns
  • The mapper will split input rows into blocks keyed by the group-by columns
  • The reducer will call the aggregates on each aggregate column
    • The aggregates take an iterable that yields blocks of the aggregate column data (the ones emitted by the mapper)

Concept II (optimized):

  • Allow any python callable to be an aggregate, if it has an 'AGGREGATE = True' attribute defined.
  • If the callable has no attribute "MAPREDUCE = True", work as in Concept I
  • Inspect the where clause for aggregates (using {{{ast}}} to find all callables, and inspecting their members). If an aggregate is found, any non-aggregates implicitly become "GROUP BY" columns
  • The mapper will split input rows into blocks, set an attribute "STAGE='map'" on the aggregates, and call them on the blocks, keyed by the group-by columns
  • The reducer sets attribute "STAGE='reduce'" and calls the aggregates on data from each aggregate column

Concept II allows the aggregates to compute partial results in the mappers, in cases where that's possible (e.g., computing sums, or averages). The behavior of Concept I is required for things like medians (where one needs to have all the rows available).

Python language extensions

Extend python syntax to make LSD M/R queries easier to write.

############################################################
#!/usr/bin/env lsd-python

bands = 'ugriz'

def myfunction():
	select:
		mean, nmag_ok
	from:
		ps1_obj, cal_mags
	map:
		bits = np.empty(len(mean), dtype=bool)
		for i in xrange(5):
			bits |= (nmag_ok(4-i) > 0) << i
		for ubit in uniq(bits):
			yield ubit, sum(bits == ubit)
	reduce (ubit, values):
		yield ubit, sum(values)
	iterate as (ubit, count):
		s = ''
		for bit in xrange(5):
			s += bands[bit] if ubit & 2**bit else '-'
		print s, count

	
############################################################

Check [http://code.activestate.com/recipes/546539/ this] and [http://fiber-space.de/wordpress/ EasyEncoding] for ideas how to do this.

Another LSD/MapReduce Example

import lsd
import pyfits
import numpy as np

def coverage_mapper(qresult, dx, filter):
        for rows in qresult:
                lon, lat = rows.as_columns()

                # Pixel indices
                i = (lon / dx).astype(int)
                j = ((90 - lat) / dx).astype(int)

                # Cut out the patch with data
                (imin, imax, jmin, jmax) = (i.min(), i.max(), j.min(), j.max())
                w = imax - imin + 1
                h = jmax - jmin + 1
                i -= imin; j -= jmin

                # Binning
                sky = np.zeros(w*h)
                idx = np.bincount(j + i*h)
                sky[0:len(idx)] = idx
                sky = sky.reshape((w, h))

                yield (sky, imin, jmin)

dx = 1./3600.
width  = int(np.ceil(360/dx))
height = int(np.ceil(180/dx)+1)

sky = np.zeros((width, height))

db = lsd.DB("db")
q = db.query("select ra, dec from ps1_det")
for (patch, imin, jmin) in q.execute([(_coverage_mapper, dx, filter)]):
        sky[imin:imin + patch.shape[0], jmin:jmin + patch.shape[1]] += patch

pyfits.writeto(output, sky[::-1,::-1].transpose(), clobber=True)