Browse Source

Introduced bulk send, internally treat objects to send as byte arrays. Overload << operator to allow sending of arbitrary objects.

git-svn-id: https://svn.inf.ethz.ch/svn/lecturers/a2/trunk@6888 8c9fc860-2736-0410-a75d-ab315db34111
skoster 8 years ago
parent
commit
01d699abf2
2 changed files with 177 additions and 40 deletions
  1. 118 32
      source/ActiveCellsRunner.mod
  2. 59 8
      source/ActiveCellsRuntime.mod

+ 118 - 32
source/ActiveCellsRunner.mod

@@ -1,25 +1,28 @@
-(* 
+(*
 	ActiveCells Runtime context to run Active Cells as Active Objects
 	Felix Friedrich, ETH Zürich, 2015
 *)
 module ActiveCellsRunner;
 
-import ActiveCellsRuntime, Commands, Modules;
+import system,ActiveCellsRuntime, Commands, Modules,D:=Diagnostics;
+
+
 const
 	EnableTrace = false;
-	
-type 
-	Cell = object 
+
+type
+	Cell = object
 	var
-		isCellnet-:boolean; 
+		isCellnet-:boolean;
 	end Cell;
-	
+
 	Fifo=object
 	var
-		data: array 64 of longint;
+		data: array 256 of system.byte;
 		inPos, outPos: longint; length: longint;
-		inPort: Port; outPort: Port;
 		
+		inPort: Port; outPort: Port;
+
 		procedure &Init(outP: Port; inP: Port; length: longint);
 		begin
 			inPos := 0; outPos := 0; self.length := length;
@@ -27,21 +30,52 @@ type
 			inPort := inP; outPort := outP;
 			inPort.SetFifo(self); outPort.SetFifo(self);
 		end Init;
-		
+
 		procedure Put(value: longint);
 		begin{EXCLUSIVE}
-			await((inPos+1) mod len(data) # outPos);
-			data[inPos] := value;
-			inc(inPos); inPos := inPos mod len(data);
+			halt(100);(*broken+deprecated*)
+			(*wait for 4 bytes of free space*)
+			await( ( len(data)- ( (inPos-outPos  +len(data) ) mod len(data) ) )>= sizeof(longint));	
+			system.move(addressof(value) , addressof(data[inPos]) , sizeof(longint) );	
+			inPos := inPos+sizeof(longint);
+			inPos := inPos mod len(data);
 		end Put;
-		
+
 		procedure Get(var value: longint);
 		begin{EXCLUSIVE}
-			await(inPos # outPos);
-			value := data[outPos];
-			inc(outPos); outPos := outPos mod len(data);
+			halt(100); (*broken+deprecated*)
+			(*wait for 4 bytes of available data*)
+			await( (inPos - outPos  +len(data) ) mod len(data) >= sizeof(longint));
+			system.move(addressof(data[outPos]) , addressof(value) ,  sizeof(longint) );
+			outPos:=outPos+sizeof(longint);
+			outPos := outPos mod len(data);
 		end Get;
-
+		
+		
+		(*todo: instead of looping byte by byte, figure out how much we can safely copy over and just do that.*)
+		procedure BulkPut(const value: array of system.byte);
+		var i: longint;
+		begin{EXCLUSIVE}
+			(*because of the exclusive semantics, this bulk put will fill the entire buffer before any get operation has a chance to run*)
+			for i:=0 to len(value)-1 do  
+				await((inPos+1) mod len(data) # outPos);
+				data[inPos]:=value[i];
+				inc(inPos); inPos := inPos mod len(data);
+			end;			
+		end BulkPut;
+		
+		procedure BulkGet(var value: array of system.byte);
+		var
+			i: longint;
+		begin{EXCLUSIVE}
+			(*because of the exclusive semantics, this bulk get will empty the entire buffer before any put operation has a chance to run*)
+			for i:=0 to len(value)-1 do
+				await(inPos#outPos);
+				value[i]:=data[outPos];
+				inc(outPos);outPos:=outPos mod len(data);
+			end;
+		end BulkGet;
+		
 	end Fifo;
 
 	Port= object
@@ -49,20 +83,20 @@ type
 		fifo-: Fifo;
 		delegatedTo-: Port;
 		inout-: set;
-	
+
 		procedure & InitPort(inout: set; width: longint);
 		begin
-			fifo := nil; 
-			delegatedTo := nil; 
+			fifo := nil;
+			delegatedTo := nil;
 			self.inout := inout;
 			delegatedTo := nil;
 		end InitPort;
-		
+
 		procedure Delegate(toPort: Port);
 		begin{EXCLUSIVE}
 			delegatedTo := toPort;
 		end Delegate;
-		
+
 		procedure SetFifo(f: Fifo);
 		begin{EXCLUSIVE}
 			if delegatedTo # nil then
@@ -71,7 +105,7 @@ type
 				fifo := f;
 			end;
 		end SetFifo;
-		
+
 		procedure Send(value: longint);
 		begin
 			begin{EXCLUSIVE}
@@ -80,34 +114,61 @@ type
 			if delegatedTo # nil then
 				delegatedTo.Send(value)
 			else
-				fifo.Put(value);
+				(*fifo.Put(value);*)
+				fifo.BulkPut(value);
 			end;
 		end Send;
 		
+		procedure BulkSend(const value: array of system.byte);
+		begin
+			begin{EXCLUSIVE}
+				await((fifo # nil) or (delegatedTo # nil));
+			end;
+			if delegatedTo # nil then
+				delegatedTo.BulkSend(value)
+			else
+				fifo.BulkPut(value);
+			end;
+			
+		end BulkSend;
+
 		procedure Receive(var value: longint);
 		begin
 			begin{EXCLUSIVE}
 				await((fifo # nil) or (delegatedTo # nil));
 			end;
-			if delegatedTo # nil then	
+			if delegatedTo # nil then
 				delegatedTo.Receive(value)
 			else
-				fifo.Get(value);
+				(*fifo.Get(value);*)
+				fifo.BulkGet(value);
 			end;
 		end Receive;
 		
+		procedure BulkReceive(var value: array of system.byte);
+		begin
+			begin{EXCLUSIVE}
+				await((fifo # nil) or (delegatedTo # nil));
+			end;
+			if delegatedTo # nil then
+				delegatedTo.BulkReceive(value)
+			else
+				fifo.BulkGet(value);
+			end;
+		end BulkReceive;
+
 	end Port;
 
 	(* generic context object that can be used by implementers of the active cells runtime *)
 	Context*= object (ActiveCellsRuntime.Context)
-	
+
 		procedure Allocate(scope: any; var c: any; t: Modules.TypeDesc; const name: array of char; isCellnet, isEngine: boolean);
 		var cel: Cell;
 		begin
 			new(cel); c := cel;
 			cel.isCellnet := isCellnet;
 		end Allocate;
-		
+
 		procedure AddPort*(c: any; var p: any; const name: array of char; inout: set; width: longint);
 		var por: Port;
 		begin
@@ -121,11 +182,10 @@ type
 			Ports2d = array of Ports1d;
 			Ports3d = array of Ports2d;
 		var
-			p: any;
 			p1d: pointer to Ports1d;
 			p2d: pointer to Ports2d;
 			p3d: pointer to Ports3d;
-			i, i0, i1, i2: longint;
+			i0, i1, i2: longint;
 		begin
 			if EnableTrace then trace(name, inout, width, len(lens)); end;
 			(*
@@ -204,23 +264,49 @@ type
 			p(Port).Send(value);
 		end Send;
 
+		procedure BulkSend*(p:any; const value: array of system.byte);
+		begin
+			if EnableTrace then trace(p, 'bulk send'); end;
+			p(Port).BulkSend(value);
+		end BulkSend;
+
 		procedure Receive*(p: any; var value: longint);
 		begin
 			if EnableTrace then trace(p, value); end;
 			p(Port).Receive(value);
 		end Receive;
 		
+		procedure BulkReceive*(p:any; var value: array of system.byte);
+		begin
+			if EnableTrace then trace(p, 'bulk receive'); end;
+			p(Port).BulkReceive(value);
+		end BulkReceive;
+
 	end Context;
 	
+	
 	procedure Execute*(context: Commands.Context);
 	var myContext: Context; cmd: array 256 of char;
+		diag: D.StreamDiagnostics;
 	begin
 		new(myContext);
+		new(diag,context.out);
 		if context.arg.GetString(cmd) then
-			ActiveCellsRuntime.Execute(cmd, myContext, nil)
+			ActiveCellsRuntime.Execute(cmd, myContext, diag)
 		end;
 	end Execute;
 
+	procedure Stop*(context: Commands.Context);
+	var myContext: Context; cmd: array 256 of char;
+		diag: D.StreamDiagnostics;
+	begin
+		new(myContext);
+		new(diag,context.out);
+		if context.arg.GetString(cmd) then
+			(*todo*)
+		end;
+	end Stop;
+
 end ActiveCellsRunner.
 
 

+ 59 - 8
source/ActiveCellsRuntime.mod

@@ -25,7 +25,7 @@ type
 		
 		procedure AddPort*(c: any; var p: any; const name: array of char; inout: set; width: longint);
 		end AddPort;
-
+		
 		procedure AddPortArray*(c: any; var ports: any; const name: array of char; inout: set; width: longint; const lens: array of longint);
 		end AddPortArray;
 
@@ -68,12 +68,18 @@ type
 		procedure Send*(p: any; value: longint);
 		end Send;
 		
+		procedure BulkSend*(p: any; const value: array of system.byte);
+		end BulkSend;
+		
 		procedure SendNonBlocking*(p: any; value: longint): boolean;
 		end SendNonBlocking;
 
 		procedure Receive*(p: any; var value: longint);
 		end Receive;
 		
+		procedure BulkReceive*(p: any; var value: array of system.byte);
+		end BulkReceive;
+		
 		procedure ReceiveNonBlocking*(p: any; var value: longint): boolean;
 		end ReceiveNonBlocking;
 			
@@ -232,6 +238,11 @@ type
 		GetContext().Send(p, value);
 	end Send;
 	
+	procedure BulkSend*(p: any; const value: array of system.byte);
+	begin
+		GetContext().BulkSend(p,value);
+	end BulkSend;
+	
 	procedure SendNonBlocking*(p: any; value: longint): boolean;
 	begin
 		return GetContext().SendNonBlocking(p, value);
@@ -242,6 +253,11 @@ type
 		GetContext().Receive(p, value);
 	end Receive;
 	
+	procedure BulkReceive*(p: any; var value: array of system.byte);
+	begin
+		GetContext().BulkReceive(p,value);
+	end BulkReceive;
+	
 	procedure ReceiveNonBlocking*(p: any; var value: longint): boolean;
 	begin
 		return GetContext().ReceiveNonBlocking(p, value);
@@ -398,6 +414,8 @@ type
 
 		copy(typeName, name);
 		Strings.Append(name, ".@Body");
+		trace(name);
+		trace(m.refs);
 		offset := Reflection.FindByName(m.refs, 0, name, true);
 		if offset # 0 then
 			if Reflection.GetChar(m.refs,offset) = Reflection.sfProcedure then
@@ -424,14 +442,47 @@ type
 		end;
 	end Execute;
 	
-(*	type LA = array of longint;
-	operator "<<"* (p: port out; const a: LA);
-	var i: longint;
+	type la= array of system.byte; 
+	operator "<<"* (p: port out; const a: la);
 	begin
-		for i := 0 to len(a)-1 do
-			p << a[i];
-		end;
+		if EnableTrace then trace('bulk send'); end;
+		BulkSend(system.val(any,p),a);
 	end "<<";
-*)
+	
+	
+	operator "<<"* (var  a: la; p: port in);
+	begin
+		if EnableTrace then trace('bulk rec');end;
+		BulkReceive(system.val(any,p),a);
+	end "<<";
+	
+	type li= longint;
+	operator "<<"* (p: port out; a: li);
+	begin
+		if EnableTrace then trace('normal send');end;
+		BulkSend(system.val(any,p),a);
+	end "<<";
+	
+	operator "<<"* (var  a: li; p: port in);
+	begin
+		if EnableTrace then trace('normal rec');end;
+		BulkReceive(system.val(any,p),a);
+	end "<<";
+
+	type r= real;
+	operator "<<"* (p: port out; a: r);
+	begin
+		if EnableTrace then trace('normal send');end;
+		BulkSend(system.val(any,p),a);
+	end "<<";
+	
+	operator "<<"* (var  a:r; p: port in);
+	begin
+		if EnableTrace then trace('normal rec');end;
+		BulkReceive(system.val(any,p),a);
+	end "<<";
+	
+	
+
 end ActiveCellsRuntime.