Personal tools

Nodal-wikid.pl

From OrganicDesign Wiki

Jump to: navigation, search
# Licensed under LGPL: www.gnu.org/copyleft/lesser.html
# [[:Template:PERL]]
# NodalWikid.pl - NodalHash using wiki as persistence (before moving to peerd)
#
# NOTE: peerd is extremely inefficient!
#       Nodes should be based on binary (see ListSpace), rather than PERL's tied hashes
#       And NodalWikid.pl is even more inefficient :-)
#
use IO::Socket;
use IO::Select;
 
our $maxNodes = 1000; # Max nodes that can fit in local runtime NodalSpace
our %space;
our $space;
 
# ----------------------------------------------------------------------------------------------------------- #
# NODAL CORE CLASS
# - Converts a standard PERL hash-table into a Distributed Computational Hash Table
# - This class is "hard-wired" to act on %::space
package NodalSpace;
use Scalar::Util qw( refaddr );
use Carp;
 
# Nodes needed for nodal operation
# - THIS and SELF are concepts that contain a node-reference
use constant OBJECT		=> 0;	# aka ROOT
use constant LIST		=> 1;	# aka SUBJECT
use constant HASH		=> 2;	# aka CONTENT
use constant HUSK		=> 3;	# General Husk (class/struct of husk)
use constant THIS		=> 4;	# This executing Husk (runtime of husk)
use constant PEER		=> 5;	# General Peer class (class of peer)
use constant SELF		=> 6;	# This peer (runtime of this peer)
use constant INIT		=> 7;	# aka onCreate
use constant MAIN		=> 8;
use constant EXIT		=> 9;	# aka onRemove
use constant GUID		=> 10;
use constant NAME		=> 11;
use constant CODE		=> 16;
use constant LANG		=> 17;
use constant PERL		=> 18;
# Bender			=> 19;
# Gir				=> 20;
# Pipi				=> 21;
# IronGiant			=> 22;
 
# Create the actual data structure with its three aspects
# - The internal structure is a three element array containing scalar, listref and hashref
#   the hash aspect uses a two element array for keys which are refs because the actual
#   hash-key can only be the address of a ref, so its stored as {ref-addr} = [ref-key,value]
sub TIEHASH {
	my $this = []; # Note that the internal data structure is actually an array not a hash
	bless $this, shift;
	$this->CLEAR;
	return $this;
	}
 
# Used when a NodalHash is assigned with {} or initialised with TIEHASH
# - clears all three aspects of the NodalHash object
sub CLEAR {
	my $this = shift;
	$$this[OBJECT] = '';
	$$this[LIST] = [];
	$$this[HASH] = {};
	}
 
# Return value in passed key
# - if value is a ref (which it should be) then it's blessed as a NodalHash so methods are accessible
# - remember, keys which are refs must be treated specially to preserve the key as a reference
# - the special key of -i returns the nodes internal data-structure (a 3-element array)
#   the internal structure is only meant for use by other non-TIEHASH methods of the Nodal class
# - onAccess happens here, but it may not be a useful concept to adopt
sub FETCH {
	my ( $this, $key, $val ) = @_;
	return $this if $key eq -i;
	my $k = ref $key ? refaddr $key : $key;
	$val = exists $$this[HASH]{$k} ? $$this[HASH]{$k}[1] : undef;
	$val = $$val[1] if ref $key;
	return ref $val ? bless $val, ref $this : $val;
	}
 
# Store val in key of NodalHash
# - If key is a ref, then store key and val together using address as key
# - If val is a hash, tie it to this Nodal class
sub STORE {
	my ( $this, $key, $val ) = @_;
	my $k = ref $key ? refaddr $key : $key;
	tie %$val, ref $this if ref $val eq 'HASH' and not tied %$val;
	# Get current value
	my $cur = exists $$this[HASH]{$k} ? $$this[HASH]{$k} : undef;
	$cur = $$cur[1] if ref $key;
	# If changing, update, queue for propagation and queue any onChange
	if ( $val ne $cur ) {
		$$this[HASH]{$k} = ref $key ? [$key, $val] : $val;
		# QUEUE THE CHANGE FOR SYNC HERE (general onChange may not be useful)
		}
	$val;
	}
 
# Nodal Reduction
# - An item shifts off to executes, it returns what to push back on (if anything)
# - If there is no code here, then execution goes within, and the same node is pushed back on
#   Reducable items with no cod ere Processes, their presence is controlled by their INIT and EXIT processes
# - If there is code, but not locally executable, then build from this language's text description
sub reduce {
	my @list = shift->{-i}[LIST];
	return unless my $node = shift @list;
	if ( exists $$node{CODE} ) {
		my $code = $$node{CODE}{ $$space{ $$space{THIS} }{LANG} }->object;
		$$node{CODE}->object = eval "sub{$code}" unless $$node{CODE}->object;
		if ( ref $$node{CODE}->object eq 'CODE' ) { $node = &{ $$node{CODE}->object } }
		else {
			# error: could not declare this code in local language
			# - we need to create an instance of this kind of error
			}
		} else { $node->reduce }
	push @list, $node if ref $node;
	}
 
