|
VolD 0.1
|
00001 00002 package de.zib.vold.volatilelogic; 00003 00004 import de.zib.vold.common.VoldException; 00005 import de.zib.vold.replication.Replicator; 00006 00007 import java.util.List; 00008 import java.util.Set; 00009 import java.util.Map; 00010 00011 import org.apache.commons.logging.Log; 00012 import org.apache.commons.logging.LogFactory; 00013 00014 import org.joda.time.DateTime; 00015 00021 public class ReplicatedVolatileDirectory implements VolatileDirectory 00022 { 00023 protected final Log log = LogFactory.getLog( this.getClass() ); 00024 00025 private VolatileDirectory backend; 00026 private Replicator replicator; 00027 00034 public ReplicatedVolatileDirectory( VolatileDirectory backend, Replicator replicator ) 00035 { 00036 this.backend = backend; 00037 this.replicator = replicator; 00038 } 00039 00043 public ReplicatedVolatileDirectory( ) 00044 { 00045 this.backend = null; 00046 this.replicator = null; 00047 } 00048 00052 public void setDirectory( VolatileDirectory directory ) 00053 { 00054 this.backend = directory; 00055 } 00056 00060 public void setReplicator( Replicator replicator ) 00061 { 00062 this.replicator = replicator; 00063 } 00064 00068 public void checkState( ) 00069 { 00070 if( null == backend || null == replicator ) 00071 { 00072 throw new IllegalStateException( "Tried to operate on frontend while it had not been initialized yet. You first need to set a volatile Directory!" ); 00073 } 00074 } 00075 00081 @Override 00082 public long getActualSlice( ) 00083 { 00084 // guard 00085 { 00086 checkState(); 00087 } 00088 00089 return backend.getActualSlice(); 00090 } 00091 00097 @Override 00098 public long getNumberOfSlices( ) 00099 { 00100 // guard 00101 { 00102 checkState(); 00103 } 00104 00105 return backend.getNumberOfSlices(); 00106 } 00107 00113 @Override 00114 public long getTimeSliceSize( ) 00115 { 00116 // guard 00117 { 00118 checkState(); 00119 } 00120 00121 return backend.getTimeSliceSize(); 00122 } 00123 00133 @Override 00134 public void insert( List< String > key, Set< String > value ) 00135 { 00136 // guard 00137 { 00138 checkState(); 00139 } 00140 00141 log.debug( "Replicating insert: " + key.toString() + " |--> " + value.toString() ); 00142 00143 InsertThread insertion = new InsertThread( backend, key, value ); 00144 00145 try 00146 { 00147 insertion.start(); 00148 00149 replicator.insert( key, value ); 00150 00151 insertion.join(); 00152 } 00153 catch( InterruptedException e ) 00154 { 00155 throw new VoldException( e ); 00156 } 00157 00158 if( null != insertion.exception ) 00159 throw insertion.exception; 00160 } 00161 00170 @Override 00171 public void refresh( List< String > key ) 00172 { 00173 // guard 00174 { 00175 checkState(); 00176 } 00177 00178 log.debug( "Replicating refresh: " + key.toString() ); 00179 00180 RefreshThread freshen = new RefreshThread( backend, key ); 00181 00182 try 00183 { 00184 freshen.start(); 00185 00186 replicator.refresh( key ); 00187 00188 freshen.join(); 00189 } 00190 catch( InterruptedException e ) 00191 { 00192 throw new VoldException( e ); 00193 } 00194 00195 if( null != freshen.exception ) 00196 throw freshen.exception; 00197 } 00198 00207 @Override 00208 public void delete( List< String > key ) 00209 { 00210 // guard 00211 { 00212 checkState(); 00213 } 00214 00215 log.debug( "Replicating delete: " + key.toString() ); 00216 00217 DeleteThread deletion = new DeleteThread( backend, key ); 00218 00219 try 00220 { 00221 deletion.start(); 00222 00223 replicator.delete( key ); 00224 00225 deletion.join(); 00226 } 00227 catch( InterruptedException e ) 00228 { 00229 throw new VoldException( e ); 00230 } 00231 00232 if( null != deletion.exception ) 00233 throw deletion.exception; 00234 } 00235 00239 private class InsertThread extends Thread 00240 { 00241 private final VolatileDirectory directory; 00242 private final List< String > key; 00243 private final Set< String > values; 00244 public VoldException exception = null; 00245 00246 public InsertThread( VolatileDirectory directory, List< String > key, Set< String > values ) 00247 { 00248 this.directory = directory; 00249 this.key = key; 00250 this.values = values; 00251 } 00252 00253 @Override 00254 public void run() 00255 { 00256 try 00257 { 00258 directory.insert( key, values ); 00259 } 00260 catch( VoldException e ) 00261 { 00262 this.exception = e; 00263 } 00264 } 00265 } 00266 00270 private class RefreshThread extends Thread 00271 { 00272 private final VolatileDirectory directory; 00273 private final List< String > key; 00274 public VoldException exception = null; 00275 00276 public RefreshThread( VolatileDirectory directory, List< String > key ) 00277 { 00278 this.directory = directory; 00279 this.key = key; 00280 } 00281 00282 @Override 00283 public void run() 00284 { 00285 try 00286 { 00287 directory.refresh( key ); 00288 } 00289 catch( VoldException e ) 00290 { 00291 this.exception = e; 00292 } 00293 } 00294 } 00295 00299 private class DeleteThread extends Thread 00300 { 00301 private final VolatileDirectory directory; 00302 private final List< String > key; 00303 public VoldException exception = null; 00304 00305 public DeleteThread( VolatileDirectory directory, List< String > key ) 00306 { 00307 this.directory = directory; 00308 this.key = key; 00309 } 00310 00311 @Override 00312 public void run() 00313 { 00314 try 00315 { 00316 directory.delete( key ); 00317 } 00318 catch( VoldException e ) 00319 { 00320 this.exception = e; 00321 } 00322 } 00323 } 00324 00331 @Override 00332 public Set< String > lookup( List< String > key ) 00333 { 00334 // guard 00335 { 00336 checkState(); 00337 } 00338 00339 return backend.lookup( key ); 00340 } 00341 00348 @Override 00349 public Map< List< String >, Set< String > > prefixLookup( List< String > key ) 00350 { 00351 // guard 00352 { 00353 checkState(); 00354 } 00355 00356 return backend.prefixLookup( key ); 00357 } 00358 00365 @Override 00366 public Map< List< String >, DateTime > sliceLookup( long slice ) 00367 { 00368 // guard 00369 { 00370 checkState(); 00371 } 00372 00373 return backend.sliceLookup( slice ); 00374 } 00375 }