Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Distributed processing #60

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
17 changes: 12 additions & 5 deletions dev/kk2_with_monitoring.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%matplotlib inline\n",
Expand All @@ -12,6 +14,7 @@
"from IPython.display import clear_output, display\n",
"import cPickle as pickle\n",
"import os\n",
"from ipyparallel import Client\n",
"\n",
"fname, shank = '../temp/testsmallish', 4\n",
"params = dict(\n",
Expand Down Expand Up @@ -69,7 +72,11 @@
" data = raw_data.to_sparse_data()\n",
" pickle.dump(data, open(fname+'.pickle', 'wb'), -1)\n",
" \n",
"kk = KK(data, **params)\n",
"client = Client()\n",
"distributer = IPythonDistributer(client)\n",
"#distributer = None\n",
" \n",
"kk = KK(data, distributer=distributer, **params)\n",
"kk.register_callback(SaveCluEvery(fname, shank, every=5))\n",
"kk.register_callback(kk_monitor_callback)\n",
"\n",
Expand All @@ -93,16 +100,16 @@
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2.0
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.8"
"version": "2.7.10"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
}
5 changes: 4 additions & 1 deletion dev/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
print('Number of spikes:', data.num_spikes)
print('Number of unique masks:', data.num_masks)

distributer = None
#distributer = MockDistributer(10)

kk = KK(data, max_iterations=1000,
use_mua_cluster=False,
# split_every=1, split_first=1, # for debugging splits
Expand All @@ -41,7 +44,7 @@
# fast_split=True,
# max_split_iterations=10,
consider_cluster_deletion=True,
#num_cpus=1,
distributer=distributer,
)
# kk.register_callback(SaveCluEvery(fname, shank, every=1))
kk.register_callback(MonitoringServer())
Expand Down
54 changes: 54 additions & 0 deletions dev/testing_distributed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from pylab import *
from klustakwik2 import *
import cPickle as pickle
import os
from ipyparallel import Client

fname, shank = '../temp/testsmallish', 4
params = dict(
max_iterations=1000,
use_mua_cluster=False,
# split_every=1, split_first=1, # for debugging splits
# split_every=1000000, split_first=1000000, # disable splitting
# points_for_cluster_mask=1e-100, # don't use reduced cluster masks
# full_step_every=1,
# always_split_bimodal=True,
# dist_thresh=15,
# subset_break_fraction=0.01,
# break_fraction=0.01,
# fast_split=True,
# max_split_iterations=10,
consider_cluster_deletion=True,
)
num_starting_masks = 100

################################################
iterations = []
scores = []
num_clusters = []

log_to_file(fname+'.klg.'+str(shank), 'debug')
log_suppress_hierarchy('klustakwik', inclusive=False)

if os.path.exists(fname+'.pickle'):
data = pickle.load(open(fname+'.pickle', 'rb'))
else:
raw_data = load_fet_fmask_to_raw(fname, shank, drop_last_n_features=1)
data = raw_data.to_sparse_data()
pickle.dump(data, open(fname+'.pickle', 'wb'), -1)

client = Client()
distributer = IPythonDistributer(client)
#distributer = None

kk = KK(data, distributer=distributer, **params)
kk.register_callback(SaveCluEvery(fname, shank, every=5))

if False:
clusters = loadtxt(fname+'.clu.'+str(shank), skiprows=1, dtype=int)
kk.cluster_from(clusters)
else:
#kk.cluster_with_subset_schedule(100, [0.99, 1.0])
kk.cluster_mask_starts()

save_clu(kk, fname, shank)
3 changes: 2 additions & 1 deletion klustakwik2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
from .monitoring import *
from .debugtools import *
from .default_parameters import *
from .distributed import *

__version__ = '0.2.4'
__version__ = '0.2.5dev'
__version__ = get_kk_version(__version__)

log_message('info', 'KlustaKwik2 version '+__version__)
186 changes: 113 additions & 73 deletions klustakwik2/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
from .mask_starts import mask_starts
from .linear_algebra import BlockPlusDiagonalMatrix
from .default_parameters import default_parameters
from .distributed import MockDistributer

from .numerics import (accumulate_cluster_mask_sum, compute_cluster_mean,
compute_covariance_matrix,
compute_log_p_and_assign, compute_penalties)
compute_log_p_and_assign, compute_penalties,
merge_log_p_arrays)

import time

