bamboo.db
Class StorageManager

java.lang.Object
  extended by bamboo.util.StandardStage
      extended by bamboo.db.StorageManager
All Implemented Interfaces:
EventHandlerIF, SingleThreadedEventHandlerIF

public class StorageManager
extends StandardStage
implements SingleThreadedEventHandlerIF

An asynchronous interface to BerkeleyDB.

Data is stored on disk in a table with the following fields:

BytesData
0-7 put time since the epoch (microseconds)
8-11ttl interval after put time (seconds)
12-31guid
32-51data hash
52 whether this is a put (1) or remove (0)
53- data
The data hash is needed to guarentee a consistent scan order and for removes. The primary key is bytes 0-52, and the secondary key is the guid concatenated with the data hash.

Version:
$Id: StorageManager.java,v 1.69 2004/11/14 20:13:45 srhea Exp $
Author:
Sean C. Rhea

Nested Class Summary
static class StorageManager.AddMonitor
           
protected static class StorageManager.Alarm
           
static class StorageManager.DiscardReq
          Drop a datum from the primary database; optionally move to the recycling bin.
protected static class StorageManager.EnqueueEvent
           
protected static class StorageManager.GBGCont
           
protected static class StorageManager.GBTCont
           
static class StorageManager.GetByGuidCont
          Continue an existing GetByGuidReq.
static class StorageManager.GetByGuidReq
          Get all the data in the database whose keys contain the given guid.
static class StorageManager.GetByGuidResp
          The (possibly) partial result of a get.
static class StorageManager.GetByKeyReq
           
static class StorageManager.GetByKeyResp
           
static class StorageManager.GetByTimeCont
          Continue an existing GetByTimeReq.
static class StorageManager.GetByTimeReq
          Get all the data in the database whose keys have timestamps in the range [low, high].
static class StorageManager.GetByTimeResp
          The (possibly) partial result of a get.
protected static class StorageManager.IBGCont
           
static class StorageManager.IterateByGuidCont
          Continue an existing IterateByGuidReq.
static class StorageManager.IterateByGuidReq
          Get all the data in the database whose keys have guids in the range [low, high].
static class StorageManager.IterateByGuidResp
          If the continuation is non-null, it may be sent out in a IterateByGuidCont request to get the next datum, if any.
static class StorageManager.Key
          The primary key under which data are stored--must be unique.
static class StorageManager.PutReq
          Put a new datum into the database.
static class StorageManager.PutResp
          The result of a PutReq, if removed_key is non-null, then removed_key and removed_data where made irrelevant by the put and have been removed from the database.
static class StorageManager.RemoveMonitor
           
static interface StorageManager.StorageMonitor
           
protected static class StorageManager.SyncAlarm
           
 
Field Summary
protected  com.sleepycat.db.Db by_guid
           
protected  com.sleepycat.db.Db by_guid_and_data_hash
           
protected  com.sleepycat.db.Db by_time
           
protected  com.sleepycat.db.Db client_counts
           
protected  com.sleepycat.db.DbEnv env
           
protected  com.sleepycat.db.DbSecondaryKeyCreate guid_key_creator
           
 long last_dequeue_ms
           
static InetAddress MAX_CLIENT
           
protected  Map open_cursors
           
protected  Random rand
           
protected  com.sleepycat.db.Db recycling
           
protected  Set storage_monitors
           
protected  LinkedList to_bdb_thread
           
static InetAddress ZERO_CLIENT
           
static BigInteger ZERO_GUID
           
static byte[] ZERO_HASH
           
static StorageManager.Key ZERO_KEY
           
 
Fields inherited from class bamboo.util.StandardStage
acore, classifier, DEBUG, event_types, inb_msg_types, logger, my_node_id, my_sink, outb_msg_types, sim_running
 
Constructor Summary
StorageManager()
           
 
Method Summary
protected  void application_enqueue(SinkIF sink, QueueElementIF item)
           
protected  int arraycmp(byte[] a1, int o1, byte[] a2, int o2, int sz)
           
protected  void check_open_cursors()
           
protected  void close_cursor(com.sleepycat.db.Dbc cursor)
           
protected  byte[] data_hash(ByteBuffer data)
           
protected  void drop_expired_data(com.sleepycat.db.DbTxn xact, com.sleepycat.db.Dbc cursor, StorageManager.Key k, int size)
           
