Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions lib/src/main/java/io/cloudquery/helper/ArrowHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.cloudquery.helper;

import static java.util.Arrays.asList;

import com.google.protobuf.ByteString;
import io.cloudquery.schema.Column;
import io.cloudquery.schema.Table;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;

public class ArrowHelper {
public static ByteString encode(Table table) throws IOException {
try (BufferAllocator bufferAllocator = new RootAllocator()) {
Schema schema = toArrowSchema(table);
VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, bufferAllocator);
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
try (ArrowStreamWriter writer =
new ArrowStreamWriter(schemaRoot, null, Channels.newChannel(out))) {
writer.start();
writer.end();
return ByteString.copyFrom(out.toByteArray());
}
}
}
}

public static Schema toArrowSchema(Table table) {
List<Column> columns = table.getColumns();
Field[] fields = new Field[columns.size()];
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
Field field = Field.nullable(column.getName(), column.getType());
fields[i] = field;
}
Map<String, String> metadata = new HashMap<>();
metadata.put("cq:table_name", table.getName());
if (table.getTitle() != null) {
metadata.put("cq:table_title", table.getTitle());
}
if (table.getDescription() != null) {
metadata.put("cq:table_description", table.getDescription());
}
if (table.getParent() != null) {
metadata.put("cq:table_depends_on", table.getParent().getName());
}
return new Schema(asList(fields), metadata);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.cloudquery.internal.servers.plugin.v3;

import com.google.protobuf.ByteString;
import io.cloudquery.helper.ArrowHelper;
import io.cloudquery.plugin.BackendOptions;
import io.cloudquery.plugin.NewClientOptions;
import io.cloudquery.plugin.Plugin;
Expand Down Expand Up @@ -65,7 +66,7 @@ public void getTables(
request.getSkipDependentTables());
List<ByteString> byteStrings = new ArrayList<>();
for (Table table : Table.flattenTables(tables)) {
byteStrings.add(table.encode());
byteStrings.add(ArrowHelper.encode(table));
}
responseObserver.onNext(
io.cloudquery.plugin.v3.GetTables.Response.newBuilder()
Expand Down
3 changes: 2 additions & 1 deletion lib/src/main/java/io/cloudquery/scheduler/Scheduler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.cloudquery.scheduler;

import io.cloudquery.helper.ArrowHelper;
import io.cloudquery.plugin.v3.Sync;
import io.cloudquery.schema.ClientMeta;
import io.cloudquery.schema.Table;
Expand All @@ -25,7 +26,7 @@ public void sync() {
try {
logger.info("sending migrate message for table: {}", table.getName());
Sync.MessageMigrateTable migrateTable =
Sync.MessageMigrateTable.newBuilder().setTable(table.encode()).build();
Sync.MessageMigrateTable.newBuilder().setTable(ArrowHelper.encode(table)).build();
Sync.Response response = Sync.Response.newBuilder().setMigrateTable(migrateTable).build();
syncStream.onNext(response);
} catch (IOException e) {
Expand Down
3 changes: 2 additions & 1 deletion lib/src/main/java/io/cloudquery/schema/Resource.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.cloudquery.schema;

import com.google.protobuf.ByteString;
import io.cloudquery.helper.ArrowHelper;
import io.cloudquery.scalar.Scalar;
import io.cloudquery.scalar.ValidationException;
import java.io.IOException;
Expand Down Expand Up @@ -42,6 +43,6 @@ public Scalar<?> get(String columnName) {

public ByteString encode() throws IOException {
// TODO: Encode data and not only schema
return table.encode();
return ArrowHelper.encode(table);
}
}
49 changes: 0 additions & 49 deletions lib/src/main/java/io/cloudquery/schema/Table.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
package io.cloudquery.schema;

import static java.util.Arrays.asList;

import com.google.protobuf.ByteString;
import io.cloudquery.glob.Glob;
import io.cloudquery.schema.Column.ColumnBuilder;
import io.cloudquery.transformers.TransformerException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -20,12 +14,6 @@
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;

@Builder(toBuilder = true)
@Getter
Expand Down Expand Up @@ -215,41 +203,4 @@ public Optional<Column> getColumn(String name) {
}
return Optional.empty();
}

public Schema toArrowSchema() {
Field[] fields = new Field[columns.size()];
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
Field field = Field.nullable(column.getName(), column.getType());
fields[i] = field;
}
Map<String, String> metadata = new HashMap<>();
metadata.put("cq:table_name", name);
if (title != null) {
metadata.put("cq:table_title", title);
}
if (description != null) {
metadata.put("cq:table_description", description);
}
if (parent != null) {
metadata.put("cq:table_depends_on", parent.getName());
}
Schema schema = new Schema(asList(fields), metadata);
return schema;
}

public ByteString encode() throws IOException {
try (BufferAllocator bufferAllocator = new RootAllocator()) {
Schema schema = toArrowSchema();
VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, bufferAllocator);
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
try (ArrowStreamWriter writer =
new ArrowStreamWriter(schemaRoot, null, Channels.newChannel(out))) {
writer.start();
writer.end();
return ByteString.copyFrom(out.toByteArray());
}
}
}
}
}