Expand Down Expand Up @@ -90,6 +92,7 @@ class KK(object):
def __init__(self, data, callbacks=None, name='',
is_subset=False, is_copy=False,
map_log_to_debug=False,
distributer=None,
**params):
self.name = name
if callbacks is None:
Expand Down Expand Up @@ -131,6 +134,10 @@ def __init__(self, data, callbacks=None, name='',
else:
self.mua_cluster = -2

self.distributer = distributer
if distributer is not None:
distributer.start(self)

def register_callback(self, callback, slot='end_iteration'):
if slot not in self.callbacks:
self.callbacks[slot] = []
Expand Down Expand Up @@ -175,6 +182,11 @@ def copy(self, name='kk_copy',
is_copy=True,
**params)

def copy_without_callbacks(self):
kk = self.copy(name='without_callbacks')
kk.callbacks = {}
return kk

def subset(self, spikes, name='kk_subset', **additional_params):
newdata = self.data.subset(spikes)
if self.name:
Expand Down Expand Up @@ -400,13 +412,36 @@ def MEC_steps(self, only_evaluate_current_clusters=False):
# in each cluster, compute the cluster masks and allocate space for
# covariance matrices
self.reindex_clusters()
# Computes the masked and unmasked indices for each cluster based on the
# masks for each point in that cluster. Allocates space for covariance
# matrices.

num_clusters = self.num_clusters_alive
num_features = self.num_features
num_cluster_members = self.num_cluster_members
cluster_start = self.num_special_clusters

if self.distributer is None or only_evaluate_current_clusters: # no value in distributing this
self.prepare_for_MEC_steps(only_evaluate_current_clusters=only_evaluate_current_clusters)
for cluster in range(num_clusters):
self.MEC_steps_cluster(cluster, only_evaluate_current_clusters=only_evaluate_current_clusters)
else:
clusters = self.clusters.copy()
self.prepare_for_MEC_steps(only_evaluate_current_clusters=only_evaluate_current_clusters)
cluster_order = argsort(num_cluster_members)[::-1] # largest first
self.distributer.iteration(clusters, cluster_order, self.full_step, only_evaluate_current_clusters)
for results in self.distributer.iteration_results():
merge_log_p_arrays(self.num_spikes,
self.log_p_best, self.log_p_second_best,
self.clusters, self.clusters_second_best,
results['log_p_best'], results['log_p_second_best'],
results['clusters'], results['clusters_second_best'],
)
# we've reassigned clusters so we need to recompute the partitions, but we don't want to
# reindex yet because we may reassign points to different clusters and we need the original
# cluster numbers for that
self.partition_clusters()

@add_slots
def prepare_for_MEC_steps(self, only_evaluate_current_clusters=False):
num_clusters = self.num_clusters_alive
num_cluster_members = self.num_cluster_members
num_spikes = self.num_spikes

# Weight computations
Expand All @@ -418,7 +453,9 @@ def MEC_steps(self, only_evaluate_current_clusters=False):
denom = float(denom)
if self.use_noise_cluster:
noise_weight = (num_cluster_members[self.noise_cluster]+self.noise_point)/denom

self._noise_weight = noise_weight
self._denom = denom

# Arrays that will be used in E-step part
if only_evaluate_current_clusters:
self.clusters_second_best = zeros(0, dtype=int)
Expand All @@ -435,8 +472,6 @@ def MEC_steps(self, only_evaluate_current_clusters=False):
else:
self.old_clusters = self.clusters.copy()

num_skipped = 0

if self.full_step and self.use_noise_cluster and not only_evaluate_current_clusters:
# start with cluster 0 - uniform distribution over space
# because we have normalized all dims to 0...1, density will be 1.
Expand All @@ -454,75 +489,74 @@ def MEC_steps(self, only_evaluate_current_clusters=False):
else:
self.collect_candidates = False

clusters_to_kill = []

for cluster in range(num_clusters):
# Compute the sum of fmasks
if cluster<self.num_special_clusters:
cluster_mask_sum = full(num_features, -1.0) # ensure that special clusters are masked
else:
cluster_mask_sum = zeros(num_features)
accumulate_cluster_mask_sum(self, cluster_mask_sum, self.get_spikes_in_cluster(cluster))
self.clusters_to_kill = []

# Compute the masked and unmasked sets
unmasked, = (cluster_mask_sum>=self.points_for_cluster_mask).nonzero()
masked, = (cluster_mask_sum<self.points_for_cluster_mask).nonzero()
unmasked = array(unmasked, dtype=int)
masked = array(masked, dtype=int)
cov = BlockPlusDiagonalMatrix(masked, unmasked)
@add_slots
def MEC_steps_cluster(self, cluster, only_evaluate_current_clusters=False):
denom = self._denom
num_cluster_members = self.num_cluster_members
num_features = self.num_features
# Compute the sum of fmasks
if cluster<self.num_special_clusters:
cluster_mask_sum = full(num_features, -1.0) # ensure that special clusters are masked
else:
cluster_mask_sum = zeros(num_features)
accumulate_cluster_mask_sum(self, cluster_mask_sum, self.get_spikes_in_cluster(cluster))

# Compute the masked and unmasked sets
unmasked, = (cluster_mask_sum>=self.points_for_cluster_mask).nonzero()
masked, = (cluster_mask_sum<self.points_for_cluster_mask).nonzero()
unmasked = array(unmasked, dtype=int)
masked = array(masked, dtype=int)
cov = BlockPlusDiagonalMatrix(masked, unmasked)

########### M step ########################################################

# Normalize by total number of points to give class weight
if cluster==self.noise_cluster:
weight = self._noise_weight
elif cluster==self.mua_cluster:
weight = (num_cluster_members[self.mua_cluster]+self.mua_point)/denom
else:
weight = (num_cluster_members[cluster]+self.prior_point)/denom

########### M step ########################################################

# Normalize by total number of points to give class weight
if cluster==self.noise_cluster:
weight = noise_weight
elif cluster==self.mua_cluster:
weight = (num_cluster_members[self.mua_cluster]+self.mua_point)/denom
else:
weight = (num_cluster_members[cluster]+self.prior_point)/denom

# Compute means for each cluster
# Note that we do this densely at the moment, might want to switch
# that to a sparse structure later
cluster_mean = compute_cluster_mean(self, cluster)
# Compute covariance matrices
compute_covariance_matrix(self, cluster, cluster_mean, cov)

########### EC steps ######################################################

if cluster<self.num_special_clusters:
continue
try:
chol = cov.cholesky()
except LinAlgError:
self.log('warning', 'Linear algebra error on cluster '+str(cluster))
clusters_to_kill.append(cluster) # todo: we don't actually do anything with this...
continue
# Compute means for each cluster
# Note that we do this densely at the moment, might want to switch
# that to a sparse structure later
cluster_mean = compute_cluster_mean(self, cluster)
# Compute covariance matrices
compute_covariance_matrix(self, cluster, cluster_mean, cov)

# LogRootDet is given by log of product of diagonal elements
log_root_det = sum(log(chol.diagonal))+sum(log(chol.block.diagonal()))

# compute diagonal of inverse of cov matrix
inv_cov_diag = zeros(num_features)
basis_vector = zeros(num_features)
for i in range(num_features):
basis_vector[i] = 1.0
root = chol.trisolve(basis_vector)
inv_cov_diag[i] = sum(root**2)
basis_vector[i] = 0.0

self.run_callbacks('e_step_before_main_loop', cholesky=chol, cluster=cluster,
inv_cov_diag=inv_cov_diag)

compute_log_p_and_assign(self, cluster, weight, inv_cov_diag, log_root_det, chol,
cluster_mean, only_evaluate_current_clusters, self.num_cpus)

self.run_callbacks('e_step_after_main_loop')
########### EC steps ######################################################

# we've reassigned clusters so we need to recompute the partitions, but we don't want to
# reindex yet because we may reassign points to different clusters and we need the original
# cluster numbers for that
self.partition_clusters()
if cluster<self.num_special_clusters:
return
try:
chol = cov.cholesky()
except LinAlgError:
self.log('warning', 'Linear algebra error on cluster '+str(cluster))
self.clusters_to_kill.append(cluster) # todo: we don't actually do anything with this...
return

# LogRootDet is given by log of product of diagonal elements
log_root_det = sum(log(chol.diagonal))+sum(log(chol.block.diagonal()))

# compute diagonal of inverse of cov matrix
inv_cov_diag = zeros(num_features)
basis_vector = zeros(num_features)
for i in range(num_features):
basis_vector[i] = 1.0
root = chol.trisolve(basis_vector)
inv_cov_diag[i] = sum(root**2)
basis_vector[i] = 0.0

self.run_callbacks('e_step_before_main_loop', cholesky=chol, cluster=cluster,
inv_cov_diag=inv_cov_diag)

compute_log_p_and_assign(self, cluster, weight, inv_cov_diag, log_root_det, chol,
cluster_mean, only_evaluate_current_clusters, self.num_cpus)

self.run_callbacks('e_step_after_main_loop')

@add_slots
def compute_cluster_penalties(self, clusters=None):
Expand Down Expand Up @@ -669,6 +703,12 @@ def get_spikes_in_cluster(self, cluster):

@add_slots
def try_splits(self):
# TODO: distribute splitting over multiple machines. It may be sufficient to distribute
# split candidate over the machines, and do the evaluation on the same machine. This
# greatly reduces the difficulty of programming it, and the amount of intermachine
# communication (you only need to send the spikes in the cluster and get back a bool).
# In tests on a small data set, split_candidate took about 5x longer than split_evalution,
# so it might be the case that you don't lose too much time doing this.
did_split = False
num_clusters = self.num_clusters_alive

Expand Down
Loading