Skip to content

Commit

Permalink
accommodate changes in clusterless
Browse files Browse the repository at this point in the history
sink manifest is renamed to manifestTemplate and maintained as a string
  • Loading branch information
cwensel committed Jul 21, 2023
1 parent e00a94e commit b7cbb73
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ void writeReadParquetPartitionedWithManifests(
.build())
.withSink(Sink.builder()
.withOutput(intermediate)
.withManifest(intermediateManifest)
.withManifestTemplate(intermediateManifest.toString())
.withManifestLot("20211112PT5M000")
.withSchema(Schema.builder()
.withFormat(Format.parquet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,37 +39,33 @@ public void writeSuccess(Properties conf) {
// do nothing
}
};

public static ManifestWriter from(Dataset dataset, URI uriPrefix) {
if (!(dataset instanceof Sink)) {
return NULL;
}
Sink sink = (Sink) dataset;

if (sink.manifest() == null) {
if (!sink.hasManifest()) {
return NULL;
}

return new ManifestWriter(sink, uriPrefix);
}

private final URI manifestURI;
private final URITemplate template;
private final String lot;
private final URI uriPrefix;

private ManifestWriter() {
this.manifestURI = null;
this.template = null;
this.lot = null;
this.uriPrefix = null;
}

public ManifestWriter(Sink dataset, URI uriPrefix) {
this(dataset.manifest(), dataset.manifestLot(), uriPrefix);
}

public ManifestWriter(URI manifestURI, String lot, URI uriPrefix) {
this.manifestURI = manifestURI;
this.lot = lot;
String manifestTemplate = dataset.manifestTemplate();
this.template = new URITemplate(URLDecoder.decode(manifestTemplate, StandardCharsets.UTF_8));
this.lot = dataset.manifestLot();
this.uriPrefix = URIs.makeAbsolute(uriPrefix);

if (this.lot == null) {
Expand All @@ -78,8 +74,6 @@ public ManifestWriter(URI manifestURI, String lot, URI uriPrefix) {
}

public void writeSuccess(Properties conf) throws IOException {
URITemplate template = new URITemplate(URLDecoder.decode(manifestURI.toString(), StandardCharsets.UTF_8));

URI complete = template
.expand("lot", lot)
.expand("state", "complete")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public int openWritesThreshold() {
URI commonRoot = uris.get(0);

// uri is likely a directory or single file, let the Hfs tap handle it
if (!isSink && dataset.manifest() != null) {
if (!isSink && dataset.hasManifest()) {
commonRoot = URIs.findCommonPrefix(uris, dataset.partitions().size());
}

Expand All @@ -90,7 +90,7 @@ public int openWritesThreshold() {
Tap tap;

if (isSink) {
tap = createSinkTap(local, scheme, commonRoot, uris, dataset.manifest());
tap = createSinkTap(local, scheme, commonRoot, uris);
} else {
tap = createSourceTap(local, scheme, commonRoot, uris);
}
Expand All @@ -117,7 +117,7 @@ public boolean commitResource(Properties conf) throws IOException {
}

@NotNull
private static Hfs createSourceTap(Properties local, Scheme scheme, URI commonURI, List<URI> uris) {
private Hfs createSourceTap(Properties local, Scheme scheme, URI commonURI, List<URI> uris) {
Observed.INSTANCE.reads(commonURI);

String[] identifiers = uris.stream()
Expand Down Expand Up @@ -148,7 +148,7 @@ public Fields retrieveSourceFields(FlowProcess<? extends Configuration> flowProc
}

@NotNull
private static Hfs createSinkTap(Properties local, Scheme scheme, URI commonURI, List<URI> uris, URI manifest) {
private Hfs createSinkTap(Properties local, Scheme scheme, URI commonURI, List<URI> uris) {
if (uris.size() > 1) {
throw new IllegalArgumentException("cannot write to multiple uris, got: " + uris.stream().limit(10));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@

package io.clusterless.tessellate.model;

import com.fasterxml.jackson.annotation.JsonIgnore;

import java.net.URI;
import java.util.List;

public interface Dataset {
URI manifest();

Schema schema();

@JsonIgnore
boolean hasManifest();

List<URI> uris();

List<Partition> partitions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import java.util.List;

public class Sink implements Dataset, Model {
private URI manifest;
private String manifestTemplate;
private String manifestLot;
private URI output;
private Schema schema = new Schema();
Expand All @@ -30,9 +30,8 @@ public URI output() {
return output;
}

@Override
public URI manifest() {
return manifest;
public String manifestTemplate() {
return manifestTemplate;
}

public String manifestLot() {
Expand All @@ -43,6 +42,11 @@ public Schema schema() {
return schema;
}

@Override
public boolean hasManifest() {
return manifestTemplate() != null;
}

@Override
public List<URI> uris() {
return List.of(output());
Expand All @@ -61,7 +65,7 @@ public Filename filename() {
}

public static final class Builder {
private URI manifest;
private String manifestTemplate;
private String manifestLot;
private URI output;
private Schema schema = new Schema();
Expand All @@ -76,8 +80,8 @@ public static Builder builder() {
return new Builder();
}

public Builder withManifest(URI manifest) {
this.manifest = manifest;
public Builder withManifestTemplate(String manifestTemplate) {
this.manifestTemplate = manifestTemplate;
return this;
}

Expand Down Expand Up @@ -113,12 +117,12 @@ public Builder withFilename(Filename filename) {

public Sink build() {
Sink sink = new Sink();
sink.namedPartitions = this.namedPartitions;
sink.manifest = this.manifest;
sink.manifestLot = this.manifestLot;
sink.schema = this.schema;
sink.namedPartitions = this.namedPartitions;
sink.filename = this.filename;
sink.output = this.output;
sink.manifestLot = this.manifestLot;
sink.manifestTemplate = this.manifestTemplate;
sink.partitions = this.partitions;
return sink;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public URI manifest() {
return manifest;
}

@Override
public boolean hasManifest() {
return manifest() != null;
}

public String manifestLot() {
return manifestLot;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
public class OutputOptions implements AWSOptions {
@CommandLine.Option(names = {"-o", "--output"}, description = "output uris")
private URI output;
@CommandLine.Option(names = {"-t", "--output-manifest"}, description = "output manifest uri template")
private String outputManifest;
@CommandLine.Option(names = {"-t", "--output-manifest-template"}, description = "output manifest uri template")
private String outputManifestTemplate;
@CommandLine.Option(names = {"-l", "--output-manifest-lot"}, description = "output lot")
private String outputLot;
@CommandLine.Option(names = {"--output-aws-endpoint"}, description = "aws endpoint")
Expand All @@ -35,12 +35,12 @@ public OutputOptions setOutput(URI output) {
return this;
}

public String outputManifest() {
return outputManifest;
public String outputManifestTemplate() {
return outputManifestTemplate;
}

public OutputOptions setOutputManifest(String outputManifest) {
this.outputManifest = outputManifest;
public OutputOptions setOutputManifestTemplate(String outputManifestTemplate) {
this.outputManifestTemplate = outputManifestTemplate;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,15 @@ public class PipelineOptionsMerge {
.putInto("inputManifest", "/source/manifest")
.putInto("inputManifestLot", "/source/manifestLot")
.putInto("output", "/sink/output")
.putInto("outputManifest", "/sink/manifest")
.putInto("outputManifestTemplate", "/sink/manifestTemplate")
.putInto("outputManifestLot", "/sink/manifestLot");
private static JSONBuilder builder = new JSONBuilder(buildSpec);

// all uris that should be resolved relative to the pipeline file path
private static List<NestedPointer<JsonNode, ArrayNode>> uris = List.of(
COMPILER.nested("/source/inputs/*"),
COMPILER.nested("/source/manifest"),
COMPILER.nested("/sink/output"),
COMPILER.nested("/sink/manifest")
COMPILER.nested("/sink/output")
);

private static final Map<Comparable, Function<PipelineOptions, JsonNode>> argumentLookups = new HashMap<>();
Expand All @@ -65,7 +64,7 @@ public class PipelineOptionsMerge {
argumentLookups.put("inputManifest", pipelineOptions -> nullOrNode(pipelineOptions.inputOptions().inputManifest()));
argumentLookups.put("inputManifestLot", pipelineOptions -> nullOrNode(pipelineOptions.inputOptions().inputLot()));
argumentLookups.put("output", pipelineOptions -> nullOrNode(pipelineOptions.outputOptions().output()));
argumentLookups.put("outputManifest", pipelineOptions -> nullOrNode(pipelineOptions.outputOptions().outputManifest()));
argumentLookups.put("outputManifestTemplate", pipelineOptions -> nullOrNode(pipelineOptions.outputOptions().outputManifestTemplate()));
argumentLookups.put("outputManifestLot", pipelineOptions -> nullOrNode(pipelineOptions.outputOptions().outputLot()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ void writeReadParquetPartitionedWithManifests(
.build())
.withSink(Sink.builder()
.withOutput(intermediate)
.withManifest(intermediateManifest)
.withManifestTemplate(intermediateManifest.toString())
.withManifestLot("20211112PT5M000")
.withSchema(Schema.builder()
.withFormat(Format.parquet)
Expand Down

0 comments on commit b7cbb73

Please sign in to comment.