{ fpAsync: Asynchronous event management for Free Pascal Copyright (C) 2001-2005 by Areca Systems GmbH / Sebastian Guenther, sg@freepascal.org See the file COPYING.FPC, included in this distribution, for details about the copyright. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. } unit fpAsync; {$MODE objfpc} {$H+} interface uses SysUtils, Classes, libasync; type TNotifyEvent = procedure(Sender: TObject) of object; EAsyncError = class(Exception) private FErrorCode: TAsyncResult; public constructor Create(AErrorCode: TAsyncResult); property ErrorCode: TAsyncResult read FErrorCode; end; TEventLoop = class private FData: TAsyncData; FFirstNotifyData: Pointer; function GetIsRunning: Boolean; procedure SetIsRunning(AIsRunning: Boolean); protected procedure CheckResult(AResultCode: TAsyncResult); public constructor Create; destructor Destroy; override; function Handle: TAsyncHandle; // Main loop control procedure Run; procedure Break; // Timer support function AddTimerCallback(AMSec: LongInt; APeriodic: Boolean; ACallback: TAsyncCallback; AUserData: Pointer): TAsyncTimer; procedure RemoveTimerCallback(ATimer: TAsyncTimer); function AddTimerNotify(AMSec: LongInt; APeriodic: Boolean; ANotify: TNotifyEvent; ASender: TObject): Pointer; procedure RemoveTimerNotify(AHandle: Pointer); // I/O notification support (for files, sockets etc.) procedure SetIOCallback(AHandle: Integer; ACallback: TAsyncCallback; AUserData: Pointer); procedure ClearIOCallback(AHandle: Integer); function SetIONotify(AHandle: Integer; ANotify: TNotifyEvent; ASender: TObject): Pointer; procedure ClearIONotify(AHandle: Pointer); procedure SetDataAvailableCallback(AHandle: Integer; ACallback: TAsyncCallback; AUserData: Pointer); procedure ClearDataAvailableCallback(AHandle: Integer); function SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent; ASender: TObject): Pointer; procedure ClearDataAvailableNotify(AHandle: Pointer); procedure SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback; AUserData: Pointer); procedure ClearCanWriteCallback(AHandle: Integer); function SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent; ASender: TObject): Pointer; procedure ClearCanWriteNotify(AHandle: Pointer); class function TimerTicks: Int64; // Properties property IsRunning: Boolean read GetIsRunning write SetIsRunning; end; // ------------------------------------------------------------------- // Asynchronous line reader // ------------------------------------------------------------------- TLineNotify = procedure(const ALine: String) of object; TGenericLineReader = class protected RealBuffer, FBuffer: PChar; FBytesInBuffer: Integer; FOnLine: TLineNotify; InCallback, DoStopAndFree: Boolean; function Read(var ABuffer; count: Integer): Integer; virtual; abstract; procedure NoData; virtual; abstract; public destructor Destroy; override; procedure Run; // Process as many lines as possible property Buffer: PChar read FBuffer; property BytesInBuffer: Integer read FBytesInBuffer; property OnLine: TLineNotify read FOnLine write FOnLine; end; TAsyncStreamLineReader = class(TGenericLineReader) protected FEventLoop: TEventLoop; FDataStream: TStream; FBlockingStream: THandleStream; FOnEOF: TNotifyEvent; NotifyHandle: Pointer; function Read(var ABuffer; count: Integer): Integer; override; procedure NoData; override; procedure StreamDataAvailable(UserData: TObject); public constructor Create(AEventLoop: TEventLoop; AStream: THandleStream); constructor Create(AEventLoop: TEventLoop; ADataStream: TStream; ABlockingStream: THandleStream); destructor Destroy; override; procedure StopAndFree; // Destroy instance after run property EventLoop: TEventLoop read FEventLoop; property DataStream: TStream read FDataStream; property BlockingStream: THandleStream read FBlockingStream; property OnEOF: TNotifyEvent read FOnEOF write FOnEOF; end; // ------------------------------------------------------------------- // Asynchronous write buffers // ------------------------------------------------------------------- TWriteBuffer = class(TStream) protected FBuffer: PChar; FBytesInBuffer: Integer; FBufferSent: Boolean; FOnBufferEmpty: TNotifyEvent; FOnBufferSent: TNotifyEvent; InCallback: Boolean; function Seek(Offset: LongInt; Origin: Word): LongInt; override; function Write(const ABuffer; Count: LongInt): LongInt; override; function DoRealWrite(const ABuffer; Count: Integer): Integer; virtual; abstract; procedure WritingFailed; virtual; abstract; procedure WantWrite; virtual; abstract; procedure BufferEmpty; virtual; public EndOfLineMarker: String; constructor Create; destructor Destroy; override; procedure WriteLine(const line: String); procedure Run; // Write as many data as possible property BytesInBuffer: Integer read FBytesInBuffer; property BufferSent: Boolean read FBufferSent; property OnBufferEmpty: TNotifyEvent read FOnBufferEmpty write FOnBufferEmpty; property OnBufferSent: TNotifyEvent read FOnBufferSent write FOnBufferSent; end; TAsyncWriteStream = class(TWriteBuffer) protected FEventLoop: TEventLoop; FDataStream: TStream; FBlockingStream: THandleStream; NotifyHandle: Pointer; DoStopAndFree: Boolean; function DoRealWrite(const ABuffer; Count: Integer): Integer; override; procedure WritingFailed; override; procedure WantWrite; override; procedure CanWrite(UserData: TObject); public constructor Create(AEventLoop: TEventLoop; AStream: THandleStream); constructor Create(AEventLoop: TEventLoop; ADataStream: TStream; ABlockingStream: THandleStream); destructor Destroy; override; procedure StopAndFree; // Destroy instance after run property EventLoop: TEventLoop read FEventLoop; property DataStream: TStream read FDataStream; property BlockingStream: THandleStream read FBlockingStream; end; var { All data written to a TWriteBuffer or descendant class will be written to this stream as well: } fpAsyncWriteBufferDebugStream: TStream; implementation type PNotifyData = ^TNotifyData; TNotifyData = record Next: PNotifyData; Notify: TNotifyEvent; Sender: TObject; case Boolean of False: (TimerHandle: TAsyncTimer); True: (FileHandle: LongInt); end; procedure EventHandler(Data: Pointer); cdecl; begin with PNotifyData(Data)^ do Notify(Sender); end; function AddNotifyData(Obj: TEventLoop): PNotifyData; begin New(Result); Result^.Next := PNotifyData(Obj.FFirstNotifyData); Obj.FFirstNotifyData := Result; end; procedure FreeNotifyData(Obj: TEventLoop; Data: PNotifyData); var CurData, PrevData, NextData: PNotifyData; begin PrevData := nil; CurData := Obj.FFirstNotifyData; while Assigned(CurData) do begin NextData := CurData^.Next; if CurData = Data then if Assigned(PrevData) then PrevData^.Next := NextData else Obj.FFirstNotifyData := NextData; PrevData := CurData; CurData := NextData; end; Dispose(Data); end; constructor EAsyncError.Create(AErrorCode: TAsyncResult); begin inherited Create(Format('Async I/O error %d', [Ord(AErrorCode)])); FErrorCode := AErrorCode; end; constructor TEventLoop.Create; begin asyncInit(Handle); end; destructor TEventLoop.Destroy; var NotifyData, NextNotifyData: PNotifyData; begin asyncFree(Handle); NotifyData := FFirstNotifyData; while Assigned(NotifyData) do begin NextNotifyData := NotifyData^.Next; Dispose(NotifyData); NotifyData := NextNotifyData; end; end; function TEventLoop.Handle: TAsyncHandle; begin Result := TAsyncHandle(Self); end; procedure TEventLoop.Run; begin asyncRun(Handle); end; procedure TEventLoop.Break; begin asyncBreak(Handle); end; function TEventLoop.AddTimerCallback(AMSec: LongInt; APeriodic: Boolean; ACallback: TAsyncCallback; AUserData: Pointer): TAsyncTimer; begin Result := asyncAddTimer(Handle, AMSec, APeriodic, ACallback, AUserData); end; procedure TEventLoop.RemoveTimerCallback(ATimer: TAsyncTimer); begin asyncRemoveTimer(Handle, ATimer); end; function TEventLoop.AddTimerNotify(AMSec: LongInt; APeriodic: Boolean; ANotify: TNotifyEvent; ASender: TObject): Pointer; var UserData: PNotifyData; begin UserData := AddNotifyData(Self); UserData^.Notify := ANotify; UserData^.Sender := ASender; UserData^.TimerHandle := asyncAddTimer(Handle, AMSec, APeriodic, @EventHandler, UserData); Result := UserData; end; procedure TEventLoop.RemoveTimerNotify(AHandle: Pointer); var Data: PNotifyData; begin Data := PNotifyData(AHandle); asyncRemoveTimer(Handle, Data^.TimerHandle); FreeNotifyData(Self, Data); end; procedure TEventLoop.SetIOCallback(AHandle: Integer; ACallback: TAsyncCallback; AUserData: Pointer); begin CheckResult(asyncSetIOCallback(Handle, AHandle, ACallback, AUserData)); end; procedure TEventLoop.ClearIOCallback(AHandle: Integer); begin asyncClearIOCallback(Handle, AHandle); end; function TEventLoop.SetIONotify(AHandle: Integer; ANotify: TNotifyEvent; ASender: TObject): Pointer; var UserData: PNotifyData; ResultCode: TAsyncResult; begin UserData := AddNotifyData(Self); UserData^.Notify := ANotify; UserData^.Sender := ASender; UserData^.FileHandle := AHandle; ResultCode := asyncSetIOCallback(Handle, AHandle, @EventHandler, UserData); if ResultCode <> asyncOK then begin FreeNotifyData(Self, UserData); raise EAsyncError.Create(ResultCode); end else Result := UserData; {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetIONotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF} end; procedure TEventLoop.ClearIONotify(AHandle: Pointer); var Data: PNotifyData; begin Data := PNotifyData(AHandle); {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearIONotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF} asyncClearIOCallback(Handle, Data^.FileHandle); FreeNotifyData(Self, Data); end; procedure TEventLoop.SetDataAvailableCallback(AHandle: Integer; ACallback: TAsyncCallback; AUserData: Pointer); begin CheckResult(asyncSetDataAvailableCallback(Handle, AHandle, ACallback, AUserData)); end; procedure TEventLoop.ClearDataAvailableCallback(AHandle: Integer); begin asyncClearDataAvailableCallback(Handle, AHandle); end; function TEventLoop.SetDataAvailableNotify(AHandle: Integer; ANotify: TNotifyEvent; ASender: TObject): Pointer; var UserData: PNotifyData; ResultCode: TAsyncResult; begin UserData := AddNotifyData(Self); UserData^.Notify := ANotify; UserData^.Sender := ASender; UserData^.FileHandle := AHandle; ResultCode := asyncSetDataAvailableCallback(Handle, AHandle, @EventHandler, UserData); if ResultCode <> asyncOK then begin FreeNotifyData(Self, UserData); raise EAsyncError.Create(ResultCode); end else Result := UserData; {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetDataAvailableNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF} end; procedure TEventLoop.ClearDataAvailableNotify(AHandle: Pointer); var Data: PNotifyData; begin Data := PNotifyData(AHandle); {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearDataAvailableNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF} asyncClearDataAvailableCallback(Handle, Data^.FileHandle); FreeNotifyData(Self, Data); end; procedure TEventLoop.SetCanWriteCallback(AHandle: Integer; ACallback: TAsyncCallback; AUserData: Pointer); begin CheckResult(asyncSetCanWriteCallback(Handle, AHandle, ACallback, AUserData)); end; procedure TEventLoop.ClearCanWriteCallback(AHandle: Integer); begin asyncClearCanWriteCallback(Handle, AHandle); end; function TEventLoop.SetCanWriteNotify(AHandle: Integer; ANotify: TNotifyEvent; ASender: TObject): Pointer; var UserData: PNotifyData; ResultCode: TAsyncResult; begin UserData := AddNotifyData(Self); UserData^.Notify := ANotify; UserData^.Sender := ASender; UserData^.FileHandle := AHandle; ResultCode := asyncSetCanWriteCallback(Handle, AHandle, @EventHandler, UserData); if ResultCode <> asyncOK then begin FreeNotifyData(Self, UserData); raise EAsyncError.Create(ResultCode); end else Result := UserData; {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.SetCanWriteNotify: Filehandle=', AHandle, ', Result=', Integer(Result));{$ENDIF} end; procedure TEventLoop.ClearCanWriteNotify(AHandle: Pointer); var Data: PNotifyData; begin Data := PNotifyData(AHandle); {$IFDEF fpAsyncDebug}WriteLn('TEventLoop.ClearCanWriteNotify: Filehandle=', Data^.FileHandle, ', Data=', Integer(AHandle));{$ENDIF} asyncClearCanWriteCallback(Handle, Data^.FileHandle); FreeNotifyData(Self, Data); end; class function TEventLoop.TimerTicks: Int64; begin Result := asyncGetTicks; end; procedure TEventLoop.CheckResult(AResultCode: TAsyncResult); begin if AResultCode <> asyncOK then raise EAsyncError.Create(AResultCode); end; function TEventLoop.GetIsRunning: Boolean; begin Result := asyncIsRunning(Handle); end; procedure TEventLoop.SetIsRunning(AIsRunning: Boolean); begin if IsRunning then begin if not AIsRunning then Run; end else if AIsRunning then Break; end; // ------------------------------------------------------------------- // TGenericLineReader // ------------------------------------------------------------------- destructor TGenericLineReader.Destroy; begin if Assigned(RealBuffer) then begin FreeMem(RealBuffer); RealBuffer := nil; end; inherited Destroy; end; procedure TGenericLineReader.Run; var NewData: array[0..1023] of Byte; p: PChar; BytesRead, OldBufSize, CurBytesInBuffer, LastEndOfLine, i, LineLength: Integer; line: String; FirstRun: Boolean; begin FirstRun := True; while True do begin BytesRead := Read(NewData, SizeOf(NewData)); //WriteLn('Linereader: ', BytesRead, ' bytes read'); if BytesRead <= 0 then begin if FirstRun then NoData; break; end; FirstRun := False; OldBufSize := FBytesInBuffer; // Append the new received data to the read buffer Inc(FBytesInBuffer, BytesRead); ReallocMem(RealBuffer, FBytesInBuffer); Move(NewData, RealBuffer[OldBufSize], BytesRead); {Process all potential lines in the current buffer. Attention: FBuffer and FBytesInBuffer MUST be updated for each line, as they can be accessed from within the FOnLine handler!} LastEndOfLine := 0; if OldBufSize > 0 then i := OldBufSize - 1 else i := 0; CurBytesInBuffer := FBytesInBuffer; while i <= CurBytesInBuffer - 2 do begin if (RealBuffer[i] = #13) or (RealBuffer[i] = #10) then begin LineLength := i - LastEndOfLine; SetLength(line, LineLength); if LineLength > 0 then Move(RealBuffer[LastEndOfLine], line[1], LineLength); if ((RealBuffer[i] = #13) and (RealBuffer[i + 1] = #10)) or ((RealBuffer[i] = #10) and (RealBuffer[i + 1] = #13)) then Inc(i); LastEndOfLine := i + 1; if Assigned(FOnLine) then begin FBuffer := RealBuffer + LastEndOfLine; FBytesInBuffer := CurBytesInBuffer - LastEndOfLine; InCallback := True; try FOnLine(line); finally InCallback := False; end; // Check if has been destroyed by FOnLine: if DoStopAndFree then exit; end; end; Inc(i); end; FBytesInBuffer := CurBytesInBuffer; if LastEndOfLine > 0 then begin // Remove all processed lines from the buffer Dec(FBytesInBuffer, LastEndOfLine); GetMem(p, FBytesInBuffer); Move(RealBuffer[LastEndOfLine], p^, FBytesInBuffer); if Assigned(RealBuffer) then FreeMem(RealBuffer); RealBuffer := p; end; FBuffer := RealBuffer; end; end; // ------------------------------------------------------------------- // TAsyncStreamLineReader // ------------------------------------------------------------------- constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop; AStream: THandleStream); begin Self.Create(AEventLoop, AStream, AStream); end; constructor TAsyncStreamLineReader.Create(AEventLoop: TEventLoop; ADataStream: TStream; ABlockingStream: THandleStream); begin ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream)); inherited Create; FEventLoop := AEventLoop; FDataStream := ADataStream; FBlockingStream := ABlockingStream; NotifyHandle := EventLoop.SetDataAvailableNotify( FBlockingStream.Handle, @StreamDataAvailable, nil); end; destructor TAsyncStreamLineReader.Destroy; begin inherited Destroy; end; procedure TAsyncStreamLineReader.StopAndFree; begin if InCallback then begin if Assigned(NotifyHandle) then begin EventLoop.ClearDataAvailableNotify(NotifyHandle); NotifyHandle := nil; end; DoStopAndFree := True; end else Self.Free; end; function TAsyncStreamLineReader.Read(var ABuffer; count: Integer): Integer; begin Result := FDataStream.Read(ABuffer, count); end; procedure TAsyncStreamLineReader.NoData; var s: String; begin if (FDataStream = FBlockingStream) or (FDataStream.Position = FDataStream.Size) then begin if (FBytesInBuffer > 0) and Assigned(FOnLine) then begin if FBuffer[FBytesInBuffer - 1] in [#13, #10] then Dec(FBytesInBuffer); SetLength(s, FBytesInBuffer); Move(FBuffer^, s[1], FBytesInBuffer); FOnLine(s); end; EventLoop.ClearDataAvailableNotify(NotifyHandle); NotifyHandle := nil; if Assigned(FOnEOF) then begin InCallback := True; try FOnEOF(Self); finally InCallback := False; end; end; end; end; procedure TAsyncStreamLineReader.StreamDataAvailable(UserData: TObject); begin Run; if DoStopAndFree then Free; end; // ------------------------------------------------------------------- // TWriteBuffer // ------------------------------------------------------------------- procedure TWriteBuffer.BufferEmpty; begin if Assigned(FOnBufferEmpty) then begin InCallback := True; FOnBufferEmpty(Self); InCallback := False; end; end; constructor TWriteBuffer.Create; begin inherited Create; FBuffer := nil; FBytesInBuffer := 0; EndOfLineMarker := #10; end; destructor TWriteBuffer.Destroy; begin if Assigned(FBuffer) then FreeMem(FBuffer); inherited Destroy; end; function TWriteBuffer.Seek(Offset: LongInt; Origin: Word): LongInt; begin if ((Offset = 0) and ((Origin = soFromCurrent) or (Origin = soFromEnd))) or ((Offset = FBytesInBuffer) and (Origin = soFromBeginning)) then Result := FBytesInBuffer else // !!!: No i18n for this string - solve this problem in the FCL?!? raise EStreamError.Create('Invalid stream operation'); end; function TWriteBuffer.Write(const ABuffer; Count: LongInt): LongInt; begin if Count > 0 then begin FBufferSent := False; ReallocMem(FBuffer, FBytesInBuffer + Count); Move(ABuffer, FBuffer[FBytesInBuffer], Count); Inc(FBytesInBuffer, Count); if Assigned(fpAsyncWriteBufferDebugStream) then fpAsyncWriteBufferDebugStream.Write(ABuffer, Count); WantWrite; end; Result := Count; end; procedure TWriteBuffer.WriteLine(const line: String); var s: String; begin s := line + EndOfLineMarker; WriteBuffer(s[1], Length(s)); end; procedure TWriteBuffer.Run; var Written: Integer; NewBuf: PChar; Failed: Boolean; begin Failed := True; repeat if FBytesInBuffer = 0 then begin BufferEmpty; if FBufferSent then exit; WantWrite; exit; end; Written := DoRealWrite(FBuffer[0], FBytesInBuffer); if Written > 0 then begin Failed := False; Dec(FBytesInBuffer, Written); GetMem(NewBuf, FBytesInBuffer); Move(FBuffer[Written], NewBuf[0], FBytesInBuffer); FreeMem(FBuffer); FBuffer := NewBuf; end; until Written <= 0; if Failed then WritingFailed; end; // ------------------------------------------------------------------- // TAsyncWriteStream // ------------------------------------------------------------------- function TAsyncWriteStream.DoRealWrite(const ABuffer; Count: Integer): Integer; begin Result := FDataStream.Write(ABuffer, count); end; procedure TAsyncWriteStream.WritingFailed; begin if (FDataStream <> FBlockingStream) and Assigned(NotifyHandle) then begin EventLoop.ClearCanWriteNotify(NotifyHandle); NotifyHandle := nil; end; end; procedure TAsyncWriteStream.WantWrite; begin if not Assigned(NotifyHandle) then NotifyHandle := EventLoop.SetCanWriteNotify(FBlockingStream.Handle, @CanWrite, nil); end; procedure TAsyncWriteStream.CanWrite(UserData: TObject); begin if FBytesInBuffer = 0 then begin if Assigned(NotifyHandle) then begin EventLoop.ClearCanWriteNotify(NotifyHandle); NotifyHandle := nil; end; FBufferSent := True; if Assigned(FOnBufferSent) then begin InCallback := True; try FOnBufferSent(Self); finally InCallback := False; end; end; end else Run; if DoStopAndFree then Free; end; constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop; AStream: THandleStream); begin Self.Create(AEventLoop, AStream, AStream); end; constructor TAsyncWriteStream.Create(AEventLoop: TEventLoop; ADataStream: TStream; ABlockingStream: THandleStream); begin ASSERT(Assigned(ADataStream) and Assigned(ABlockingStream)); inherited Create; FEventLoop := AEventLoop; FDataStream := ADataStream; FBlockingStream := ABlockingStream; end; destructor TAsyncWriteStream.Destroy; begin if Assigned(NotifyHandle) then EventLoop.ClearCanWriteNotify(NotifyHandle); inherited Destroy; end; procedure TAsyncWriteStream.StopAndFree; begin if InCallback then begin if Assigned(NotifyHandle) then begin EventLoop.ClearCanWriteNotify(NotifyHandle); NotifyHandle := nil; end; DoStopAndFree := True; end else Self.Free; end; end.