GradoopGraphHeadIterator.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.storage.accumulo.impl.iterator.tserver;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.util.Pair;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.impl.pojo.EPGMGraphHeadFactory;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.storage.accumulo.utils.KryoUtils;
import org.gradoop.storage.accumulo.impl.constants.AccumuloTables;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
/**
* Accumulo gradoop graph head iterator
*/
public class GradoopGraphHeadIterator extends BaseElementIterator<GraphHead> {
/**
* Graph head factory
*/
private final EPGMGraphHeadFactory factory = new EPGMGraphHeadFactory();
@Nonnull
@Override
public GraphHead fromRow(@Nonnull Map.Entry<Key, Value> pair) throws IOException {
//map from serialize content
EPGMGraphHead content = KryoUtils.loads(pair.getValue().get(), EPGMGraphHead.class);
content.setId(GradoopId.fromString(pair.getKey().getRow().toString()));
//read from content
return content;
}
@Nonnull
@Override
public Pair<Key, Value> toRow(@Nonnull GraphHead record) throws IOException {
//write to content
return new Pair<>(new Key(record.getId().toString()),
new Value(KryoUtils.dumps(factory.initGraphHead(
record.getId(),
record.getLabel(),
record.getProperties()))));
}
/**
* Read read next element definition from accumulo store.
* Return null if next element does not fulfill filter formula
*
* @param source origin store source
* @return decoded epgm element
* @throws IOException on failure
*/
@Nullable
public EPGMGraphHead readLine(
@Nonnull SortedKeyValueIterator<Key, Value> source
) throws IOException {
if (!source.hasTop()) {
return null;
}
EPGMGraphHead row = new EPGMGraphHead();
row.setId(GradoopId.fromString(source.getTopKey().getRow().toString()));
while (source.hasTop()) {
Key key = source.getTopKey();
Value value = source.getTopValue();
if (!Objects.equals(row.getId().toString(), key.getRow().toString())) {
break;
}
switch (key.getColumnFamily().toString()) {
case AccumuloTables.KEY.LABEL:
row.setLabel(value.toString());
break;
case AccumuloTables.KEY.PROPERTY:
row.setProperty(key.getColumnQualifier().toString(),
PropertyValue.fromRawBytes(value.get()));
break;
default:
break;
}
source.next();
}
return row;
}
}