diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java index ad4a8aaa8..b91e0c463 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/GarbageCollectionIterator.java @@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; @@ -33,6 +32,7 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.fluo.accumulo.util.ColumnConstants; import org.apache.fluo.accumulo.util.ColumnType; +import org.apache.fluo.accumulo.util.NotificationUtil; import org.apache.fluo.accumulo.util.ReadLockUtil; import org.apache.fluo.accumulo.util.ZookeeperUtil; import org.apache.fluo.accumulo.values.DelLockValue; @@ -49,8 +49,6 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator source; @@ -170,7 +168,7 @@ private void readColMetadata() throws IOException { curCol.set(source.getTopKey()); - if (source.getTopKey().getColumnFamilyData().equals(NOTIFY_CF_BS)) { + if (NotificationUtil.isNtfy(source.getTopKey())) { return; } diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/summarizer/FluoCollector.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/summarizer/FluoCollector.java new file mode 100644 index 000000000..bd5ecf7a9 --- /dev/null +++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/summarizer/FluoCollector.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.accumulo.summarizer; + +import org.apache.accumulo.core.client.summary.Summarizer.Collector; +import org.apache.accumulo.core.client.summary.Summarizer.StatisticConsumer; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.fluo.accumulo.util.ColumnType; +import org.apache.fluo.accumulo.util.NotificationUtil; +import org.apache.fluo.accumulo.util.ReadLockUtil; + +public class FluoCollector implements Collector { + + private long ntfy = 0; + private long ntfyDel = 0; + private long txDone = 0; + private long delLock = 0; + private long lock = 0; + private long data = 0; + private long write = 0; + private long ack = 0; + private long delrlock = 0; + private long rlock = 0; + + @Override + public void accept(Key k, Value v) { + + if (NotificationUtil.isNtfy(k)) { + if (NotificationUtil.isDelete(k)) { + ntfyDel++; + } else { + ntfy++; + } + + } else { + ColumnType colType = ColumnType.from(k); + switch (colType) { + case TX_DONE: + txDone++; + break; + case DEL_LOCK: + delLock++; + break; + case LOCK: + lock++; + break; + case DATA: + data++; + break; + case WRITE: + write++; + break; + case ACK: + ack++; + break; + case RLOCK: + if (ReadLockUtil.isDelete(k.getTimestamp())) { + delrlock++; + } else { + rlock++; + } + break; + default: + throw new IllegalArgumentException("Unknown column type : " + colType); + } + } + } + + @Override + public void summarize(StatisticConsumer sc) { + sc.accept("ntfy", ntfy); + sc.accept("ntfyDel", ntfyDel); + sc.accept("txDone", txDone); + sc.accept("delLock", delLock); + sc.accept("lock", lock); + sc.accept("data", data); + sc.accept("write", write); + sc.accept("ack", ack); + sc.accept("delrlock", delrlock); + sc.accept("rlock", rlock); + } +} diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/summarizer/FluoSummarizer.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/summarizer/FluoSummarizer.java new file mode 100644 index 000000000..6455a45ba --- /dev/null +++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/summarizer/FluoSummarizer.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.accumulo.summarizer; + +import java.util.Map; + +import com.google.common.base.Preconditions; +import org.apache.accumulo.core.client.summary.Summarizer; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.client.summary.Summary; + +public class FluoSummarizer implements Summarizer { + + public static final SummarizerConfiguration CONFIG = + SummarizerConfiguration.builder(FluoSummarizer.class).setPropertyId("fluo").build(); + + @Override + public Collector collector(SummarizerConfiguration sc) { + return new FluoCollector(); + } + + @Override + public Combiner combiner(SummarizerConfiguration sc) { + return (m1, m2) -> m2.forEach((k, v) -> m1.merge(k, v, Long::sum)); + } + + public static class Counts { + + public final long ntfy; + public final long ntfyDel; + public final long txDone; + public final long delLock; + public final long lock; + public final long data; + public final long write; + public final long ack; + public final long delrlock; + public final long rlock; + + public Counts(long ntfy, long ntfyDel, long txDone, long delLock, long lock, long data, + long write, long ack, long delrlock, long rlock) { + this.ntfy = ntfy; + this.ntfyDel = ntfyDel; + this.txDone = txDone; + this.delLock = delLock; + this.lock = lock; + this.data = data; + this.write = write; + this.ack = ack; + this.delrlock = delrlock; + this.rlock = rlock; + } + } + + public static Counts getCounts(Summary summary) { + Preconditions.checkArgument( + summary.getSummarizerConfiguration().getClassName().equals(FluoSummarizer.class.getName())); + Map m = summary.getStatistics(); + return new Counts(m.get("ntfy"), m.get("ntfyDel"), m.get("txDone"), m.get("delLock"), + m.get("lock"), m.get("data"), m.get("write"), m.get("ack"), m.get("delrlock"), + m.get("rlock")); + } +} diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java index 225cd907b..df45c2c82 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java +++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java @@ -42,6 +42,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.fluo.accumulo.iterators.GarbageCollectionIterator; import org.apache.fluo.accumulo.iterators.NotificationIterator; +import org.apache.fluo.accumulo.summarizer.FluoSummarizer; import org.apache.fluo.accumulo.util.AccumuloProps; import org.apache.fluo.accumulo.util.ColumnConstants; import org.apache.fluo.accumulo.util.ZookeeperPath; @@ -203,6 +204,8 @@ private void initialize(InitializationOptions opts, AccumuloClient client) ntc.setLocalityGroups(Collections.singletonMap(ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME, Collections.singleton(new Text(ColumnConstants.NOTIFY_CF.toArray())))); + ntc.enableSummarization(FluoSummarizer.CONFIG); + configureIterators(ntc); ntc.setProperties(ntcProps); diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/SummaryIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/SummaryIT.java new file mode 100644 index 000000000..8a78b375f --- /dev/null +++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/SummaryIT.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.integration.impl; + +import java.util.List; + +import org.apache.accumulo.core.client.summary.Summary; +import org.apache.fluo.accumulo.summarizer.FluoSummarizer; +import org.apache.fluo.accumulo.summarizer.FluoSummarizer.Counts; +import org.apache.fluo.api.client.Transaction; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.integration.ITBaseImpl; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class SummaryIT extends ITBaseImpl { + + @Test + public void testSummaries() throws Exception { + try (Transaction tx = client.newTransaction()) { + String seen = tx.withReadLock().gets("u:http://wikipedia.com/abc", new Column("doc", "seen")); + if (seen == null) { + tx.set("d:7705", new Column("doc", "source"), "http://wikipedia.com/abc"); + } + tx.commit(); + } + + List summaries = aClient.tableOperations().summaries(table).flush(true).retrieve(); + + Counts counts = FluoSummarizer.getCounts(summaries.get(0)); + + assertEquals(0, counts.ack); + assertEquals(1, counts.data); + assertEquals(0, counts.delLock); + assertEquals(1, counts.delrlock); + assertEquals(0, counts.lock); + assertEquals(0, counts.ntfy); + assertEquals(0, counts.ntfyDel); + assertEquals(0, counts.rlock); + assertEquals(1, counts.txDone); + assertEquals(1, counts.write); + + try (Transaction tx = client.newTransaction()) { + tx.set("d:7705", new Column("doc", "source"), "http://wikipedia.com/abcd"); + tx.commit(); + } + + summaries = aClient.tableOperations().summaries(table).flush(true).retrieve(); + + counts = FluoSummarizer.getCounts(summaries.get(0)); + + assertEquals(0, counts.ack); + assertEquals(2, counts.data); + assertEquals(0, counts.delLock); + assertEquals(1, counts.delrlock); + assertEquals(0, counts.lock); + assertEquals(0, counts.ntfy); + assertEquals(0, counts.ntfyDel); + assertEquals(0, counts.rlock); + assertEquals(2, counts.txDone); + assertEquals(2, counts.write); + } +} diff --git a/modules/integration-tests/src/main/resources/log4j.properties b/modules/integration-tests/src/main/resources/log4j.properties index 509f3be5e..02a06bb2c 100644 --- a/modules/integration-tests/src/main/resources/log4j.properties +++ b/modules/integration-tests/src/main/resources/log4j.properties @@ -17,6 +17,7 @@ log4j.appender.CA.layout=org.apache.log4j.PatternLayout log4j.appender.CA.layout.ConversionPattern=%d{ISO8601} [%c{2}] %-5p: %m%n log4j.logger.Audit=ERROR +log4j.logger.org.apache.accumulo.audit=ERROR log4j.logger.org.apache.curator=ERROR log4j.logger.org.apache.accumulo.core.client.impl.ServerClient=ERROR log4j.logger.org.apache.accumulo.core.util.shell.Shell.audit=off @@ -35,4 +36,3 @@ log4j.logger.org.apache.zookeeper.ClientCnxn=FATAL log4j.logger.org.apache.zookeeper.ZooKeeper=WARN log4j.logger.org.apache.curator.framework.recipes.cache.PathChildrenCache=FATAL log4j.logger.org.apache.fluo=ERROR - diff --git a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java index 12def1cea..158bed90a 100644 --- a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java +++ b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoKeyValueGenerator.java @@ -16,9 +16,13 @@ package org.apache.fluo.mapreduce; import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Collections; import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; import org.apache.accumulo.core.data.Key; +import org.apache.fluo.accumulo.summarizer.FluoSummarizer; import org.apache.fluo.accumulo.util.ColumnType; import org.apache.fluo.accumulo.values.WriteValue; import org.apache.fluo.api.data.Bytes; @@ -61,6 +65,7 @@ * * * + * @see FluoKeyValueGenerator#getSummarizers() */ public class FluoKeyValueGenerator { @@ -196,4 +201,15 @@ public FluoKeyValue[] getKeyValues() { return keyVals; } + + /** + * Use this when configuring Accumulo's File output format to generate initial data to import into + * a new Fluo table. + * + * @return Configuration that will generate Fluo summary data. + * @since 1.3.0 + */ + public static Collection getSummarizers() { + return Collections.singleton(FluoSummarizer.CONFIG); + } }