123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491 |
- (* Aos Runtime: TCP, Copyright 2005, Emil J. Zeller *)
- (* Aos, Copyright 2001, Pieter Muller, ETH Zurich *)
- MODULE TCP; (** AUTHOR "pjm, mvt"; PURPOSE "TCP protocol"; *)
- IMPORT
- WSock32, Modules, Kernel, Streams, IP, Objects, KernelLog,SYSTEM;
- CONST
- Trace = FALSE;
- NilPort* = 0;
- (** Error codes *)
- Ok* = 0; ConnectionRefused* = 3701; NotConnected* = 3705; TimedOut* = 3704;
- (** TCP connection states *)
- (** TCP connection states *)
- NumStates* = 12; Closed* = 0; Listen* = 1; SynSent* = 2;
- SynReceived* = 3; Established* = 4; CloseWait* = 5; FinWait1* = 6;
- Closing* = 7; LastAck* = 8; FinWait2* = 9; TimeWait* = 10;
- Unused* = 11; (* no real state, only used in this implementation *)
- OpenStates* = {Listen, SynReceived, Established, CloseWait, FinWait1, FinWait2};
- ClosedStates* = {Unused, Closed, Closing, LastAck, TimeWait};
- HalfClosedStates* = ClosedStates + {FinWait1, FinWait2};
- FinStates* = {Unused, Closed, CloseWait, Closing, LastAck, TimeWait}; Timeout = 14;
- (*AckNow = 0; *) (* send Ack immediately *)
- (* DelAck = 1;*) (* send Ack, but try to delay it *)
- NoDelay = 2; (* don't delay packets tocoalesce (disable Nagle algorithm) *)
- DoKeepAlive = 3; (* enable keep-alive timer *)
- TYPE
- (** Connection object. NOTE: Only one process should access a Connection! *)
- Connection* = OBJECT (Streams.Connection)
- VAR
- (* assigned interface *)
- int-: IP.Interface;
- (* local protocol address *)
- lport-: LONGINT;
- (* foreign protocol address *)
- fip-: IP.Adr;
- fport-: LONGINT;
- state*: SHORTINT; (* TCP state *)
- (* send sequence *)
- sndnxt-: LONGINT; (* send next *)
- iss-: LONGINT; (* initial send sequence number *)
- (* receive sequence *)
- rcvnxt-: LONGINT; (* receive next *)
- irs-: LONGINT; (* initial receive sequence number *)
- socket: WSock32.Socket;
- sndwnd-: LONGINT; (* send window *)
- sndcwnd-: LONGINT; (* congestion-controlled window *)
- sndcc-: LONGINT; (* number of bytes in send buffer *)
- rcvwnd-: LONGINT; (* receive window *)
- srtt-: LONGINT; (* smoothed round trip time *)
- (* receiver: Receiver; *)
- (* sender: Sender; *)
- (* lip: IP.Adr; *)
- timeout: Objects.Timer;
- flags: SET;
- (* Initialization for internal use only. *)
- PROCEDURE & Init*;
- BEGIN
- state := Unused;
- socket := WSock32.InvalidSocket; (* NEW(inbuf,receive,send); *) (*NEW( receiver, SELF.receive ); *)
- (*NEW( sender, SELF.send ); *)
- END Init;
- (** Open a TCP connection (only use once per Connection instance).
- Use TCP.NilPort for lport to automatically assign an unused local port.
- *)
- PROCEDURE Open*( lport: LONGINT; fip: IP.Adr; fport: LONGINT; VAR res: LONGINT );
- VAR adr: WSock32.sockaddrIn; err: LONGINT; str: ARRAY 64 OF CHAR;
- BEGIN {EXCLUSIVE}
- IF Trace THEN
- KernelLog.Enter; KernelLog.String( "Open connection: lport=" ); KernelLog.Int( lport, 1 );
- KernelLog.String( " ,fip=" ); IP.AdrToStr( fip, str ); KernelLog.String( str );
- KernelLog.String( " ,fport=" ); KernelLog.Int( fport, 1 ); Report( SELF ); KernelLog.Exit;
- END;
- ASSERT ( (state = Unused) & (lport >= 0) & (lport < 10000H) & (fport >= 0) & (fport < 10000H) );
- IF (fip.usedProtocol # IP.IPv4) THEN
- KernelLog.String("TCP.Connection.Open: Warning: Connection to non-IPv4 host not supported!"); KernelLog.Ln;
- res := NotConnected;
- RETURN;
- END;
- IF socket = WSock32.InvalidSocket THEN
- socket := WSock32.socket( WSock32.AFINet, WSock32.SockStream, WSock32.IPProtoTCP );
- ASSERT ( socket # WSock32.InvalidSocket );
- pool.Add( SELF, SELF.Finalize )
- END;
- IF ~IP.IsNilAdr(fip) & (fport # NilPort) THEN (* active open (connect) *)
- IF Trace THEN KernelLog.Enter; KernelLog.String( "Active open" ); Report( SELF ); KernelLog.Exit; END;
- int := IP.InterfaceByDstIP( fip );
- SELF.lport := lport; SELF.fip := fip; SELF.fport := fport;
- IF lport # NilPort THEN
- adr.sinFamily := WSock32.PFINet; adr.sinAddr := 0;
- adr.sinPort := WSock32.htons( SHORT( lport ) );
- err := WSock32.bind( socket, adr, SIZEOF( WSock32.sockaddrIn ) );
- IF err # 0 THEN
- res := NotConnected; state := Closed; WSock32.DispError;
- RETURN
- END
- END;
- adr.sinFamily := WSock32.PFINet; (* SYSTEM.MOVE( ADDRESSOF( fip ), ADDRESSOF( adr.sinAddr ), 4 ); *)
- adr.sinAddr := (fip.ipv4Adr); adr.sinPort := WSock32.htons( SHORT( fport ) );
- err := WSock32.connect( socket, adr, SIZEOF( WSock32.sockaddrIn ) );
- IF err # 0 THEN
- res := NotConnected; WSock32.DispError; state := Closed;
- err := WSock32.closesocket( socket );
- ELSE res := Ok; state := Established; SetPortAndIp;
- END
- ELSE
- IF Trace THEN KernelLog.Enter; KernelLog.String( "Passive open" ); Report( SELF ); KernelLog.Exit; END;
- (* passive open (listen) *)
- ASSERT ( (fport = NilPort) & IP.IsNilAdr(fip));
- SELF.int := NIL; SELF.lport := lport; SELF.fip := IP.NilAdr;
- SELF.fport := NilPort; adr.sinFamily := WSock32.PFINet;
- adr.sinAddr := 0; adr.sinPort := WSock32.htons( SHORT( lport ) );
- err := WSock32.bind( socket, adr, SIZEOF( WSock32.sockaddrIn ) );
- IF err = 0 THEN err := WSock32.listen( socket, WSock32.SOMaxConn ) END;
- IF err # 0 THEN
- res := NotConnected; state := Closed;
- WSock32.DispError;
- ELSE
- SetPortAndIp; res := Ok; state := Listen
- END
- END;
- IF Trace THEN
- KernelLog.Enter; KernelLog.String( "Open connection, result = " ); ReportResult( res ); Report( SELF ); KernelLog.Exit;
- END;
- IF state = Established THEN (* receiver.Start*) END;
- END Open;
- (** Send data on a TCP connection. *)
- PROCEDURE Send*( CONST data: ARRAY OF CHAR; ofs, len: LONGINT; propagate: BOOLEAN; VAR res: LONGINT );
- VAR err : LONGINT;
- BEGIN
- ASSERT((ofs >= 0) & (ofs + len <= LEN(data)));
- IF state = Closed THEN res := NotConnected; RETURN
- ELSIF state = Closing THEN
- ELSE
- ASSERT ((*state = Established) & *) (socket # WSock32.InvalidSocket) );
- END;
- res := Streams.Ok;
- err := WSock32.send( socket, data[ofs], len, {} );
- IF (err < 0) OR ((err = 0) & (len > 0)) THEN
- KernelLog.String( "TCP.Send :" ); WSock32.DispError; res := NotConnected;
- END;
- END Send;
- (** Receive data on a TCP connection. The data parameter specifies the buffer. The ofs parameters specify the position in the buffer where data should be received (usually 0), and the size parameters specifies how many bytes of data can be received in the buffer. The min parameter specifies the minimum number of bytes to receive before Receive returns and must by <= size. The len parameter returns the number of bytes received, and the res parameter returns 0 if ok, or a non-zero error code otherwise (e.g. if the connection is closed by the communication partner, or by a call of the Close method). *)
- PROCEDURE Receive*( VAR data: ARRAY OF CHAR; ofs, size, min: LONGINT; VAR len, res: LONGINT );
- VAR ret: LONGINT;
- BEGIN
- ASSERT ( (ofs >= 0) & (ofs + size <= LEN( data )) & (min <= size) );
- len := 0; res := Streams.Ok;
- BEGIN {EXCLUSIVE}
- IF state = Closed THEN res := NotConnected; RETURN
- ELSIF state=CloseWait THEN res := Streams.EOF; RETURN
- END;
- END;
- IF socket = WSock32.InvalidSocket THEN res := NotConnected; RETURN
- END;
- IF (size = 0) OR ((min = 0) & (Available() = 0)) THEN res := Streams.Ok; RETURN
- END;
- REPEAT
- ret := WSock32.recv( socket, data[ofs], size, {} );
- IF ret > 0 THEN INC( len, ret ); INC(ofs, ret); DEC(size, ret); END;
- UNTIL (size <= 0) OR (len >= min) OR (ret <= 0);
- IF ret < 0 THEN
- IF Trace THEN KernelLog.String( "TCP.Receiver.Receive" ); WSock32.DispError; END;
- BEGIN {EXCLUSIVE}
- res := WSock32.shutdown( socket, WSock32.SDboth );
- res := WSock32.closesocket( socket );
- state := Closed; res := Streams.EOF
- END;
- ELSIF ret = 0 THEN (* connection has been gracefully shut down by remote side, otherwise recv would block *)
- IF Trace THEN
- KernelLog.Enter; KernelLog.String( "TCP.Connection.Receive, graceful shutdown by remote side " ); Report( SELF ); KernelLog.Exit;
- END;
- BEGIN {EXCLUSIVE}
- IF state = Established THEN res := WSock32.shutdown( socket, WSock32.SDReceive ); (* may not receive any more *) state := CloseWait;
- ELSIF state IN {FinWait1, FinWait2, Closing} THEN state := Closed; res := WSock32.shutdown( socket, WSock32.SDboth );
- res := WSock32.closesocket( socket ); socket := WSock32.InvalidSocket; pool.Remove( SELF );
- END;
- res := Streams.EOF
- END;
- IF Trace THEN
- KernelLog.Enter; KernelLog.String( "Receive Result " ); ReportResult( res );
- Report( SELF ); KernelLog.Exit;
- END;
- END;
- END Receive;
- (** Return connection state. *)
- PROCEDURE State*( ): LONGINT;
- BEGIN {EXCLUSIVE}
- RETURN state
- END State;
- PROCEDURE HandleTimeout;
- BEGIN {EXCLUSIVE}
- INCL( flags, Timeout )
- END HandleTimeout;
- PROCEDURE AwaitState*( good, bad: SET; ms: LONGINT; VAR res: LONGINT );
- BEGIN {EXCLUSIVE}
- IF ~(state IN (good + bad)) THEN
- IF ms # -1 THEN
- IF timeout = NIL THEN NEW( timeout ) END;
- Objects.SetTimeout( timeout, SELF.HandleTimeout, ms )
- END;
- EXCL( flags, Timeout );
- AWAIT( (state IN (good + bad)) OR (Timeout IN flags) );
- IF ms # -1 THEN Objects.CancelTimeout( timeout ) END
- END;
- IF state IN good THEN res := Ok
- ELSIF state IN bad THEN res := NotConnected
- ELSE res := TimedOut
- END
- END AwaitState;
- (** Close a TCP connection (half-close). *)
- PROCEDURE Close*;
- VAR res: LONGINT; closetimer: Objects.Timer; (* fof 070102 *)
- BEGIN {EXCLUSIVE}
- IF state = Closed THEN RETURN END;
- (*IF receiver # NIL THEN receiver.Terminate END; *)
- IF Trace THEN
- KernelLog.Enter; KernelLog.String( "TCP.Connection.Close, " ); Report( SELF ); KernelLog.Exit;
- END;
- (* ASSERT ( ((state = Listen) OR (state = Established) OR (state=Closing) ) & (socket # WSock32.InvalidSocket) ); *)
- IF socket # WSock32.InvalidSocket THEN
- IF state = CloseWait THEN
- res := WSock32.shutdown( socket, WSock32.SDboth );
- state := Closed; res := WSock32.closesocket( socket );
- socket := WSock32.InvalidSocket; pool.Remove( SELF );
- ELSIF state = Established THEN
- res := WSock32.shutdown( socket, WSock32.SDSend ); (* may not send any more *)
- state := FinWait1;
- NEW(closetimer);
- Objects.SetTimeout(closetimer,SELF.Close,5000); (* 5 seconds time for receiving rest of data, i.e. by calling Available etc. *) (* fof 070102 *)
- ELSIF state IN {FinWait1, FinWait2, Closing, TimeWait} THEN
- res := WSock32.shutdown( socket, WSock32.SDboth );
- res := WSock32.closesocket( socket );
- socket := WSock32.InvalidSocket; pool.Remove( SELF );
- state := Closed;
- ELSIF state = Listen THEN
- res := WSock32.shutdown( socket, WSock32.SDboth );
- state := Closed; res := WSock32.closesocket( socket );
- socket := WSock32.InvalidSocket; pool.Remove( SELF );
- END;
- IF Trace THEN KernelLog.Enter; KernelLog.String( "Close done." ); Report( SELF ); KernelLog.Exit; END;
- (* half-close: use shutdown? *)
- END;
- (*state := Closed*) (* fof 070102 *)
- END Close;
- PROCEDURE SetPortAndIp;
- VAR sockname: WSock32.sockaddrIn; lensockname: LONGINT; res: LONGINT;
- BEGIN
- lensockname := SIZEOF( WSock32.sockaddrIn );
- res := WSock32.getsockname( socket, sockname, lensockname );
- IF res = Ok THEN
- (* lip := sockname.sinAddr; *)
- lport := WSock32.ntohs( sockname.sinPort ); (* lip := WSock32.ntohl( lip ); *)
- END;
- lensockname := SIZEOF( WSock32.sockaddrIn );
- res := WSock32.getpeername( socket, sockname, lensockname );
- IF res = Ok THEN
- fip.usedProtocol := IP.IPv4;
- fip.ipv4Adr := sockname.sinAddr; fport := WSock32.ntohs( sockname.sinPort );
- END;
- IF Trace THEN KernelLog.Enter; KernelLog.String( "SetPortAndIp " ); Report( SELF ); KernelLog.Exit; END;
- END SetPortAndIp;
- (** Accept a client waiting on a listening connection. Blocks until a client is available or the connection is closed. *)
- PROCEDURE Accept*( VAR client: Connection; VAR res: LONGINT );
- VAR s: WSock32.Socket; adr: WSock32.sockaddrIn; adrlen: LONGINT;
- str: ARRAY 64 OF CHAR;
- BEGIN
- IF Trace THEN KernelLog.Enter; KernelLog.String( "Accepting connections" ); Report( SELF ); KernelLog.Exit; END;
- ASSERT ( (state = Listen) & (socket # WSock32.InvalidSocket) );
- adr.sinFamily := WSock32.PFINet; adrlen := SIZEOF( WSock32.sockaddrIn );
- s := WSock32.accept( socket, adr, adrlen ); (* blocks ! *)
- BEGIN {EXCLUSIVE}
- IF s # WSock32.InvalidSocket THEN
- NEW( client ); client.lport := NilPort;
- IF (adrlen = SIZEOF( WSock32.sockaddrIn )) &
- (adr.sinFamily = WSock32.PFINet) THEN
- client.fip.usedProtocol := IP.IPv4;
- client.fip.ipv4Adr := adr.sinAddr; (* WSock32.ntohl( adr.sinAddr ); *)
- client.fport := WSock32.ntohs( adr.sinPort )
- ELSE client.fip := IP.NilAdr; client.fport := NilPort
- END;
- client.int := IP.InterfaceByDstIP( client.fip );
- pool.Add( client, client.Finalize ); client.socket := s;
- client.state := Established; res := Ok; client.SetPortAndIp;
- (*client.receiver.Start; *)
- IF Trace THEN
- KernelLog.Enter; KernelLog.String( "Accepted connection: client lport=" ); KernelLog.Int( client.lport, 1 );
- KernelLog.String( " ,fip=" ); IP.AdrToStr( client.fip, str );
- KernelLog.String( str ); KernelLog.String( " ,fport=" );
- KernelLog.Int( client.fport, 1 ); Report( SELF ); KernelLog.Exit;
- END;
- ELSE client := NIL; res := ConnectionRefused
- END;
- END;
- END Accept;
- PROCEDURE DelaySend*( enable: BOOLEAN );
- BEGIN {EXCLUSIVE}
- IF enable THEN EXCL( flags, NoDelay ); ELSE INCL( flags, NoDelay ); END;
- END DelaySend;
- PROCEDURE KeepAlive*( enable: BOOLEAN );
- BEGIN {EXCLUSIVE}
- IF enable THEN INCL( flags, DoKeepAlive ); ELSE EXCL( flags, DoKeepAlive ); END;
- END KeepAlive;
- PROCEDURE Discard*;
- BEGIN
- (* SetState( Closed ); *) Close; (* ConnectionFinalizer( SELF ); *)
- END Discard;
- PROCEDURE Requested*( ): BOOLEAN;
- BEGIN {EXCLUSIVE}
- RETURN FALSE;
- END Requested;
- PROCEDURE Available*( ): LONGINT;
- VAR ret, res: LONGINT; fdset: WSock32.FDSet;
- data: ARRAY 256 OF CHAR;
- BEGIN (* {EXCLUSIVE} *)
- (* IF Trace THEN KernelLog.String("available: "); KernelLog.Int(receiver.Available(),1); KernelLog.Ln END; *)
- ret := WSock32.ioctlsocket( socket, WSock32.FIONRead, res );
- IF ret # 0 THEN KernelLog.String( "TCP.Available " ); WSock32.DispError; END;
- IF res = 0 THEN (* check socket for shutdown *)
- fdset.fdcount := 1; fdset.socket[0] := socket;
- (* KernelLog.String("select..."); *)
- ret := WSock32.select( 0, fdset, NIL , NIL , selectTimeout );
- IF ret = 1 THEN (* nothing available but we can receive, try it: *)
- (* KernelLog.Enter; KernelLog.String( " Available: trying to receive " ); KernelLog.Exit; *)
- res := WSock32.recv( socket, data, 256, {1} );
- IF res = 0 THEN
- BEGIN {EXCLUSIVE}
- IF state = Established THEN state := CloseWait; res := WSock32.shutdown( socket, WSock32.SDReceive ); (* may not receive any more *)
- ELSIF state IN {FinWait1, FinWait2, Closing} THEN
- state := Closed; res := WSock32.shutdown( socket, WSock32.SDboth ); res := WSock32.closesocket( socket );
- socket := WSock32.InvalidSocket; pool.Remove( SELF );
- END;
- END;
- IF Trace THEN
- KernelLog.Enter; KernelLog.String( "TCP.Connection.Available: graceful shutdown by remote side." ); Report( SELF ); KernelLog.Exit;
- END;
- ELSIF res < 0 THEN
- IF Trace THEN KernelLog.String( "TCP.Receiver.Receive: " ); WSock32.DispError; END;
- res := 0;
- BEGIN {EXCLUSIVE}
- state := CloseWait;
- END;
- END;
- END;
- END;
- RETURN res;
- END Available;
- (* Finalize the Connection object *)
- PROCEDURE Finalize( ptr: ANY );
- VAR res: LONGINT;
- BEGIN {EXCLUSIVE}
- IF Trace THEN KernelLog.Enter; KernelLog.String( "TCP.Finalize " ); Report( SELF ); KernelLog.Exit; END;
- ASSERT ( ptr = SELF );
- IF socket # WSock32.InvalidSocket THEN
- res := WSock32.shutdown( socket, WSock32.SDboth );
- res := WSock32.closesocket( socket );
- (* ASSERT ( res = 0 ); *)
- socket := WSock32.InvalidSocket;
- (* pool.Remove(SELF) *) (* done outside !*)
- END;
- state := Unused
- END Finalize;
- END Connection;
- VAR
- pool*: Kernel.FinalizedCollection; (* pool of all Connections *)
- selectTimeout: WSock32.TimeVal;
- PROCEDURE Init;
- BEGIN
- NEW( pool ); NEW(selectTimeout); (* zero *)
- END Init;
- PROCEDURE Finalize( obj: ANY; VAR cont: BOOLEAN );
- BEGIN
- obj( Connection ).Finalize( obj ); cont := TRUE
- END Finalize;
- PROCEDURE Cleanup;
- BEGIN
- pool.Enumerate( Finalize )
- END Cleanup;
- PROCEDURE ReportState( state: LONGINT );
- BEGIN
- KernelLog.String( "State=" );
- CASE state OF
- Closed:
- KernelLog.String( "Closed" )
- | Listen:
- KernelLog.String( "Listen" );
- | SynSent:
- KernelLog.String( "SynSent" );
- | SynReceived:
- KernelLog.String( "SynReceived" );
- | Established:
- KernelLog.String( "Established" );
- | CloseWait:
- KernelLog.String( "CloseWait" );
- | FinWait1:
- KernelLog.String( "FinWait1" );
- | FinWait2:
- KernelLog.String( "FinWait2" );
- | TimeWait:
- KernelLog.String( "TimeWait" );
- | Unused:
- KernelLog.String( "Unused" );
- ELSE KernelLog.String( "????" );
- END;
- END ReportState;
- PROCEDURE Report( c: Connection );
- VAR str: ARRAY 64 OF CHAR;
- BEGIN
- KernelLog.String( " [lport=" ); KernelLog.Int( c.lport, 1 ); KernelLog.String( " ,fip=" );
- IP.AdrToStr( c.fip, str ); KernelLog.String( str ); KernelLog.String( " ,fport=" );
- KernelLog.Int( c.fport, 1 ); KernelLog.String( "," ); ReportState( c.state );
- KernelLog.String( "]" );
- END Report;
- PROCEDURE ReportResult( res: LONGINT );
- BEGIN
- IF res = Ok THEN KernelLog.String( "Ok" );
- ELSIF res = ConnectionRefused THEN KernelLog.String( "ConnectionRefused" )
- ELSIF res = NotConnected THEN KernelLog.String( "NotConnected" )
- ELSIF res = TimedOut THEN KernelLog.String( "TimedOut" );
- ELSIF res = Streams.EOF THEN KernelLog.String( "Streams.EOF" );
- ELSE KernelLog.String( "Unknown result code=" ); KernelLog.Int( res, 1 );
- END;
- END ReportResult;
- BEGIN
- Init; Modules.InstallTermHandler( Cleanup )
- END TCP.
- state diagram in this version of TCP very much simplified (rest done by Windows):
- either
- closed -> Listen -> Established -> CloseWait | FinWait1 -> Closed
- or
- closed -> Established -> CloseWait | FinWait1 -> Closed
|