m3u8 文件是 HTTP Live Streaming(缩写为 HLS) 协议的部分内容,而 HLS 是一个由苹果公司提出的基于 HTTP 的流媒体网络传输协议。
关于m3u8 格式详解,可以参考此文:m3u8 文件格式详解
JS的实现版本可以参考这位博主的gitHub:m3u8 视频在线提取工具
在这里,我先基于python代码来说解怎么去提取m3u8文件并合并成真正的视频文件。
一个正常的m3u8文件格式如下:
#EXTM3U
#EXT-X-VERSION:6
#EXT-X-TARGETDURATION:3
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-PLAYLIST-TYPE:VOD
#EXT-X-MAP:URI="https://xxxx/init.mp4"
#EXTINF:3.00000000000000000000,
https://xxxx/0.m4s
#EXTINF:3.00000000000000000000,
https://xxxxx/1.m4s
#EXTINF:3.00000000000000000000,
https://xxxx/2.m4s
#EXTINF:1.00000000000000000000,
https://xxxx/3.m4s
#EXT-X-ENDLIST
最需要注意的关键节点是:EXT-X-KEY(表示视频会经过指定算法加密)、EXT-X-MAP(最初的视频分片,不一定存在)、EXTINF(按照顺序的视频分片以及这个分片的播放秒数)
定义解析函数:m3u8_convert
"""
_url:m3u8 的下载地址
save_path:待保存的视频本地路径
"""
def m3u8_convert(_url, save_path):
writer = open(save_path, 'wb')
if not writer:
print("%s 没法成功打开" % save_path)
return
#先下载文件
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36'}
rs = callAPI(_url)
list_content = rs.decode(encoding = 'utf-8').split('\n')
player_list = []
# key 以后处理加解密的操作
key = ''
#进行文件格式解析
for index, line in enumerate(list_content):
# 判断视频是否经过AES-128加密
tmp_key = checkExtKey(line)
if len(tmp_key) > 0:
key = tmp_key
else:
next_line = ""
if index < len(list_content)-1:
next_line = list_content[index + 1]
href = checkExtInf(line, next_line)
if len(href) > 0:
player_list.append(href)
#下载每一个视频分片,并保存到本地文件里
for i, _url in enumerate(player_list):
print('正在下载文件:%s' % _url)
_bytes = callAPI(_url)
print('已下载文件大小:%d' % len(_bytes))
writer.write(_bytes)
writer.close()
print('视频生成完成')
函数中,需要对该三个节点EXT-X-KEY、EXT-X-MAP、EXTINF进行处理,由于找不到比较好的已加密的视频demo ,所以我的代码里暂时忽略对EXT-X-KEY处理。
# 检测EXT-X-KEY,提取key
def checkExtKey(line):
if "#EXT-X-KEY" in line:
method_pos = line.find("METHOD")
comma_pos = line.find(",")
method = line[method_pos:comma_pos].split('=')[1] # 获取加密方式
print("Decode Method:", method)
uri_pos = line.find("URI")
quotation_mark_pos = line.rfind('"')
key_url = line[uri_pos:quotation_mark_pos].split('"')[1]
key = callAPI(key_url).decode(encoding='utf-8') # 获取加密密钥
print("key:", key)
return key
return ""
# 检测EXTINF 和 EXT-X-MAP
def checkExtInf(line, next_line):
href = ""
if '#EXTINF' in line:
# 提取下一行的http 链接地址
if 'http' in next_line:
href = next_line
elif '#EXT-X-MAP' in line:
# 提取最初的视频地址
uri_pos = line.find("URI=\"")
if uri_pos > -1:
href = line[uri_pos + 5:-1]
return href
完整的代码如下:
# -*- coding: UTF-8 -*-
import requests
import os
# from Crypto.Cipher import AES
def callAPI(_url):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/75.0.3770.100 Safari/537.36'}
return requests.get(_url, headers=headers).content
# 检测EXT-X-KEY,提取key
def checkExtKey(line):
if "#EXT-X-KEY" in line:
method_pos = line.find("METHOD")
comma_pos = line.find(",")
method = line[method_pos:comma_pos].split('=')[1] # 获取加密方式
print("Decode Method:", method)
uri_pos = line.find("URI")
quotation_mark_pos = line.rfind('"')
key_url = line[uri_pos:quotation_mark_pos].split('"')[1]
key = callAPI(key_url).decode(encoding='utf-8') # 获取加密密钥
print("key:", key)
return key
return ""
# 检测EXTINF 和 EXT-X-MAP
def checkExtInf(line, next_line):
href = ""
if '#EXTINF' in line:
# 提取下一行的http 链接地址
if 'http' in next_line:
href = next_line
elif '#EXT-X-MAP' in line:
# 提取最初的视频地址
uri_pos = line.find("URI=\"")
if uri_pos > -1:
href = line[uri_pos + 5:-1]
return href
"""
_url:m3u8 的下载地址
save_path:待保存的视频本地路径
"""
def m3u8_convert(_url, save_path):
writer = open(save_path, 'wb')
if not writer:
print("%s 没法成功打开" % save_path)
return
# 先下载文件
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36'}
rs = callAPI(_url).decode(encoding='utf-8')
list_content = rs.split('\n')
player_list = []
# key 以后处理加解密的操作
key = ''
# 进行文件格式解析
for index, line in enumerate(list_content):
# 判断视频是否经过AES-128加密
tmp_key = checkExtKey(line)
if len(tmp_key) > 0:
key = tmp_key
else:
next_line = ""
if index < len(list_content) - 1:
next_line = list_content[index + 1]
href = checkExtInf(line, next_line)
if len(href) > 0:
player_list.append(href)
# 下载每一个视频分片,并保存到本地文件里
for i, _url in enumerate(player_list):
print('正在下载文件:%s' % _url)
_bytes = callAPI(_url)
print('已下载文件大小:%d' % len(_bytes))
writer.write(_bytes)
writer.close()
print('视频生成完成')
大功告成后,马上找一个可以测试的m3u8吧。
save_data_file = '~/Desktop/player.mp4'
url = '【m3u8 URL地址】'
# 下载视频
m3u8_convert(url, save_data_file)
下面是golang的代码:
package main
import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
)
func callapi(durl string) []byte {
_, err := url.ParseRequestURI(durl)
if err != nil {
panic(durl + " 下载地址出错")
}
client := http.DefaultClient
//client.Timeout = 5000
resp, err := client.Get(durl)
if err != nil {
panic(err)
}
raw := resp.Body
fmt.Println("拿到Body :")
// fmt.Println(resp.Body)
defer raw.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
panic(err)
}
return body
}
// 检测EXT-X-KEY,提取key
func checkExtKey(line string) string {
if strings.Contains(line, "#EXT-X-KEY") {
method_pos := strings.Index(line, "METHOD")
comma_pos := strings.Index(line, ",")
method := strings.Split(line[method_pos:comma_pos], "=")[1]
fmt.Println("Decode Method:%s", method)
uri_pos := strings.Index(line, "URI")
quotation_mark_pos := strings.LastIndex(line, "\"")
key_url := strings.Split(line[uri_pos:quotation_mark_pos], "\"")[1]
key := string(callapi(key_url))
fmt.Println("Decode Key:%s", key)
return key
}
return ""
}
// 检测EXTINF 和 EXT-X-MAP
func checkExtInf(line string, next_line string) string {
href := ""
if strings.Contains(line, "#EXTINF") {
if strings.Contains(next_line, "http") {
href = next_line
}
} else if strings.Contains(line, "#EXT-X-MAP") {
uri_pos := strings.Index(line, "URI=\"")
if uri_pos > -1 {
href = line[uri_pos+5 : len(line)-1]
}
}
return href
}
func m3u8_convert(_url string, save_file string) {
body := string(callapi(_url))
list_contents := strings.Split(body, "\n")
var player_list []string
//key := ""
for index, line := range list_contents {
tmp_key := checkExtKey(line)
if len(tmp_key) > 0 {
//key = tmp_key
} else {
next_line := ""
if index < len(list_contents)-1 {
next_line = list_contents[index+1]
}
href := checkExtInf(line, next_line)
if len(href) > 0 {
player_list = append(player_list, href)
}
}
}
saveDataWithPlayList(player_list, save_file)
fmt.Println("视频生成完成")
}
func saveDataWithPlayList(play_list []string, save_file string) {
writer, err := os.Create(save_file)
if err != nil {
panic("生成视频文件失败")
}
defer writer.Close()
for _, item := range play_list {
fmt.Println("The item is :", item)
bytes := callapi(item)
writer.Write(bytes)
}
}
func main() {
play_list_url := "【m3u8 URL地址】"
save_file := "~/Desktop/player.mp4"
m3u8_convert(play_list_url, save_file)
}