"""
Data structures for sparse float data. Life is made simpler by dealing only
with float64 data
"""
from __future__ import division
# pylint: disable=E1101,E1103,W0231,E0202

import warnings
from pandas.compat import lmap
from pandas import compat
import numpy as np

from pandas.core.dtypes.missing import isna, notna
from pandas.core.dtypes.cast import maybe_upcast, find_common_type
from pandas.core.dtypes.common import _ensure_platform_int, is_scipy_sparse

from pandas.core.common import _try_sort
from pandas.compat.numpy import function as nv
from pandas.core.index import Index, MultiIndex, _ensure_index
from pandas.core.series import Series
from pandas.core.frame import (DataFrame, extract_index, _prep_ndarray,
                               _default_index)
import pandas.core.algorithms as algos
from pandas.core.internals import (BlockManager,
                                   create_block_manager_from_arrays)
import pandas.core.generic as generic
from pandas.core.sparse.series import SparseSeries, SparseArray
from pandas._libs.sparse import BlockIndex, get_blocks
from pandas.util._decorators import Appender
import pandas.core.ops as ops


_shared_doc_kwargs = dict(klass='SparseDataFrame')


class SparseDataFrame(DataFrame):
    """
    DataFrame containing sparse floating point data in the form of SparseSeries
    objects

    Parameters
    ----------
    data : same types as can be passed to DataFrame or scipy.sparse.spmatrix
    index : array-like, optional
    column : array-like, optional
    default_kind : {'block', 'integer'}, default 'block'
        Default sparse kind for converting Series to SparseSeries. Will not
        override SparseSeries passed into constructor
    default_fill_value : float
        Default fill_value for converting Series to SparseSeries
        (default: nan). Will not override SparseSeries passed in.
    """
    _subtyp = 'sparse_frame'

    def __init__(self, data=None, index=None, columns=None, default_kind=None,
                 default_fill_value=None, dtype=None, copy=False):

        # pick up the defaults from the Sparse structures
        if isinstance(data, SparseDataFrame):
            if index is None:
                index = data.index
            if columns is None:
                columns = data.columns
            if default_fill_value is None:
                default_fill_value = data.default_fill_value
            if default_kind is None:
                default_kind = data.default_kind
        elif isinstance(data, (SparseSeries, SparseArray)):
            if index is None:
                index = data.index
            if default_fill_value is None:
                default_fill_value = data.fill_value
            if columns is None and hasattr(data, 'name'):
                columns = [data.name]
            if columns is None:
                raise Exception("cannot pass a series w/o a name or columns")
            data = {columns[0]: data}

        if default_fill_value is None:
            default_fill_value = np.nan
        if default_kind is None:
            default_kind = 'block'

        self._default_kind = default_kind
        self._default_fill_value = default_fill_value

        if is_scipy_sparse(data):
            mgr = self._init_spmatrix(data, index, columns, dtype=dtype,
                                      fill_value=default_fill_value)
        elif isinstance(data, dict):
            mgr = self._init_dict(data, index, columns, dtype=dtype)
        elif isinstance(data, (np.ndarray, list)):
            mgr = self._init_matrix(data, index, columns, dtype=dtype)
        elif isinstance(data, SparseDataFrame):
            mgr = self._init_mgr(data._data,
                                 dict(index=index, columns=columns),
                                 dtype=dtype, copy=copy)
        elif isinstance(data, DataFrame):
            mgr = self._init_dict(data, data.index, data.columns, dtype=dtype)
        elif isinstance(data, BlockManager):
            mgr = self._init_mgr(data, axes=dict(index=index, columns=columns),
                                 dtype=dtype, copy=copy)
        elif data is None:
            data = DataFrame()

            if index is None:
                index = Index([])
            else:
                index = _ensure_index(index)

            if columns is None:
                columns = Index([])
            else:
                for c in columns:
                    data[c] = SparseArray(np.nan, index=index,
                                          kind=self._default_kind,
                                          fill_value=self._default_fill_value)
            mgr = to_manager(data, columns, index)
            if dtype is not None:
                mgr = mgr.astype(dtype)

        generic.NDFrame.__init__(self, mgr)

    @property
    def _constructor(self):
        return SparseDataFrame

    _constructor_sliced = SparseSeries

    def _init_dict(self, data, index, columns, dtype=None):
        # pre-filter out columns if we passed it
        if columns is not None:
            columns = _ensure_index(columns)
            data = dict((k, v) for k, v in compat.iteritems(data)
                        if k in columns)
        else:
            columns = Index(_try_sort(list(data.keys())))

        if index is None:
            index = extract_index(list(data.values()))

        sp_maker = lambda x: SparseArray(x, kind=self._default_kind,
                                         fill_value=self._default_fill_value,
                                         copy=True, dtype=dtype)
        sdict = {}
        for k, v in compat.iteritems(data):
            if isinstance(v, Series):
                # Force alignment, no copy necessary
                if not v.index.equals(index):
                    v = v.reindex(index)

                if not isinstance(v, SparseSeries):
                    v = sp_maker(v.values)
            elif isinstance(v, SparseArray):
                v = v.copy()
            else:
                if isinstance(v, dict):
                    v = [v.get(i, np.nan) for i in index]

                v = sp_maker(v)
            sdict[k] = v

        # TODO: figure out how to handle this case, all nan's?
        # add in any other columns we want to have (completeness)
        nan_arr = np.empty(len(index), dtype='float64')
        nan_arr.fill(np.nan)
        nan_arr = sp_maker(nan_arr)
        sdict.update((c, nan_arr) for c in columns if c not in sdict)

        return to_manager(sdict, columns, index)

    def _init_matrix(self, data, index, columns, dtype=None):
        """ Init self from ndarray or list of lists """
        data = _prep_ndarray(data, copy=False)
        index, columns = self._prep_index(data, index, columns)
        data = dict([(idx, data[:, i]) for i, idx in enumerate(columns)])
        return self._init_dict(data, index, columns, dtype)

    def _init_spmatrix(self, data, index, columns, dtype=None,
                       fill_value=None):
        """ Init self from scipy.sparse matrix """
        index, columns = self._prep_index(data, index, columns)
        data = data.tocoo()
        N = len(index)

        # Construct a dict of SparseSeries
        sdict = {}
        values = Series(data.data, index=data.row, copy=False)
        for col, rowvals in values.groupby(data.col):
            # get_blocks expects int32 row indices in sorted order
            rowvals = rowvals.sort_index()
            rows = rowvals.index.values.astype(np.int32)
            blocs, blens = get_blocks(rows)

            sdict[columns[col]] = SparseSeries(
                rowvals.values, index=index,
                fill_value=fill_value,
                sparse_index=BlockIndex(N, blocs, blens))

        # Add any columns that were empty and thus not grouped on above
        sdict.update({column: SparseSeries(index=index,
                                           fill_value=fill_value,
                                           sparse_index=BlockIndex(N, [], []))
                      for column in columns
                      if column not in sdict})

        return self._init_dict(sdict, index, columns, dtype)

    def _prep_index(self, data, index, columns):
        N, K = data.shape
        if index is None:
            index = _default_index(N)
        if columns is None:
            columns = _default_index(K)

        if len(columns) != K:
            raise ValueError('Column length mismatch: {columns} vs. {K}'
                             .format(columns=len(columns), K=K))
        if len(index) != N:
            raise ValueError('Index length mismatch: {index} vs. {N}'
                             .format(index=len(index), N=N))
        return index, columns

    def to_coo(self):
        """
        Return the contents of the frame as a sparse SciPy COO matrix.

        .. versionadded:: 0.20.0

        Returns
        -------
        coo_matrix : scipy.sparse.spmatrix
            If the caller is heterogeneous and contains booleans or objects,
            the result will be of dtype=object. See Notes.

        Notes
        -----
        The dtype will be the lowest-common-denominator type (implicit
        upcasting); that is to say if the dtypes (even of numeric types)
        are mixed, the one that accommodates all will be chosen.

        e.g. If the dtypes are float16 and float32, dtype will be upcast to
        float32. By numpy.find_common_type convention, mixing int64 and
        and uint64 will result in a float64 dtype.
        """
        try:
            from scipy.sparse import coo_matrix
        except ImportError:
            raise ImportError('Scipy is not installed')

        dtype = find_common_type(self.dtypes)
        cols, rows, datas = [], [], []
        for col, name in enumerate(self):
            s = self[name]
            row = s.sp_index.to_int_index().indices
            cols.append(np.repeat(col, len(row)))
            rows.append(row)
            datas.append(s.sp_values.astype(dtype, copy=False))

        cols = np.concatenate(cols)
        rows = np.concatenate(rows)
        datas = np.concatenate(datas)
        return coo_matrix((datas, (rows, cols)), shape=self.shape)

    def __array_wrap__(self, result):
        return self._constructor(
            result, index=self.index, columns=self.columns,
            default_kind=self._default_kind,
            default_fill_value=self._default_fill_value).__finalize__(self)

    def __getstate__(self):
        # pickling
        return dict(_typ=self._typ, _subtyp=self._subtyp, _data=self._data,
                    _default_fill_value=self._default_fill_value,
                    _default_kind=self._default_kind)

    def _unpickle_sparse_frame_compat(self, state):
        """ original pickle format """
        series, cols, idx, fv, kind = state

        if not isinstance(cols, Index):  # pragma: no cover
            from pandas.io.pickle import _unpickle_array
            columns = _unpickle_array(cols)
        else:
            columns = cols

        if not isinstance(idx, Index):  # pragma: no cover
            from pandas.io.pickle import _unpickle_array
            index = _unpickle_array(idx)
        else:
            index = idx

        series_dict = DataFrame()
        for col, (sp_index, sp_values) in compat.iteritems(series):
            series_dict[col] = SparseSeries(sp_values, sparse_index=sp_index,
                                            fill_value=fv)

        self._data = to_manager(series_dict, columns, index)
        self._default_fill_value = fv
        self._default_kind = kind

    def to_dense(self):
        """
        Convert to dense DataFrame

        Returns
        -------
        df : DataFrame
        """
        data = dict((k, v.to_dense()) for k, v in compat.iteritems(self))
        return DataFrame(data, index=self.index, columns=self.columns)

    def _apply_columns(self, func):
        """ get new SparseDataFrame applying func to each columns """

        new_data = {}
        for col, series in compat.iteritems(self):
            new_data[col] = func(series)

        return self._constructor(
            data=new_data, index=self.index, columns=self.columns,
            default_fill_value=self.default_fill_value).__finalize__(self)

    def astype(self, dtype):
        return self._apply_columns(lambda x: x.astype(dtype))

    def copy(self, deep=True):
        """
        Make a copy of this SparseDataFrame
        """
        result = super(SparseDataFrame, self).copy(deep=deep)
        result._default_fill_value = self._default_fill_value
        result._default_kind = self._default_kind
        return result

    @property
    def default_fill_value(self):
        return self._default_fill_value

    @property
    def default_kind(self):
        return self._default_kind

    @property
    def density(self):
        """
        Ratio of non-sparse points to total (dense) data points
        represented in the frame
        """
        tot_nonsparse = sum([ser.sp_index.npoints
                             for _, ser in compat.iteritems(self)])
        tot = len(self.index) * len(self.columns)
        return tot_nonsparse / float(tot)

    def fillna(self, value=None, method=None, axis=0, inplace=False,
               limit=None, downcast=None):
        new_self = super(SparseDataFrame,
                         self).fillna(value=value, method=method, axis=axis,
                                      inplace=inplace, limit=limit,
                                      downcast=downcast)
        if not inplace:
            self = new_self

        # set the fill value if we are filling as a scalar with nothing special
        # going on
        if (value is not None and value == value and method is None and
                limit is None):
            self._default_fill_value = value

        if not inplace:
            return self

    # ----------------------------------------------------------------------
    # Support different internal representation of SparseDataFrame

    def _sanitize_column(self, key, value, **kwargs):
        """
        Creates a new SparseArray from the input value.

        Parameters
        ----------
        key : object
        value : scalar, Series, or array-like
        kwargs : dict

        Returns
        -------
        sanitized_column : SparseArray

        """
        sp_maker = lambda x, index=None: SparseArray(
            x, index=index, fill_value=self._default_fill_value,
            kind=self._default_kind)
        if isinstance(value, SparseSeries):
            clean = value.reindex(self.index).as_sparse_array(
                fill_value=self._default_fill_value, kind=self._default_kind)

        elif isinstance(value, SparseArray):
            if len(value) != len(self.index):
                raise AssertionError('Length of values does not match '
                                     'length of index')
            clean = value

        elif hasattr(value, '__iter__'):
            if isinstance(value, Series):
                clean = value.reindex(self.index)
                if not isinstance(value, SparseSeries):
                    clean = sp_maker(clean)
            else:
                if len(value) != len(self.index):
                    raise AssertionError('Length of values does not match '
                                         'length of index')
                clean = sp_maker(value)

        # Scalar
        else:
            clean = sp_maker(value, self.index)

        # always return a SparseArray!
        return clean

    def __getitem__(self, key):
        """
        Retrieve column or slice from DataFrame
        """
        if isinstance(key, slice):
            date_rng = self.index[key]
            return self.reindex(date_rng)
        elif isinstance(key, (np.ndarray, list, Series)):
            return self._getitem_array(key)
        else:
            return self._get_item_cache(key)

    def get_value(self, index, col, takeable=False):
        """
        Quickly retrieve single value at passed column and index

        .. deprecated:: 0.21.0

        Please use .at[] or .iat[] accessors.

        Parameters
        ----------
        index : row label
        col : column label
        takeable : interpret the index/col as indexers, default False

        Returns
        -------
        value : scalar value
        """
        warnings.warn("get_value is deprecated and will be removed "
                      "in a future release. Please use "
                      ".at[] or .iat[] accessors instead", FutureWarning,
                      stacklevel=2)
        return self._get_value(index, col, takeable=takeable)

    def _get_value(self, index, col, takeable=False):
        if takeable is True:
            series = self._iget_item_cache(col)
        else:
            series = self._get_item_cache(col)

        return series._get_value(index, takeable=takeable)
    _get_value.__doc__ = get_value.__doc__

    def set_value(self, index, col, value, takeable=False):
        """
        Put single value at passed column and index

        .. deprecated:: 0.21.0

        Please use .at[] or .iat[] accessors.

        Parameters
        ----------
        index : row label
        col : column label
        value : scalar value
        takeable : interpret the index/col as indexers, default False

        Notes
        -----
        This method *always* returns a new object. It is currently not
        particularly efficient (and potentially very expensive) but is provided
        for API compatibility with DataFrame

        Returns
        -------
        frame : DataFrame
        """
        warnings.warn("set_value is deprecated and will be removed "
                      "in a future release. Please use "
                      ".at[] or .iat[] accessors instead", FutureWarning,
                      stacklevel=2)
        return self._set_value(index, col, value, takeable=takeable)

    def _set_value(self, index, col, value, takeable=False):
        dense = self.to_dense()._set_value(
            index, col, value, takeable=takeable)
        return dense.to_sparse(kind=self._default_kind,
                               fill_value=self._default_fill_value)
    _set_value.__doc__ = set_value.__doc__

    def _slice(self, slobj, axis=0, kind=None):
        if axis == 0:
            new_index = self.index[slobj]
            new_columns = self.columns
        else:
            new_index = self.index
            new_columns = self.columns[slobj]

        return self.reindex(index=new_index, columns=new_columns)

    def xs(self, key, axis=0, copy=False):
        """
        Returns a row (cross-section) from the SparseDataFrame as a Series
        object.

        Parameters
        ----------
        key : some index contained in the index

        Returns
        -------
        xs : Series
        """
        if axis == 1:
            data = self[key]
            return data

        i = self.index.get_loc(key)
        data = self.take([i]).get_values()[0]
        return Series(data, index=self.columns)

    # ----------------------------------------------------------------------
    # Arithmetic-related methods

    def _combine_frame(self, other, func, fill_value=None, level=None,
                       try_cast=True):
        this, other = self.align(other, join='outer', level=level, copy=False)
        new_index, new_columns = this.index, this.columns

        if level is not None:
            raise NotImplementedError("'level' argument is not supported")

        if self.empty and other.empty:
            return self._constructor(index=new_index).__finalize__(self)

        new_data = {}
        new_fill_value = None
        if fill_value is not None:
            # TODO: be a bit more intelligent here
            for col in new_columns:
                if col in this and col in other:
                    dleft = this[col].to_dense()
                    dright = other[col].to_dense()
                    result = dleft._binop(dright, func, fill_value=fill_value)
                    result = result.to_sparse(fill_value=this[col].fill_value)
                    new_data[col] = result
        else:

            for col in new_columns:
                if col in this and col in other:
                    new_data[col] = func(this[col], other[col])

        # if the fill values are the same use them? or use a valid one
        other_fill_value = getattr(other, 'default_fill_value', np.nan)
        if self.default_fill_value == other_fill_value:
            new_fill_value = self.default_fill_value
        elif np.isnan(self.default_fill_value) and not np.isnan(
                other_fill_value):
            new_fill_value = other_fill_value
        elif not np.isnan(self.default_fill_value) and np.isnan(
                other_fill_value):
            new_fill_value = self.default_fill_value

        return self._constructor(data=new_data, index=new_index,
                                 columns=new_columns,
                                 default_fill_value=new_fill_value
                                 ).__finalize__(self)

    def _combine_match_index(self, other, func, level=None, fill_value=None,
                             try_cast=True):
        new_data = {}

        if fill_value is not None:
            raise NotImplementedError("'fill_value' argument is not supported")
        if level is not None:
            raise NotImplementedError("'level' argument is not supported")

        new_index = self.index.union(other.index)
        this = self
        if self.index is not new_index:
            this = self.reindex(new_index)

        if other.index is not new_index:
            other = other.reindex(new_index)

        for col, series in compat.iteritems(this):
            new_data[col] = func(series.values, other.values)

        # fill_value is a function of our operator
        if isna(other.fill_value) or isna(self.default_fill_value):
            fill_value = np.nan
        else:
            fill_value = func(np.float64(self.default_fill_value),
                              np.float64(other.fill_value))

        return self._constructor(
            new_data, index=new_index, columns=self.columns,
            default_fill_value=fill_value).__finalize__(self)

    def _combine_match_columns(self, other, func, level=None, fill_value=None,
                               try_cast=True):
        # patched version of DataFrame._combine_match_columns to account for
        # NumPy circumventing __rsub__ with float64 types, e.g.: 3.0 - series,
        # where 3.0 is numpy.float64 and series is a SparseSeries. Still
        # possible for this to happen, which is bothersome

        if fill_value is not None:
            raise NotImplementedError("'fill_value' argument is not supported")
        if level is not None:
            raise NotImplementedError("'level' argument is not supported")

        new_data = {}

        union = intersection = self.columns

        if not union.equals(other.index):
            union = other.index.union(self.columns)
            intersection = other.index.intersection(self.columns)

        for col in intersection:
            new_data[col] = func(self[col], float(other[col]))

        return self._constructor(
            new_data, index=self.index, columns=union,
            default_fill_value=self.default_fill_value).__finalize__(self)

    def _combine_const(self, other, func, errors='raise', try_cast=True):
        return self._apply_columns(lambda x: func(x, other))

    def _reindex_index(self, index, method, copy, level, fill_value=np.nan,
                       limit=None, takeable=False):
        if level is not None:
            raise TypeError('Reindex by level not supported for sparse')

        if self.index.equals(index):
            if copy:
                return self.copy()
            else:
                return self

        if len(self.index) == 0:
            return self._constructor(
                index=index, columns=self.columns).__finalize__(self)

        indexer = self.index.get_indexer(index, method, limit=limit)
        indexer = _ensure_platform_int(indexer)
        mask = indexer == -1
        need_mask = mask.any()

        new_series = {}
        for col, series in self.iteritems():
            if mask.all():
                continue

            values = series.values
            # .take returns SparseArray
            new = values.take(indexer)
            if need_mask:
                new = new.values
                # convert integer to float if necessary. need to do a lot
                # more than that, handle boolean etc also
                new, fill_value = maybe_upcast(new, fill_value=fill_value)
                np.putmask(new, mask, fill_value)

            new_series[col] = new

        return self._constructor(
            new_series, index=index, columns=self.columns,
            default_fill_value=self._default_fill_value).__finalize__(self)

    def _reindex_columns(self, columns, method, copy, level, fill_value=None,
                         limit=None, takeable=False):
        if level is not None:
            raise TypeError('Reindex by level not supported for sparse')

        if notna(fill_value):
            raise NotImplementedError("'fill_value' argument is not supported")

        if limit:
            raise NotImplementedError("'limit' argument is not supported")

        if method is not None:
            raise NotImplementedError("'method' argument is not supported")

        # TODO: fill value handling
        sdict = dict((k, v) for k, v in compat.iteritems(self) if k in columns)
        return self._constructor(
            sdict, index=self.index, columns=columns,
            default_fill_value=self._default_fill_value).__finalize__(self)

    def _reindex_with_indexers(self, reindexers, method=None, fill_value=None,
                               limit=None, copy=False, allow_dups=False):

        if method is not None or limit is not None:
            raise NotImplementedError("cannot reindex with a method or limit "
                                      "with sparse")

        if fill_value is None:
            fill_value = np.nan

        index, row_indexer = reindexers.get(0, (None, None))
        columns, col_indexer = reindexers.get(1, (None, None))

        if columns is None:
            columns = self.columns

        new_arrays = {}
        for col in columns:
            if col not in self:
                continue
            if row_indexer is not None:
                new_arrays[col] = algos.take_1d(self[col].get_values(),
                                                row_indexer,
                                                fill_value=fill_value)
            else:
                new_arrays[col] = self[col]

        return self._constructor(new_arrays, index=index,
                                 columns=columns).__finalize__(self)

    def _join_compat(self, other, on=None, how='left', lsuffix='', rsuffix='',
                     sort=False):
        if on is not None:
            raise NotImplementedError("'on' keyword parameter is not yet "
                                      "implemented")
        return self._join_index(other, how, lsuffix, rsuffix)

    def _join_index(self, other, how, lsuffix, rsuffix):
        if isinstance(other, Series):
            if other.name is None:
                raise ValueError('Other Series must have a name')

            other = SparseDataFrame(
                {other.name: other},
                default_fill_value=self._default_fill_value)

        join_index = self.index.join(other.index, how=how)

        this = self.reindex(join_index)
        other = other.reindex(join_index)

        this, other = this._maybe_rename_join(other, lsuffix, rsuffix)

        from pandas import concat
        return concat([this, other], axis=1, verify_integrity=True)

    def _maybe_rename_join(self, other, lsuffix, rsuffix):
        to_rename = self.columns.intersection(other.columns)
        if len(to_rename) > 0:
            if not lsuffix and not rsuffix:
                raise ValueError('columns overlap but no suffix specified: '
                                 '{to_rename}'.format(to_rename=to_rename))

            def lrenamer(x):
                if x in to_rename:
                    return '{x}{lsuffix}'.format(x=x, lsuffix=lsuffix)
                return x

            def rrenamer(x):
                if x in to_rename:
                    return '{x}{rsuffix}'.format(x=x, rsuffix=rsuffix)
                return x

            this = self.rename(columns=lrenamer)
            other = other.rename(columns=rrenamer)
        else:
            this = self

        return this, other

    def transpose(self, *args, **kwargs):
        """
        Returns a DataFrame with the rows/columns switched.
        """
        nv.validate_transpose(args, kwargs)
        return self._constructor(
            self.values.T, index=self.columns, columns=self.index,
            default_fill_value=self._default_fill_value,
            default_kind=self._default_kind).__finalize__(self)

    T = property(transpose)

    @Appender(DataFrame.count.__doc__)
    def count(self, axis=0, **kwds):
        if axis is None:
            axis = self._stat_axis_number

        return self.apply(lambda x: x.count(), axis=axis)

    def cumsum(self, axis=0, *args, **kwargs):
        """
        Return SparseDataFrame of cumulative sums over requested axis.

        Parameters
        ----------
        axis : {0, 1}
            0 for row-wise, 1 for column-wise

        Returns
        -------
        y : SparseDataFrame
        """
        nv.validate_cumsum(args, kwargs)

        if axis is None:
            axis = self._stat_axis_number

        return self.apply(lambda x: x.cumsum(), axis=axis)

    @Appender(generic._shared_docs['isna'])
    def isna(self):
        return self._apply_columns(lambda x: x.isna())
    isnull = isna

    @Appender(generic._shared_docs['notna'])
    def notna(self):
        return self._apply_columns(lambda x: x.notna())
    notnull = notna

    def apply(self, func, axis=0, broadcast=False, reduce=False):
        """
        Analogous to DataFrame.apply, for SparseDataFrame

        Parameters
        ----------
        func : function
            Function to apply to each column
        axis : {0, 1, 'index', 'columns'}
        broadcast : bool, default False
            For aggregation functions, return object of same size with values
            propagated

        Returns
        -------
        applied : Series or SparseDataFrame
        """
        if not len(self.columns):
            return self
        axis = self._get_axis_number(axis)

        if isinstance(func, np.ufunc):
            new_series = {}
            for k, v in compat.iteritems(self):
                applied = func(v)
                applied.fill_value = func(v.fill_value)
                new_series[k] = applied
            return self._constructor(
                new_series, index=self.index, columns=self.columns,
                default_fill_value=self._default_fill_value,
                default_kind=self._default_kind).__finalize__(self)
        else:
            if not broadcast:
                return self._apply_standard(func, axis, reduce=reduce)
            else:
                return self._apply_broadcast(func, axis)

    def applymap(self, func):
        """
        Apply a function to a DataFrame that is intended to operate
        elementwise, i.e. like doing map(func, series) for each series in the
        DataFrame

        Parameters
        ----------
        func : function
            Python function, returns a single value from a single value

        Returns
        -------
        applied : DataFrame
        """
        return self.apply(lambda x: lmap(func, x))


