1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
| from scapy.all import * import time
conf.ipv6_enabled = False conf.L3socket = L3RawSocket
def complete_tcp_handshake(target_ip, target_port): """ 使用Scapy模拟完整的TCP三次握手过程 """ source_port = RandShort()._fix()
seq_num = random.randint(1000, 9000) print(f"[+] 开始TCP连接到 {target_ip}:{target_port} seq {seq_num}") print(f"[+] 使用源端口: {source_port}") print(f"[+] 步骤1: 发送SYN包...") syn_packet = IP(dst=target_ip)/TCP(sport=source_port, dport=target_port, flags="S", seq=seq_num) print(f"[+] 发送包详情: {syn_packet.summary()}") syn_ack_response = sr1(syn_packet, timeout=5, verbose=1) if syn_ack_response is None: print("[-] 未收到响应,目标可能关闭或被过滤") try: import socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(2) result = s.connect_ex((target_ip, target_port)) if result == 0: print(f"[+] 使用普通套接字连接到 {target_ip}:{target_port} 成功,但Scapy无法获取响应") else: print(f"[-] 使用普通套接字连接到 {target_ip}:{target_port} 失败,错误码: {result}") s.close() except Exception as e: print(f"[-] 套接字测试异常: {e}") return False print(f"[+] 收到响应: {syn_ack_response.summary()}") if not syn_ack_response.haslayer(TCP) or syn_ack_response[TCP].flags != 0x12: print("[-] 没有收到预期的SYN-ACK响应") print(f"[-] 收到的标志: {syn_ack_response[TCP].flags}") return False print(f"[+] 步骤2: 收到SYN-ACK响应,服务器序列号={syn_ack_response[TCP].seq}, 确认号={syn_ack_response[TCP].ack}") server_seq = syn_ack_response[TCP].seq ack_packet = IP(dst=target_ip)/TCP( sport=source_port, dport=target_port, flags="A", seq=syn_ack_response[TCP].ack, ack=server_seq + 1 ) print(f"[+] 步骤3: 发送ACK包...") send(ack_packet, verbose=1) print(f"[+] TCP三次握手完成!")
print(f"[+] TCP连接已建立!")
print(f"[+] 发送一些数据...") data_packet = IP(dst=target_ip)/TCP( sport=source_port, dport=target_port, flags="PA", seq=syn_ack_response[TCP].ack, ack=server_seq + 1 )/Raw(load="Hello from Scapy!") data_response = sr1(data_packet, timeout=2, verbose=0) if data_response and data_response.haslayer(TCP): print(f"[+] 收到数据响应!") print(f"[+] 发送FIN包开始关闭连接...") fin_packet = IP(dst=target_ip)/TCP( sport=source_port, dport=target_port, flags="FA", seq=syn_ack_response[TCP].ack + len("Hello from Scapy!"), ack=server_seq + 1 ) fin_response = sr1(fin_packet, timeout=2, verbose=0) if fin_response and fin_response.haslayer(TCP) and fin_response[TCP].flags & 0x11: last_ack = IP(dst=target_ip)/TCP( sport=source_port, dport=target_port, flags="A", seq=fin_response[TCP].ack, ack=fin_response[TCP].seq + 1 ) send(last_ack, verbose=0) print(f"[+] TCP连接已正常关闭") return True
if __name__ == "__main__": target_ip = "192.168.1.75" target_port = 8000 complete_tcp_handshake(target_ip, target_port)
|