|
楼主 |
发表于 2010-7-3 18:57:47
|
显示全部楼层
function TServerSocket.CheckError(AErrorCode: Integer; AInfo: string): Boolean;
var
HandleError: Boolean;
begin
Result := True;
if AErrorCode = 0 then Exit;
if AErrorCode = -1 then
AErrorCode := WSAGetLastError;
if AErrorCode = -1 then
AErrorCode := GetLastError;
if (AErrorCode <> WSAEWOULDBLOCK) and (AErrorCode <> ERROR_IO_PENDING) and
(AErrorCode <> 0) then
begin
if Assigned(FonError) then
begin
HandleError := False;
FonError(AErrorCode, SysErrorMessage(AErrorCode), AInfo, HandleError);
if HandleError then Exit;
end;
raise ESocketError.CreateFmt(SWindowsSocketError,
[SysErrorMessage(AErrorCode), AErrorCode, AInfo]);
end;
end;
procedure TServerSocket.DoLog(ASocketEvent: TSocketEvent; AInfo: string);
begin
if Assigned(FOnLog) then FOnLog(ASocketEvent, AInfo);
end;
procedure TServerSocket.DoRead(ASymmetricalSocket: TSymmetricalSocket;
AData: Pointer; ACount: Integer);
begin
if Assigned(FOnRead) then
FOnRead(ASymmetricalSocket, AData, ACount);
end;
procedure TServerSocket.SetPort(AValue: Integer);
begin
if FActive then
raise ESocketError.Create('IOCP is acitve, cann''t change port');
FPort := AValue;
end;
procedure TServerSocket.RegisterClient(ASocket: TSymmetricalSocket);
begin
FLock.Enter;
try
if FClients.IndexOf(ASocket) = -1 then
begin
FClients.Add(ASocket);
DoAfterConnect(ASocket);
{* 注册关闭通知消息 *}
WSAAsyncSelect(ASocket.Socket, FHandle, WM_CLIENTSOCKET, FD_CLOSE);
end;
finally
FLock.Leave;
end;
end;
procedure TServerSocket.UnRegisterClient(ASocket: TSymmetricalSocket);
var
iIndex: Integer;
begin
FLock.Enter;
try
iIndex := FClients.IndexOf(ASocket);
if iIndex <> -1 then
begin
FClients.Delete(iIndex);
DoDisConnect(ASocket);
end;
finally
FLock.Leave;
end;
end;
procedure TServerSocket.AcceptClient;
var
Addr: TSockAddrIn;
iAddrLen: Integer;
ClientWinSocket: TSocket;
SymmSocket: TSymmetricalSocket;
begin
iAddrLen := SizeOf(TSockAddrIn);
ClientWinSocket := WinSock2.WSAAccept(Socket, nil, nil, nil, 0);
if ClientWinSocket <> INVALID_SOCKET then
begin
if (not Active) or (not DoConnect(ClientWinSocket)) then
begin
closesocket(ClientWinSocket);
Exit;
end;
try
DoLog(seAccept);
SymmSocket := TSymmetricalSocket.Create(Self, ClientWinSocket);
DoLog(seConnect);
except
closesocket(ClientWinSocket);
CheckError;
Exit;
end;
if CreateIoCompletionPort(ClientWinSocket, FIOCPHandle, DWORD(SymmSocket), 0) = 0 then
begin
CheckError(GetLastError, 'CreateIoCompletionPort');
SymmSocket.Free;
end
else
SymmSocket.PrepareRecv;
end;
end;
procedure TServerSocket.DoAfterConnect(ASymSocket: TSymmetricalSocket);
begin
if Assigned(FOnAfterConnect) then FOnAfterConnect(ASymSocket);
end;
function TServerSocket.DoConnect(ASocket: TSocket): Boolean;
var
SockAddrIn: TSockAddrIn;
Size: Integer;
begin
Result := True;
if Assigned(FOnBeforeConnect) then
begin
Size := SizeOf(TSockAddrIn);
CheckError(getpeername(ASocket, SockAddrIn, Size), 'getpeername');
FOnBeforeConnect(inet_ntoa(SockAddrIn.sin_addr), Result);
end;
end;
procedure TServerSocket.DoDisConnect(ASymSocket: TSymmetricalSocket);
begin
if Assigned(FOnAfterDisconnect) then FOnAfterDisconnect(ASymSocket);
end;
function TServerSocket.FindSymmClient(
ASocket: TSocket): TSymmetricalSocket;
var
i: Integer;
begin
Result := nil;
FLock.Enter;
try
for i := 0 to FClients.Count - 1 do
begin
Result := FClients[i];
if ASocket = Result.Socket then
Break
else
Result := nil;
end;
finally
FLock.Leave;
end;
end;
function TServerSocket.GetClient(const AIndex: Integer): TSymmetricalSocket;
begin
Result := FClients[AIndex];
end;
function TServerSocket.GetClientCount: Integer;
begin
Result := FClients.Count;
end;
procedure TServerSocket.WMClientClose(var AMsg: TCMSocketMessage);
var
ASymmSocket: TSymmetricalSocket;
begin
if AMsg.SelectEvent = FD_CLOSE then
begin
ASymmSocket := FindSymmClient(AMsg.Socket);
if Assigned(ASymmSocket) then
ASymmSocket.Free;
end;
end;
{ TSocketThread }
constructor TSocketThread.Create(AServer: TServerSocket);
begin
FServer := AServer;
inherited Create(False);
FreeOnTerminate := True;
end;
{ TAcceptThread }
procedure TAcceptThread.Execute;
begin
inherited;
while not Terminated and FServer.Active do
begin
FServer.AcceptClient;
end;
end;
{ TWorkThread }
procedure TWorkThread.Execute;
var
ASymSocket: TSymmetricalSocket;
AIOCPStruct: PIOCPStruct;
iWorkCount: Cardinal;
begin
inherited;
while (not Terminated) and (FServer.Active) do
begin
AIOCPStruct := nil;
iWorkCount := 0;
ASymSocket := nil;
if not GetQueuedCompletionStatus(FServer.FIOCPHandle, iWorkCount,
DWORD(ASymSocket), POVerlapped(AIOCPStruct), INFINITE) then
begin
if Assigned(ASymSocket) then
FreeAndNil(ASymSocket);
Continue;
end;
if Cardinal(AIOCPStruct) = SHUTDOWN_FLAG then Break; //退出标志
if not FServer.Active then Break; //退出
{* 客户可能超时 或是断开连接,I/O失败 应放在通知结束的后面 *}
if iWorkCount = 0 then
begin
//FreeAndNil(ASymSocket); //不在这儿释放,而是接收释放消息来释放
Continue;
end;
FServer.DoLog(AIOCPStruct.Event);
if ASymSocket.WorkBlock(AIOCPStruct, iWorkCount) = -1 then
begin
FreeAndNil(ASymSocket);
end;
end;
end;
{ TSymmetricalSocket }
constructor TSymmetricalSocket.Create(ASvrSocket: TServerSocket;
ASocket: TSocket);
begin
FServer := ASvrSocket;
FSocket := ASocket;
FAssignMemory := TList.Create;
FServer.RegisterClient(Self);
//PrepareRecv;
end;
destructor TSymmetricalSocket.Destroy;
var
i: Integer;
Linger: TLinger;
begin
FServer.UnRegisterClient(Self);
FillChar(Linger, SizeOf(TLinger), 0); //优雅关闭
setsockopt(FSocket, SOL_SOCKET, SO_LINGER, @Linger, Sizeof(Linger));
closesocket(FSocket);
for i := FAssignMemory.Count - 1 downto 0 do
FServer.MemoryManager.Release(FAssignMemory[i]);
FAssignMemory.Free;
inherited;
end;
function TSymmetricalSocket.Allocate: PIOCPStruct;
var
i: Integer;
begin
for i := 0 to FAssignMemory.Count - 1 do
begin
Result := FAssignMemory[i];
if not Result.Active then
begin
Result.Active := True;
Exit;
end;
end;
Result := FServer.MemoryManager.Allocate;
FAssignMemory.Add(Result);
Result.Active := True;
end;
function TSymmetricalSocket.PrepareRecv(AIOCPStruct: PIOCPStruct = nil): Boolean;
var
iFlags, iTransfer: Cardinal;
ErrCode: Integer;
begin
if not Assigned(AIOCPStruct) then
AIOCPStruct := Allocate;
iFlags := 0;
AIOCPStruct.Event := seRead;
FillChar(AIOCPStruct.Buffer, SizeOf(AIOCPStruct.Buffer), 0);
FillChar(AIOCPStruct.Overlapped, SizeOf(AIOCPStruct.Overlapped), 0);
AIOCPStruct.wsaBuffer.buf := @AIOCPStruct.Buffer;
AIOCPStruct.wsaBuffer.len := MAX_BUFSIZE;
Result := WSARecv(FSocket, @AIOCPStruct.wsaBuffer, 1, @iTransfer, @iFlags, @AIOCPStruct.Overlapped, nil) <> SOCKET_ERROR;
if not Result then
begin
ErrCode := WSAGetLastError;
Result := ErrCode = ERROR_IO_PENDING;
if not Result then
FServer.CheckError(ErrCode, 'WSARecv');
end;
end;
function TSymmetricalSocket.WorkBlock(AIOCPStruct: PIOCPStruct;
ACount: DWORD): Integer;
var
ErrCode: Integer;
iSend, iFlag: Cardinal;
begin
Result := 0;
try
case AIOCPStruct.Event of
seRead: //接收数据
begin
FServer.DoRead(Self, @AIOCPStruct.Buffer[0], ACount);
if PrepareRecv(AIOCPStruct) then
Result := ACount;
end;
seWrite: //发送数据
begin
Dec(AIOCPStruct.wsaBuffer.len, ACount);
if AIOCPStruct.wsaBuffer.len <= 0 then
begin
AIOCPStruct.Active := False;
end
else
begin
FillChar(AIOCPStruct.Overlapped, SizeOf(AIOCPStruct.Overlapped), 0);
iFlag := 0;
if SOCKET_ERROR = WSASend(FSocket, @AIOCPStruct.wsaBuffer, 1, @iSend,
iFlag, @AIOCPStruct.Overlapped, nil) then
begin
ErrCode := WSAGetLastError;
if ErrCode <> ERROR_IO_PENDING then
FServer.CheckError(ErrCode, 'WSASend');
end
else Result := iSend;
end;
end;
end;
except
Result := 0;
end;
end;
function TSymmetricalSocket.Write(var ABuf; ACount: Integer): Integer;
var
AIOCPStruct: PIOCPStruct;
ErrCode: Integer;
iFlag, iSend: Cardinal;
begin
Result := ACount;
if Result = 0 then Exit;
AIOCPStruct := Allocate;
iFlag := 0;
AIOCPStruct.Event := seWrite;
FillChar(AIOCPStruct.Buffer[0], SizeOf(AIOCPStruct.Buffer), 0);
CopyMemory(@AIOCPStruct.Buffer[0], @ABuf, ACount);
AIOCPStruct.wsaBuffer.buf := @AIOCPStruct.Buffer[0];
AIOCPStruct.wsaBuffer.len := Result;
if SOCKET_ERROR = WSASend(FSocket, @AIOCPStruct.wsaBuffer, 1, @iSend, iFlag,
@AIOCPStruct.Overlapped, nil) then
begin
ErrCode := WSAGetLastError;
if ErrCode <> ERROR_IO_PENDING then
begin
Result := SOCKET_ERROR;
FServer.CheckError(ErrCode, 'WSASend');
end;
end;
end;
function TSymmetricalSocket.WriteString(const AValue: string): Integer;
begin
Result := Write(Pointer(AValue)^, Length(AValue));
end;
function TSymmetricalSocket.GetRemoteIP: string;
var
SockAddrIn: TSockAddrIn;
iSize: Integer;
HostEnt: PHostEnt;
begin
if FRemoteAddress = '' then
begin
iSize := SizeOf(SockAddrIn);
FServer.CheckError(getpeername(FSocket, SockAddrIn, iSize), 'getpeername');
FRemoteAddress := inet_ntoa(SockAddrIn.sin_addr);
end;
Result := FRemoteAddress;
end;
function TSymmetricalSocket.GetRemotePort: Integer;
var
SockAddrIn: TSockAddrIn;
iSize: Integer;
HostEnt: PHostEnt;
begin
if FRemoteAddress = '' then
begin
iSize := SizeOf(SockAddrIn);
FServer.CheckError(getpeername(FSocket, SockAddrIn, iSize), 'getpeername');
FRemotePort := ntohs(SockAddrIn.sin_port);
end;
Result := FRemotePort;
end;
function TSymmetricalSocket.GetRemoteHost: string;
var
SockAddrIn: TSockAddrIn;
iSize: Integer;
HostEnt: PHostEnt;
begin
if FRemoteAddress = '' then
begin
iSize := SizeOf(SockAddrIn);
FServer.CheckError(getpeername(FSocket, SockAddrIn, iSize), 'getpeername');
HostEnt := gethostbyaddr(@SockAddrIn.sin_addr.S_addr, 4, PF_INET);
if HostEnt <> nil then FRemoteHost := HostEnt.h_name;
end;
Result := FRemoteHost
end;
end. |
|