11package io .cloudquery .memdb ;
22
3+ import io .cloudquery .messages .WriteDeleteStale ;
34import io .cloudquery .messages .WriteInsert ;
45import io .cloudquery .messages .WriteMessage ;
56import io .cloudquery .messages .WriteMigrateTable ;
7+ import io .cloudquery .scalar .Timestamp ;
68import io .cloudquery .schema .ClientMeta ;
9+ import io .cloudquery .schema .Column ;
710import io .cloudquery .schema .Resource ;
811import io .cloudquery .schema .Table ;
912import io .cloudquery .schema .TableColumnChange ;
1013import java .util .ArrayList ;
14+ import java .util .Date ;
1115import java .util .HashMap ;
1216import java .util .List ;
1317import java .util .Map ;
18+ import java .util .Objects ;
19+ import java .util .Optional ;
1420import java .util .concurrent .locks .ReentrantReadWriteLock ;
1521
1622public class MemDBClient implements ClientMeta {
1723 private static final String id = "memdb" ;
1824
19- private ReentrantReadWriteLock lock = new ReentrantReadWriteLock ();
20- private Map <String , Table > tables = new HashMap <>();
21- private Map <String , List <Resource >> memDB = new HashMap <>();
25+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock ();
26+ private final Map <String , Table > tables = new HashMap <>();
27+ private final Map <String , List <Resource >> memDB = new HashMap <>();
2228
2329 public MemDBClient () {}
2430
@@ -37,6 +43,9 @@ public void write(WriteMessage message) {
3743 if (message instanceof WriteInsert insert ) {
3844 insert (insert );
3945 }
46+ if (message instanceof WriteDeleteStale deleteStale ) {
47+ deleteStale (deleteStale );
48+ }
4049 } finally {
4150 lock .writeLock ().unlock ();
4251 }
@@ -74,6 +83,40 @@ private void overwrite(Table table, Resource resource) {
7483 memDB .get (tableName ).add (resource );
7584 }
7685
86+ private void deleteStale (WriteDeleteStale deleteStale ) {
87+ String tableName = deleteStale .getTableName ();
88+
89+ List <Resource > filteredList = new ArrayList <>();
90+
91+ for (int i = 0 ; i < memDB .get (tableName ).size (); i ++) {
92+ Resource row = memDB .get (tableName ).get (i );
93+ Optional <Column > sourceColumn = row .getTable ().getColumn (Column .CQ_SOURCE_NAME );
94+ if (sourceColumn .isEmpty ()) {
95+ continue ;
96+ }
97+ Optional <Column > syncColumn = row .getTable ().getColumn (Column .CQ_SYNC_TIME );
98+ if (syncColumn .isEmpty ()) {
99+ continue ;
100+ }
101+
102+ String sourceName = "" ;
103+ if (row .get (Column .CQ_SOURCE_NAME ) != null ) {
104+ sourceName = row .get (Column .CQ_SOURCE_NAME ).toString ();
105+ }
106+
107+ if (Objects .equals (sourceName , deleteStale .getSourceName ())) {
108+ Date rowSyncTime = new Date (0 );
109+ if (row .get (Column .CQ_SYNC_TIME ) != null ) {
110+ rowSyncTime = new Date (((Timestamp ) row .get (Column .CQ_SYNC_TIME )).get ());
111+ }
112+ if (!rowSyncTime .before (deleteStale .getTimestamp ())) {
113+ filteredList .add (row );
114+ }
115+ }
116+ }
117+ memDB .put (tableName , filteredList );
118+ }
119+
77120 public void close () {
78121 // do nothing
79122 }
0 commit comments