def to_manager(sdf, columns, index):
    """ create and return the block manager from a dataframe of series,
    columns, index
    """

    # from BlockManager perspective
    axes = [_ensure_index(columns), _ensure_index(index)]

    return create_block_manager_from_arrays(
        [sdf[c] for c in columns], columns, axes)


def stack_sparse_frame(frame):
    """
    Only makes sense when fill_value is NaN
    """
    lengths = [s.sp_index.npoints for _, s in compat.iteritems(frame)]
    nobs = sum(lengths)

    # this is pretty fast
    minor_labels = np.repeat(np.arange(len(frame.columns)), lengths)

    inds_to_concat = []
    vals_to_concat = []
    # TODO: Figure out whether this can be reached.
    # I think this currently can't be reached because you can't build a
    # SparseDataFrame with a non-np.NaN fill value (fails earlier).
    for _, series in compat.iteritems(frame):
        if not np.isnan(series.fill_value):
            raise TypeError('This routine assumes NaN fill value')

        int_index = series.sp_index.to_int_index()
        inds_to_concat.append(int_index.indices)
        vals_to_concat.append(series.sp_values)

    major_labels = np.concatenate(inds_to_concat)
    stacked_values = np.concatenate(vals_to_concat)
    index = MultiIndex(levels=[frame.index, frame.columns],
                       labels=[major_labels, minor_labels],
                       verify_integrity=False)

    lp = DataFrame(stacked_values.reshape((nobs, 1)), index=index,
                   columns=['foo'])
    return lp.sort_index(level=0)


def homogenize(series_dict):
    """
    Conform a set of SparseSeries (with NaN fill_value) to a common SparseIndex
    corresponding to the locations where they all have data

    Parameters
    ----------
    series_dict : dict or DataFrame

    Notes
    -----
    Using the dumbest algorithm I could think of. Should put some more thought
    into this

    Returns
    -------
    homogenized : dict of SparseSeries
    """
    index = None

    need_reindex = False

    for _, series in compat.iteritems(series_dict):
        if not np.isnan(series.fill_value):
            raise TypeError('this method is only valid with NaN fill values')

        if index is None:
            index = series.sp_index
        elif not series.sp_index.equals(index):
            need_reindex = True
            index = index.intersect(series.sp_index)

    if need_reindex:
        output = {}
        for name, series in compat.iteritems(series_dict):
            if not series.sp_index.equals(index):
                series = series.sparse_reindex(index)

            output[name] = series
    else:
        output = series_dict

    return output


# use unaccelerated ops for sparse objects
ops.add_flex_arithmetic_methods(SparseDataFrame, use_numexpr=False,
                                **ops.frame_flex_funcs)
ops.add_special_arithmetic_methods(SparseDataFrame, use_numexpr=False,
                                   **ops.frame_special_funcs)