Kaynağa Gözat

forgotten commit: implemented synchronous start of all cells via a delayed execution of Launcher activity after the architecture has been fully assembled

git-svn-id: https://svn.inf.ethz.ch/svn/lecturers/a2/trunk@6906 8c9fc860-2736-0410-a75d-ab315db34111
eth.morozova 8 yıl önce
ebeveyn
işleme
3e979c6be1
2 değiştirilmiş dosya ile 59 ekleme ve 32 silme
  1. 33 26
      source/ActiveCellsRunner.mod
  2. 26 6
      source/ActiveCellsRuntime.mod

+ 33 - 26
source/ActiveCellsRunner.mod

@@ -13,14 +13,14 @@ const
 type
 	Cell = object
 	var
-		isCellnet-:boolean;
+		isCellnet:boolean;
 	end Cell;
 
 	Fifo=object
 	var
 		data: array 256 of system.byte;
 		inPos, outPos: longint; length: longint;
-		
+
 		inPort: Port; outPort: Port;
 
 		procedure &Init(outP: Port; inP: Port; length: longint);
@@ -35,8 +35,8 @@ type
 		begin{EXCLUSIVE}
 			halt(100);(*broken+deprecated*)
 			(*wait for 4 bytes of free space*)
-			await( ( len(data)- ( (inPos-outPos  +len(data) ) mod len(data) ) )>= sizeof(longint));	
-			system.move(addressof(value) , addressof(data[inPos]) , sizeof(longint) );	
+			await( ( len(data)- ( (inPos-outPos  +len(data) ) mod len(data) ) )>= sizeof(longint));
+			system.move(addressof(value) , addressof(data[inPos]) , sizeof(longint) );
 			inPos := inPos+sizeof(longint);
 			inPos := inPos mod len(data);
 		end Put;
@@ -50,20 +50,20 @@ type
 			outPos:=outPos+sizeof(longint);
 			outPos := outPos mod len(data);
 		end Get;
-		
-		
+
+
 		(*todo: instead of looping byte by byte, figure out how much we can safely copy over and just do that.*)
 		procedure BulkPut(const value: array of system.byte);
 		var i: longint;
 		begin{EXCLUSIVE}
 			(*because of the exclusive semantics, this bulk put will fill the entire buffer before any get operation has a chance to run*)
