首先需要有一个telegram账号,
可以使用虚拟手机号注册:textnow或者gv
准备环境:
目前是python37,
# 安装 Telethon 包
pip install Telethon
编写登录代码:
from telethon import TelegramClient, utils, errors
from telethon.sessions import StringSession
import asyncio
“”“
api_id,和hash,需要去
https://my.telegram.org/auth?to=apps
上面注册一个app,注册一次即可,不需要每一个账号都注册,
id和hash是表示是什么软件接入telegram服务器的,可以在官方提供的app中提取,毕竟源码是开源的,
”“”
api_id =""
api_hash=""
phone =""
password = "" # 二次验证的密码,如果没有,为空即可
"""
PROXY_TYPE_SOCKS4 = SOCKS4 = 1
PROXY_TYPE_SOCKS5 = SOCKS5 = 2
PROXY_TYPE_HTTP = HTTP = 3
"""
# 目前我使用的是socket5 的代理
p = (2, "127.0.0.1", 1080)
# 如果是多线程采集的情况下,需要指定事件循环,在单线程的情况,可以不要loop参数
loop = asyncio.new_event_loop()
“”“
登录 session,这里有两个选择,
一个是.session 文件,这种保存session的方式,不是很推荐,在后面机器部署的时候,需要同步每一个账号的session,文件,这种只需要指定一个文件的路径即可,例如:/home/xxx.session,
另外一个是将session,保存成一个字符串,这样可以在数据库中保存,例如,
需要导入session包
from telethon.sessions import StringSession
StringSession("xxx")
”“
session = StringSession("xxx")
client = TelegramClient(session, api_id, api_hash,
proxy=p, loop=loop,
auto_reconnect=True,
)
client.start(phone=phone, password=password)
print(client.get_me())
## 如何保存session 字符串数据了?
# 需要登录后的账号,然后执行下面的命令,便可以把session的token保存为字符串
print(StringSession.save(client.session))
# 使用监听的方式 采集消息
from telethon import TelegramClient, events
# 登录代码同上
client.add_event_handler(callback=msg_event_handler,
event=events.NewMessage(incoming=True))
async def msg_event_handler( event: events.NewMessage.Event):
message = event.message
# 判断是否是群组或者频道发送的消息
if event.is_channel or event.is_group:
## 获取消息的id,
if isinstance(message.to_id, types.PeerChannel):
to_id = message.to_id.channel_id
else:
to_id = message.to_id.chat_id
## 自定义处理
# 使用api的方式查询聊天记录
username = "群组|频道 的username或者id"
min_id = 0 # 开启的消息id
limit =1000 # 检索的消息数量
“”“
telegram使用的是群组|频道+ 消息id来标示唯一消息的,而且消息id是从1-无穷的
”“”
messages = client.iter_messages(username, min_id=min_id, reverse=True, limit=limit)
# 进行消息处理
async for message in messages:
print(message)
# 例子:
async def dow():
dialog = await client.get_entity("fordarknetspiderbot")
old = datetime.datetime.now()
zero_today = old - datetime.timedelta(days=0, hours=old.hour, minutes=old.minute, seconds=old.second,
microseconds=old.microsecond)
async for msg in client.iter_messages(dialog, min_id=2110, offset_date=zero_today, reverse=True):
# user = await client.get_entity(msg.from_id)
print(" 时间:{} id:{} 消息:{}, ".format(
(msg.date + datetime.timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S"), msg.id, msg.message))
now = datetime.datetime.now()
t = now - old
print(t)
client.loop.run_until_complete(dow())
总结:一般来说监听的方式比查询api的方式要快,而且要好,查询api会被限流,目前我是两个方式一起用的,因为有一些账号被封了的时候,新账号又不想去理会,可以新开一个线程去轮训服务器中的群组列表,我们只需要当前登录的账号中都没有添加这些群组的群组,这样子就不和监听的方式冲突了,
# 设置监听后,可以新开一个任务去轮训群组,
client.loop.create_task(task_Handler())
client.run_until_disconnected()
async def task_Handler():
pass