BaseInputFormat.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.storage.accumulo.impl.io.inputformats;
import com.google.common.collect.Lists;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.mapred.AccumuloRowInputFormat;
import org.apache.accumulo.core.client.mapred.RangeInputSplit;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.gradoop.common.model.impl.pojo.EPGMElement;
import org.gradoop.storage.accumulo.impl.predicate.query.AccumuloQueryHolder;
import org.gradoop.storage.accumulo.config.GradoopAccumuloConfig;
import org.gradoop.storage.accumulo.impl.constants.AccumuloDefault;
import org.gradoop.storage.accumulo.impl.constants.AccumuloTables;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Common Abstract {@link InputFormat} for gradoop accumulo store
*
* @param <T> element define in gradoop
*/
public abstract class BaseInputFormat<T extends EPGMElement> extends GenericInputFormat<T> {
/**
* serialize id
*/
private static final int serialVersionUID = 0x1;
/**
* accumulo properties
*/
private final Properties properties;
/**
* predicate filter
*/
private final AccumuloQueryHolder<T> predicate;
/**
* accumulo batch scanner
*/
private transient BatchScanner scanner;
/**
* accumulo row iterator
*/
private transient Iterator<Map.Entry<Key, Value>> iterator;
/**
* Create a new input format for gradoop element
*
* @param properties accumulo properties
* @param predicate scan predicate filter
*/
BaseInputFormat(
Properties properties,
AccumuloQueryHolder<T> predicate
) {
this.properties = properties;
this.predicate = predicate;
}
/**
* Invoke hook after connector initiate
*/
protected abstract void initiate();
/**
* Get table by table prefix definition
*
* @param prefix prefix defined in store configuration
* @return table name
*/
protected abstract String getTableName(String prefix);
/**
* Attach scanner iterator setting
*
* @param scanner accumulo batch scanner
* @param iteratorSettingPriority iterator priority from properties
* @param options query options
*/
protected abstract void attachIterator(
BatchScanner scanner,
int iteratorSettingPriority,
Map<String, String> options
);
/**
* Decode record instance from result entry
* This is a pure client behavior for result decode
*
* @param row accumulo result
* @return gradoop element
* @throws IOException on failure
*/
protected abstract T mapRow(Map.Entry<Key, Value> row) throws IOException;
@Override
public void open(GenericInputSplit split) throws IOException {
super.open(split);
try {
String user = (String) properties
.getOrDefault(GradoopAccumuloConfig.ACCUMULO_USER, AccumuloDefault.USER);
String password = (String) properties
.getOrDefault(GradoopAccumuloConfig.ACCUMULO_PASSWD, AccumuloDefault.PASSWORD);
String instance = (String) properties
.getOrDefault(GradoopAccumuloConfig.ACCUMULO_INSTANCE, AccumuloDefault.INSTANCE);
String zkHosts = (String) properties
.getOrDefault(GradoopAccumuloConfig.ZOOKEEPER_HOSTS, AccumuloDefault.INSTANCE);
String tableName = getTableName((String) properties
.getOrDefault(GradoopAccumuloConfig.ACCUMULO_TABLE_PREFIX, AccumuloDefault.TABLE_PREFIX));
Authorizations auth = (Authorizations) properties
.getOrDefault(GradoopAccumuloConfig.ACCUMULO_AUTHORIZATIONS,
AccumuloDefault.AUTHORIZATION);
int batchSize = (int) properties
.getOrDefault(GradoopAccumuloConfig.GRADOOP_BATCH_SCANNER_THREADS,
AccumuloDefault.BATCH_SCANNER_THREADS);
int iteratorPriority = (int) properties
.getOrDefault(GradoopAccumuloConfig.GRADOOP_ITERATOR_PRIORITY,
AccumuloDefault.ITERATOR_PRIORITY);
Connector conn = new ZooKeeperInstance(instance, zkHosts)
.getConnector(user, new PasswordToken(password));
List<Range> ranges = doSplits(
split.getTotalNumberOfSplits(),
tableName,
user,
password,
instance,
zkHosts,
auth);
Map<String, String> options = new HashMap<>();
if (predicate != null && predicate.getReduceFilter() != null) {
options.put(AccumuloTables.KEY_PREDICATE, predicate.getReduceFilter().encode());
}
if (split.getSplitNumber() + 1 > ranges.size()) {
scanner = null;
iterator = Collections.emptyIterator();
} else {
scanner = conn.createBatchScanner(tableName, auth, batchSize);
attachIterator(scanner, iteratorPriority, options);
scanner.setRanges(Lists.newArrayList(ranges.get(split.getSplitNumber())));
iterator = scanner.iterator();
}
initiate();
} catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
throw new IOException(e);
}
}
@Override
public boolean reachedEnd() {
return !iterator.hasNext();
}
@Override
public T nextRecord(T reuse) throws IOException {
return mapRow(iterator.next());
}
@Override
public void close() throws IOException {
super.close();
if (scanner != null) {
scanner.close();
}
}
/**
* Split table into ranges according to {@link AccumuloRowInputFormat#getSplits} suggest
*
* @param maxSplit max split size
* @param tableName split table name
* @param user accumulo user
* @param password accumulo password
* @param instance accumulo instance name
* @param zkHosts zookeeper hosts
* @param auth accumulo access authorization
* @return split range collections
* @throws IOException on failure
* @throws AccumuloSecurityException on security error
*/
@Nonnull
private List<Range> doSplits(
int maxSplit,
@Nonnull String tableName,
@Nonnull String user,
@Nonnull String password,
@Nonnull String instance,
@Nonnull String zkHosts,
@Nonnull Authorizations auth
) throws IOException, AccumuloSecurityException {
AccumuloRowInputFormat format = new AccumuloRowInputFormat();
JobConf conf = new JobConf();
AccumuloRowInputFormat.setInputTableName(conf, tableName);
AccumuloRowInputFormat.setConnectorInfo(conf, user, new PasswordToken(password));
AccumuloRowInputFormat.setZooKeeperInstance(conf, ClientConfiguration.create()
.withInstance(instance)
.withZkHosts(zkHosts));
AccumuloRowInputFormat.setScanAuthorizations(conf, auth);
if (predicate != null && predicate.getQueryRanges() != null) {
AccumuloRowInputFormat.setRanges(conf, predicate.getQueryRanges());
}
InputSplit[] splits = format.getSplits(conf, maxSplit);
return Stream.of(splits)
.map(it -> (RangeInputSplit) it)
.map(RangeInputSplit::getRange)
.collect(Collectors.toList());
}
}