diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/AscBucketComp.java b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/AscBucketComp.java new file mode 100644 index 00000000000..d53ff9ce975 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/AscBucketComp.java @@ -0,0 +1,41 @@ +package org.apache.solr.client.solrj.streaming; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Comparator; + +public class AscBucketComp implements Comparator { + + private int ord; + + public AscBucketComp(int ord) { + this.ord = ord; + } + + public int compare(BucketMetrics b1, BucketMetrics b2) { + double d1 = b1.getMetrics()[ord].getValue(); + double d2 = b2.getMetrics()[ord].getValue(); + if(d1 > d2) { + return 1; + } else if(d1 < d2) { + return -1; + } else { + return 0; + } + } +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/AscMetricComp.java b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/AscMetricComp.java new file mode 100644 index 00000000000..d3e6da87932 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/AscMetricComp.java @@ -0,0 +1,39 @@ +package org.apache.solr.client.solrj.streaming; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.Serializable; +import java.util.Comparator; +import java.util.List; + +public class AscMetricComp implements Comparator, Serializable { + + private static final long serialVersionUID = 1; + + private int ord; + + public AscMetricComp(int ord) { + this.ord = ord; + } + + public int compare(Tuple t1, Tuple t2) { + List values1 = (List)t1.get("metricValues"); + List values2 = (List)t2.get("metricValues"); + return values1.get(ord).compareTo(values2.get(ord)); + } +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/Bucket.java b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/Bucket.java index b1b9b2032e2..b7925b61278 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/Bucket.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/Bucket.java @@ -23,23 +23,17 @@ public class Bucket implements Serializable { private static final long serialVersionUID = 1; - private String bucketValue; + private String bucketKey; public Bucket() { } - public Bucket(String bucketValue) { - this.bucketValue = bucketValue; + public Bucket(String bucketKey) { + this.bucketKey = bucketKey; } public String getBucketValue(Tuple tuple) { - return bucketValue; - + return tuple.get(bucketKey).toString(); } - - - - - } \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/BucketMetrics.java b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/BucketMetrics.java new file mode 100644 index 00000000000..064cc6286b9 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/BucketMetrics.java @@ -0,0 +1,43 @@ +package org.apache.solr.client.solrj.streaming; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.Serializable; + +public class BucketMetrics implements Serializable { + + private static final long serialVersionUID = 1; + + private HashKey key; + private Metric[] metrics; + + public BucketMetrics(HashKey key, Metric[] metrics) { + this.key = key; + this.metrics = metrics; + } + + public Metric[] getMetrics() { + return metrics; + } + + public HashKey getKey() { + return key; + } + + +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/CloudSolrStream.java index 555a77833df..fe6bd46cda0 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/CloudSolrStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/CloudSolrStream.java @@ -242,6 +242,10 @@ public void close() throws IOException { } public Tuple read() throws IOException { + return _read(); + } + + protected Tuple _read() throws IOException { TupleWrapper tw = tuples.pollFirst(); if(tw != null) { Tuple t = tw.getTuple(); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/CountMetric.java b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/CountMetric.java index f58418e8ed6..53b3055444c 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/CountMetric.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/CountMetric.java @@ -36,6 +36,10 @@ public void update(Tuple tuple) { ++count; } + public double getValue() { + return count; + } + public Metric newInstance() { return new CountMetric(); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/DescBucketComp.java b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/DescBucketComp.java new file mode 100644 index 00000000000..28f564b430b --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/DescBucketComp.java @@ -0,0 +1,41 @@ +package org.apache.solr.client.solrj.streaming; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Comparator; + +public class DescBucketComp implements Comparator { + + private int ord; + + public DescBucketComp(int ord) { + this.ord = ord; + } + + public int compare(BucketMetrics b1, BucketMetrics b2) { + double d1 = b1.getMetrics()[ord].getValue(); + double d2 = b2.getMetrics()[ord].getValue(); + if(d1 > d2) { + return -1; + } else if(d1 < d2) { + return 1; + } else { + return 0; + } + } +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/DescMetricComp.java b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/DescMetricComp.java new file mode 100644 index 00000000000..32c5f4c3001 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/DescMetricComp.java @@ -0,0 +1,39 @@ +package org.apache.solr.client.solrj.streaming; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.Serializable; +import java.util.Comparator; +import java.util.List; + +public class DescMetricComp implements Comparator, Serializable { + + private static final long serialVersionUID = 1; + + private int ord; + + public DescMetricComp(int ord) { + this.ord = ord; + } + + public int compare(Tuple t1, Tuple t2) { + List values1 = (List)t1.get("metricValues"); + List values2 = (List)t2.get("metricValues"); + return values1.get(ord).compareTo(values2.get(ord))*-1; + } +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/HashKey.java b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/HashKey.java index 35cddca7db1..006582908c6 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/HashKey.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/HashKey.java @@ -33,10 +33,15 @@ public HashKey(String value) { public HashKey(Tuple t, String[] keys) { this.parts = new Object[keys.length]; for(int i=0; i metricValues() { Map m = new HashMap(); double dcount = (double)count; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/Metric.java b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/Metric.java index 14b5d193133..e773b918d62 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/Metric.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/Metric.java @@ -22,6 +22,7 @@ public interface Metric extends Serializable { public String getName(); + public double getValue(); public void update(Tuple tuple); public Metric newInstance(); public Map metricValues(); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/MetricStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/MetricStream.java index b460c315b3d..3114ad36ab3 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/MetricStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/MetricStream.java @@ -18,11 +18,13 @@ package org.apache.solr.client.solrj.streaming; import java.io.IOException; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.ArrayList; +import java.util.PriorityQueue; /* String[] buckets = {"a","b"}; @@ -37,22 +39,29 @@ public class MetricStream extends TupleStream { private static final long serialVersionUID = 1; private TupleStream tupleStream; - private long lcount; - private double dcount; private Bucket[] buckets; private Metric[] metrics; private String outKey; private Map bucketMap; + private BucketMetrics[] bucketMetrics; private static final HashKey metricsKey = new HashKey("metrics"); + private int topN; + private Comparator comp; + private Comparator rcomp; public MetricStream(TupleStream tupleStream, Bucket[] buckets, Metric[] metrics, - String outKey) { + String outKey, + Comparator comp, + int topN) { this.tupleStream = tupleStream; this.buckets = buckets; this.metrics = metrics; this.outKey = outKey; + this.topN = topN; + this.rcomp = new ReverseOrdComp(comp); + this.comp = comp; this.bucketMap = new HashMap(); } @@ -69,40 +78,83 @@ public String getOutKey() { return this.outKey; } - public Map getBucketMap() { - return bucketMap; + public BucketMetrics[] getBucketMetrics(){ + return bucketMetrics; } - Map merge(List>> bucketMaps) { + public void setBucketMetrics(BucketMetrics[] bucketMetrics) { + this.bucketMetrics = bucketMetrics; + } - Map bucketAccumulator = new HashMap(); + public void setBucketMap(Map bucketMap) { + this.bucketMap = bucketMap; + } - for(Map> bucketMap : bucketMaps) { - Iterator it = bucketMap.entrySet().iterator(); - //Iterate the buckets - while(it.hasNext()) { - Map.Entry entry = (Map.Entry)it.next(); - String bucketKey = (String)entry.getKey(); - HashKey hashKey = new HashKey(bucketKey); - List> metricValues = (List>)entry.getValue(); + BucketMetrics[] merge(List all) { + Map bucketAccumulator = new HashMap(); + for(Map top : all) { + List ks = (List)top.get("buckets"); + List ms = (List)top.get("metrics"); + for(int i=0; i> metricValues = (Map>)ms.get(i); if(bucketAccumulator.containsKey(hashKey)) { - Metric[] mergeMetrics = bucketAccumulator.get(bucketKey); - for(int i=0; i> it = bucketAccumulator.entrySet().iterator(); + + PriorityQueue priorityQueue = new PriorityQueue(topN, rcomp); + + while(it.hasNext()) { + Map.Entry entry = it.next(); + BucketMetrics bms = new BucketMetrics(entry.getKey(), entry.getValue()); + if(priorityQueue.size() > topN) { + priorityQueue.add(bms); + } else { + BucketMetrics peek = priorityQueue.peek(); + if(comp.compare(bms, peek) < 0) { + priorityQueue.poll(); + priorityQueue.add(bms); + } + } + } + + int s = priorityQueue.size(); + BucketMetrics[] bucketMetrics = new BucketMetrics[s]; + + for(int i=bucketMetrics.length-1; i>=0; i--) { + BucketMetrics b = priorityQueue.poll(); + this.bucketMetrics[i]= b; + } + return bucketMetrics; + } + + private class ReverseOrdComp implements Comparator { + private Comparator comp; + + public ReverseOrdComp(Comparator comp) { + this.comp = comp; + } + + public int compare(BucketMetrics e1, BucketMetrics e2) { + return comp.compare(e1,e2)*-1; + } } public void setStreamContext(StreamContext context) { @@ -127,21 +179,71 @@ public Tuple read() throws IOException { Tuple tuple = tupleStream.read(); if(tuple.EOF) { - Map>> metricValues = new HashMap(); Iterator> it = bucketMap.entrySet().iterator(); + + if(comp == null) { + //Handle No bucket constructor + Map.Entry noBucket = it.next(); + BucketMetrics bms = new BucketMetrics(noBucket.getKey(), noBucket.getValue()); + this.bucketMetrics = new BucketMetrics[1]; + this.bucketMetrics[0] = bms; + List> outMetrics = new ArrayList(); + List outKeys = new ArrayList(); + for(Metric metric : bms.getMetrics()) { + Map outMetricValues = metric.metricValues(); + String outKey = metric.getName(); + outMetrics.add(outMetricValues); + outKeys.add(outKey); + } + Map outMap = new HashMap(); + outMap.put("buckets",outKeys); + outMap.put("metrics",outMetrics); + tuple.set(this.outKey, outMap); + return tuple; + } + + PriorityQueue priorityQueue = new PriorityQueue(topN, rcomp); + while(it.hasNext()) { Map.Entry entry = it.next(); - HashKey key = entry.getKey(); - Metric[] values = entry.getValue(); - List> finished = new ArrayList(); + BucketMetrics bms = new BucketMetrics(entry.getKey(), entry.getValue()); + if(priorityQueue.size() < topN) { + priorityQueue.add(bms); + } else { + BucketMetrics peek = priorityQueue.peek(); + + if(comp.compare(bms, peek) < 0) { + priorityQueue.poll(); + priorityQueue.add(bms); + } - for(Metric m : values) { - Map finalMetric = m.metricValues(); - finished.add(finalMetric); } - metricValues.put(key.toString(), finished); } - tuple.set(this.outKey, metricValues); + + int s = priorityQueue.size(); + this.bucketMetrics = new BucketMetrics[s]; + + for(int i=bucketMetrics.length-1; i>=0; i--) { + BucketMetrics b = priorityQueue.poll(); + this.bucketMetrics[i]= b; + } + + List> outMetrics = new ArrayList(); + List outKeys = new ArrayList(); + + for(BucketMetrics bms : this.bucketMetrics) { + for(Metric metric : bms.getMetrics()) { + Map outMetricValues = metric.metricValues(); + String outKey = metric.getName(); + outMetrics.add(outMetricValues); + outKeys.add(outKey); + } + } + + Map outMap = new HashMap(); + outMap.put("buckets",outKeys); + outMap.put("metrics",outMetrics); + tuple.set(this.outKey, outMap); return tuple; } @@ -151,7 +253,7 @@ public Tuple read() throws IOException { for(int i=0; i children() { return l; } - public Map> merge(MetricStream[] metricStreams) { - Map> ags = new HashMap(); - for(MetricStream merger : metricStreams) { - String outKey = merger.getOutKey(); + public void merge(List metricStreams) { + for(MetricStream metricStream : metricStreams) { + String outKey = metricStream.getOutKey(); Iterator it = eofTuples.values().iterator(); - List>> values = new ArrayList(); + List values = new ArrayList(); while(it.hasNext()) { - Tuple t = (Tuple)it.next(); - Map> agg = (Map>)t.get(outKey); - values.add(agg); + Tuple t = it.next(); + Map top = (Map)t.get(outKey); + values.add(top); } - Map merged = merger.merge(values); - ags.put(outKey, merged); + BucketMetrics[] bucketMetrics = metricStream.merge(values); + metricStream.setBucketMetrics(bucketMetrics); + } + } + + public Tuple read() throws IOException { + Tuple tuple = _read(); + + if(tuple.EOF) { + List metricStreams = new ArrayList(); + getMetricStreams(this, metricStreams); + this.merge(metricStreams); + Map m = new HashMap(); + m.put("EOF", true); + return new Tuple(m); + } + + return tuple; + } + + private void getMetricStreams(TupleStream tupleStream, + List metricStreams) { + if(tupleStream instanceof MetricStream) { + metricStreams.add((MetricStream)tupleStream); + } + + List children = tupleStream.children(); + for(TupleStream ts : children) { + getMetricStreams(ts, metricStreams); } - return ags; } protected void constructStreams() throws IOException { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/RollupStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/RollupStream.java new file mode 100644 index 00000000000..fb8b3b54991 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/streaming/RollupStream.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.solr.client.solrj.streaming; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; + +public class RollupStream extends TupleStream { + + private static final long serialVersionUID = 1; + + private PushBackStream tupleStream; + private Bucket[] buckets; + private Metric[] metrics; + private HashKey currentKey = new HashKey("-"); + private Metric[] currentMetrics; + private boolean finished = false; + + public RollupStream(TupleStream tupleStream, + Bucket[] buckets, + Metric[] metrics) { + this.tupleStream = new PushBackStream(tupleStream); + this.buckets = buckets; + this.metrics = metrics; + } + + public void setStreamContext(StreamContext context) { + this.tupleStream.setStreamContext(context); + } + + public List children() { + List l = new ArrayList(); + l.add(tupleStream); + return l; + } + + public void open() throws IOException { + tupleStream.open(); + } + + public void close() throws IOException { + tupleStream.close(); + } + + public Tuple read() throws IOException { + + while(true) { + Tuple tuple = tupleStream.read(); + if(tuple.EOF) { + if(!finished) { + Map map = new HashMap(); + List metricValues = new ArrayList(); + List metricNames = new ArrayList(); + for(Metric metric : currentMetrics) { + metricNames.add(metric.getName()); + metricValues.add(metric.getValue()); + } + map.put("buckets", currentKey.toString()); + map.put("metricNames", metricNames); + map.put("metricValues", metricValues); + Tuple t = new Tuple(map); + tupleStream.pushBack(tuple); + finished = true; + return t; + } else { + return tuple; + } + } + + String[] bucketValues = new String[buckets.length]; + for(int i=0; i metricValues = new ArrayList(); + List metricNames = new ArrayList(); + for(Metric metric : currentMetrics) { + metricNames.add(metric.getName()); + metricValues.add(metric.getValue()); + } + map.put("buckets", currentKey.toString()); + map.put("metricNames", metricNames); + map.put("metricValues", metricValues); + t = new Tuple(map); + } + + currentMetrics = new Metric[metrics.length]; + currentKey = hashKey; + for(int i=0; i metricValues() { return m; } + public double getValue() { + if(isDouble) { + return doubleSum; + } else { + return (double)longSum; + } + } + public void update(Map metricValues) { if(isDouble) { double dsum = metricValues.get(SUM); diff --git a/solr/solrj/src/test-files/solrj/solr/collection1/conf/schema-streaming.xml b/solr/solrj/src/test-files/solrj/solr/collection1/conf/schema-streaming.xml index 0fa7901c703..d9cbc1fd2af 100644 --- a/solr/solrj/src/test-files/solrj/solr/collection1/conf/schema-streaming.xml +++ b/solr/solrj/src/test-files/solrj/solr/collection1/conf/schema-streaming.xml @@ -45,7 +45,7 @@ - + diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/streaming/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/streaming/StreamingTest.java index 1d7d94486f1..1ad0eaff162 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/streaming/StreamingTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/streaming/StreamingTest.java @@ -337,6 +337,121 @@ private void testMergeJoinStream() throws Exception { } + private void testParallelMergeJoinStream() throws Exception { + + indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0"); + indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "join_i", "1000"); + indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); + indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4"); + indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1"); + indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "0", "join_i", "2000"); + indexr(id, "7", "a_s", "hello7", "a_i", "1", "a_f", "0"); + + commit(); + + String zkHost = zkServer.getZkAddress(); + + //Test one-to-one + Map paramsA = mapParams("q","id:(0 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc", "partitionKeys","a_f"); + CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA); + + Map fieldMappings = new HashMap(); + fieldMappings.put("id","streamB.id"); + + Map paramsB = mapParams("q","id:(2)","fl","id,a_s,a_f,join_i", "sort", "a_f desc", "partitionKeys","a_f"); + CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection1", paramsB); + streamB.setFieldMappings(fieldMappings); + + String[] keys = {"a_f"}; + + MergeJoinStream mstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f")); + ParallelStream fstream = new ParallelStream(zkHost,"collection1", mstream, 2, new DescFieldComp("a_f")); + + List tuples = getTuples(fstream); + + assert(tuples.size() == 1); + assertOrder(tuples, 0); + assertLong(tuples.get(0), "join_i", 1000); + + + //Test one-to-many + + paramsA = mapParams("q","id:(0 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc", "partitionKeys","a_f"); + streamA = new CloudSolrStream(zkHost, "collection1", paramsA); + + fieldMappings = new HashMap(); + fieldMappings.put("id","streamB.id"); + + paramsB = mapParams("q","id:(2 6)","fl","id,a_s,a_f,join_i", "sort", "a_f desc", "partitionKeys","a_f"); + streamB = new CloudSolrStream(zkHost, "collection1", paramsB); + streamB.setFieldMappings(fieldMappings); + + + mstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f")); + fstream = new ParallelStream(zkHost,"collection1", mstream, 2, new DescFieldComp("a_f")); + + tuples = getTuples(fstream); + + assert(tuples.size() == 2); + assertOrder(tuples, 0,0); + assertLong(tuples.get(0), "join_i", 1000); + assertLong(tuples.get(1), "join_i", 2000); + + //Test many-to-one + + paramsA = mapParams("q","id:(0 2 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc", "partitionKeys","a_f"); + streamA = new CloudSolrStream(zkHost, "collection1", paramsA); + + fieldMappings = new HashMap(); + fieldMappings.put("id","streamB.id"); + + paramsB = mapParams("q","id:(6)","fl","id,a_s,a_f,join_i", "sort", "a_f desc", "partitionKeys","a_f"); + streamB = new CloudSolrStream(zkHost, "collection1", paramsB); + streamB.setFieldMappings(fieldMappings); + + + mstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f")); + fstream = new ParallelStream(zkHost,"collection1", mstream, 2, new DescFieldComp("a_f")); + + tuples = getTuples(fstream); + + assert(tuples.size() == 2); + assertOrder(tuples, 2,0); + assertLong(tuples.get(0), "join_i", 2000); + assertLong(tuples.get(1), "join_i", 2000); + + //Test many-to-many + + paramsA = mapParams("q","id:(0 7 1 3 4) ","fl","id,a_s,a_f", "sort", "a_f desc", "partitionKeys","a_f"); + streamA = new CloudSolrStream(zkHost, "collection1", paramsA); + + fieldMappings = new HashMap(); + fieldMappings.put("id","streamB.id"); + + paramsB = mapParams("q","id:(6 2)","fl","id,a_s,a_f,join_i", "sort", "a_f desc", "partitionKeys","a_f"); + streamB = new CloudSolrStream(zkHost, "collection1", paramsB); + streamB.setFieldMappings(fieldMappings); + + + mstream = new MergeJoinStream(streamA, streamB, new DescFieldComp("a_f")); + fstream = new ParallelStream(zkHost,"collection1", mstream, 2, new DescFieldComp("a_f")); + + tuples = getTuples(fstream); + + assert(tuples.size() == 4); + assertOrder(tuples, 7,7,0,0); + assertLong(tuples.get(0), "join_i", 1000); + assertLong(tuples.get(1), "join_i", 2000); + assertLong(tuples.get(2), "join_i", 1000); + assertLong(tuples.get(3), "join_i", 2000); + + del("*:*"); + commit(); + + } + + + private void testRankStream() throws Exception { @@ -363,6 +478,135 @@ private void testRankStream() throws Exception { commit(); } + private void testRollupStream() throws Exception { + indexr(id, "0", "a_s", "hello0", "a_i", "100", "a_f", "0"); + indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "0"); + indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); + indexr(id, "4", "a_s", "hello3", "a_i", "4", "a_f", "4"); + indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1"); + indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "1"); + indexr(id, "7", "a_s", "hello1", "a_i", "1", "a_f", "1"); + + commit(); + + String zkHost = zkServer.getZkAddress(); + + Bucket[] buckets = {new Bucket("a_s")}; + Metric[] metrics = {new SumMetric("a_i", false), + new MeanMetric("a_i", false), + new CountMetric(), + new MinMetric("a_i", false), + new MaxMetric("a_i", false)}; + + Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc"); + CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params); + RollupStream rstream = new RollupStream(stream, buckets, metrics); + rstream.open(); + Tuple tuple = rstream.read(); + String b = (String)tuple.get("buckets"); + List values = (List)tuple.get("metricValues"); + assert(b.equals("hello0")); + assert(values.get(0) == 102.0d); + assert(values.get(1) == 51.0d); + assert(values.get(2) == 2.0d); + assert(values.get(3) == 2.0d); + assert(values.get(4) == 100.0d); + + tuple = rstream.read(); + b = (String)tuple.get("buckets"); + values = (List)tuple.get("metricValues"); + assert(b.equals("hello1")); + assert(values.get(0) == 3.0d); + assert(values.get(1) == 1.0d); + assert(values.get(2) == 3.0d); + assert(values.get(3) == 1.0d); + assert(values.get(4) == 1.0d); + + + tuple = rstream.read(); + b = (String)tuple.get("buckets"); + values = (List)tuple.get("metricValues"); + assert(b.equals("hello3")); + assert(values.get(0) == 7.0d); + assert(values.get(1) == 3.5d); + assert(values.get(2) == 2.0d); + assert(values.get(3) == 3.0d); + assert(values.get(4) == 4.0d); + + tuple = rstream.read(); + assert(tuple.EOF); + + rstream.close(); + del("*:*"); + commit(); + } + + private void testParallelRollupStream() throws Exception { + indexr(id, "0", "a_s", "hello0", "a_i", "100", "a_f", "0"); + indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "0"); + indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); + indexr(id, "4", "a_s", "hello3", "a_i", "4", "a_f", "4"); + indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1"); + indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "1"); + indexr(id, "7", "a_s", "hello1", "a_i", "1", "a_f", "1"); + + commit(); + + String zkHost = zkServer.getZkAddress(); + + Bucket[] buckets = {new Bucket("a_s")}; + Metric[] metrics = {new SumMetric("a_i", false), + new MeanMetric("a_i", false), + new CountMetric(), + new MinMetric("a_i", false), + new MaxMetric("a_i", false)}; + + Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_s asc","partitionKeys","a_s"); + CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params); + RollupStream rostream = new RollupStream(stream, buckets, metrics); + ParallelStream rstream = new ParallelStream(zkHost,"collection1", rostream, 2, new AscFieldComp("buckets")); + + rstream.open(); + Tuple tuple = rstream.read(); + String b = (String)tuple.get("buckets"); + List values = (List)tuple.get("metricValues"); + assert(b.equals("hello0")); + assert(values.get(0) == 102.0d); + assert(values.get(1) == 51.0d); + assert(values.get(2) == 2.0d); + assert(values.get(3) == 2.0d); + assert(values.get(4) == 100.0d); + + tuple = rstream.read(); + b = (String)tuple.get("buckets"); + values = (List)tuple.get("metricValues"); + assert(b.equals("hello1")); + assert(values.get(0) == 3.0d); + assert(values.get(1) == 1.0d); + assert(values.get(2) == 3.0d); + assert(values.get(3) == 1.0d); + assert(values.get(4) == 1.0d); + + + tuple = rstream.read(); + b = (String)tuple.get("buckets"); + values = (List)tuple.get("metricValues"); + assert(b.equals("hello3")); + assert(values.get(0) == 7.0d); + assert(values.get(1) == 3.5d); + assert(values.get(2) == 2.0d); + assert(values.get(3) == 3.0d); + assert(values.get(4) == 4.0d); + + tuple = rstream.read(); + assert(tuple.EOF); + + rstream.close(); + del("*:*"); + commit(); + } + + private void testMetricStream() throws Exception { @@ -371,6 +615,8 @@ private void testMetricStream() throws Exception { indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); indexr(id, "4", "a_s", "hello3", "a_i", "4", "a_f", "4"); indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1"); + indexr(id, "6", "a_s", "hello1", "a_i", "1", "a_f", "1"); + indexr(id, "7", "a_s", "hello1", "a_i", "1", "a_f", "1"); commit(); @@ -385,55 +631,86 @@ private void testMetricStream() throws Exception { Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc"); CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params); - MetricStream mstream = new MetricStream(stream, buckets, metrics, "metric1"); + MetricStream mstream = new MetricStream(stream, buckets, metrics, "metric1", new DescBucketComp(0),5); getTuples(mstream); - Map bucketMap = mstream.getBucketMap(); + BucketMetrics[] bucketMetrics = mstream.getBucketMetrics(); + assert(bucketMetrics.length == 3); - Metric[] bucketMetrics = bucketMap.get(new HashKey("hello0")); - assertMetric(bucketMetrics[0], SumMetric.SUM, 102.0d); - assertMetric(bucketMetrics[1], MeanMetric.COUNT, 2.0d); - assertMetric(bucketMetrics[1], MeanMetric.MEAN, 51.0d); - assertMetric(bucketMetrics[2], CountMetric.COUNT, 2.0d); - assertMetric(bucketMetrics[3], MinMetric.MIN, 2.0d); - assertMetric(bucketMetrics[4], MaxMetric.MAX, 100.0d); + //Bucket should be is descending order based on Metric 0, which is the SumMetric. + assert(bucketMetrics[0].getKey().toString().equals("hello0")); + assert(bucketMetrics[1].getKey().toString().equals("hello3")); + assert(bucketMetrics[2].getKey().toString().equals("hello1")); - bucketMetrics = bucketMap.get(new HashKey("hello3")); - assertMetric(bucketMetrics[0], SumMetric.SUM, 7.0d); - assertMetric(bucketMetrics[1], MeanMetric.COUNT, 2.0d); - assertMetric(bucketMetrics[1], MeanMetric.MEAN, 3.5d); - assertMetric(bucketMetrics[2], CountMetric.COUNT, 2.0d); + assertMetric(bucketMetrics[0].getMetrics()[0], 102.0d); //Test the first Metric of the first BucketMetrics + assertMetric(bucketMetrics[0].getMetrics()[1], 51.0d); //Test the second Metric of the first BucketMetrics + assertMetric(bucketMetrics[0].getMetrics()[2], 2.0d); //Test the third Metric of the first BucketMetrics + assertMetric(bucketMetrics[0].getMetrics()[3], 2.0d); //Test the fourth Metric of the first BucketMetrics + assertMetric(bucketMetrics[0].getMetrics()[4], 100.0d); //Test the fifth Metric of the first BucketMetrics + assertMetric(bucketMetrics[1].getMetrics()[0], 7.0d); + assertMetric(bucketMetrics[2].getMetrics()[0], 3.0d); - del("*:*"); - commit(); - } - private void testCountStream() throws Exception { + params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc"); + stream = new CloudSolrStream(zkHost, "collection1", params); + mstream = new MetricStream(stream, buckets, metrics, "metric1", new AscBucketComp(0),5); + getTuples(mstream); + + bucketMetrics = mstream.getBucketMetrics(); - indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0"); - indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0"); - indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3"); - indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4"); - indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1"); + assertMetric(bucketMetrics[0].getMetrics()[0], 3.0d); //Test the first Metric of the first BucketMetrics + assertMetric(bucketMetrics[0].getMetrics()[1], 1.0d); //Test the second Metric of the first BucketMetrics + assertMetric(bucketMetrics[0].getMetrics()[2], 3.0d); //Test the third Metric of the first BucketMetrics + assertMetric(bucketMetrics[0].getMetrics()[3], 1.0d); //Test the fourth Metric of the first BucketMetrics + assertMetric(bucketMetrics[0].getMetrics()[4], 1.0d); //Test the fifth Metric of the first BucketMetrics + + assertMetric(bucketMetrics[1].getMetrics()[0], 7.0d); + assertMetric(bucketMetrics[2].getMetrics()[0], 102.0d); + indexr(id, "8", "a_s", "hello4", "a_i", "1000", "a_f", "1"); //Add a fourth record. commit(); - //Test CloudSolrStream and SumStream over an int field - String zkHost = zkServer.getZkAddress(); + //Test desc comp with more buckets then priority queue can hold. + params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc"); + stream = new CloudSolrStream(zkHost, "collection1", params); + mstream = new MetricStream(stream, buckets, metrics, "metric1", new DescBucketComp(0),3); + getTuples(mstream); - Map params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc"); - CloudSolrStream stream = new CloudSolrStream(zkHost, "collection1", params); - List tuples = getTuples(stream); + bucketMetrics = mstream.getBucketMetrics(); + assert(bucketMetrics.length == 3); + assert(bucketMetrics[0].getKey().toString().equals("hello4")); + assert(bucketMetrics[1].getKey().toString().equals("hello0")); + assert(bucketMetrics[2].getKey().toString().equals("hello3")); - assert(tuples.size() == 5); - assertOrder(tuples, 0,1,2,3,4); + //Test asc comp with more buckets then priority queue can hold. + params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc"); + stream = new CloudSolrStream(zkHost, "collection1", params); + mstream = new MetricStream(stream, buckets, metrics, "metric1", new AscBucketComp(0),3); + getTuples(mstream); + + bucketMetrics = mstream.getBucketMetrics(); + assert(bucketMetrics.length == 3); + assert(bucketMetrics[0].getKey().toString().equals("hello1")); + assert(bucketMetrics[1].getKey().toString().equals("hello3")); + assert(bucketMetrics[2].getKey().toString().equals("hello0")); + + + //Test with no buckets + params = mapParams("q","*:*","fl","id,a_s,a_i","sort", "a_i asc"); + stream = new CloudSolrStream(zkHost, "collection1", params); + mstream = new MetricStream(stream, metrics, "metric1"); + getTuples(mstream); + + bucketMetrics = mstream.getBucketMetrics(); + assert(bucketMetrics.length == 1); + assert(bucketMetrics[0].getKey().toString().equals("metrics")); + assertMetric(bucketMetrics[0].getMetrics()[0], 1112.0d); //Test the first Metric of the first BucketMetrics del("*:*"); commit(); - } private void testGroupByStream() throws Exception { @@ -529,10 +806,6 @@ private void testParallelHashJoinStream() { } - private void testParallelMergeJoinStream() { - - } - private void testParallelGroupByStream() throws Exception { indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0"); @@ -691,6 +964,7 @@ public void doTest() throws Exception { testUniqueStream(); testMetricStream(); + testRollupStream(); testRankStream(); testFilterStream(); testGroupByStream(); @@ -698,6 +972,7 @@ public void doTest() throws Exception { testMergeJoinStream(); testMergeStream(); testParallelStream(); + testParallelRollupStream(); testParallelGroupByStream(); testParallelHashJoinStream(); testParallelMergeJoinStream(); @@ -764,10 +1039,9 @@ public boolean assertLong(Tuple tuple, String fieldName, long l) throws Exceptio return true; } - public boolean assertMetric(Metric metric, String key, double value) throws Exception { + public boolean assertMetric(Metric metric, double value) throws Exception { System.out.println("Metric Type##################################:"+metric.getClass()); - Map metricValues = metric.metricValues(); - Double d = metricValues.get(key); + Double d = metric.getValue(); if(d.doubleValue() != value) { throw new Exception("Unexpected Metric "+d+"!="+value); }