VolD 0.1

ReplicatedVolatileDirectory.java

Go to the documentation of this file.
00001 
00002 package de.zib.vold.volatilelogic;
00003 
00004 import de.zib.vold.common.VoldException;
00005 import de.zib.vold.replication.Replicator;
00006 
00007 import java.util.List;
00008 import java.util.Set;
00009 import java.util.Map;
00010 
00011 import org.apache.commons.logging.Log;
00012 import org.apache.commons.logging.LogFactory;
00013 
00014 import org.joda.time.DateTime;
00015 
00021 public class ReplicatedVolatileDirectory implements VolatileDirectory
00022 {
00023         protected final Log log = LogFactory.getLog( this.getClass() );
00024 
00025         private VolatileDirectory backend;
00026         private Replicator replicator;
00027 
00034         public ReplicatedVolatileDirectory( VolatileDirectory backend, Replicator replicator )
00035         {
00036                 this.backend = backend;
00037                 this.replicator = replicator;
00038         }
00039 
00043         public ReplicatedVolatileDirectory( )
00044         {
00045                 this.backend = null;
00046                 this.replicator = null;
00047         }
00048 
00052         public void setDirectory( VolatileDirectory directory )
00053         {
00054                 this.backend = directory;
00055         }
00056 
00060         public void setReplicator( Replicator replicator )
00061         {
00062                 this.replicator = replicator;
00063         }
00064 
00068         public void checkState( )
00069         {
00070                 if( null == backend || null == replicator )
00071                 {
00072                         throw new IllegalStateException( "Tried to operate on frontend while it had not been initialized yet. You first need to set a volatile Directory!" );
00073                 }
00074         }
00075 
00081         @Override
00082         public long getActualSlice( )
00083         {
00084                 // guard
00085                 {
00086                         checkState();
00087                 }
00088 
00089                 return backend.getActualSlice();
00090         }
00091 
00097         @Override
00098         public long getNumberOfSlices( )
00099         {
00100                 // guard
00101                 {
00102                         checkState();
00103                 }
00104 
00105                 return backend.getNumberOfSlices();
00106         }
00107 
00113         @Override
00114         public long getTimeSliceSize( )
00115         {
00116                 // guard
00117                 {
00118                         checkState();
00119                 }
00120 
00121                 return backend.getTimeSliceSize();
00122         }
00123 
00133         @Override
00134         public void insert( List< String > key, Set< String > value )
00135         {
00136                 // guard
00137                 {
00138                         checkState();
00139                 }
00140 
00141                 log.debug( "Replicating insert: " + key.toString() + " |--> " + value.toString() );
00142 
00143                 InsertThread insertion = new InsertThread( backend, key, value );
00144                 
00145                 try
00146                 {
00147                         insertion.start();
00148 
00149                         replicator.insert( key, value );
00150                         
00151                         insertion.join();
00152                 }
00153                 catch( InterruptedException e )
00154                 {
00155                         throw new VoldException( e );
00156                 }
00157 
00158                 if( null != insertion.exception )
00159                         throw insertion.exception;
00160         }
00161 
00170         @Override
00171         public void refresh( List< String > key )
00172         {
00173                 // guard
00174                 {
00175                         checkState();
00176                 }
00177 
00178                 log.debug( "Replicating refresh: " + key.toString() );
00179 
00180                 RefreshThread freshen = new RefreshThread( backend, key );
00181                 
00182                 try
00183                 {
00184                         freshen.start();
00185 
00186                         replicator.refresh( key );
00187                         
00188                         freshen.join();
00189                 }
00190                 catch( InterruptedException e )
00191                 {
00192                         throw new VoldException( e );
00193                 }
00194 
00195                 if( null != freshen.exception )
00196                         throw freshen.exception;
00197         }
00198 
00207         @Override
00208         public void delete( List< String > key )
00209         {
00210                 // guard
00211                 {
00212                         checkState();
00213                 }
00214 
00215                 log.debug( "Replicating delete: " + key.toString() );
00216 
00217                 DeleteThread deletion = new DeleteThread( backend, key );
00218                 
00219                 try
00220                 {
00221                         deletion.start();
00222 
00223                         replicator.delete( key );
00224 
00225                         deletion.join();
00226                 }
00227                 catch( InterruptedException e )
00228                 {
00229                         throw new VoldException( e );
00230                 }
00231 
00232                 if( null != deletion.exception )
00233                         throw deletion.exception;
00234         }
00235 
00239         private class InsertThread extends Thread
00240         {
00241                 private final VolatileDirectory directory;
00242                 private final List< String > key;
00243                 private final Set< String > values;
00244                 public VoldException exception = null;
00245 
00246                 public InsertThread( VolatileDirectory directory, List< String > key, Set< String > values )
00247                 {
00248                         this.directory = directory;
00249                         this.key = key;
00250                         this.values = values;
00251                 }
00252 
00253                 @Override
00254                 public void run()
00255                 {
00256                         try
00257                         {
00258                                 directory.insert( key, values );
00259                         }
00260                         catch( VoldException e )
00261                         {
00262                                 this.exception = e;
00263                         }
00264                 }
00265         }
00266 
00270         private class RefreshThread extends Thread
00271         {
00272                 private final VolatileDirectory directory;
00273                 private final List< String > key;
00274                 public VoldException exception = null;
00275 
00276                 public RefreshThread( VolatileDirectory directory, List< String > key )
00277                 {
00278                         this.directory = directory;
00279                         this.key = key;
00280                 }
00281 
00282                 @Override
00283                 public void run()
00284                 {
00285                         try
00286                         {
00287                                 directory.refresh( key );
00288                         }
00289                         catch( VoldException e )
00290                         {
00291                                 this.exception = e;
00292                         }
00293                 }
00294         }
00295 
00299         private class DeleteThread extends Thread
00300         {
00301                 private final VolatileDirectory directory;
00302                 private final List< String > key;
00303                 public VoldException exception = null;
00304 
00305                 public DeleteThread( VolatileDirectory directory, List< String > key )
00306                 {
00307                         this.directory = directory;
00308                         this.key = key;
00309                 }
00310 
00311                 @Override
00312                 public void run()
00313                 {
00314                         try
00315                         {
00316                                 directory.delete( key );
00317                         }
00318                         catch( VoldException e )
00319                         {
00320                                 this.exception = e;
00321                         }
00322                 }
00323         }
00324 
00331         @Override
00332         public Set< String > lookup( List< String > key )
00333         {
00334                 // guard
00335                 {
00336                         checkState();
00337                 }
00338 
00339                 return backend.lookup( key );
00340         }
00341 
00348         @Override
00349         public Map< List< String >, Set< String > > prefixLookup( List< String > key )
00350         {
00351                 // guard
00352                 {
00353                         checkState();
00354                 }
00355 
00356                 return backend.prefixLookup( key );
00357         }
00358 
00365         @Override
00366         public Map< List< String >, DateTime > sliceLookup( long slice )
00367         {
00368                 // guard
00369                 {
00370                         checkState();
00371                 }
00372 
00373                 return backend.sliceLookup( slice );
00374         }
00375 }