Update to support java 9/10
This commit is contained in:
@@ -26,10 +26,12 @@ int sum = Fn.on(list1).forEach(new Fn.NoRet<Integer>() {
|
||||
```
|
||||
### Requirements
|
||||
|
||||
Nyx can be used on Java 6 (or later) platform.
|
||||
Nyx collections can be used with Java 9/10 platform.
|
||||
|
||||
### Releases
|
||||
|
||||
* v0.3
|
||||
- Upgraded to support Java 9/10
|
||||
* v0.2
|
||||
- New extended API for easy Nyx collections traversal and modifications (foreach, mapTo, filter).
|
||||
- Asynchronous data exchange with off-heap storage for better throughput.
|
||||
|
||||
172
pom.xml
172
pom.xml
@@ -1,87 +1,87 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>nyx</groupId>
|
||||
<artifactId>nyx-collections</artifactId>
|
||||
<version>0.2.0-SNAPSHOT</version>
|
||||
<distributionManagement>
|
||||
<repository>
|
||||
<id>internal.repo</id>
|
||||
<name>Temporary Staging Repository</name>
|
||||
<url>file://${project.build.directory}/mvn-repo</url>
|
||||
</repository>
|
||||
</distributionManagement>
|
||||
<properties>
|
||||
<github.global.server>github</github.global.server>
|
||||
</properties>
|
||||
<build>
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<source>1.7</source>
|
||||
<target>1.7</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-deploy-plugin</artifactId>
|
||||
<version>2.8.1</version>
|
||||
<configuration>
|
||||
<altDeploymentRepository>internal.repo::default::file://${project.build.directory}/mvn-repo</altDeploymentRepository>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>com.github.github</groupId>
|
||||
<artifactId>site-maven-plugin</artifactId>
|
||||
<version>0.10-SNAPSHOT</version>
|
||||
<configuration>
|
||||
<message>Maven artifacts for ${project.version}</message> <!-- git commit message -->
|
||||
<noJekyll>true</noJekyll> <!-- disable webpage processing -->
|
||||
<outputDirectory>${project.build.directory}/mvn-repo</outputDirectory> <!-- matches distribution management repository url above -->
|
||||
<branch>refs/heads/mvn-repo</branch> <!-- remote branch name -->
|
||||
<includes>
|
||||
<include>**/*</include>
|
||||
</includes>
|
||||
<repositoryName>nyx</repositoryName> <!-- github repo name -->
|
||||
<repositoryOwner>Arvik</repositoryOwner> <!-- github username -->
|
||||
</configuration>
|
||||
<executions>
|
||||
<!-- run site-maven-plugin's 'site' target as part of the build's normal
|
||||
'deploy' phase -->
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>site</goal>
|
||||
</goals>
|
||||
<phase>deploy</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.11</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>cglib</groupId>
|
||||
<artifactId>cglib</artifactId>
|
||||
<version>3.1</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<contributors>
|
||||
<contributor>
|
||||
<timezone>+1</timezone>
|
||||
<email>varlou@gmail.com</email>
|
||||
</contributor>
|
||||
</contributors>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>nyx</groupId>
|
||||
<artifactId>nyx-collections</artifactId>
|
||||
<version>0.3.0-SNAPSHOT</version>
|
||||
<distributionManagement>
|
||||
<repository>
|
||||
<id>internal.repo</id>
|
||||
<name>Temporary Staging Repository</name>
|
||||
<url>file://${project.build.directory}/mvn-repo</url>
|
||||
</repository>
|
||||
</distributionManagement>
|
||||
<properties>
|
||||
<github.global.server>github</github.global.server>
|
||||
</properties>
|
||||
<build>
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<source>9</source>
|
||||
<target>9</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-deploy-plugin</artifactId>
|
||||
<version>2.8.1</version>
|
||||
<configuration>
|
||||
<altDeploymentRepository>internal.repo::default::file://${project.build.directory}/mvn-repo</altDeploymentRepository>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>com.github.github</groupId>
|
||||
<artifactId>site-maven-plugin</artifactId>
|
||||
<version>0.12</version>
|
||||
<configuration>
|
||||
<message>Maven artifacts for ${project.version}</message> <!-- git commit message -->
|
||||
<noJekyll>true</noJekyll> <!-- disable webpage processing -->
|
||||
<outputDirectory>${project.build.directory}/mvn-repo</outputDirectory> <!-- matches distribution management repository url above -->
|
||||
<branch>refs/heads/mvn-repo</branch> <!-- remote branch name -->
|
||||
<includes>
|
||||
<include>**/*</include>
|
||||
</includes>
|
||||
<repositoryName>nyx</repositoryName> <!-- github repo name -->
|
||||
<repositoryOwner>Arvik</repositoryOwner> <!-- github username -->
|
||||
</configuration>
|
||||
<executions>
|
||||
<!-- run site-maven-plugin's 'site' target as part of the build's normal
|
||||
'deploy' phase -->
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>site</goal>
|
||||
</goals>
|
||||
<phase>deploy</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.11</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>cglib</groupId>
|
||||
<artifactId>cglib</artifactId>
|
||||
<version>3.1</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<contributors>
|
||||
<contributor>
|
||||
<timezone>+1</timezone>
|
||||
<email>varlou@gmail.com</email>
|
||||
</contributor>
|
||||
</contributors>
|
||||
</project>
|
||||
@@ -1,30 +1,145 @@
|
||||
package nyx.collections;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Basic implementation of Set collection interface.
|
||||
*
|
||||
* @author varlou@gmail.com
|
||||
*/
|
||||
public class NyxSet<E> extends NyxList<E> implements Set<E>, Serializable {
|
||||
|
||||
private static final long serialVersionUID = -7388374589811129603L;
|
||||
|
||||
@Override
|
||||
public boolean add(E e) {
|
||||
if (!super.contains(e)) {
|
||||
super.add(e);
|
||||
return true;
|
||||
} else return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean addAll(Collection<? extends E> c) {
|
||||
boolean result = false;
|
||||
for (E e : c) result |= add(e);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
package nyx.collections;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
import java.util.function.UnaryOperator;
|
||||
|
||||
/**
|
||||
* Basic implementation of Set collection interface.
|
||||
*
|
||||
* @author varlou@gmail.com
|
||||
*/
|
||||
public class NyxSet<E> implements Set<E>, Serializable {
|
||||
|
||||
private static final long serialVersionUID = -7388374589811129603L;
|
||||
|
||||
private final NyxList<E> internList = new NyxList<>();
|
||||
|
||||
@Override
|
||||
public boolean add(E e) {
|
||||
if (!internList.contains(e)) {
|
||||
internList.add(e);
|
||||
return true;
|
||||
} else return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean remove(Object obj) {
|
||||
return internList.remove(obj);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsAll(Collection<?> c) {
|
||||
return internList.containsAll(c);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeAll(Collection<?> c) {
|
||||
return internList.removeAll(c);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean retainAll(Collection<?> c) {
|
||||
return internList.retainAll(c);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean addAll(Collection<? extends E> c) {
|
||||
boolean result = false;
|
||||
for (E e : c) result |= add(e);
|
||||
return result;
|
||||
}
|
||||
|
||||
public void add(int index, E element) {
|
||||
internList.add(index, element);
|
||||
}
|
||||
|
||||
public boolean addAll(int index, Collection<? extends E> c) {
|
||||
return internList.addAll(index, c);
|
||||
}
|
||||
|
||||
public E get(int index) {
|
||||
return internList.get(index);
|
||||
}
|
||||
|
||||
public E set(int index, E element) {
|
||||
return internList.set(index, element);
|
||||
}
|
||||
|
||||
public E remove(int index) {
|
||||
return internList.remove(index);
|
||||
}
|
||||
|
||||
public int indexOf(Object o) {
|
||||
return internList.indexOf(o);
|
||||
}
|
||||
|
||||
public int lastIndexOf(Object o) {
|
||||
return internList.lastIndexOf(o);
|
||||
}
|
||||
|
||||
public ListIterator<E> listIterator() {
|
||||
return internList.listIterator();
|
||||
}
|
||||
|
||||
public ListIterator<E> listIterator(int index) {
|
||||
return internList.listIterator(index);
|
||||
}
|
||||
|
||||
public List<E> subList(int fromIndex, int toIndex) {
|
||||
return internList.subList(fromIndex, toIndex);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return internList.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
return internList.equals(o);
|
||||
}
|
||||
|
||||
public void replaceAll(UnaryOperator<E> operator) {
|
||||
internList.replaceAll(operator);
|
||||
}
|
||||
|
||||
public void sort(Comparator<? super E> c) {
|
||||
internList.sort(c);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
internList.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return internList.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return internList.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean contains(Object obj) {
|
||||
return internList.contains(obj);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<E> iterator() {
|
||||
return internList.iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object[] toArray() {
|
||||
return internList.toArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T[] toArray(T[] a) {
|
||||
return internList.toArray(a);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,258 +1,255 @@
|
||||
package nyx.collections.storage;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Method;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
import nyx.collections.Acme;
|
||||
import nyx.collections.Const;
|
||||
|
||||
/**
|
||||
* Elastic thread-safe storage for byte arrays. This is a base class which is
|
||||
* used by Nyx collection classes. Byte arrays are stored in direct
|
||||
* {@link java.nio.ByteBuffers} outside garbage-collected memory.
|
||||
*
|
||||
* @author varlou@gmail.com
|
||||
*/
|
||||
public class ElasticByteStorage<E> implements Storage<E, byte[]>, Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1408552328267845863L;
|
||||
|
||||
private transient List<ByteBuffer> dbbs = new ArrayList<>();
|
||||
private long cursor = 0;
|
||||
private int capacity = Const._1Kb * 4; // default chunk size is 4Kb
|
||||
/* long array contains 3 elements - from, to and hashCode*/
|
||||
private Map<E, long[]> elementsLocation = Acme.chashmap();
|
||||
|
||||
/** Creates instance with a default initial capacity (4Kb). */
|
||||
public ElasticByteStorage() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates instance with a given initial capacity in bytes.
|
||||
*
|
||||
* @param capacity
|
||||
* the capacity of this storage in bytes
|
||||
* @throws IllegalArgumentException
|
||||
* if {@code capacity < 4096}
|
||||
*/
|
||||
public ElasticByteStorage(int capacity) {
|
||||
if (capacity < Const._1Kb * 4) throw new IllegalArgumentException();
|
||||
this.capacity = capacity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] put(E id, byte[] addme) {
|
||||
if (elementsLocation.containsKey(id))
|
||||
throw new IllegalArgumentException();
|
||||
try {
|
||||
long[] location = incremenetCursor(addme);
|
||||
long lCursor = location[0];
|
||||
int commited = 0;
|
||||
while (commited < addme.length) {
|
||||
ByteBuffer cbuf = bufferForPosition(lCursor);
|
||||
synchronized (cbuf) {
|
||||
cbuf.position(offsetForPosition(lCursor));
|
||||
int spaceLeft = capacity - cbuf.position();
|
||||
int size = Math.min(spaceLeft, addme.length - commited);
|
||||
cbuf.put(addme, commited, size);
|
||||
lCursor += size;
|
||||
commited += size;
|
||||
}
|
||||
}
|
||||
elementsLocation.put(id, location);
|
||||
return addme;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized long[] incremenetCursor(byte[] addme) {
|
||||
return new long[] { this.cursor, this.cursor += addme.length, Arrays.hashCode(addme) };
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] get(E id) {
|
||||
if (!elementsLocation.containsKey(id)) return null;
|
||||
long[] location = elementsLocation.get(id);
|
||||
assert location.length == 3;
|
||||
byte[] result = new byte[(int) (location[1] - location[0])];
|
||||
int readed = 0;
|
||||
while (readed < result.length) {
|
||||
long pos = location[0] + readed;
|
||||
ByteBuffer bb = bufferForPosition(pos);
|
||||
int offset = offsetForPosition(pos);
|
||||
synchronized (bb) {
|
||||
bb.position(offset);
|
||||
int size = Math.min(bb.remaining(), result.length - readed);
|
||||
bb.get(result, readed, size);
|
||||
readed += size;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void clear() {
|
||||
this.elementsLocation.clear();
|
||||
for (ByteBuffer byteBuffer : dbbs) deallocDirectByteBuffer(byteBuffer);
|
||||
}
|
||||
|
||||
private ByteBuffer getBuffer() {
|
||||
return bufferForPosition(cursor);
|
||||
}
|
||||
|
||||
private ByteBuffer bufferForPosition(long pos) {
|
||||
int idx = (int) (pos / capacity);
|
||||
ByteBuffer byteBuffer = dbbs.size() > idx ? dbbs.get(idx) : null;
|
||||
if (byteBuffer == null)
|
||||
dbbs.add(byteBuffer = makeNewBuffer());
|
||||
return byteBuffer;
|
||||
}
|
||||
|
||||
private ByteBuffer makeNewBuffer() {
|
||||
return Acme.dbbuffer(capacity);
|
||||
}
|
||||
|
||||
private int currentOffset() {
|
||||
return offsetForPosition(this.cursor);
|
||||
}
|
||||
|
||||
private int offsetForPosition(long pos) {
|
||||
return (int) (pos % capacity);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param bb
|
||||
* ByteBuffer to discard
|
||||
* @throws ReflectiveOperationException
|
||||
* if no cleaner field found in current implementation of
|
||||
* ByteBuffer (may happen in case of non-Sun/Oracle JDK)
|
||||
*/
|
||||
private void deallocDirectByteBuffer(ByteBuffer bb) {
|
||||
if (!bb.isDirect())
|
||||
return;
|
||||
Field cleanerField;
|
||||
try {
|
||||
cleanerField = bb.getClass().getDeclaredField("cleaner");
|
||||
cleanerField.setAccessible(true);
|
||||
Object cleaner = cleanerField.get(bb);
|
||||
Method cleanMethod = cleaner.getClass().getMethod("clean",
|
||||
new Class[] {});
|
||||
cleanMethod.invoke(cleaner, new Object[] {});
|
||||
} catch (ReflectiveOperationException | SecurityException
|
||||
| IllegalArgumentException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes given object from the storage.
|
||||
* <p>
|
||||
* The memory occupied by the object is not actually freed, so this method
|
||||
* only <b>marks the object for deletion</b>.
|
||||
*
|
||||
* @param id
|
||||
* the id of the object to be removed
|
||||
*/
|
||||
@Override
|
||||
public byte[] remove(E id) {
|
||||
byte[] res = get(id);
|
||||
this.elementsLocation.remove(id);
|
||||
return res;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return this.elementsLocation.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<E> keySet() {
|
||||
return Acme.umset(this.elementsLocation.keySet());
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] update(E key, byte[] value) {
|
||||
byte[] old = remove(key);
|
||||
put(key, value);
|
||||
return old;
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes all elements previously marked for deletion (by
|
||||
* {@link #remove(Object)} method) from this storage.
|
||||
*
|
||||
* This is very basic implementation which needs to be improved in the new
|
||||
* versions of Nyx Collections. It simply copies all remaining elements into
|
||||
* new {@link ByteBuffer} and destroys old ones.
|
||||
*/
|
||||
@Override
|
||||
public synchronized void purge() {
|
||||
// Basic implementation - copies all remaining elements into new
|
||||
// byte buffers and removes old ones. Doubles
|
||||
ElasticByteStorage<E> copy = new ElasticByteStorage<E>(this.capacity);
|
||||
for (Entry<E, long[]> entry : elementsLocation.entrySet()) {
|
||||
copy.put(entry.getKey(), get(entry.getKey()));
|
||||
}
|
||||
// deallocate off-heap memory
|
||||
for (ByteBuffer oldBuffer : this.dbbs)
|
||||
deallocDirectByteBuffer(oldBuffer);
|
||||
// switch to the newly created Storage instance.
|
||||
this.dbbs = copy.dbbs;
|
||||
this.elementsLocation = copy.elementsLocation;
|
||||
this.cursor = copy.cursor;
|
||||
}
|
||||
|
||||
private void writeObject(java.io.ObjectOutputStream out) throws IOException {
|
||||
out.writeObject(new Object[]{ this.capacity,this.elementsLocation, this.cursor, this.dbbs.size() });
|
||||
for (ByteBuffer byteBuffer : dbbs) {
|
||||
byte[] toWrite = new byte[byteBuffer.limit()];
|
||||
byteBuffer.position(0);
|
||||
byteBuffer.get(toWrite);
|
||||
out.writeObject(toWrite);
|
||||
}
|
||||
out.flush();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void readObject(java.io.ObjectInputStream in)
|
||||
throws ClassNotFoundException, IOException {
|
||||
Object[] fields = (Object[]) in.readObject();
|
||||
this.capacity = (int) fields[0];
|
||||
this.elementsLocation= (Map<E, long[]>) fields[1];
|
||||
this.cursor = (long) fields[2];
|
||||
int size = (int)fields[3];
|
||||
this.dbbs = Acme.alist(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
byte[] toRead = (byte[])in.readObject();
|
||||
ByteBuffer byteBuffer = Acme.dbbuffer(this.capacity);
|
||||
byteBuffer.put(toRead);
|
||||
this.dbbs.add(byteBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Binary comparison
|
||||
*/
|
||||
@Override
|
||||
public boolean contains(byte[] value) {
|
||||
// check hashCodes first - than fetch
|
||||
int hashCode = Arrays.hashCode(value);
|
||||
for (Entry<E, long[]> entry : elementsLocation.entrySet())
|
||||
if (entry.getValue()[2] == hashCode
|
||||
&& Arrays.equals(value, get(entry.getKey())))
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
package nyx.collections.storage;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Method;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
import nyx.collections.Acme;
|
||||
import nyx.collections.Const;
|
||||
|
||||
/**
|
||||
* Elastic thread-safe storage for byte arrays. This is a base class which is
|
||||
* used by Nyx collection classes. Byte arrays are stored in direct
|
||||
* {@link java.nio.ByteBuffers} outside garbage-collected memory.
|
||||
*
|
||||
* @author varlou@gmail.com
|
||||
*/
|
||||
public class ElasticByteStorage<E> implements Storage<E, byte[]>, Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1408552328267845863L;
|
||||
|
||||
private transient List<ByteBuffer> dbbs = new ArrayList<>();
|
||||
private long cursor = 0;
|
||||
private int capacity = Const._1Kb * 4; // default chunk size is 4Kb
|
||||
/* long array contains 3 elements - from, to and hashCode*/
|
||||
private Map<E, long[]> elementsLocation = Acme.chashmap();
|
||||
|
||||
/**
|
||||
* Creates instance with a default initial capacity (4Kb).
|
||||
*/
|
||||
public ElasticByteStorage() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates instance with a given initial capacity in bytes.
|
||||
*
|
||||
* @param capacity the capacity of this storage in bytes
|
||||
* @throws IllegalArgumentException if {@code capacity < 4096}
|
||||
*/
|
||||
public ElasticByteStorage(int capacity) {
|
||||
if (capacity < Const._1Kb * 4) throw new IllegalArgumentException();
|
||||
this.capacity = capacity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] put(E id, byte[] addme) {
|
||||
if (elementsLocation.containsKey(id))
|
||||
throw new IllegalArgumentException();
|
||||
try {
|
||||
long[] location = incremenetCursor(addme);
|
||||
long lCursor = location[0];
|
||||
int commited = 0;
|
||||
while (commited < addme.length) {
|
||||
ByteBuffer cbuf = bufferForPosition(lCursor);
|
||||
synchronized (cbuf) {
|
||||
cbuf.position(offsetForPosition(lCursor));
|
||||
int spaceLeft = capacity - cbuf.position();
|
||||
int size = Math.min(spaceLeft, addme.length - commited);
|
||||
cbuf.put(addme, commited, size);
|
||||
lCursor += size;
|
||||
commited += size;
|
||||
}
|
||||
}
|
||||
elementsLocation.put(id, location);
|
||||
return addme;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized long[] incremenetCursor(byte[] addme) {
|
||||
return new long[]{this.cursor, this.cursor += addme.length, Arrays.hashCode(addme)};
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] get(E id) {
|
||||
if (!elementsLocation.containsKey(id)) return null;
|
||||
long[] location = elementsLocation.get(id);
|
||||
assert location.length == 3;
|
||||
byte[] result = new byte[(int) (location[1] - location[0])];
|
||||
int readed = 0;
|
||||
while (readed < result.length) {
|
||||
long pos = location[0] + readed;
|
||||
ByteBuffer bb = bufferForPosition(pos);
|
||||
int offset = offsetForPosition(pos);
|
||||
synchronized (bb) {
|
||||
bb.position(offset);
|
||||
int size = Math.min(bb.remaining(), result.length - readed);
|
||||
bb.get(result, readed, size);
|
||||
readed += size;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void clear() {
|
||||
this.elementsLocation.clear();
|
||||
for (ByteBuffer byteBuffer : dbbs) deallocDirectByteBuffer(byteBuffer);
|
||||
}
|
||||
|
||||
private ByteBuffer getBuffer() {
|
||||
return bufferForPosition(cursor);
|
||||
}
|
||||
|
||||
private ByteBuffer bufferForPosition(long pos) {
|
||||
int idx = (int) (pos / capacity);
|
||||
ByteBuffer byteBuffer = dbbs.size() > idx ? dbbs.get(idx) : null;
|
||||
if (byteBuffer == null)
|
||||
dbbs.add(byteBuffer = makeNewBuffer());
|
||||
return byteBuffer;
|
||||
}
|
||||
|
||||
private ByteBuffer makeNewBuffer() {
|
||||
return Acme.dbbuffer(capacity);
|
||||
}
|
||||
|
||||
private int currentOffset() {
|
||||
return offsetForPosition(this.cursor);
|
||||
}
|
||||
|
||||
private int offsetForPosition(long pos) {
|
||||
return (int) (pos % capacity);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param bb ByteBuffer to discard
|
||||
* @throws ReflectiveOperationException if no cleaner field found in current implementation of
|
||||
* ByteBuffer (may happen in case of non-Sun/Oracle JDK)
|
||||
*/
|
||||
private void deallocDirectByteBuffer(ByteBuffer bb) {
|
||||
if (!bb.isDirect())
|
||||
return;
|
||||
Field cleanerField;
|
||||
try {
|
||||
final Class<?> unsafeClass = Class.forName("sun.misc.Unsafe");
|
||||
// first check if Unsafe has the right method, otherwise we can give up
|
||||
// without doing any security critical stuff:
|
||||
Method invokeCleaner = unsafeClass.getMethod("invokeCleaner", ByteBuffer.class);
|
||||
Field unsafeClassField = unsafeClass.getDeclaredField("theUnsafe");
|
||||
unsafeClassField.setAccessible(true);
|
||||
invokeCleaner.invoke(unsafeClassField.get(null),bb);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes given object from the storage.
|
||||
* <p>
|
||||
* The memory occupied by the object is not actually freed, so this method
|
||||
* only <b>marks the object for deletion</b>.
|
||||
*
|
||||
* @param id the id of the object to be removed
|
||||
*/
|
||||
@Override
|
||||
public byte[] remove(E id) {
|
||||
byte[] res = get(id);
|
||||
this.elementsLocation.remove(id);
|
||||
return res;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return this.elementsLocation.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<E> keySet() {
|
||||
return Acme.umset(this.elementsLocation.keySet());
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] update(E key, byte[] value) {
|
||||
byte[] old = remove(key);
|
||||
put(key, value);
|
||||
return old;
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes all elements previously marked for deletion (by
|
||||
* {@link #remove(Object)} method) from this storage.
|
||||
* <p>
|
||||
* This is very basic implementation which needs to be improved in the new
|
||||
* versions of Nyx Collections. It simply copies all remaining elements into
|
||||
* new {@link ByteBuffer} and destroys old ones.
|
||||
*/
|
||||
@Override
|
||||
public synchronized void purge() {
|
||||
// Basic implementation - copies all remaining elements into new
|
||||
// byte buffers and removes old ones. Doubles
|
||||
ElasticByteStorage<E> copy = new ElasticByteStorage<E>(this.capacity);
|
||||
for (Entry<E, long[]> entry : elementsLocation.entrySet()) {
|
||||
copy.put(entry.getKey(), get(entry.getKey()));
|
||||
}
|
||||
// deallocate off-heap memory
|
||||
for (ByteBuffer oldBuffer : this.dbbs)
|
||||
deallocDirectByteBuffer(oldBuffer);
|
||||
// switch to the newly created Storage instance.
|
||||
this.dbbs = copy.dbbs;
|
||||
this.elementsLocation = copy.elementsLocation;
|
||||
this.cursor = copy.cursor;
|
||||
}
|
||||
|
||||
private void writeObject(java.io.ObjectOutputStream out) throws IOException {
|
||||
out.writeObject(new Object[]{this.capacity, this.elementsLocation, this.cursor, this.dbbs.size()});
|
||||
for (ByteBuffer byteBuffer : dbbs) {
|
||||
byte[] toWrite = new byte[byteBuffer.limit()];
|
||||
byteBuffer.position(0);
|
||||
byteBuffer.get(toWrite);
|
||||
out.writeObject(toWrite);
|
||||
}
|
||||
out.flush();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void readObject(java.io.ObjectInputStream in)
|
||||
throws ClassNotFoundException, IOException {
|
||||
Object[] fields = (Object[]) in.readObject();
|
||||
this.capacity = (int) fields[0];
|
||||
this.elementsLocation = (Map<E, long[]>) fields[1];
|
||||
this.cursor = (long) fields[2];
|
||||
int size = (int) fields[3];
|
||||
this.dbbs = Acme.alist(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
byte[] toRead = (byte[]) in.readObject();
|
||||
ByteBuffer byteBuffer = Acme.dbbuffer(this.capacity);
|
||||
byteBuffer.put(toRead);
|
||||
this.dbbs.add(byteBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Binary comparison
|
||||
*/
|
||||
@Override
|
||||
public boolean contains(byte[] value) {
|
||||
// check hashCodes first - than fetch
|
||||
int hashCode = Arrays.hashCode(value);
|
||||
for (Entry<E, long[]> entry : elementsLocation.entrySet())
|
||||
if (entry.getValue()[2] == hashCode
|
||||
&& Arrays.equals(value, get(entry.getKey())))
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user