sub DELETE {
	my ( $this, $key ) = @_;
	delete $$this[HASH]{ ref $key ? refaddr $key : $key };
	}
 
sub EXISTS {
	my ( $this, $key ) = @_;
	exists $$this[HASH]{ ref $key ? refaddr $key : $key };
	}
 
# Returns ref to lambda content
# - Content is returned directly if already a reference
sub STATE {
	my $val = shift->{-i}[STATE];
	return ref $val ? $val : \$val;
	}
 
# STORAGE OVERVIEW
# - each storage resource is an n-node list which is filled from the start and chopped off the end
# - this allows chopped data to be aggregated and compressed into deeper archive if necessary
# - for now this can simply be done with an index file listing the node guids in order
# - the node-associations and data-content are stored in guid-named file
# - this method can easily use FS or DB, and they can share common NodalSpace::toString and NodalSpace::fromString methods
 
# Writes and flushes change-buffer to local file-cache
sub sync {
	my $this = shift;
	my @synclist = @{ $$this{SYNC}->list };
	for ( @synclist ) {
		# something
		}
	&::logAdd( "$#synclist nodes exported." );
	}
 
# Loads $::maxNodes from the nodal index file
# - saved from @root list
sub load {
	my @rootlist = @{ $space->list };
 
	# first loop thru maxNodes of root-list and create empty hashrefs
	# - the new hashref is a NodalHash (because assigned to a NodalHash key)
	# - don't know what to do with root a:b yet
	open ROOTH, '<', "$peer.3";
	while ( <ROOTH> =~ /^(.+?)(:(.+?))?$/ && $#rootlist < $::maxNodes ) {
		my $node = $$space{$1} = {};
		${ $$node{GUID}->object } = $1 unless $2;
		push @rootlist, $node;
		}
	close ROOTH;
 
	# now loop thru all empty nodes and populate from their associated files
	for my $node ( @rootlist ) {
		my $guid = $$node{GUID}->object;
 
		# Read in nodal portion (LIST and HASH aspects)
		if ( open NODEH, '<', "$guid.3" ) {
			while ( <NODEH> =~ /^(.+?)(:(.+?))?$/ )
				{ $2 ? $$space{$node}{$1} = $$space{$3} : push @{ $$space{$node}->list }, $$space{$1} }
			close NODEH
			}
		else { &::logAdd( "Couldn't read LIST & HASH content for: $node!" ) }
 
		# Read in SCALAR aspect (from guid.1 if small and safe)
		if ( /^([^*])+[\r\n]+\*/ ) { $$node->object = $1 }
		elsif ( open NODEH, '<', "$guid.1" ) {
			binmode NODEH;
			sysread NODEH, $$node->object, 1000000;
			close NODEH;
			}
		else { &::logAdd( "Couldn't read SCALAR content for: $node!" ) }
 
		}
 
	&::logAdd( "$#rootlist nodes nodes imported." );
	}
 
# ----------------------------------------------------------------------------------------------------------- #
# INITIALISE NODAL SPACE
 
package main;
 
# Create global %space, tie to Nodal class
# - $space is a blessed ref to %space for executing the non-TIEHASH methods of %space
tie %space, 'NodalSpace';
$space = \%space;
bless $space, 'NodalSpace';
 
# Load persistent content
# - this is not done in TIEHASH since only root should initialise the loading
$space->load;
 
# Convert $peer from name to node-ref of this peer in %space and map to SELF
my $userpage = readFile "User:$::peer";
$$space{NodalSpace::SELF} = $$space{$1} if $userpage =~ /^\*\s*Node\s*:\s*([0-9]+)/im;
$$space{NodalSpace::THIS} = $space; # this husks runtime (env)
#$peer = $space;
#&::logAdd( 'GUID for this peer is '.$$peer{NodalSpace::GUID} );
#$$space{NodalSpace::SELF} = $$space{$peer};
 
# Set SELF's LANG to PERL
$$space{NodalSpace::SELF}{NodalSpace::LANG} = $$space{NodalSpace::PERL};
 
# Push peer's INIT onto root queue
# - when INIT is finished, root contains MAIN for this peer
push @{ $space->list }, $$space{NodalSpace::INIT};
 