protected  boolean finish_get_by_guid_recycling(StorageManager.GBGCont cont, com.sleepycat.db.DbTxn xact, com.sleepycat.db.Dbc cursor, com.sleepycat.db.Dbt data, int retval, SinkIF comp_q, Object user_data)
           
protected  boolean finish_get_by_guid(StorageManager.GBGCont cont, com.sleepycat.db.DbTxn xact, com.sleepycat.db.Dbc cursor, com.sleepycat.db.Dbt pkey, com.sleepycat.db.Dbt data, int retval, SinkIF comp_q, Object user_data)
           
protected  void finish_get_by_time(StorageManager.GBTCont cont, com.sleepycat.db.DbTxn xact, com.sleepycat.db.Dbc cursor, com.sleepycat.db.Dbt data, int retval, SinkIF comp_q, Object user_data)
           
protected  boolean finish_iterate_by_guid(StorageManager.IBGCont cont, com.sleepycat.db.Dbt pkey, com.sleepycat.db.Dbt data, int retval, SinkIF comp_q, Object user_data)
           
protected static void guid_to_bytes(BigInteger guid, byte[] buf, int offset)
           
 void handle_add_monitor(StorageManager.AddMonitor event)
           
protected  void handle_discard_req(StorageManager.DiscardReq req)
           
protected  void handle_get_by_guid_cont(StorageManager.GetByGuidCont req)
           
protected  void handle_get_by_guid_req(StorageManager.GetByGuidReq req)
           
protected  void handle_get_by_key_req(StorageManager.GetByKeyReq req)
           
protected  void handle_get_by_time_cont(StorageManager.GetByTimeCont req)
           
protected  void handle_get_by_time_req(StorageManager.GetByTimeReq req)
           
protected  void handle_iterate_by_guid_cont(StorageManager.IterateByGuidCont req)
           
protected  void handle_iterate_by_guid_req(StorageManager.IterateByGuidReq req)
           
protected  void handle_put_req(StorageManager.PutReq req)
           
 void handle_remove_monitor(StorageManager.RemoveMonitor event)
           
 void handleEvent(QueueElementIF item)
           
 void init(ConfigDataIF config)
           
protected  boolean key_expired(StorageManager.Key k)
           
 void notify_storage_changed(StorageManager.StorageMonitor sm, boolean added, InetAddress client_id, long size)
           
protected  com.sleepycat.db.Dbc open_cursor(com.sleepycat.db.Db db, com.sleepycat.db.DbTxn xact, String source)
           
protected  void primary_key_to_recycling_key(byte[] buf)
           
 void register_monitor(StorageManager.StorageMonitor s)
           
protected  void storage_changed(boolean added, StorageManager.Key fk, int size, com.sleepycat.db.DbTxn xact)
           
protected  void the_real_handle_event(QueueElementIF item)
          This function is only ever called from within the Berkeley DB thread launched from the init () function; as such, it can safely block.
 void unregister_monitor(StorageManager.StorageMonitor s)
           
 
Methods inherited from class bamboo.util.StandardStage
BUG, BUG, BUG, config_get_boolean, config_get_double, config_get_int, config_get_string, configGetInt, destroy, dispatch, enqueue, handleEvents, lookup_stage, now_ms, timer_ms
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

ZERO_GUID

public static final BigInteger ZERO_GUID

ZERO_HASH

public static final byte[] ZERO_HASH

ZERO_CLIENT

public static final InetAddress ZERO_CLIENT

MAX_CLIENT

public static final InetAddress MAX_CLIENT

ZERO_KEY

public static final StorageManager.Key ZERO_KEY

storage_monitors

protected Set storage_monitors

client_counts

protected com.sleepycat.db.Db client_counts

by_time

protected com.sleepycat.db.Db by_time

by_guid

protected com.sleepycat.db.Db by_guid

by_guid_and_data_hash

protected com.sleepycat.db.Db by_guid_and_data_hash

recycling

protected com.sleepycat.db.Db recycling

env

protected com.sleepycat.db.DbEnv env

to_bdb_thread

protected LinkedList to_bdb_thread

open_cursors

protected Map open_cursors

guid_key_creator

protected com.sleepycat.db.DbSecondaryKeyCreate guid_key_creator

rand

protected Random rand

last_dequeue_ms

public long last_dequeue_ms
Constructor Detail

StorageManager

public StorageManager()
Method Detail

register_monitor

public void register_monitor(StorageManager.StorageMonitor s)

unregister_monitor

public void unregister_monitor(StorageManager.StorageMonitor s)

handle_add_monitor

public void handle_add_monitor(StorageManager.AddMonitor event)

