11package io .cloudquery .scheduler ;
22
3+ import com .google .protobuf .ByteString ;
34import io .cloudquery .helper .ArrowHelper ;
45import io .cloudquery .plugin .v3 .Sync ;
56import io .cloudquery .schema .ClientMeta ;
7+ import io .cloudquery .schema .Resource ;
68import io .cloudquery .schema .Table ;
79import io .grpc .stub .StreamObserver ;
810import java .util .List ;
11+ import java .util .concurrent .ExecutorService ;
12+ import java .util .concurrent .Executors ;
13+ import java .util .concurrent .TimeUnit ;
914import lombok .Builder ;
1015import lombok .NonNull ;
1116import org .apache .logging .log4j .Logger ;
@@ -20,8 +25,58 @@ public class Scheduler {
2025 private int concurrency ;
2126 private boolean deterministicCqId ;
2227
23- public void sync () {
28+ private void resolveTables (List <Table > tables , Resource parent , int concurrency )
29+ throws InterruptedException {
30+ if (tables == null || tables .isEmpty ()) {
31+ return ;
32+ }
33+ ExecutorService executor = Executors .newFixedThreadPool (Math .min (tables .size (), concurrency ));
2434 for (Table table : tables ) {
35+ final int nextLevelConcurrency = Math .max (1 , concurrency / 2 );
36+ executor .submit (
37+ new Runnable () {
38+ @ Override
39+ public void run () {
40+ try {
41+ String tableMessage =
42+ parent != null
43+ ? "table " + table .getName () + " of parent" + parent .getTable ().getName ()
44+ : "table " + table .getName ();
45+
46+ logger .info ("resolving {}" , tableMessage );
47+ if (!table .getResolver ().isPresent ()) {
48+ logger .error ("no resolver for {}" , tableMessage );
49+ return ;
50+ }
51+
52+ SchedulerTableOutputStream schedulerTableOutputStream =
53+ new SchedulerTableOutputStream (table , parent , client , logger );
54+ table .getResolver ().get ().resolve (client , parent , schedulerTableOutputStream );
55+
56+ for (Resource resource : schedulerTableOutputStream .getResources ()) {
57+ ByteString record = resource .encode ();
58+ Sync .MessageInsert insert =
59+ Sync .MessageInsert .newBuilder ().setRecord (record ).build ();
60+ Sync .Response response = Sync .Response .newBuilder ().setInsert (insert ).build ();
61+ syncStream .onNext (response );
62+ resolveTables (table .getRelations (), resource , nextLevelConcurrency );
63+ }
64+
65+ logger .info ("resolved {}" , tableMessage );
66+ } catch (Exception e ) {
67+ logger .error ("Failed to resolve table: {}" , table .getName (), e );
68+ syncStream .onError (e );
69+ return ;
70+ }
71+ }
72+ });
73+ }
74+ executor .shutdown ();
75+ executor .awaitTermination (Long .MAX_VALUE , TimeUnit .DAYS );
76+ }
77+
78+ public void sync () {
79+ for (Table table : Table .flattenTables (tables )) {
2580 try {
2681 logger .info ("sending migrate message for table: {}" , table .getName ());
2782 Sync .MessageMigrateTable migrateTable =
@@ -34,26 +89,12 @@ public void sync() {
3489 }
3590 }
3691
37- for (Table table : tables ) {
38- try {
39- logger .info ("resolving table: {}" , table .getName ());
40- if (!table .getResolver ().isPresent ()) {
41- logger .error ("no resolver for table: {}" , table .getName ());
42- continue ;
43- }
44- SchedulerTableOutputStream schedulerTableOutputStream =
45- SchedulerTableOutputStream .builder ()
46- .table (table )
47- .client (client )
48- .logger (logger )
49- .syncStream (syncStream )
50- .build ();
51- table .getResolver ().get ().resolve (client , null , schedulerTableOutputStream );
52- logger .info ("resolved table: {}" , table .getName ());
53- } catch (Exception e ) {
54- syncStream .onError (e );
55- return ;
56- }
92+ try {
93+ resolveTables (this .tables , null , this .concurrency );
94+ } catch (InterruptedException e ) {
95+ logger .error ("Failed to resolve tables" , e );
96+ syncStream .onError (e );
97+ return ;
5798 }
5899
59100 syncStream .onCompleted ();
0 commit comments