前言

想用自己的设备看电视?有多个设备要看电视?要用一根网线即上网又看电视?
本文提供 udpxy / igmpproxy / vlan单线复用 三种方案满足你的全部需求。

不使用运营商盒子

方案一 - udpxy 组播转单拨 (推荐)

前置条件: Openwrt 路由器一台
优点: 不限设备,随时随地观看直播
缺点: 部分IPTV服务缺失(点播,回放等)

Step1: 将 IPTV 接入主路由

  • 如果你有光猫超级管理员权限,可以登陆光猫设置端口vlan绑定
    1. 登陆光猫,查看互联网IPTV业务的 VLAN ID
      iptv_1
      从上图可以看出,互联网业务是772,IPTV 是20。

    2. 设置端口 VLAN 绑定
      iptv_2
      这里将772绑定为10,20绑定为11,具体绑定的ID任意

  • 如果没有,则需要另取一根网线将光猫的iptv端口连接到路由器上

Step2: 配置主路由

警告:
请务必备份你的路由器设置!
此步骤操作不当可能导致与路由器失联,需重置路由器。

将所有WAN接口绑定的设备修改为br-lan.【Internet 业务绑定ID】,这里均设置为br-lan.10注意此时不要应用设置,仅保存即可

image

设置LAN接口的设备为br-lan.1(当然也可以设置其他的数字,不要一样就行),同样此时不要应用设置,仅保存即可

image-1707288698515

打开接口 - 设备 - br-lan - 配置,按照下图进行设置

image-1707289478688