# ----------------------------------------------------------------------------------------------------------- #
# PEERD
# - this is the single peerd-server loop running in its own thread
# - nodal reduction can be in here instead of each session handler
# - only stream message construct/extract needs to be in the individual session-handler threads
sub peerd {
 
	my $port = shift;
 
	# On startup, notify server.pl who'll preserve our handle in global scope
	my $server;
	do {
		$server = new IO::Socket::INET PeerAddr => 'localhost', PeerPort => $port, Proto => 'tcp';
		unless ( $server ) {
			logAdd "Couldn't establish connection to parent, retrying in 10 seconds.";
			sleep(10);
			}
		} until ( $server );
	print $server "GET /connect/peer/peerd HTTP/1.1\r\n\r\n";
 
	# Initialise streams
	my $select = IO::Select->new( $server );
	my %streams = ();
	$streams{fileno $server}{handle} = $server;
	$streams{fileno $server}{buffer} = '';
 
	# Main server loop (listening to server.pl and session-handlers)
	while(1) {
 
		# Loop through readable streams
		for my $handle ( $select->can_read(1) ) {
			my $stream = fileno $handle;
 
			# There is data to read on this stream, accumulate in this streams input buffer
			if ( sysread $handle, my $input, 10000 ) {
				$streams{$stream}{buffer} .= $input;
 
				# At least one complete message has accumulated
				if ( $streams{$stream}{buffer} =~ s/^(.*\x00)//s ) {
 
					# Remove each complete message from buffer and process
					for my $msg ( split /\x00/, $1 ) {
 
						# Message is from server.pl:
						if ( $handle == $server ) {
							logAdd "Message from server.pl: \"$msg\"";
 
							# Msg is a connect-request, so establish new stream back to server.pl
							# - the new stream-handle will be given to a newly spawned session-handler
							if ( $msg =~ /connect-request\/(stream[0-9]+)$/ ) {
								if ( my $ph = new IO::Socket::INET PeerAddr => 'localhost', PeerPort => $port, Proto => 'tcp' ) {
									print $ph "GET /session/peer/peerd/$1 HTTP/1.1\r\n\r\n";
									my $pstream = fileno $ph;
									$streams{$pstream}{handle} = $ph;
									$streams{$pstream}{buffer} = '';
									$streams{$pstream}{interface} = 1;
									logAdd "Connecting back to server on stream$pstream";
									}
								else { logAdd "Failed to establish new connection with server.pl!" }
								}
 
							# Not connect-request, so propagate msg to session-handlers
							else {
								logAdd "Forwarding msg to session-handlers...";
								for ( keys %streams ) {
									my $sh = $streams{$_}{handle};
									print $sh "$msg\x00" if exists $streams{$_}{interface};
									logAdd "Forwarded to stream$_" if exists $streams{$_}{interface};
									}
								}
							}
 
						# Message is not from server.pl, must be from a session-handler
						else {
							logAdd "Message from a session-handler: \"$msg\"";
							}
						}
					}
				}
 
			# There is no more data to read on this stream, so close it
			else {
				$::select->remove( $handle );
				delete $streams{$stream};
				$handle->close();
				logAdd "Stream$stream disconnected.";
				}
 
			}
		}
	}
 
# ----------------------------------------------------------------------------------------------------------- #
# SESSION-HANDLER
# - A separate interfaceHandler function is spawned for each peer.swf connect request
# - a session-handler is a dedicated thread sitting between nodal-wikid.pl and a peer.swf
# - todo: this is spawned by server.pl, but that should be moved into here
sub sessionHandler {
 
	my( $peerd, $interface ) = @_;
	$::subname .= "/$$";
	my $pstream = fileno $peerd;
	my $istream = fileno $interface;
	logAdd "Session-handler( peerd/stream$pstream, interface/stream$istream )";
 
	# Set up a new listener for interface and parent
	my $select = IO::Select->new( $peerd, $interface );
 
	# Introduce ourselves to the interface we've been assigned to handle
	print $interface "Hello, I'm your session-handler, my PID is $$.\x00";
 
	# Server loop (listening to parent and interface streams)
	my %buffers = {};
	while(1) {
		for my $handle ( $select->can_read(1) ) {
 
			# One of our streams has data to read
			if ( sysread $handle, my $input, 10000 ) {
				my $stream = fileno $handle;
				$buffers{$stream} .= $input;
				if ( $buffers{$stream} =~ s/^(.*\x00)//s ) {
					for ( split /\x00/, $1 ) {
						logAdd "Msg received on stream$stream: \"$_\"";
						# Forward all data from peerd to interface
						print $interface "$_\x00" if $handle == $peerd;
						}
					}
				}
 
			# If either connection close, close stream-handle and die
			else {
				$peerd->close();
				$interface->close();
				logAdd 'Handles closed, Exit.';
				exit;
				}
 
			}
		}
	}

The GNU Project Debian Linux Ubuntu Linux Wikipedia online encycopedia MediaWiki