Skip to content

Commit

Permalink
add an expression evaluator for literal insertion, fix input manifest…
Browse files Browse the repository at this point in the history
… support
  • Loading branch information
cwensel committed Jul 19, 2023
1 parent 8d65ebb commit b35820f
Show file tree
Hide file tree
Showing 31 changed files with 435 additions and 142 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ Usage:

- insert - insert a literal value into a field
- `value=>intoField|type`
- eval - evaluate an expression locally and insert into a field (relies on [MVEL](http://mvel.documentnode.com))
- `expression!>intoField|type`
- coerce - transform a field to a new type
- `field|newType`
- copy - copy a field value to a new field
Expand Down
2 changes: 2 additions & 0 deletions tessellate-main/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ dependencies {
implementation("org.apache.hadoop:hadoop-common:$hadoop3Version")
implementation("org.apache.hadoop:hadoop-aws:$hadoop3Version")

implementation("org.mvel:mvel2:2.5.0.Final")

// required by hadoop in java 9+
implementation("javax.xml.bind:jaxb-api:2.3.0")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
@CommandLine.Command(
name = "tess",
mixinStandardHelpOptions = true,
version = "1.0-wip"
version = "1.0-wip",
sortOptions = false
)
public class Main implements Callable<Integer> {
enum Show {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,56 +9,56 @@
package io.clusterless.tessellate.factory;

import cascading.flow.local.LocalFlowProcess;
import cascading.nested.json.hadoop3.JSONTextLine;
import cascading.tap.SinkMode;
import cascading.tap.hadoop.Hfs;
import cascading.tap.local.hadoop.LocalHfsAdaptor;
import cascading.tap.Tap;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryIterator;
import com.fasterxml.jackson.databind.JsonNode;
import io.clusterless.tessellate.model.Dataset;
import io.clusterless.tessellate.model.Field;
import io.clusterless.tessellate.model.Schema;
import io.clusterless.tessellate.model.Sink;
import io.clusterless.tessellate.model.Source;
import io.clusterless.tessellate.options.PipelineOptions;
import io.clusterless.tessellate.util.Format;
import io.clusterless.tessellate.util.JSONUtil;
import io.clusterless.tessellate.util.URIs;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.util.*;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

public class ManifestReader {
private static final Logger LOG = LoggerFactory.getLogger(ManifestReader.class);
public static final int SHOW_DUPLICATES = 20;

public static ManifestReader from(Dataset dataset) {
if (!(dataset instanceof Source)) {
return new ManifestReader(dataset.uris());
}
public static ManifestReader from(Sink sink) {
return new ManifestReader(sink.uris());
}

return new ManifestReader(((Source) dataset));
public static ManifestReader from(Source source) {
return new ManifestReader(source);
}

private final URI manifestURI;
private final List<URI> uris;
private final int numPartitions;
private List<URI> manifestUris;

public ManifestReader(Source source) {
this.manifestURI = source.manifest();
this.uris = clean(source.uris());
this.numPartitions = source.partitions().size();
}

public ManifestReader(List<URI> uris) {
this.uris = clean(uris);
this.manifestURI = null;
this.numPartitions = 0;
}

public List<URI> uris(Properties conf) throws IOException {
public List<URI> uris(PipelineOptions pipelineOptions) throws IOException {
if (manifestURI == null) {
return uris;
}
Expand All @@ -69,7 +69,7 @@ public List<URI> uris(Properties conf) throws IOException {

JsonNode node = null;

try (TupleEntryIterator entryIterator = openForRead(conf, manifestURI)) {
try (TupleEntryIterator entryIterator = openForRead(pipelineOptions, manifestURI)) {
while (entryIterator.hasNext()) {
TupleEntry next = entryIterator.next();
node = (JsonNode) next.getObject(0);
Expand All @@ -93,32 +93,6 @@ public List<URI> uris(Properties conf) throws IOException {
return manifestUris;
}

public boolean urisFromManifest() {
return manifestUris != null;
}

public URI findCommonRoot(Properties conf) throws IOException {
List<URI> uris = uris(conf);

// uri is likely a directory or single file, let the Hfs tap handle it
if (!urisFromManifest() && uris.size() == 1) {
return uris.get(0);
}

Set<String> roots = uris.stream()
.map(u -> URIs.trim(u, numPartitions + 1))
.map(Objects::toString)
.collect(Collectors.toSet());

String commonPrefix = StringUtils.getCommonPrefix(roots.toArray(new String[0]));

if (commonPrefix.isEmpty()) {
throw new IllegalArgumentException("to many unique roots, got: " + roots);
}

return URI.create(commonPrefix);
}

protected List<URI> clean(List<URI> uris) {
List<URI> distinct = uris.stream()
.map(URIs::copyWithoutQuery)
Expand Down Expand Up @@ -147,7 +121,19 @@ protected List<URI> clean(List<URI> uris) {
return uris;
}

private static TupleEntryIterator openForRead(Properties conf, URI uri) throws IOException {
return new LocalHfsAdaptor(new Hfs(new JSONTextLine(), uri.toString(), SinkMode.KEEP)).openForRead(new LocalFlowProcess(conf));
private TupleEntryIterator openForRead(PipelineOptions pipelineOptions, URI uri) throws IOException {
Source source = Source.builder()
.withSchema(Schema.builder()
.withFormat(Format.json)
.withDeclared(List.of(new Field("json|json")))
.build())
.withInputs(List.of(uri))
.build();

SourceFactory sourceFactory = TapFactories.findSourceFactory(pipelineOptions, source);

Tap<Properties, ?, ?> sourceTap = sourceFactory.getSource(pipelineOptions, source);

return sourceTap.openForRead(new LocalFlowProcess(new Properties()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
import io.clusterless.tessellate.factory.local.LocalDirectoryFactory;
import io.clusterless.tessellate.model.Sink;
import io.clusterless.tessellate.model.Source;
import io.clusterless.tessellate.options.PipelineOptions;
import io.clusterless.tessellate.util.Compression;
import io.clusterless.tessellate.util.Format;
import io.clusterless.tessellate.util.Protocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.util.*;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -60,7 +62,17 @@ public class TapFactories {
}
}

public static SourceFactory findSourceFactory(Source sourceModel) {
public static SourceFactory findSourceFactory(PipelineOptions pipelineOptions, Source sourceModel) throws IOException {
if (sourceModel.manifest() != null) {
LOG.info("reading manifest: {}", sourceModel.manifest());

ManifestReader manifestReader = ManifestReader.from(sourceModel);

List<URI> uris = manifestReader.uris(pipelineOptions);

sourceModel.uris().addAll(uris);
}

List<URI> inputUris = sourceModel.uris();
Format format = sourceModel.schema().format();
Compression compression = sourceModel.schema().compression();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
import io.clusterless.tessellate.util.Format;
import io.clusterless.tessellate.util.Protocol;

import java.util.Properties;
import java.util.Set;

public interface TapFactory {
default void applyGlobalProperties(Properties properties) {
}

Set<Protocol> getProtocols();

Set<Format> getFormats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import cascading.tap.partition.Partition;
import cascading.tuple.Fields;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import io.clusterless.tessellate.factory.ManifestReader;
import io.clusterless.tessellate.factory.ManifestWriter;
import io.clusterless.tessellate.factory.Observed;
import io.clusterless.tessellate.factory.hdfs.fs.ObserveLocalFileSystem;
Expand All @@ -30,6 +29,7 @@
import io.clusterless.tessellate.options.AWSOptions;
import io.clusterless.tessellate.options.PipelineOptions;
import io.clusterless.tessellate.util.Property;
import io.clusterless.tessellate.util.URIs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
Expand Down Expand Up @@ -67,22 +67,25 @@ public int openWritesThreshold() {

Fields declaredFields = declaredFields(dataset, currentFields);

Properties local = initLocalProperties(pipelineOptions, dataset, declaredFields);
Properties local = getProperties(pipelineOptions, dataset, declaredFields);

ManifestReader manifestReader = ManifestReader.from(dataset);
List<URI> uris = dataset.uris();

List<URI> uris = manifestReader.uris(local);
URI commonRoot = uris.get(0);

URI commonRoot = manifestReader.findCommonRoot(local);
// uri is likely a directory or single file, let the Hfs tap handle it
if (!isSink && dataset.manifest() != null) {
commonRoot = URIs.findCommonPrefix(uris, dataset.partitions().size());
}

LOG.info("{}: handing uris: {}, with common: {}", logPrefix(isSink), uris.size(), commonRoot);
LOG.info("{}: handling uris: {}, with common: {}", logPrefix(isSink), uris.size(), commonRoot);

Scheme scheme = createScheme(pipelineOptions, dataset, declaredFields);
Scheme scheme = createScheme(dataset, declaredFields);

Tap tap;

if (isSink) {
tap = createSinkTap(local, scheme, commonRoot, uris, ((Sink) dataset).manifest());
tap = createSinkTap(local, scheme, commonRoot, uris, dataset.manifest());
} else {
tap = createSourceTap(local, scheme, commonRoot, uris);
}
Expand Down Expand Up @@ -113,66 +116,80 @@ private static String logPrefix(boolean isSink) {
return isSink ? "writing" : "reading";
}

@NotNull
protected Properties initLocalProperties(PipelineOptions pipelineOptions, Dataset dataset, Fields declaredFields) {
@Override
public void applyGlobalProperties(Properties properties) {
properties.setProperty("mapred.output.direct." + S3AFileSystem.class.getSimpleName(), "true");
properties.setProperty("mapreduce.input.fileinputformat.input.dir.recursive", "true");

// intercept all writes against the s3a: and file: filesystem
properties.setProperty("mapred.output.direct." + ObserveS3AFileSystem.class.getSimpleName(), "true");
properties.setProperty("fs.s3a.impl", ObserveS3AFileSystem.class.getName());
properties.setProperty("fs.s3.impl", ObserveS3AFileSystem.class.getName());
properties.setProperty("mapred.output.direct." + ObserveLocalFileSystem.class.getSimpleName(), "true");
properties.setProperty("fs.file.impl", ObserveLocalFileSystem.class.getName());

properties.setProperty(Constants.AWS_CREDENTIALS_PROVIDER, getAWSCredentialProviders());
}

protected Properties getProperties(PipelineOptions pipelineOptions, Dataset dataset, Fields declaredFields) {
Properties local = new Properties();

// covers case when reading manifest
applyGlobalProperties(local);

local = applyAWSProperties(pipelineOptions, local, isSink(dataset));

return applySinkProperties(dataset, declaredFields, local);
}

protected Properties applySinkProperties(Dataset dataset, Fields declaredFields, Properties local) {
String prefix = PART_NAME_DEFAULT;

// hdfs always treat paths as directories, so we need to provide a prefix for the part files
if (isSink(dataset)) {
prefix = getPartFileName((Sink) dataset, declaredFields);
}

Properties local = new Properties();

local.setProperty("mapred.output.direct." + S3AFileSystem.class.getSimpleName(), "true");
local.setProperty("cascading.tapcollector.partname", String.format("%%s%%s%s-%%05d-%%05d", prefix));
local.setProperty("mapreduce.input.fileinputformat.input.dir.recursive", "true");

// intercept all writes against the s3a: and file: filesystem
local.setProperty("mapred.output.direct." + ObserveS3AFileSystem.class.getSimpleName(), "true");
local.setProperty("fs.s3a.impl", ObserveS3AFileSystem.class.getName());
local.setProperty("fs.s3.impl", ObserveS3AFileSystem.class.getName());
local.setProperty("mapred.output.direct." + ObserveLocalFileSystem.class.getSimpleName(), "true");
local.setProperty("fs.file.impl", ObserveLocalFileSystem.class.getName());

return applyAWSProperties(pipelineOptions, dataset, local);
return local;
}

protected abstract Scheme createScheme(PipelineOptions pipelineOptions, Dataset dataset, Fields declaredFields);
protected abstract Scheme createScheme(Dataset dataset, Fields declaredFields);

protected Properties applyAWSProperties(PipelineOptions pipelineOptions, Dataset dataset, Properties local) {
AWSOptions overrideAWSOptions = isSink(dataset) ? pipelineOptions.outputOptions() : pipelineOptions.inputOptions();
protected Properties applyAWSProperties(PipelineOptions pipelineOptions, Properties properties, boolean isSink) {
AWSOptions overrideAWSOptions = isSink ? pipelineOptions.outputOptions() : pipelineOptions.inputOptions();
List<AWSOptions> awsOptions = List.of(overrideAWSOptions, pipelineOptions);

Optional<AWSOptions> hasAssumedRoleARN = awsOptions.stream()
.filter(AWSOptions::hasAWSAssumedRoleARN)
.findFirst();

hasAssumedRoleARN.ifPresent(o -> local.setProperty(Constants.ASSUMED_ROLE_ARN, o.awsAssumedRoleARN()));
hasAssumedRoleARN.ifPresent(o -> properties.setProperty(Constants.ASSUMED_ROLE_ARN, o.awsAssumedRoleARN()));

Property.setIfNotNullFromSystem(local, Constants.ASSUMED_ROLE_ARN);
Property.setIfNotNullFromSystem(properties, Constants.ASSUMED_ROLE_ARN);

if (local.containsKey(Constants.ASSUMED_ROLE_ARN)) {
local.setProperty(Constants.ASSUMED_ROLE_SESSION_NAME, "role-session-" + System.currentTimeMillis());
local.setProperty(Constants.AWS_CREDENTIALS_PROVIDER, AssumedRoleCredentialProvider.class.getName());
local.setProperty(Constants.ASSUMED_ROLE_CREDENTIALS_PROVIDER, getAWSCredentialProviders());
if (properties.containsKey(Constants.ASSUMED_ROLE_ARN)) {
properties.setProperty(Constants.ASSUMED_ROLE_SESSION_NAME, "role-session-" + System.currentTimeMillis());
properties.setProperty(Constants.AWS_CREDENTIALS_PROVIDER, AssumedRoleCredentialProvider.class.getName());
properties.setProperty(Constants.ASSUMED_ROLE_CREDENTIALS_PROVIDER, getAWSCredentialProviders());
} else {
local.setProperty(Constants.AWS_CREDENTIALS_PROVIDER, getAWSCredentialProviders());
properties.setProperty(Constants.AWS_CREDENTIALS_PROVIDER, getAWSCredentialProviders());
}

Optional<String> hasAWSEndpoint = awsOptions.stream()
.filter(AWSOptions::hasAwsEndpoint)
.map(AWSOptions::awsEndpoint)
.findFirst();

Property.setIfNotNullFromEnvThenSystem(local, "AWS_S3_ENDPOINT", Constants.ENDPOINT, hasAWSEndpoint.orElse(null));
Property.setIfNotNullFromEnvThenSystem(local, "AWS_ACCESS_KEY_ID", Constants.ACCESS_KEY);
Property.setIfNotNullFromEnvThenSystem(local, "AWS_SECRET_ACCESS_KEY", Constants.SECRET_KEY);
Property.setIfNotNullFromSystem(local, Constants.SESSION_TOKEN);
Property.setIfNotNullFromSystem(local, Constants.PROXY_HOST);
Property.setIfNotNullFromSystem(local, Constants.PROXY_PORT);
Property.setIfNotNullFromEnvThenSystem(properties, "AWS_S3_ENDPOINT", Constants.ENDPOINT, hasAWSEndpoint.orElse(null));
Property.setIfNotNullFromEnvThenSystem(properties, "AWS_ACCESS_KEY_ID", Constants.ACCESS_KEY);
Property.setIfNotNullFromEnvThenSystem(properties, "AWS_SECRET_ACCESS_KEY", Constants.SECRET_KEY);
Property.setIfNotNullFromSystem(properties, Constants.SESSION_TOKEN);
Property.setIfNotNullFromSystem(properties, Constants.PROXY_HOST);
Property.setIfNotNullFromSystem(properties, Constants.PROXY_PORT);

return local;
return properties;
}

@NotNull
Expand Down Expand Up @@ -234,6 +251,8 @@ private String getAWSCredentialProviders() {

list.addFirst(DefaultAWSCredentialsProviderChain.class);

return list.stream().map(Class::getName).collect(Collectors.joining(","));
return list.stream()
.map(Class::getName)
.collect(Collectors.joining(","));
}
}
Loading

0 comments on commit b35820f

Please sign in to comment.