将IPTV接口连接到盒子上用于一会抓包(这里是eth3

保存并应用全部配置,稍等片刻。此时 IPTV 盒子应该可以正常使用,且 LAN 正常访问互联网。

Step3: 抓包

安装好wireshark,电脑运行如下命令,开始抓包(地址和网口根据你的情况修改)

  • Linux
    ssh root@192.168.100.1 tcpdump -i eth3 -U -s0 -w - | wireshark -k -i -
    
  • Windows
    ssh root@192.168.100.1 'tcpdump -i eth3 -U -s0 -w -' | /path/to/Wireshark.exe -k -i -
    

此时,打开盒子电源,等待启动完成后打开直播随便切几个台。

Step4: 模拟盒子登陆认证

理论上可以直接设置成一个静态地址跳过认证,不过为了避免潜在的IP冲突,不稳定性,这里还是使用模拟盒子 IPoE 认证的方法。

停止抓包,过虑 dhcp 数据包,找到并单击 dhcp discover 请求。

image-1707291272332

记录盒子 MAC 地址Host NameVendor class identifier
其中 Vendor class identifier 应该是二进制数据,需要右键 -> 复制 -> As a hex stream

新建一个名为 IPTV 的接口,按照下图配置:

image-1707291486313

image-1707291544531

并在 接口 - 设备 - br-lan.11 配置 MAC 地址。

最后,为了正常发送 hex 格式的 Vendor class identifier,需要将 /lib/netifd/proto/dhcp.sh72 行

${vendorid:+-V "$vendorid"} \

修改为

${vendorid:+-V "" "-x 0x3c:$vendorid"} \

保存并应用设置,此时应当认证成功并为之分配了一个IP。

Step5: 安装并配置 udpxy

打开 系统 - 软件包 - 下载并安装软件包,安装 luci-app-udpxy

刷新,打开 服务 - udpxy 按照下图配置:
image-1707292277118

Bind Interface 填 br-lan.【LAN 的 VLAN ID】
Source Interface 填 br-lan.【IPTV 的 VLAN ID】

Step6: 分析组播地址并测试

在 Wireshark 中,输入 http 过滤 http 数据包,Ctrl+F 搜索 channelorderbyset_data.jsp

右键该条目,追踪流 - HTTP Stream

image-1707292816544

找到诸如 igmp://225.1.2.47:10276 的数据,将其替换为 http://<你的路由器IP>:4022/rtp/225.1.2.47:10276 即对应频道的内网单播直播地址,填入播放器即可播放。

如果你无法通过该方法获得频道列表,可以观察接收到的大量 udp 组播数据包获得频道的组播地址(此处为 225.1.2.47)。

image-1707293060894

使用运营商盒子

一般来说,IPTV要求预埋一条从光猫到电视的专用网线。但是由于装修时未考虑该需求,只留了一根网线用于放置在客厅的路由器,导致IPTV无法安装或不得不放弃客厅的无线覆盖。这种情况下可以考虑igmproxy单线复用方案。

方案二 - igmpproxy

前置条件: Openwrt 路由器一台
优点: 无需购买其他专用设备
缺点: 观看直播时出现广播风暴,影响其他网络设备的性能

如果想让运营商赠送的盒子正常使用,需要满足以下条件:

  1. 正常访问位于内网的服务器
  2. 路由器能够处理组播igmp协议

Step1: 配置路由器并模拟认证

完成方案一 Step1Step4

Step2: 连通IPTV内网

添加一个静态路由,设置 10.0.0.0/8 的内网段走 IPTV 接口。
image-1707295180843

其中网关地址和子网掩码可以在盒子设置或者抓的 DHCP Offer 包的 Relay agent IP addressSubnet Mask 字段得到。

打开接口 - IPTV - 编辑 - 防火墙设置 - 创建新的名为 IPTV 的防火墙区域。

打开网络 - 防火墙,如图打开 IPTV 域的 IP 动态伪装(即 NAT 功能)。
image-1707294357102

并编辑 lan 区域,在 允许转发到目标区域 中添加 iptv 区域

保存并应用,配置盒子使用不带认证的 DHCP 获取 IP地址,此时插入局域网任意 LAN 口,盒子应该能正常联网,但无法观看直播(因为现在还无法加入组播)。

Step3: 启用 igmpproxy

打开 系统 - 软件包 - 下载并安装软件包,安装 igmpproxy

编辑 /etc/config/igmpproxy

config igmpproxy
        option quickleave 1

config phyint
        option network br-lan.11
        option zone iptv # the upstream firewall zone for forward rules
        option direction upstream
        list altnet 0.0.0.0/0 # a description of allowed source addresses for multicast packets

config phyint
        option network br-lan.1
        option zone lan #the downstream firewall zone for forward rules
        option direction downstream

注意根据自己情况修改 br-lan.11br-lan.1option zone iptv
重启路由器,此时盒子应当能在任意 LAN 口正常使用。

方案三 - 单线复用

前置条件: Openwrt 路由器一台、网管交换机(支持vlan)一台
优点: 对局域网其他设备影响较小,稳定性高
缺点: 另需购买一台专用设备

Step1: 配置路由器

完成方案一 Step1Step2,并按照下图设置 VLAN 交换:
image-1707295458891

其中,eth3 将连接具有vlan功能的交换机或路由器。

Step2: 配置网管交换机

将网管交换机插入IPTV的端口设置为 vlan id 为 11 的 untagged 模式,
插入其他联网设备的端口设置为 vlan id 为 1 的 untagged 模式即可。

由于具体设备不同,详细方法自行查阅设备说明书。

进阶操作

由于不同地区系统不同,以下内容仅供参考
操作前请先根据方案一方案二Step2 完成主路由配置

通过脚本模拟运营商盒子登陆,数据请求,格式转换等,可以让第三方软件如 tivimate 实时获取到最新的节目单与频道列表数据,甚至可以支持回放功能。

此处给的脚本基于河南联通,不同地区与运营商需要根据自己抓的包进行一定的修改。

具体步骤

Step1: 获取必要数据

image-1707316381823
打开之前 Wireshark 抓取的数据包,如图搜索找到登陆请求 xxxxxx/auth.jsp,右键 -> 追踪流 -> HTTP Stream

image-1707316638420
记录 Authenticator 参数,与 Host 后的地址,记为 API_EPG_BASE

搜索 /getencrypttoken.jsp,同样的操作,记录 Host 后的地址,记为 API_EAS_BASE

Authenticator 参数实际上是下方内容用 $ 拼接起来经 3DES 加密后的 HEX 数据:

0 1 2 3 4 5 6 7
8位随机数 请求 getencrypttoken.jsp 的得到的 Encrypt Token 账号 序列号 IP MAC CTC

加密的密码一般是你的机顶盒账户密码(六位)并用字符 0 填充到二十四位,如果你不知道密码,可以打开盒子系统设置-账户中查看,或者尝试000000(笔者所在地区的账号密码是盒子第一次启动时系统自动下发的)。

Step2: 修改脚本代码

#!/usr/bin/env python3
import random
import re
import os
import json
import time
from datetime import datetime
import requests
from urllib.parse import urlsplit, parse_qs
from Crypto.Cipher import DES3
from Crypto.Util.Padding import unpad, pad
from xml.etree.ElementTree import Element, SubElement, tostring

KEY = '000000'.ljust(24, '0') # 修改六位数字密码
AUTHENTICATOR = '' # 填写你抓取到的 Authenticator

# 下面地址根据抓的包自行修改
API_EAS_IP = '10.222.33.44'
API_EAS_BASE = 'http://10.222.33.44:8080/iptvepg/'
API_EPG_BASE = 'http://10.233.44.55:8080/iptvepg/'

UDPXY_BASE = 'http://192.168.1.1:5678/rtp/'
SERVICE_BASE = 'http://192.168.1.1:1234'

COMMON_HEADERS = {
    'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; ChromiumBrowser) AppleWebKit/534.24 (KHTML, like Gecko) Safari/534.24 SkWebKit-HA-CU',
}

os.chdir(os.path.dirname(__file__))


def auth_in():
    def adjust_key_parity(key_in):
        def parity_byte(key_byte):
            parity = 1
            for i in range(1, 8):
                parity ^= (key_byte >> i) & 1
            return (key_byte & 0xFE) | parity

        from Crypto.Util.py3compat import bchr
        from Crypto.Util.py3compat import bord
        key_out = b"".join([bchr(parity_byte(bord(x))) for x in key_in])
        return key_out

    # ignore error: Triple DES key degenerates to single DES
    DES3.adjust_key_parity = adjust_key_parity

    cryptor = DES3.new(KEY, DES3.MODE_ECB)
    data = cryptor.decrypt(bytes.fromhex(AUTHENTICATOR))
    data = unpad(data, DES3.block_size).decode()
    data = data.split('$')

    # get encrypt token
    headers = COMMON_HEADERS.copy().update({
        'Host': 'iptvz.shangdu.com:8080',
    })
    res = requests.get(API_EAS_BASE + 'platform/getencrypttoken.jsp', headers=headers, params={
        'UserID': data[2],
        'Action': 'Login',
        'TerminalFlag': 1,
        'TerminalOsType': 0,
        'STBID': '',
        'stbtype': ''
    }).text
    encrypt_token = re.search(r"GetAuthInfo\('(.*)'\)", res).group(1)

    # replace 8-digit random number
    data[0] = str(random.randint(0, 99999999)).zfill(8)

    # replace encrypt token
    data[1] = encrypt_token

    # auth
    session = requests.Session()
    session.headers.update(COMMON_HEADERS)
    res = session.post(API_EPG_BASE + 'platform/auth.jsp', params={
        'easip': API_EAS_IP,
        'ipVersion': 4,
        'networkid': 1
    }, data={
        'UserID': data[2],
        'Authenticator': cryptor.encrypt(pad('$'.join(data).encode(), DES3.block_size)).hex().upper(),
        'StbIP': data[4]
    })

    # convert server time to local time
    serverTime = datetime.strptime(res.headers['Date'], '%a, %d %b %Y %H:%M:%S %Z')
    serverExpiredTime = re.search(r"\('TokenExpiredTime', *'([^']*)'", res.text).group(1)
    serverExpiredTime = datetime.strptime(serverExpiredTime, '%Y.%m.%d %H:%M:%S')
    expiredTime = datetime.now() + (serverExpiredTime - serverTime)

    redirect_url = re.search(r"window\.location(?:\.href)? *= *'(.*)'", res.text).group(1)
    session.get(redirect_url)

    redirect_url = urlsplit(redirect_url)
    params = {k: v[0] for k, v in parse_qs(redirect_url.query).items()}

    res = session.post(API_EPG_BASE + 'function/funcportalauth.jsp', data={
        'UserToken': params['UserToken'],
        'UserID': params['UserID'],
        'STBID': params['STBID'],
        'stbinfo': '',
        'prmid': '',
        'easip': params['easip'],
        'networkid': params['networkid'],
        'stbtype': 'E900V21E',
        'drmsupplier': ''
    })

    assert res.headers['X-Frame-UserToken'] == params['UserToken']

    # 加载首页
    res = session.get(API_EPG_BASE + 'frame1442/portal.jsp?tempno=-1')
    assert res.headers['X-Frame-UserToken'] == params['UserToken']

    return session, expiredTime


_session = None
_session_expire = datetime(1970, 1, 1)

try:
    with open('iptv.json', 'r') as f:
        cache = json.load(f)
        _session = requests.Session()
        _session.headers.update(COMMON_HEADERS)
        _session.cookies.update(cache['cookies'])
        _session_expire = datetime.fromisoformat(cache['expireTime'])
except FileNotFoundError:
    pass


def cached_auth_in():
    global _session, _session_expire
    if _session is None or datetime.now() >= _session_expire:
        print('[*]', 'Cache expired, re-authenticating...')
        _session, _session_expire = auth_in()
        with open('iptv.json', 'w') as f:
            json.dump({
                'cookies': _session.cookies.get_dict(),
                'expireTime': _session_expire.isoformat()
            }, f)

    return _session


def request(method, url, retry=True, **kwargs):
    global _session_expire
    session = cached_auth_in()
    res = session.request(method, url, **kwargs)
    err = re.search(r"qrcodeerror\.jsp\?errorcode=(\d+)", res.text)
    sessionExp = re.search(r"rebuildsessionresponse\.jsp", res.text)
    if err or sessionExp:
        if err:
            print('[!]', 'An error occurred during request:', err.group(1))
        if retry:
            print('[!]', 'Refreshing session...')
            _session_expire = datetime(1970, 1, 1)
            cached_auth_in()
            return request(method, url, retry=False, **kwargs)
        else:
            raise Exception('Request failed, error code: %s' % err.group(1))
    return res


def channel_list():
    res = request('post', API_EPG_BASE + 'function/frameset_builder.jsp', data={
        "MAIN_WIN_SRC": "/iptvepg/frame1442/portal.jsp?tempno=-1",
        "NEED_UPDATE_STB": "1",
        "BUILD_ACTION": "FRAMESET_BUILDER",
        "hdmistatus": "undefined"
    })

    # parse channel info
    channels_info = re.findall(r"jsSetChannelInfo\(([^)]+)\);", res.text)
    channels_info = [json.loads('[%s]' % channel_info.replace("'", '"')) for channel_info in channels_info]

    attributes = ['userChannelID', 'timeShift', 'TSTVtime', 'isIgmp', 'channelId', 'channelName', 'columnId',
                  'channelType', 'pipEnable', 'lpvrEnable', 'channelLevel', 'isCanLock', 'isIPPV', 'mixno',
                  'cdnchannelCode', 'advertisecontent', 'definition', 'tvPauseEnable', 'ottcdnchannelcode',
                  'funcswitch', 'allownettype']
    channels_info = {channel[4]: {k: channel[i] for i, k in enumerate(attributes)} for channel in channels_info}

    # parse channel config
    channels = re.findall(r"jsSetConfig\('Channel', *'([^']+)'\)", res.text)
    channels = [json.loads('{%s}' % re.sub(r"(,|^) *([a-zA-Z0-9]+) *=", r'\1"\2":', channel)) for channel in channels]

    # merge channel info to config
    channels = [{**channel, **channels_info[channel['ChannelID']]} for channel in channels]

    # 获取分类名称
    res = request('get', API_EPG_BASE + 'frame1451/sdk_getcolumnlist.jsp?columncode=01').json()
    assert res['returncode'] == '0'

    columns = {column['columncode']: column['columnname'] for column in res['data']}
    channels = [{**channel, 'columnname': columns[channel['columnId']]} for channel in channels]

    return channels


def generate_epg(channels):
    today = datetime.now()
    today_plus_3 = today.replace(day=today.day + 3)

    tv = Element('tv')
    for channel in channels:
        channel_el = SubElement(tv, 'channel', id=channel['UserChannelID'])
        SubElement(channel_el, 'display-name', lang="zh").text = channel['ChannelName']

    channel_id_map = {channel['ChannelID']: channel['UserChannelID'] for channel in channels}

    def parse_time(time):
        return datetime.strptime(time, '%Y.%m.%d %H:%M:%S').strftime('%Y%m%d%H%M%S +0800')

    for i, channel in enumerate(channels):
        if i % 10 == 0:
            print('[*]', 'Fetching EPG for channel', i + 1, '/', len(channels))

        # 获取节目预告
        res = request('get', API_EPG_BASE + 'frame1451/sdk_getprevuellist.jsp?', params={
            "channelcode": channel['ChannelID'],
            "begintime": "{} 00:00:00".format(today.strftime('%Y.%m.%d')),
            "endtime": "{} 23:59:59".format(today_plus_3.strftime('%Y.%m.%d')),
            "utcbegintime": "",
            "utcendtime": ""
        }).json()

        if res['returncode'] != '0':
            print('[!]', 'Failed to fetch EPG for', channel['ChannelName'])
            print(res)
            continue

        for program in res['data']:
            programme = SubElement(tv, 'programme',
                                   start=parse_time(program['begintime']),
                                   stop=parse_time(program['endtime']),
                                   channel=channel_id_map[program['channelcode']])

            SubElement(programme, 'title', lang="zh").text = program['prevuename']
            SubElement(programme, 'desc', lang="zh").text = program['description']
        time.sleep(random.random())

    return (b'<?xml version="1.0" encoding="UTF-8"?>\n<!DOCTYPE tv SYSTEM "xmltv.dtd">\n'
            + tostring(tv, encoding='utf-8'))


def generate_m3u(channels):
    m3u = ['#EXTM3U url-tvg="{base}/epg.xml" x-tvg-url="{base}/epg.xml"'.format(base=SERVICE_BASE)]
    for channel in channels:
        m3u_item = ['#EXTINF:-1', 'tvg-id="{}"'.format(channel['UserChannelID']), 'tvg-name="{}"'.format(channel['ChannelName']), 'tvg-group="{}"'.format(channel['columnname'])]

        if os.path.exists('web/icons/{}.png'.format(channel['ChannelID'])):
            m3u_item.append('tvg-logo="{}/icons/{}.png"'.format(SERVICE_BASE, channel['ChannelID']))

        if channel['TimeShift'] == '1':
            m3u_item.append('catchup="append"')
            m3u_item.append('catchup-source="{}"'.format(channel['TimeShiftURL']))

        m3u.append(' '.join(m3u_item) + ',' + channel['ChannelName'])
        m3u.append(re.sub(r"^igmp://", UDPXY_BASE, channel['ChannelURL']))
    return '\n'.join(m3u)


if __name__ == '__main__':
    channels = channel_list()

    with open('web/epg.xml', 'wb') as f:
        f.write(generate_epg(channels))

    with open('web/iptv.m3u', 'w') as f:
        f.write(generate_m3u(channels))

    print('[*]', 'Done')

Step3: 安装 Python 环境

将脚本放置在路由器任意目录,这里以/root/iptv/iptv.py为例,并在同目录下创建web文件夹。

执行下面命令安装 python 及依赖:

opkg install python3 python3-pip python3-venv
cd /root/iptv
virtualenv venv
/root/iptv/venv/bin/pip install pycryptodome requests

手动执行 /root/iptv/venv/bin/python /root/iptv/iptv.py 测试生成节目单与播放列表。

/root/iptv/web/icons 放入台标,以 [ChannelID].png 格式命名。

在系统-计划任务追加下面内容:
0 1 */1 * * /root/iptv/venv/bin/python /root/iptv/iptv.py >> /var/log/iptv.log

用 Docker 创建一个 WEB 服务器:

docker run --restart unless-stopped --name nginx -v /root/iptv/web:/usr/share/nginx/html -v /var/log/nginx:/var/log/nginx -p 1234:80 -d nginx

Step4: 设置播放器

  • xmltv 地址:http://192.168.1.1:1234/epg.xml
  • m3u 地址:http://192.168.1.1:1234/iptv.m3u

最终效果如下图(如使用 jellyfin):
image-1708076430231