AccumuloEPGMStore.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.storage.accumulo.impl;
import com.google.common.collect.Lists;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.NamespaceExistsException;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.gradoop.common.model.api.entities.Element;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.storage.common.api.EPGMConfigProvider;
import org.gradoop.storage.common.api.EPGMGraphInput;
import org.gradoop.storage.common.api.EPGMGraphPredictableOutput;
import org.gradoop.storage.common.iterator.ClosableIterator;
import org.gradoop.storage.common.iterator.EmptyClosableIterator;
import org.gradoop.storage.common.predicate.query.ElementQuery;
import org.gradoop.storage.common.predicate.query.Query;
import org.gradoop.storage.accumulo.config.GradoopAccumuloConfig;
import org.gradoop.storage.accumulo.impl.constants.AccumuloDefault;
import org.gradoop.storage.accumulo.impl.constants.AccumuloTables;
import org.gradoop.storage.accumulo.impl.handler.AccumuloRowHandler;
import org.gradoop.storage.accumulo.impl.iterator.client.ClientClosableIterator;
import org.gradoop.storage.accumulo.impl.iterator.tserver.GradoopEdgeIterator;
import org.gradoop.storage.accumulo.impl.iterator.tserver.GradoopGraphHeadIterator;
import org.gradoop.storage.accumulo.impl.iterator.tserver.GradoopVertexIterator;
import org.gradoop.storage.accumulo.impl.predicate.filter.api.AccumuloElementFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Default Accumulo EPGM graph store that handles reading and writing vertices and
* graphs from and to Accumulo, It is designed thread-safe.
* Store contains instances are divided by {@link GradoopAccumuloConfig#ACCUMULO_TABLE_PREFIX}
*
* @see EPGMGraphPredictableOutput
*/
public class AccumuloEPGMStore implements
EPGMConfigProvider<GradoopAccumuloConfig>,
EPGMGraphInput,
EPGMGraphPredictableOutput<
AccumuloElementFilter<EPGMGraphHead>,
AccumuloElementFilter<EPGMVertex>,
AccumuloElementFilter<EPGMEdge>> {
/**
* accumulo epgm store logger
*/
private static final Logger LOG = LoggerFactory.getLogger(AccumuloEPGMStore.class);
/**
* gradoop accumulo configuration
*/
private final GradoopAccumuloConfig config;
/**
* accumulo client connector
*/
private final Connector conn;
/**
* batch writer for epgm graph head table
*/
private final BatchWriter graphWriter;
/**
* batch writer for epgm vertex table
*/
private final BatchWriter vertexWriter;
/**
* batch writer for epgm edge table
*/
private final BatchWriter edgeWriter;
/**
* auto flush flag, default false
*/
private volatile boolean autoFlush;
/**
* Creates an AccumuloEPGMStore based on the given parameters.
* Tables with given prefix will be auto-create if not exists
*
* @param config accumulo store configuration
* @throws AccumuloSecurityException for security violations,
* authentication failures,
* authorization failures,
* etc.
* @throws AccumuloException generic Accumulo Exception for general accumulo failures.
*/
public AccumuloEPGMStore(@Nonnull GradoopAccumuloConfig config)
throws AccumuloSecurityException, AccumuloException {
this.config = config;
this.conn = createConnector();
createTablesIfNotExists();
try {
graphWriter = conn.createBatchWriter(getGraphHeadName(), new BatchWriterConfig());
vertexWriter = conn.createBatchWriter(getVertexTableName(), new BatchWriterConfig());
edgeWriter = conn.createBatchWriter(getEdgeTableName(), new BatchWriterConfig());
} catch (TableNotFoundException e) {
throw new IllegalStateException(e); //should not be here
}
}
/**
* Create an accumulo client connector with given configuration
*
* @return accumulo client connector instance
* @throws AccumuloSecurityException for security violations,
* authentication failures,
* authorization failures,
* etc.
* @throws AccumuloException generic Accumulo Exception for general accumulo failures.
*/
public Connector createConnector() throws AccumuloSecurityException, AccumuloException {
return new ZooKeeperInstance(
/*instannce*/config.get(GradoopAccumuloConfig.ACCUMULO_INSTANCE, AccumuloDefault.INSTANCE),
/*zookeepers*/config.get(GradoopAccumuloConfig.ZOOKEEPER_HOSTS, AccumuloDefault.ZOOKEEPERS)
).getConnector(
/*user*/config.get(GradoopAccumuloConfig.ACCUMULO_USER, AccumuloDefault.USER),
/*password*/
new PasswordToken(config.get(GradoopAccumuloConfig.ACCUMULO_PASSWD, AccumuloDefault
.PASSWORD)));
}
@Override
public GradoopAccumuloConfig getConfig() {
return config;
}
@Override
public String getVertexTableName() {
return config.getVertexTable();
}
@Override
public String getEdgeTableName() {
return config.getEdgeTable();
}
@Override
public String getGraphHeadName() {
return config.getGraphHeadTable();
}
@Override
public void writeGraphHead(@Nonnull GraphHead record) {
writeRecord(record, graphWriter, config.getGraphHandler());
}
@Override
public void writeVertex(@Nonnull Vertex record) {
writeRecord(record, vertexWriter, config.getVertexHandler());
}
@Override
public void writeEdge(@Nonnull Edge record) {
writeRecord(record, edgeWriter, config.getEdgeHandler());
}
@Override
public void setAutoFlush(boolean autoFlush) {
this.autoFlush = autoFlush;
}
@Override
public void flush() {
try {
graphWriter.flush();
vertexWriter.flush();
edgeWriter.flush();
} catch (MutationsRejectedException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() {
try {
graphWriter.close();
vertexWriter.close();
edgeWriter.close();
} catch (MutationsRejectedException e) {
throw new RuntimeException(e);
}
}
/**
* Drop all tables used by this store instance.
*
* @throws IOException when deleting tables fails.
*/
public void dropTables() throws IOException {
for (String tableName : new String[] {
getVertexTableName(), getEdgeTableName(), getGraphHeadName()}) {
try {
dropTableIfExists(tableName);
} catch (AccumuloSecurityException | AccumuloException e) {
throw new IOException("Failed to delete table " + tableName, e);
}
}
}
/**
* Truncate all tables handled by this store instance, i.e. delete all rows.
* The current implementations simply drops all tables and creates new ones.
*
* @throws IOException when deleting or creating tables fails.
*/
public void truncateTables() throws IOException {
dropTables();
try {
createTablesIfNotExists();
} catch (AccumuloSecurityException | AccumuloException e) {
throw new IOException("Failed to create tables.", e);
}
}
@Nullable
@Override
public EPGMGraphHead readGraph(@Nonnull GradoopId graphId) throws IOException {
ElementQuery<AccumuloElementFilter<EPGMGraphHead>> query = Query
.elements()
.fromSets(graphId)
.noFilter();
try (ClosableIterator<EPGMGraphHead> it = getGraphSpace(query, 1)) {
return it.hasNext() ? it.next() : null;
}
}
@Nullable
@Override
public EPGMVertex readVertex(@Nonnull GradoopId vertexId) throws IOException {
ElementQuery<AccumuloElementFilter<EPGMVertex>> query = Query
.elements()
.fromSets(vertexId)
.noFilter();
try (ClosableIterator<EPGMVertex> it = getVertexSpace(query, 1)) {
return it.hasNext() ? it.next() : null;
}
}
@Nullable
@Override
public EPGMEdge readEdge(@Nonnull GradoopId edgeId) throws IOException {
ElementQuery<AccumuloElementFilter<EPGMEdge>> query = Query
.elements()
.fromSets(edgeId)
.noFilter();
try (ClosableIterator<EPGMEdge> it = getEdgeSpace(query, 1)) {
return it.hasNext() ? it.next() : null;
}
}
@Nonnull
@Override
public ClosableIterator<EPGMGraphHead> getGraphSpace(
@Nullable ElementQuery<AccumuloElementFilter<EPGMGraphHead>> query,
int cacheSize
) throws IOException {
if (query != null &&
query.getQueryRanges() != null &&
query.getQueryRanges().isEmpty()) {
return new EmptyClosableIterator<>();
}
if (query != null) {
LOG.info(query.toString());
}
BatchScanner scanner = createBatchScanner(
getGraphHeadName(),
GradoopGraphHeadIterator.class,
query);
Iterator<Map.Entry<Key, Value>> iterator = scanner.iterator();
if (!iterator.hasNext()) {
return new EmptyClosableIterator<>();
} else {
return new ClientClosableIterator<>(scanner,
new GradoopGraphHeadIterator(),
config.getGraphHandler(),
cacheSize);
}
}
@Nonnull
@Override
public ClosableIterator<EPGMVertex> getVertexSpace(
@Nullable ElementQuery<AccumuloElementFilter<EPGMVertex>> query,
int cacheSize
) throws IOException {
if (query != null &&
query.getQueryRanges() != null &&
query.getQueryRanges().isEmpty()) {
return new EmptyClosableIterator<>();
}
if (query != null) {
LOG.info(query.toString());
}
BatchScanner scanner = createBatchScanner(
getVertexTableName(),
GradoopVertexIterator.class,
query);
Iterator<Map.Entry<Key, Value>> iterator = scanner.iterator();
if (!iterator.hasNext()) {
return new EmptyClosableIterator<>();
} else {
return new ClientClosableIterator<>(scanner,
new GradoopVertexIterator(),
config.getVertexHandler(),
cacheSize);
}
}
@Nonnull
@Override
public ClosableIterator<EPGMEdge> getEdgeSpace(
@Nullable ElementQuery<AccumuloElementFilter<EPGMEdge>> query,
int cacheSize
) throws IOException {
if (query != null &&
query.getQueryRanges() != null &&
query.getQueryRanges().isEmpty()) {
return new EmptyClosableIterator<>();
}
if (query != null) {
LOG.info(query.toString());
}
BatchScanner scanner = createBatchScanner(
getEdgeTableName(),
GradoopEdgeIterator.class,
query);
Iterator<Map.Entry<Key, Value>> iterator = scanner.iterator();
if (!iterator.hasNext()) {
return new EmptyClosableIterator<>();
} else {
return new ClientClosableIterator<>(scanner,
new GradoopEdgeIterator(),
config.getEdgeHandler(),
cacheSize);
}
}
/**
* Write an EPGM Element instance into table
*
* @param record gradoop EPGM element
* @param writer accumulo batch writer
* @param handler accumulo row handler
* @param <T> element type
*/
private <T extends Element> void writeRecord(
@Nonnull T record,
@Nonnull BatchWriter writer,
@Nonnull AccumuloRowHandler handler
) {
Mutation mutation = new Mutation(record.getId().toString());
//noinspection unchecked
mutation = handler.writeRow(mutation, record);
try {
writer.addMutation(mutation);
if (autoFlush) {
writer.flush();
}
} catch (MutationsRejectedException e) {
throw new RuntimeException(e);
}
}
/**
* Create accumulo batch scanner with element predicate
*
* @param table table name
* @param iterator iterator class
* @param predicate accumulo predicate
* @param <T> epgm element type
* @return batch scanner instance
* @throws IOException if create fail
*/
private <T extends Element> BatchScanner createBatchScanner(
String table,
Class<? extends SortedKeyValueIterator<Key, Value>> iterator,
@Nullable ElementQuery<AccumuloElementFilter<T>> predicate
) throws IOException {
Map<String, String> options = new HashMap<>();
if (predicate != null && predicate.getFilterPredicate() != null) {
options.put(AccumuloTables.KEY_PREDICATE, predicate.getFilterPredicate().encode());
}
BatchScanner scanner;
try {
scanner = conn.createBatchScanner(table,
config.get(GradoopAccumuloConfig.ACCUMULO_AUTHORIZATIONS,
AccumuloDefault.AUTHORIZATION),
config.get(GradoopAccumuloConfig.GRADOOP_BATCH_SCANNER_THREADS,
AccumuloDefault.BATCH_SCANNER_THREADS));
int priority =
config.get(GradoopAccumuloConfig.GRADOOP_ITERATOR_PRIORITY, AccumuloDefault
.ITERATOR_PRIORITY);
scanner.addScanIterator(new IteratorSetting(
/*iterator priority*/priority,
/*iterator class*/iterator,
/*args*/options));
if (predicate == null ||
predicate.getQueryRanges() == null) {
scanner.setRanges(Lists.newArrayList(new Range()));
} else {
scanner.setRanges(Range.mergeOverlapping(predicate.getQueryRanges()
.stream()
.map(GradoopId::toString)
.map(Range::exact)
.collect(Collectors.toList())));
}
return scanner;
} catch (TableNotFoundException e) {
throw new IOException(e);
}
}
/**
* Create tables (and their namespaces, if defined by table prefix) if not exists
*
* @throws AccumuloSecurityException for security violations,
* authentication failures,
* authorization failures,
* etc.
* @throws AccumuloException generic Accumulo Exception for general accumulo failures.
*/
private void createTablesIfNotExists() throws AccumuloSecurityException, AccumuloException {
String prefix =
config.get(GradoopAccumuloConfig.ACCUMULO_TABLE_PREFIX, AccumuloDefault.TABLE_PREFIX);
if (prefix.contains(".")) {
String namespace = prefix.substring(0, prefix.indexOf("."));
try {
if (!conn.namespaceOperations().exists(namespace)) {
conn.namespaceOperations().create(namespace);
}
} catch (NamespaceExistsException ignore) {
//ignore if it is exists, maybe create by another process or thread
}
}
for (String table : new String[] {
getVertexTableName(), getEdgeTableName(), getGraphHeadName()
}) {
try {
if (!conn.tableOperations().exists(table)) {
conn.tableOperations().create(table);
}
} catch (TableExistsException ignore) {
//ignore if it is exists, maybe create by another process or thread
}
}
}
/**
* Delete a table if it exists.
*
* @param tableName The table name.
* @throws AccumuloSecurityException if the user does not have persmissions to delete the table.
* @throws AccumuloException if a general Accumulo error occurs.
*/
private void dropTableIfExists(@Nonnull String tableName) throws AccumuloSecurityException,
AccumuloException {
if (conn.tableOperations().exists(tableName)) {
try {
conn.tableOperations().delete(tableName);
} catch (TableNotFoundException e) {
// We checked before if it exists, so this should not happen.
throw new IllegalStateException(e);
}
}
}
}