HBaseEPGMStore.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.storage.hbase.impl;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.FilterList;
import org.gradoop.common.model.api.entities.Element;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.storage.common.api.EPGMConfigProvider;
import org.gradoop.storage.common.api.EPGMGraphInput;
import org.gradoop.storage.common.api.EPGMGraphPredictableOutput;
import org.gradoop.storage.common.iterator.ClosableIterator;
import org.gradoop.storage.common.predicate.query.ElementQuery;
import org.gradoop.storage.hbase.config.GradoopHBaseConfig;
import org.gradoop.storage.hbase.impl.api.EdgeHandler;
import org.gradoop.storage.hbase.impl.api.GraphHeadHandler;
import org.gradoop.storage.hbase.impl.api.VertexHandler;
import org.gradoop.storage.hbase.impl.iterator.HBaseEdgeIterator;
import org.gradoop.storage.hbase.impl.iterator.HBaseGraphIterator;
import org.gradoop.storage.hbase.impl.iterator.HBaseVertexIterator;
import org.gradoop.storage.hbase.impl.predicate.filter.HBaseFilterUtils;
import org.gradoop.storage.hbase.impl.predicate.filter.api.HBaseElementFilter;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Default HBase graph store that handles reading and writing vertices and
* graphs from and to HBase.
*
* @see EPGMGraphPredictableOutput
*/
public class HBaseEPGMStore implements
EPGMConfigProvider<GradoopHBaseConfig>,
EPGMGraphInput,
EPGMGraphPredictableOutput<
HBaseElementFilter<EPGMGraphHead>,
HBaseElementFilter<EPGMVertex>,
HBaseElementFilter<EPGMEdge>> {
/**
* Gradoop configuration.
*/
private final GradoopHBaseConfig config;
/**
* HBase table for storing graphs.
*/
private final Table graphHeadTable;
/**
* HBase table for storing vertex data.
*/
private final Table vertexTable;
/**
* HBase table for storing edge data.
*/
private final Table edgeTable;
/**
* HBase admin instance
*/
private final Admin admin;
/**
* Auto flush flag, default false
*/
private volatile boolean autoFlush;
/**
* Creates a HBaseEPGMStore based on the given parameters. All parameters
* are mandatory and must not be {@code null}.
*
* @param graphHeadTable HBase table to store graph data
* @param vertexTable HBase table to store vertex data
* @param edgeTable HBase table to store edge data
* @param config Gradoop Configuration
* @param admin HBase admin instance
*/
public HBaseEPGMStore(
final Table graphHeadTable,
final Table vertexTable,
final Table edgeTable,
final GradoopHBaseConfig config,
final Admin admin
) {
this.graphHeadTable = Preconditions.checkNotNull(graphHeadTable);
this.vertexTable = Preconditions.checkNotNull(vertexTable);
this.edgeTable = Preconditions.checkNotNull(edgeTable);
this.config = Preconditions.checkNotNull(config);
this.admin = Preconditions.checkNotNull(admin);
}
@Override
public GradoopHBaseConfig getConfig() {
return config;
}
@Override
public String getVertexTableName() {
return vertexTable.getName().getNameAsString();
}
@Override
public String getEdgeTableName() {
return edgeTable.getName().getNameAsString();
}
@Override
public String getGraphHeadName() {
return graphHeadTable.getName().getNameAsString();
}
@Override
public void writeGraphHead(@Nonnull final GraphHead graphHead) throws IOException {
GraphHeadHandler graphHeadHandler = config.getGraphHeadHandler();
// graph id
Put put = new Put(graphHeadHandler.getRowKey(graphHead.getId()));
// write graph to Put
put = graphHeadHandler.writeGraphHead(put, graphHead);
// write to table
graphHeadTable.put(put);
if (autoFlush) {
admin.flush(graphHeadTable.getName());
}
}
@Override
public void writeVertex(@Nonnull final Vertex vertexData) throws IOException {
VertexHandler vertexHandler = config.getVertexHandler();
// vertex id
Put put = new Put(vertexHandler.getRowKey(vertexData.getId()));
// write vertex data to Put
put = vertexHandler.writeVertex(put, vertexData);
// write to table
vertexTable.put(put);
if (autoFlush) {
admin.flush(vertexTable.getName());
}
}
@Override
public void writeEdge(@Nonnull final Edge edgeData) throws IOException {
// write to table
EdgeHandler edgeHandler = config.getEdgeHandler();
// edge id
Put put = new Put(edgeHandler.getRowKey(edgeData.getId()));
// write edge data to Put
put = edgeHandler.writeEdge(put, edgeData);
edgeTable.put(put);
if (autoFlush) {
admin.flush(edgeTable.getName());
}
}
@Override
public EPGMGraphHead readGraph(@Nonnull final GradoopId graphId) throws IOException {
EPGMGraphHead graphData = null;
GraphHeadHandler graphHeadHandler = config.getGraphHeadHandler();
List<Get> getList = new ArrayList<>();
if (graphHeadHandler.isSpreadingByteUsed()) {
for (byte[] rowKey : graphHeadHandler.getPossibleRowKeys(graphId)) {
getList.add(new Get(rowKey));
}
} else {
getList.add(new Get(graphId.toByteArray()));
}
final Result[] results = graphHeadTable.get(getList);
for (Result res : results) {
if (!res.isEmpty()) {
graphData = graphHeadHandler.readGraphHead(res);
break;
}
}
return graphData;
}
@Override
public EPGMVertex readVertex(@Nonnull final GradoopId vertexId) throws IOException {
EPGMVertex vertexData = null;
VertexHandler vertexHandler = config.getVertexHandler();
List<Get> getList = new ArrayList<>();
if (vertexHandler.isSpreadingByteUsed()) {
for (byte[] rowKey : vertexHandler.getPossibleRowKeys(vertexId)) {
getList.add(new Get(rowKey));
}
} else {
getList.add(new Get(vertexHandler.getRowKey(vertexId)));
}
final Result[] results = vertexTable.get(getList);
for (Result res : results) {
if (!res.isEmpty()) {
vertexData = vertexHandler.readVertex(res);
break;
}
}
return vertexData;
}
@Override
public EPGMEdge readEdge(@Nonnull final GradoopId edgeId) throws IOException {
EPGMEdge edgeData = null;
EdgeHandler edgeHandler = config.getEdgeHandler();
List<Get> getList = new ArrayList<>();
if (edgeHandler.isSpreadingByteUsed()) {
for (byte[] rowKey : edgeHandler.getPossibleRowKeys(edgeId)) {
getList.add(new Get(rowKey));
}
} else {
getList.add(new Get(edgeHandler.getRowKey(edgeId)));
}
final Result[] results = edgeTable.get(getList);
for (Result res : results) {
if (!res.isEmpty()) {
edgeData = edgeHandler.readEdge(res);
break;
}
}
return edgeData;
}
@Nonnull
@Override
public ClosableIterator<EPGMGraphHead> getGraphSpace(
@Nullable ElementQuery<HBaseElementFilter<EPGMGraphHead>> query,
int cacheSize
) throws IOException {
Scan scan = new Scan();
scan.setCaching(cacheSize);
scan.setMaxVersions(1);
if (query != null) {
attachFilter(query, scan, config.getGraphHeadHandler().isSpreadingByteUsed());
}
return new HBaseGraphIterator(graphHeadTable.getScanner(scan), config.getGraphHeadHandler());
}
@Nonnull
@Override
public ClosableIterator<EPGMVertex> getVertexSpace(
@Nullable ElementQuery<HBaseElementFilter<EPGMVertex>> query,
int cacheSize
) throws IOException {
Scan scan = new Scan();
scan.setCaching(cacheSize);
scan.setMaxVersions(1);
if (query != null) {
attachFilter(query, scan, config.getVertexHandler().isSpreadingByteUsed());
}
return new HBaseVertexIterator(vertexTable.getScanner(scan), config.getVertexHandler());
}
@Nonnull
@Override
public ClosableIterator<EPGMEdge> getEdgeSpace(
@Nullable ElementQuery<HBaseElementFilter<EPGMEdge>> query,
int cacheSize
) throws IOException {
Scan scan = new Scan();
scan.setCaching(cacheSize);
scan.setMaxVersions(1);
if (query != null) {
attachFilter(query, scan, config.getEdgeHandler().isSpreadingByteUsed());
}
return new HBaseEdgeIterator(edgeTable.getScanner(scan), config.getEdgeHandler());
}
@Override
public void setAutoFlush(boolean autoFlush) {
this.autoFlush = autoFlush;
}
@Override
public void flush() throws IOException {
admin.flush(vertexTable.getName());
admin.flush(edgeTable.getName());
admin.flush(graphHeadTable.getName());
}
@Override
public void close() throws IOException {
vertexTable.close();
edgeTable.close();
graphHeadTable.close();
}
/**
* First disable, then drop all three tables.
*
* @throws IOException on error
*/
public void dropTables() throws IOException {
admin.disableTable(vertexTable.getName());
admin.disableTable(edgeTable.getName());
admin.disableTable(graphHeadTable.getName());
admin.deleteTable(vertexTable.getName());
admin.deleteTable(edgeTable.getName());
admin.deleteTable(graphHeadTable.getName());
}
/**
* First disable, then truncate all tables handled by this store instance, i.e. delete all rows.
*
* @throws IOException when truncating any table fails.
*/
public void truncateTables() throws IOException {
admin.disableTable(graphHeadTable.getName());
admin.disableTable(vertexTable.getName());
admin.disableTable(edgeTable.getName());
admin.truncateTable(getConfig().getGraphTableName(), true);
admin.truncateTable(getConfig().getVertexTableName(), true);
admin.truncateTable(getConfig().getEdgeTableName(), true);
}
/**
* Attach a HBase filter represented by the given query to the given scan instance.
*
* @param query the query that represents a filter
* @param scan the HBase scan instance on which the filter will be applied
* @param isSpreadingByteUsed indicates whether a spreading byte is used as row key prefix or not
* @param <T> the type of the EPGM element
*/
private <T extends Element> void attachFilter(
@Nonnull ElementQuery<HBaseElementFilter<T>> query,
@Nonnull Scan scan,
boolean isSpreadingByteUsed) {
FilterList conjunctFilters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
if (query.getQueryRanges() != null && !query.getQueryRanges().isEmpty()) {
conjunctFilters.addFilter(HBaseFilterUtils.getIdFilter(query.getQueryRanges(),
isSpreadingByteUsed));
}
if (query.getFilterPredicate() != null) {
conjunctFilters.addFilter(query.getFilterPredicate().toHBaseFilter(false));
}
// if there are filters inside the root list, add it to the Scan object
if (!conjunctFilters.getFilters().isEmpty()) {
scan.setFilter(conjunctFilters);
}
}
}