-			for i:=0 to len(value)-1 do  
+			for i:=0 to len(value)-1 do
 				await((inPos+1) mod len(data) # outPos);
 				data[inPos]:=value[i];
 				inc(inPos); inPos := inPos mod len(data);
-			end;			
+			end;
 		end BulkPut;
-		
+
 		procedure BulkGet(var value: array of system.byte);
 		var
 			i: longint;
@@ -75,7 +75,7 @@ type
 				inc(outPos);outPos:=outPos mod len(data);
 			end;
 		end BulkGet;
-		
+
 	end Fifo;
 
 	Port= object
@@ -93,12 +93,12 @@ type
 		end InitPort;
 
 		procedure Delegate(toPort: Port);
-		begin{EXCLUSIVE}
+		begin(*{EXCLUSIVE}*)
 			delegatedTo := toPort;
 		end Delegate;
 
 		procedure SetFifo(f: Fifo);
-		begin{EXCLUSIVE}
+		begin(*{EXCLUSIVE}*)
 			if delegatedTo # nil then
 				delegatedTo.SetFifo(f)
 			else
@@ -108,9 +108,9 @@ type
 
 		procedure Send(value: longint);
 		begin
-			begin{EXCLUSIVE}
+			(*begin{EXCLUSIVE}
 				await((fifo # nil) or (delegatedTo # nil));
-			end;
+			end;*)
 			if delegatedTo # nil then
 				delegatedTo.Send(value)
 			else
@@ -118,25 +118,24 @@ type
 				fifo.BulkPut(value);
 			end;
 		end Send;
-		
+
 		procedure BulkSend(const value: array of system.byte);
 		begin
-			begin{EXCLUSIVE}
+			(*begin{EXCLUSIVE}
 				await((fifo # nil) or (delegatedTo # nil));
-			end;
+			end;*)
 			if delegatedTo # nil then
 				delegatedTo.BulkSend(value)
 			else
 				fifo.BulkPut(value);
 			end;
-			
 		end BulkSend;
 
 		procedure Receive(var value: longint);
 		begin
-			begin{EXCLUSIVE}
+			(*begin{EXCLUSIVE}
 				await((fifo # nil) or (delegatedTo # nil));
-			end;
+			end;*)
 			if delegatedTo # nil then
 				delegatedTo.Receive(value)
 			else
@@ -144,12 +143,12 @@ type
 				fifo.BulkGet(value);
 			end;
 		end Receive;
-		
+
 		procedure BulkReceive(var value: array of system.byte);
 		begin
-			begin{EXCLUSIVE}
+			(*begin{EXCLUSIVE}
 				await((fifo # nil) or (delegatedTo # nil));
-			end;
+			end;*)
 			if delegatedTo # nil then
 				delegatedTo.BulkReceive(value)
 			else
@@ -165,6 +164,7 @@ type
 		procedure Allocate(scope: any; var c: any; t: Modules.TypeDesc; const name: array of char; isCellnet, isEngine: boolean);
 		var cel: Cell;
 		begin
+			if res # 0 then return; end; (*! do not do anything in case of an error *)
 			new(cel); c := cel;
 			cel.isCellnet := isCellnet;
 		end Allocate;
@@ -172,6 +172,7 @@ type
 		procedure AddPort*(c: any; var p: any; const name: array of char; inout: set; width: longint);
 		var por: Port;
 		begin
+			if res # 0 then return; end; (*! do not do anything in case of an error *)
 			if EnableTrace then trace(c,p,name, inout, width); end;
 			new(por,inout,width); p := por;
 		end AddPort;
@@ -187,6 +188,7 @@ type
 			p3d: pointer to Ports3d;
 			i0, i1, i2: longint;
 		begin
+			if res # 0 then return; end; (*! do not do anything in case of an error *)
 			if EnableTrace then trace(name, inout, width, len(lens)); end;
 			(*
 				There is a slot in the respective cell that can hold a pointer to an n-dimensional array of ports.
@@ -227,6 +229,7 @@ type
 		procedure AddStaticPortArray*(c: any; var ports: array of any; const name: array of char; inout: set; width: longint);
 		var i: longint;
 		begin
+			if res # 0 then return; end; (*! do not do anything in case of an error *)
 			if EnableTrace then trace(name, inout, width, len(ports)); end;
 			for i := 0 to len(ports)-1 do
 				AddPort(c, ports[i], name, inout, width);
@@ -236,12 +239,14 @@ type
 		procedure Connect*(outPort, inPort: any; depth: longint);
 		var fifo: Fifo;
 		begin
+			if res # 0 then return; end; (*! do not do anything in case of an error *)
 			if EnableTrace then trace(outPort, inPort, outPort, inPort, depth); end;
 			new(fifo, outPort(Port), inPort(Port), depth);
 		end Connect;
 
 		procedure Delegate*(netPort: any; cellPort: any);
 		begin
+			if res # 0 then return; end; (*! do not do anything in case of an error *)
 			if EnableTrace then trace(netPort, cellPort); end;
 			netPort(Port).Delegate(cellPort(Port));
 		end Delegate;
@@ -249,12 +254,14 @@ type
 		procedure Start*(c: any; proc: procedure{DELEGATE});
 		var launcher: ActiveCellsRuntime.Launcher;
 		begin
+			if res # 0 then return; end; (*! do not do anything in case of an error *)
 			if EnableTrace then trace(c, proc); end;
 			if c(Cell).isCellnet then (* synchronous *)
 				proc
 			else
 				new(launcher, self); (* asynchronous *)
 				launcher.Start(proc, false);
+				if launcher.error & (res = 0) then res := -1; end;
 			end;
 		end Start;
 
@@ -275,7 +282,7 @@ type
 			if EnableTrace then trace(p, value); end;
 			p(Port).Receive(value);
 		end Receive;
-		
+
 		procedure BulkReceive*(p:any; var value: array of system.byte);
 		begin
 			if EnableTrace then trace(p, 'bulk receive'); end;
@@ -283,8 +290,8 @@ type
 		end BulkReceive;
 
 	end Context;
-	
-	
+
+
 	procedure Execute*(context: Commands.Context);
 	var myContext: Context; cmd: array 256 of char;
 		diag: D.StreamDiagnostics;

+ 26 - 6
source/ActiveCellsRuntime.mod

@@ -19,6 +19,8 @@ type
 	Context*= object
 	var
 		topNet-: any; (** top-level CELLNET object specific to this runtime context *)
+		finishedAssembly-: boolean; (** assigned to TRUE after the whole architecture has been assembled *)
+		res*: longint; (** error code, 0 in case of success *)
 		
 		procedure Allocate*(scope: any; var c: any; t: Modules.TypeDesc; const name: array of char; isCellNet, isEngine: boolean);
 		end Allocate;
@@ -82,6 +84,17 @@ type
 		
 		procedure ReceiveNonBlocking*(p: any; var value: longint): boolean;
 		end ReceiveNonBlocking;
+		
+		(* called in Execute after the architecture is fully assembled *)
+		procedure FinishedAssembly();
+		begin{EXCLUSIVE}
+			finishedAssembly := true;
+		end FinishedAssembly;
+
+		procedure WaitUntilFinishedAssembly();
+		begin{EXCLUSIVE}
+			await(finishedAssembly or (res # 0));
+		end WaitUntilFinishedAssembly;
 			
 	end Context;
 	
@@ -89,7 +102,7 @@ type
 	var 
 		proc: procedure {DELEGATE};
 		context: Context;
-		finished: boolean;
+		finished, delayedStart: boolean;
 		error-: boolean;
 		
 		procedure & Init*(context: Context);
@@ -100,16 +113,22 @@ type
 		end Init;
 		
 		procedure Start*(p: procedure{DELEGATE}; doWait: boolean);
-			begin{EXCLUSIVE}
-				proc := p;
-				await(~doWait or finished);
+		begin{EXCLUSIVE}
+			proc := p;
+			if ~doWait then delayedStart := true; end; (* delay actual start until the whole architecture is fully assembled *)
+			await(~doWait or finished);
 		end Start;
 		
 	begin{ACTIVE}
 		begin{EXCLUSIVE}
 			await(proc # nil);
 		end;
-		proc;
+		if delayedStart then
+			context.WaitUntilFinishedAssembly;
+		end;
+		if context.res = 0 then
+			proc;
+		end;
 		begin{EXCLUSIVE}
 			finished := true
 		end;
@@ -447,8 +466,9 @@ type
 
 				new(starter, pc, scope);
 			end;
-			new(launcher, context); 
+			new(launcher, context);
 			launcher.Start(starter.P, true);
+			context.FinishedAssembly;
 			assert(~launcher.error);
 		else 
 			Reflection.Report(Commands.GetContext().out, m.refs, offset);