Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions code/src/main/java/com/googlecode/cqengine/pk/PrimaryKeyIndex.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.googlecode.cqengine.pk;

import com.googlecode.cqengine.attribute.Attribute;
import com.googlecode.cqengine.index.support.Factory;
import com.googlecode.cqengine.index.unique.UniqueIndex;

import java.util.concurrent.ConcurrentMap;

/**
* An index that reuses map supplied in constructor
*/
public class PrimaryKeyIndex<A, O> extends UniqueIndex<A, O> {
public PrimaryKeyIndex(final ConcurrentMap<A, O> indexMap, Attribute attribute) {
super(new Factory<ConcurrentMap<A, O>>() {
@Override
public ConcurrentMap<A, O> create() {
return indexMap;
}
}, attribute);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.googlecode.cqengine.pk;

import com.googlecode.cqengine.ConcurrentIndexedCollection;
import com.googlecode.cqengine.attribute.SimpleAttribute;
import com.googlecode.cqengine.index.unique.UniqueIndex;

import java.util.concurrent.ConcurrentMap;

/**
* Indexed collection backed by primary key to object map
*/
public class PrimaryKeyIndexedCollection<O, A extends Comparable<A>> extends ConcurrentIndexedCollection<O> {

private ConcurrentMap<A, O> pkMap;
private SimpleAttribute<O, A> primaryKeyAttribute;

public PrimaryKeyIndexedCollection(SimpleAttribute<O, A> attribute) {
this(new PrimaryKeyOnHeapPersistence<O, A>(attribute));
}

public PrimaryKeyIndexedCollection(PrimaryKeyOnHeapPersistence<O, A> persistence) {
super(persistence);
pkMap = ((PrimaryKeyOnHeapObjectStore) objectStore).getPkMap();
primaryKeyAttribute = persistence.getPrimaryKeyAttribute();
UniqueIndex<A, O> pkIndex = new PrimaryKeyIndex<A, O>(pkMap, primaryKeyAttribute);
addIndex(pkIndex);
}

/**
* Fast access to entry by Key
*
* @param key
* @return
*/
public O get(A key) {
return pkMap.get(key);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package com.googlecode.cqengine.pk;

import com.googlecode.cqengine.attribute.SimpleAttribute;
import com.googlecode.cqengine.persistence.support.CollectionWrappingObjectStore;

import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* ObjectStore backed by index
*/
public class PrimaryKeyOnHeapObjectStore<O, A extends Comparable<A>> extends CollectionWrappingObjectStore<O> {
private final ConcurrentMap<A, O> pkMap;

public PrimaryKeyOnHeapObjectStore(SimpleAttribute<O, A> pkAttr) {
super(new SetFromMap(new ConcurrentHashMap<A, O>(), pkAttr));
pkMap = ((SetFromMap<O, A>) getBackingCollection()).getMap();
}

public PrimaryKeyOnHeapObjectStore(int initialCapacity, float loadFactor, int concurrencyLevel, SimpleAttribute<O, A> pkAttr) {
super(new SetFromMap<O,A>(new ConcurrentHashMap<A, O>(initialCapacity, loadFactor, concurrencyLevel), pkAttr));
pkMap = ((SetFromMap<O, A>) getBackingCollection()).getMap();
}

public ConcurrentMap<A, O> getPkMap() {
return pkMap;
}

private static class SetFromMap<O, A extends Comparable<A>> extends AbstractSet<O>
implements Set<O>, Serializable {
// The backing map
private final ConcurrentMap<A, O> map;
// Its values view
private transient Collection<O> values;
// Attribute to read a key
private final SimpleAttribute<O, A> attr;

SetFromMap(ConcurrentMap<A, O> map, SimpleAttribute<O, A> attr) {
if (!map.isEmpty())
throw new IllegalArgumentException("Map must be empty");
this.map = map;
// ValuesView in ConcurrentHashMap is unique and final!
// store the reference for faster access
values = map.values();
this.attr = attr;
}

public void clear() {
map.clear();
}

public int size() {
return map.size();
}

public boolean isEmpty() {
return map.isEmpty();
}

public boolean contains(Object o) {
return map.containsKey(attr.getValue((O)o, null));
}

public boolean remove(Object o) {
return map.remove(attr.getValue((O)o, null)) != null;
}

public boolean add(O o) {
return map.put(attr.getValue((O)o, null), o) == null;
}

public Iterator<O> iterator() {
return values.iterator();
}

public Object[] toArray() {
return values.toArray();
}

public <T> T[] toArray(T[] a) {
return values.toArray(a);
}

public String toString() {
return values.toString();
}

public int hashCode() {
return map.keySet().hashCode();
}

public boolean equals(Object o) {
return o == this;
}

public boolean containsAll(Collection<?> c) {
for (Object o : c) {
A value = attr.getValue((O) o, null);
if(map.get(value) == null) return false;
}
return true;
}

public boolean removeAll(Collection<?> c) {
boolean changed = false;
for (Object o : c) {
A value = attr.getValue((O) o, null);
if(map.remove(value) != null) changed = true;
}
return changed;
}

// retainAll and addAll is are inherited implementation

ConcurrentMap<A, O> getMap(){
return map;
}

private static final long serialVersionUID = 2454657854757543879L;

private void readObject(java.io.ObjectInputStream stream)
throws IOException, ClassNotFoundException {
stream.defaultReadObject();
values = map.values();
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.googlecode.cqengine.pk;

import com.googlecode.cqengine.attribute.SimpleAttribute;
import com.googlecode.cqengine.persistence.onheap.OnHeapPersistence;
import com.googlecode.cqengine.persistence.support.ObjectStore;

/**
* ObjectStore with direct access to pk map
*/
public class PrimaryKeyOnHeapPersistence<O, A extends Comparable<A>> extends OnHeapPersistence<O, A> {

private final int initialCapacity;
private final float loadFactor;
private final int concurrencyLevel;

public PrimaryKeyOnHeapPersistence() {
this(null, 16, 0.75F, 16);
}

public PrimaryKeyOnHeapPersistence(SimpleAttribute<O, A> primaryKeyAttribute) {
this(primaryKeyAttribute, 16, 0.75F, 16);
}

public PrimaryKeyOnHeapPersistence(SimpleAttribute<O, A> primaryKeyAttribute, int initialCapacity, float loadFactor, int concurrencyLevel) {
super(primaryKeyAttribute, initialCapacity, loadFactor, concurrencyLevel);
this.initialCapacity = initialCapacity;
this.loadFactor = loadFactor;
this.concurrencyLevel = concurrencyLevel;
}

@Override
public ObjectStore<O> createObjectStore() {
return new PrimaryKeyOnHeapObjectStore<O, A>(initialCapacity, loadFactor, concurrencyLevel, getPrimaryKeyAttribute());
}
}