Browse Source
DATADOC-256 - Update to use MongoDB driver version 2.6.5 DATADOC-7 - Support for map-reduce operations in MongoTemplatepull/1/head
10 changed files with 702 additions and 15 deletions
@ -0,0 +1,81 @@ |
|||||||
|
/* |
||||||
|
* Copyright 2010-2011 the original author or authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
package org.springframework.data.mongodb.core.mapreduce; |
||||||
|
|
||||||
|
public class MapReduceCounts { |
||||||
|
|
||||||
|
private int inputCount; |
||||||
|
|
||||||
|
private int emitCount; |
||||||
|
|
||||||
|
private int outputCount; |
||||||
|
|
||||||
|
public MapReduceCounts(int inputCount, int emitCount, int outputCount) { |
||||||
|
super(); |
||||||
|
this.inputCount = inputCount; |
||||||
|
this.emitCount = emitCount; |
||||||
|
this.outputCount = outputCount; |
||||||
|
} |
||||||
|
|
||||||
|
public int getInputCount() { |
||||||
|
return inputCount; |
||||||
|
} |
||||||
|
|
||||||
|
public int getEmitCount() { |
||||||
|
return emitCount; |
||||||
|
} |
||||||
|
|
||||||
|
public int getOutputCount() { |
||||||
|
return outputCount; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public String toString() { |
||||||
|
return "MapReduceCounts [inputCount=" + inputCount + ", emitCount=" + emitCount + ", outputCount=" + outputCount |
||||||
|
+ "]"; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public int hashCode() { |
||||||
|
final int prime = 31; |
||||||
|
int result = 1; |
||||||
|
result = prime * result + emitCount; |
||||||
|
result = prime * result + inputCount; |
||||||
|
result = prime * result + outputCount; |
||||||
|
return result; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public boolean equals(Object obj) { |
||||||
|
if (this == obj) |
||||||
|
return true; |
||||||
|
if (obj == null) |
||||||
|
return false; |
||||||
|
if (getClass() != obj.getClass()) |
||||||
|
return false; |
||||||
|
MapReduceCounts other = (MapReduceCounts) obj; |
||||||
|
if (emitCount != other.emitCount) |
||||||
|
return false; |
||||||
|
if (inputCount != other.inputCount) |
||||||
|
return false; |
||||||
|
if (outputCount != other.outputCount) |
||||||
|
return false; |
||||||
|
return true; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
} |
||||||
@ -0,0 +1,94 @@ |
|||||||
|
/* |
||||||
|
* Copyright 2011 the original author or authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
package org.springframework.data.mongodb.core.mapreduce; |
||||||
|
|
||||||
|
import java.util.Iterator; |
||||||
|
import java.util.List; |
||||||
|
|
||||||
|
import org.springframework.util.Assert; |
||||||
|
|
||||||
|
import com.mongodb.DBObject; |
||||||
|
|
||||||
|
public class MapReduceResults<T> implements Iterable<T> { |
||||||
|
|
||||||
|
private final List<T> mappedResults; |
||||||
|
|
||||||
|
private DBObject rawResults; |
||||||
|
|
||||||
|
private MapReduceTiming mapReduceTiming; |
||||||
|
|
||||||
|
private MapReduceCounts mapReduceCounts; |
||||||
|
|
||||||
|
private String outputCollection; |
||||||
|
|
||||||
|
public MapReduceResults(List<T> mappedResults, DBObject rawResults) { |
||||||
|
Assert.notNull(mappedResults); |
||||||
|
Assert.notNull(rawResults); |
||||||
|
this.mappedResults = mappedResults; |
||||||
|
this.rawResults = rawResults; |
||||||
|
parseTiming(rawResults); |
||||||
|
parseCounts(rawResults); |
||||||
|
if (rawResults.get("result") != null) { |
||||||
|
this.outputCollection = (String) rawResults.get("result"); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
public Iterator<T> iterator() { |
||||||
|
return mappedResults.iterator(); |
||||||
|
} |
||||||
|
|
||||||
|
public MapReduceTiming getTiming() { |
||||||
|
return mapReduceTiming; |
||||||
|
} |
||||||
|
|
||||||
|
public MapReduceCounts getCounts() { |
||||||
|
return mapReduceCounts; |
||||||
|
} |
||||||
|
|
||||||
|
public String getOutputCollection() { |
||||||
|
return outputCollection; |
||||||
|
} |
||||||
|
|
||||||
|
public DBObject getRawResults() { |
||||||
|
return rawResults; |
||||||
|
} |
||||||
|
|
||||||
|
protected void parseTiming(DBObject rawResults) { |
||||||
|
DBObject timing = (DBObject) rawResults.get("timing"); |
||||||
|
if (timing != null) { |
||||||
|
if (timing.get("mapTime") != null && timing.get("emitLoop") != null && timing.get("total") != null) { |
||||||
|
mapReduceTiming = new MapReduceTiming( (Long)timing.get("mapTime"), |
||||||
|
(Integer)timing.get("emitLoop"), |
||||||
|
(Integer)timing.get("total")); |
||||||
|
} |
||||||
|
} else { |
||||||
|
mapReduceTiming = new MapReduceTiming(-1,-1,-1); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
protected void parseCounts(DBObject rawResults) { |
||||||
|
DBObject counts = (DBObject) rawResults.get("counts"); |
||||||
|
if (counts != null) { |
||||||
|
if (counts.get("input") != null && counts.get("emit") != null && counts.get("output") != null) { |
||||||
|
mapReduceCounts = new MapReduceCounts( (Integer)counts.get("input"), (Integer)counts.get("emit"), (Integer)counts.get("output")); |
||||||
|
} |
||||||
|
} else { |
||||||
|
mapReduceCounts = new MapReduceCounts(-1,-1,-1); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
@ -0,0 +1,80 @@ |
|||||||
|
/* |
||||||
|
* Copyright 2010-2011 the original author or authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
package org.springframework.data.mongodb.core.mapreduce; |
||||||
|
|
||||||
|
public class MapReduceTiming { |
||||||
|
|
||||||
|
private long mapTime; |
||||||
|
|
||||||
|
private long emitLoopTime; |
||||||
|
|
||||||
|
private long totalTime; |
||||||
|
|
||||||
|
public MapReduceTiming(long mapTime, long emitLoopTime, long totalTime) { |
||||||
|
this.mapTime = mapTime; |
||||||
|
this.emitLoopTime = emitLoopTime; |
||||||
|
this.totalTime = totalTime; |
||||||
|
} |
||||||
|
|
||||||
|
public long getMapTime() { |
||||||
|
return mapTime; |
||||||
|
} |
||||||
|
|
||||||
|
public long getEmitLoopTime() { |
||||||
|
return emitLoopTime; |
||||||
|
} |
||||||
|
|
||||||
|
public long getTotalTime() { |
||||||
|
return totalTime; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public String toString() { |
||||||
|
return "MapReduceTiming [mapTime=" + mapTime + ", emitLoopTime=" + emitLoopTime + ", totalTime=" + totalTime + "]"; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public int hashCode() { |
||||||
|
final int prime = 31; |
||||||
|
int result = 1; |
||||||
|
result = prime * result + (int) (emitLoopTime ^ (emitLoopTime >>> 32)); |
||||||
|
result = prime * result + (int) (mapTime ^ (mapTime >>> 32)); |
||||||
|
result = prime * result + (int) (totalTime ^ (totalTime >>> 32)); |
||||||
|
return result; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public boolean equals(Object obj) { |
||||||
|
if (this == obj) |
||||||
|
return true; |
||||||
|
if (obj == null) |
||||||
|
return false; |
||||||
|
if (getClass() != obj.getClass()) |
||||||
|
return false; |
||||||
|
MapReduceTiming other = (MapReduceTiming) obj; |
||||||
|
if (emitLoopTime != other.emitLoopTime) |
||||||
|
return false; |
||||||
|
if (mapTime != other.mapTime) |
||||||
|
return false; |
||||||
|
if (totalTime != other.totalTime) |
||||||
|
return false; |
||||||
|
return true; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
} |
||||||
@ -0,0 +1,190 @@ |
|||||||
|
/* |
||||||
|
* Copyright 2011 the original author or authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
package org.springframework.data.mongodb.core.mapreduce; |
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals; |
||||||
|
import static org.springframework.data.mongodb.core.query.Criteria.where; |
||||||
|
import static org.springframework.data.mongodb.core.mapreduce.MapReduceOptions.options; |
||||||
|
|
||||||
|
import java.util.ArrayList; |
||||||
|
import java.util.Arrays; |
||||||
|
import java.util.HashMap; |
||||||
|
import java.util.HashSet; |
||||||
|
import java.util.List; |
||||||
|
import java.util.Map; |
||||||
|
|
||||||
|
import org.junit.After; |
||||||
|
import org.junit.Before; |
||||||
|
import org.junit.Test; |
||||||
|
import org.junit.runner.RunWith; |
||||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||||
|
import org.springframework.data.mongodb.MongoDbFactory; |
||||||
|
import org.springframework.data.mongodb.core.MongoTemplate; |
||||||
|
import org.springframework.data.mongodb.core.convert.MappingMongoConverter; |
||||||
|
import org.springframework.data.mongodb.core.mapping.MongoMappingContext; |
||||||
|
import org.springframework.data.mongodb.core.query.Query; |
||||||
|
import org.springframework.test.context.ContextConfiguration; |
||||||
|
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; |
||||||
|
|
||||||
|
import com.mongodb.BasicDBObject; |
||||||
|
import com.mongodb.DBCollection; |
||||||
|
import com.mongodb.Mongo; |
||||||
|
|
||||||
|
/** |
||||||
|
* Integration test for {@link MongoTemplate}'s Map-Reduce operations |
||||||
|
* |
||||||
|
* @author Mark Pollack |
||||||
|
*/ |
||||||
|
@RunWith(SpringJUnit4ClassRunner.class) |
||||||
|
@ContextConfiguration("classpath:infrastructure.xml") |
||||||
|
public class MapReduceTests { |
||||||
|
|
||||||
|
private String mapFunction = "function(){ for ( var i=0; i<this.x.length; i++ ){ emit( this.x[i] , 1 ); } }"; |
||||||
|
private String reduceFunction = "function(key,values){ var sum=0; for( var i=0; i<values.length; i++ ) sum += values[i]; return sum;}"; |
||||||
|
|
||||||
|
@Autowired |
||||||
|
MongoTemplate template; |
||||||
|
@Autowired |
||||||
|
MongoDbFactory factory; |
||||||
|
MongoTemplate mongoTemplate; |
||||||
|
|
||||||
|
@Autowired |
||||||
|
@SuppressWarnings("unchecked") |
||||||
|
public void setMongo(Mongo mongo) throws Exception { |
||||||
|
|
||||||
|
MongoMappingContext mappingContext = new MongoMappingContext(); |
||||||
|
mappingContext.setInitialEntitySet(new HashSet<Class<?>>(Arrays.asList(ValueObject.class))); |
||||||
|
mappingContext.afterPropertiesSet(); |
||||||
|
|
||||||
|
MappingMongoConverter mappingConverter = new MappingMongoConverter(factory, mappingContext); |
||||||
|
mappingConverter.afterPropertiesSet(); |
||||||
|
this.mongoTemplate = new MongoTemplate(factory, mappingConverter); |
||||||
|
} |
||||||
|
|
||||||
|
@Before |
||||||
|
public void setUp() { |
||||||
|
cleanDb(); |
||||||
|
} |
||||||
|
|
||||||
|
@After |
||||||
|
public void cleanUp() { |
||||||
|
cleanDb(); |
||||||
|
} |
||||||
|
|
||||||
|
protected void cleanDb() { |
||||||
|
template.dropCollection(template.getCollectionName(ValueObject.class)); |
||||||
|
template.dropCollection("jmr1_out"); |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void testMapReduce() { |
||||||
|
performMapReduce(false, false); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void testMapReduceInline() { |
||||||
|
performMapReduce(true, false); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void testMapReduceWithQuery() { |
||||||
|
performMapReduce(false, true); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void testMapReduceInlineWithScope() { |
||||||
|
createMapReduceData(); |
||||||
|
|
||||||
|
Map<String, Object> scopeVariables = new HashMap<String, Object>(); |
||||||
|
scopeVariables.put("exclude", "a"); |
||||||
|
|
||||||
|
String mapWithExcludeFunction = "function(){ for ( var i=0; i<this.x.length; i++ ){ if(this.x[i] != exclude) emit( this.x[i] , 1 ); } }"; |
||||||
|
|
||||||
|
MapReduceResults<ValueObject> results = mongoTemplate.mapReduce(mapWithExcludeFunction, reduceFunction, |
||||||
|
new MapReduceOptions().scopeVariables(scopeVariables).outputTypeInline(), ValueObject.class); |
||||||
|
Map<String, Float> m = copyToMap(results); |
||||||
|
assertEquals(3, m.size()); |
||||||
|
assertEquals(2, m.get("b").intValue()); |
||||||
|
assertEquals(2, m.get("c").intValue()); |
||||||
|
assertEquals(1, m.get("d").intValue()); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void testMapReduceExcludeQuery() { |
||||||
|
createMapReduceData(); |
||||||
|
|
||||||
|
Query query = new Query(where("x").ne(new String[] { "a", "b" })); |
||||||
|
MapReduceResults<ValueObject> results = mongoTemplate.mapReduce(query, mapFunction, reduceFunction, ValueObject.class); |
||||||
|
|
||||||
|
Map<String, Float> m = copyToMap(results); |
||||||
|
assertEquals(3, m.size()); |
||||||
|
assertEquals(1, m.get("b").intValue()); |
||||||
|
assertEquals(2, m.get("c").intValue()); |
||||||
|
assertEquals(1, m.get("d").intValue()); |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
private void performMapReduce(boolean inline, boolean withQuery) { |
||||||
|
createMapReduceData(); |
||||||
|
MapReduceResults<ValueObject> results; |
||||||
|
if (inline) { |
||||||
|
if (withQuery) { |
||||||
|
results = mongoTemplate.mapReduce(new Query(), mapFunction, reduceFunction, ValueObject.class); |
||||||
|
} else { |
||||||
|
results = mongoTemplate.mapReduce(mapFunction, reduceFunction, ValueObject.class); |
||||||
|
} |
||||||
|
} else { |
||||||
|
if (withQuery) { |
||||||
|
results = mongoTemplate.mapReduce(new Query(), mapFunction, reduceFunction, options().outputCollection("jmr1_out"), ValueObject.class); |
||||||
|
} else { |
||||||
|
results = mongoTemplate.mapReduce(mapFunction, reduceFunction, new MapReduceOptions().outputCollection("jmr1_out"), ValueObject.class); |
||||||
|
} |
||||||
|
} |
||||||
|
Map<String, Float> m = copyToMap(results); |
||||||
|
assertMapReduceResults(m); |
||||||
|
} |
||||||
|
|
||||||
|
private void createMapReduceData() { |
||||||
|
DBCollection c = mongoTemplate.getDb().getCollection(template.getCollectionName(ValueObject.class)); |
||||||
|
c.save(new BasicDBObject("x", new String[] { "a", "b" })); |
||||||
|
c.save(new BasicDBObject("x", new String[] { "b", "c" })); |
||||||
|
c.save(new BasicDBObject("x", new String[] { "c", "d" })); |
||||||
|
} |
||||||
|
|
||||||
|
private Map<String, Float> copyToMap(MapReduceResults<ValueObject> results) { |
||||||
|
List<ValueObject> valueObjects = new ArrayList<ValueObject>(); |
||||||
|
for (ValueObject valueObject : results) { |
||||||
|
valueObjects.add(valueObject); |
||||||
|
} |
||||||
|
|
||||||
|
Map<String, Float> m = new HashMap<String, Float>(); |
||||||
|
for (ValueObject vo : valueObjects) { |
||||||
|
m.put(vo.getId(), vo.getValue()); |
||||||
|
} |
||||||
|
return m; |
||||||
|
} |
||||||
|
|
||||||
|
private void assertMapReduceResults(Map<String, Float> m) { |
||||||
|
assertEquals(4, m.size()); |
||||||
|
assertEquals(1, m.get("a").intValue()); |
||||||
|
assertEquals(2, m.get("b").intValue()); |
||||||
|
assertEquals(2, m.get("c").intValue()); |
||||||
|
assertEquals(1, m.get("d").intValue()); |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
@ -0,0 +1,29 @@ |
|||||||
|
package org.springframework.data.mongodb.core.mapreduce; |
||||||
|
|
||||||
|
public class ValueObject { |
||||||
|
|
||||||
|
private String id; |
||||||
|
|
||||||
|
public String getId() { |
||||||
|
return id; |
||||||
|
} |
||||||
|
|
||||||
|
private float value; |
||||||
|
|
||||||
|
public float getValue() { |
||||||
|
return value; |
||||||
|
} |
||||||
|
|
||||||
|
public void setValue(float value) { |
||||||
|
this.value = value; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public String toString() { |
||||||
|
return "ValueObject [id=" + id + ", value=" + value + "]"; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
} |
||||||
Loading…
Reference in new issue