handle_remove_monitor

public void handle_remove_monitor(StorageManager.RemoveMonitor event)

storage_changed

protected void storage_changed(boolean added,
                               StorageManager.Key fk,
                               int size,
                               com.sleepycat.db.DbTxn xact)

notify_storage_changed

public void notify_storage_changed(StorageManager.StorageMonitor sm,
                                   boolean added,
                                   InetAddress client_id,
                                   long size)

init

public void init(ConfigDataIF config)
          throws Exception
Specified by:
init in interface EventHandlerIF
Overrides:
init in class StandardStage
Throws:
Exception

handleEvent

public void handleEvent(QueueElementIF item)
Specified by:
handleEvent in interface EventHandlerIF
Overrides:
handleEvent in class StandardStage

the_real_handle_event

protected void the_real_handle_event(QueueElementIF item)
This function is only ever called from within the Berkeley DB thread launched from the init () function; as such, it can safely block. To communicate with the main thread, this thread calls classifier.dispatch_later with a time of 0.


check_open_cursors

protected void check_open_cursors()

open_cursor

protected com.sleepycat.db.Dbc open_cursor(com.sleepycat.db.Db db,
                                           com.sleepycat.db.DbTxn xact,
                                           String source)

close_cursor

protected void close_cursor(com.sleepycat.db.Dbc cursor)

handle_put_req

protected void handle_put_req(StorageManager.PutReq req)

key_expired

protected boolean key_expired(StorageManager.Key k)

handle_get_by_key_req

protected void handle_get_by_key_req(StorageManager.GetByKeyReq req)

primary_key_to_recycling_key

protected void primary_key_to_recycling_key(byte[] buf)

handle_get_by_guid_req

protected void handle_get_by_guid_req(StorageManager.GetByGuidReq req)

handle_get_by_guid_cont

protected void handle_get_by_guid_cont(StorageManager.GetByGuidCont req)

drop_expired_data

protected void drop_expired_data(com.sleepycat.db.DbTxn xact,
                                 com.sleepycat.db.Dbc cursor,
                                 StorageManager.Key k,
                                 int size)

finish_get_by_guid

protected boolean finish_get_by_guid(StorageManager.GBGCont cont,
                                     com.sleepycat.db.DbTxn xact,
                                     com.sleepycat.db.Dbc cursor,
                                     com.sleepycat.db.Dbt pkey,
                                     com.sleepycat.db.Dbt data,
                                     int retval,
                                     SinkIF comp_q,
                                     Object user_data)

finish_get_by_guid_recycling

protected boolean finish_get_by_guid_recycling(StorageManager.GBGCont cont,
                                               com.sleepycat.db.DbTxn xact,
                                               com.sleepycat.db.Dbc cursor,
                                               com.sleepycat.db.Dbt data,
                                               int retval,
                                               SinkIF comp_q,
                                               Object user_data)

handle_get_by_time_req

protected void handle_get_by_time_req(StorageManager.GetByTimeReq req)

arraycmp

protected int arraycmp(byte[] a1,
                       int o1,
                       byte[] a2,
                       int o2,
                       int sz)

handle_get_by_time_cont

protected void handle_get_by_time_cont(StorageManager.GetByTimeCont req)

finish_get_by_time

protected void finish_get_by_time(StorageManager.GBTCont cont,
                                  com.sleepycat.db.DbTxn xact,
                                  com.sleepycat.db.Dbc cursor,
                                  com.sleepycat.db.Dbt data,
                                  int retval,
                                  SinkIF comp_q,
                                  Object user_data)

handle_iterate_by_guid_req

protected void handle_iterate_by_guid_req(StorageManager.IterateByGuidReq req)

handle_iterate_by_guid_cont

protected void handle_iterate_by_guid_cont(StorageManager.IterateByGuidCont req)

finish_iterate_by_guid

protected boolean finish_iterate_by_guid(StorageManager.IBGCont cont,
                                         com.sleepycat.db.Dbt pkey,
                                         com.sleepycat.db.Dbt data,
                                         int retval,
                                         SinkIF comp_q,
                                         Object user_data)

handle_discard_req

protected void handle_discard_req(StorageManager.DiscardReq req)

guid_to_bytes

protected static void guid_to_bytes(BigInteger guid,
                                    byte[] buf,
                                    int offset)

application_enqueue

protected void application_enqueue(SinkIF sink,
                                   QueueElementIF item)

data_hash

protected byte[] data_hash(ByteBuffer data)