Skip to content

Commit

Permalink
Updates for Accumulo 2.0 alpha2 release (apache#1068)
Browse files Browse the repository at this point in the history
  • Loading branch information
keith-turner authored Feb 11, 2019
1 parent f216519 commit 282b94e
Show file tree
Hide file tree
Showing 19 changed files with 163 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

public class AccumuloProps {

public static final String CLIENT_INSTANCE_NAME = "instance.name";
public static final String CLIENT_ZOOKEEPERS = "instance.zookeepers";

public static final String TABLE_BLOCKCACHE_ENABLED = "table.cache.block.enable";
public static final String TABLE_CLASSPATH = "table.classpath.context";
public static final String TABLE_DELETE_BEHAVIOR = "table.delete.behavior";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,12 @@ private static List<TableRange> getRanges(Environment env)

private static boolean hasNotifications(Environment env, TableRange range)
throws TableNotFoundException {
Scanner scanner = null;
try {
scanner = env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations());
try (Scanner scanner =
env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations())) {
scanner.setRange(range.getRange());
Notification.configureScanner(scanner);

return scanner.iterator().hasNext();
} finally {
if (scanner != null) {
scanner.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,14 @@ public void initialize(InitializationOptions opts)
"Fluo application already initialized at " + config.getAppZookeepers());
}

AccumuloClient conn = AccumuloUtil.getClient(config);
try (AccumuloClient client = AccumuloUtil.getClient(config)) {
initialize(opts, client);
}
}

boolean tableExists = conn.tableOperations().exists(config.getAccumuloTable());
private void initialize(InitializationOptions opts, AccumuloClient client)
throws TableExistsException, AlreadyInitializedException {
boolean tableExists = client.tableOperations().exists(config.getAccumuloTable());
if (tableExists && !opts.getClearTable()) {
throw new TableExistsException("Accumulo table already exists " + config.getAccumuloTable());
}
Expand All @@ -130,7 +135,7 @@ public void initialize(InitializationOptions opts)
logger.info("The Accumulo table '{}' will be dropped and created as requested by user",
config.getAccumuloTable());
try {
conn.tableOperations().delete(config.getAccumuloTable());
client.tableOperations().delete(config.getAccumuloTable());
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -151,7 +156,7 @@ public void initialize(InitializationOptions opts)
}

try {
initializeApplicationInZooKeeper(conn);
initializeApplicationInZooKeeper(client);

String accumuloJars;
if (!config.getAccumuloJars().trim().isEmpty()) {
Expand Down Expand Up @@ -180,7 +185,7 @@ public void initialize(InitializationOptions opts)

if (!accumuloClasspath.isEmpty()) {
String contextName = "fluo-" + config.getApplicationName();
conn.instanceOperations().setProperty(
client.instanceOperations().setProperty(
AccumuloProps.VFS_CONTEXT_CLASSPATH_PROPERTY + contextName, accumuloClasspath);
ntcProps.put(AccumuloProps.TABLE_CLASSPATH, contextName);
}
Expand All @@ -201,7 +206,7 @@ public void initialize(InitializationOptions opts)
configureIterators(ntc);

ntc.setProperties(ntcProps);
conn.tableOperations().create(config.getAccumuloTable(), ntc);
client.tableOperations().create(config.getAccumuloTable(), ntc);

updateSharedConfig();
} catch (NodeExistsException nee) {
Expand Down Expand Up @@ -246,16 +251,16 @@ public void remove() {
throw new FluoException("Must stop the oracle server to remove an application");
}

AccumuloClient conn = AccumuloUtil.getClient(config);

boolean tableExists = conn.tableOperations().exists(config.getAccumuloTable());
// With preconditions met, it's now OK to delete table & zookeeper root (if they exist)
if (tableExists) {
logger.info("The Accumulo table '{}' will be dropped", config.getAccumuloTable());
try {
conn.tableOperations().delete(config.getAccumuloTable());
} catch (Exception e) {
throw new RuntimeException(e);
try (AccumuloClient client = AccumuloUtil.getClient(config)) {
boolean tableExists = client.tableOperations().exists(config.getAccumuloTable());
// With preconditions met, it's now OK to delete table & zookeeper root (if they exist)
if (tableExists) {
logger.info("The Accumulo table '{}' will be dropped", config.getAccumuloTable());
try {
client.tableOperations().delete(config.getAccumuloTable());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

Expand All @@ -276,7 +281,8 @@ public void remove() {

private void initializeApplicationInZooKeeper(AccumuloClient client) throws Exception {

final String accumuloInstanceName = client.info().getInstanceName();
final String accumuloInstanceName =
client.properties().getProperty(AccumuloProps.CLIENT_INSTANCE_NAME);
final String accumuloInstanceID = client.getInstanceID();
final String fluoApplicationID = UUID.randomUUID().toString();

Expand Down Expand Up @@ -545,7 +551,8 @@ public boolean accumuloTableExists() {
if (!config.hasRequiredAdminProps()) {
throw new IllegalArgumentException("Admin configuration is missing required properties");
}
AccumuloClient client = AccumuloUtil.getClient(config);
return client.tableOperations().exists(config.getAccumuloTable());
try (AccumuloClient client = AccumuloUtil.getClient(config)) {
return client.tableOperations().exists(config.getAccumuloTable());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ public Environment(FluoConfiguration configuration) {

ensureDeletesAreDisabled();

if (!client.info().getInstanceName().equals(accumuloInstance)) {
throw new IllegalArgumentException("unexpected accumulo instance name "
+ client.info().getInstanceName() + " != " + accumuloInstance);
String instanceName = client.properties().getProperty(AccumuloProps.CLIENT_INSTANCE_NAME);
if (!instanceName.equals(accumuloInstance)) {
throw new IllegalArgumentException(
"unexpected accumulo instance name " + instanceName + " != " + accumuloInstance);
}

if (!client.getInstanceID().equals(accumuloInstanceID)) {
Expand Down Expand Up @@ -251,5 +252,6 @@ public SimpleConfiguration getAppConfiguration() {
@Override
public void close() {
resources.close();
client.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,9 @@ static List<Entry<Key, Value>> getOpenReadLocks(Environment env,
}
}

BatchScanner bscanner = null;
try {
bscanner =
env.getAccumuloClient().createBatchScanner(env.getTable(), env.getAuthorizations(), 1);

try (BatchScanner bscanner =
env.getAccumuloClient().createBatchScanner(env.getTable(), env.getAuthorizations(), 1)) {

bscanner.setRanges(ranges);
IteratorSetting iterCfg = new IteratorSetting(10, OpenReadLockIterator.class);
Expand All @@ -336,10 +335,6 @@ static List<Entry<Key, Value>> getOpenReadLocks(Environment env,

return ret;

} finally {
if (bscanner != null) {
bscanner.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -674,25 +674,22 @@ private boolean checkForAckCollision(ConditionalMutation cm) {

Range range = new Range(startKey, endKey);

Scanner scanner;
try {
// TODO reuse or share scanner
scanner =
env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations());
try (Scanner scanner =
env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations())) {
scanner.setRange(range);

// TODO could use iterator that stops after 1st ACK. thought of using versioning iter
// but
// it scans to ACK
if (scanner.iterator().hasNext()) {
env.getSharedResources().getBatchWriter()
.writeMutationAsync(notification.newDelete(env));
return true;
}
} catch (TableNotFoundException e) {
// TODO proper exception handling
throw new RuntimeException(e);
}

scanner.setRange(range);

// TODO could use iterator that stops after 1st ACK. thought of using versioning iter but
// it scans to ACK
if (scanner.iterator().hasNext()) {
env.getSharedResources().getBatchWriter()
.writeMutationAsync(notification.newDelete(env));
return true;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientInfo;
import org.apache.fluo.api.config.FluoConfiguration;

/**
Expand All @@ -32,18 +29,7 @@ public class AccumuloUtil {
* Creates Accumulo connector given FluoConfiguration
*/
public static AccumuloClient getClient(FluoConfiguration config) {
try {
return Accumulo.newClient()
.forInstance(config.getAccumuloInstance(), config.getAccumuloZookeepers())
.usingPassword(config.getAccumuloUser(), config.getAccumuloPassword()).build();
} catch (AccumuloException | AccumuloSecurityException e) {
throw new IllegalStateException(e);
}
}

public static ClientInfo getClientInfo(FluoConfiguration config) {
return Accumulo.newClient()
.forInstance(config.getAccumuloInstance(), config.getAccumuloZookeepers())
.usingPassword(config.getAccumuloUser(), config.getAccumuloPassword()).info();
return Accumulo.newClient().to(config.getAccumuloInstance(), config.getAccumuloZookeepers())
.as(config.getAccumuloUser(), config.getAccumuloPassword()).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,34 +75,32 @@ public static Entry<Key, Value> checkColumn(Environment env, IteratorSetting ite
Column col) {
Span span = Span.exact(row, col);

Scanner scanner;
try {
// TODO reuse or share scanner
scanner = env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations());
try (Scanner scanner =
env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations())) {
scanner.setRange(SpanUtil.toRange(span));
scanner.addScanIterator(iterConf);

Iterator<Entry<Key, Value>> iter = scanner.iterator();
if (iter.hasNext()) {
Entry<Key, Value> entry = iter.next();

Key k = entry.getKey();
Bytes r = Bytes.of(k.getRowData().toArray());
Bytes cf = Bytes.of(k.getColumnFamilyData().toArray());
Bytes cq = Bytes.of(k.getColumnQualifierData().toArray());
Bytes cv = Bytes.of(k.getColumnVisibilityData().toArray());

if (r.equals(row) && cf.equals(col.getFamily()) && cq.equals(col.getQualifier())
&& cv.equals(col.getVisibility())) {
return entry;
} else {
throw new RuntimeException("unexpected key " + k + " " + row + " " + col);
}
}
} catch (TableNotFoundException e) {
// TODO proper exception handling
throw new RuntimeException(e);
}
scanner.setRange(SpanUtil.toRange(span));
scanner.addScanIterator(iterConf);

Iterator<Entry<Key, Value>> iter = scanner.iterator();
if (iter.hasNext()) {
Entry<Key, Value> entry = iter.next();

Key k = entry.getKey();
Bytes r = Bytes.of(k.getRowData().toArray());
Bytes cf = Bytes.of(k.getColumnFamilyData().toArray());
Bytes cq = Bytes.of(k.getColumnQualifierData().toArray());
Bytes cv = Bytes.of(k.getColumnVisibilityData().toArray());

if (r.equals(row) && cf.equals(col.getFamily()) && cq.equals(col.getQualifier())
&& cv.equals(col.getVisibility())) {
return entry;
} else {
throw new RuntimeException("unexpected key " + k + " " + row + " " + col);
}
}

return null;
}
Expand Down
16 changes: 4 additions & 12 deletions modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,21 +162,14 @@ public static void scanNotifications(ScanOpts options, FluoConfiguration sConfig
Span span = getSpan(options);
Collection<Column> columns = getColumns(options);

Scanner scanner = null;
try {
scanner = client.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY);

try (Scanner scanner = client.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY)) {
scanner.setRange(SpanUtil.toRange(span));

NotificationScanner ntfyScanner = new NotificationScanner(scanner, columns);

scan(options, out, ntfyScanner);
} catch (TableNotFoundException e) {
throw new RuntimeException(e);
} finally {
if (scanner != null) {
scanner.close();
}
}
}

Expand Down Expand Up @@ -210,13 +203,12 @@ private static void generateJson(CellScanner cellScanner, Function<Bytes, String

public static void scanAccumulo(ScanOpts options, FluoConfiguration sConfig, PrintStream out) {

AccumuloClient client = AccumuloUtil.getClient(sConfig);

Span span = getSpan(options);
Collection<Column> columns = getColumns(options);

try {
Scanner scanner = client.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY);
try (AccumuloClient client = AccumuloUtil.getClient(sConfig);
Scanner scanner = client.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY)) {

scanner.setRange(SpanUtil.toRange(span));
for (Column col : columns) {
if (col.isQualifierSet()) {
Expand Down
Loading

0 comments on commit 282b94e

Please sign in to comment.