作者: 大嘴巴小牙齿

  • ZIP文件

    一、文件结构

    1、Local File Header — 每个压缩文件开头的本地头,包含文件名、压缩方式、CRC校验等

    2、Compressed Data — 实际的压缩数据

    3、End of Central Directory Record (EOCDR) — 最末尾的一条记录,相当于”目录索引”,记录了:
    – 这个zip里有多少个文件
    – 每个文件在zip里的偏移位置
    – 压缩前后的文件大小
    – 文件名列表


    二、解压过程

    第一步:定位EOCDR

    unzip从文件末尾往前搜索,找到End of Central Directory Record签名(PK\x05\x06)。EOCDR固定在最末尾,长度不固定但通常很小(几十到几百字节)。

    第二步:读取Central Directory

    EOCDR里记录了Central Directory(中央目录)在文件中的偏移位置和大小。unzip跳到那个位置,逐个读取每个文件的Central Directory Entry,得到:

    第三步:逐个解压文件

    对每个文件:

    • 根据偏移找到Local File Header
    • 读取压缩数据
    • 按压缩方式解压(最常用是deflate)
    • 用CRC32校验解压后的数据是否完整
    • 写入磁盘

  • Base64编码(Golang)

    Base64是一种用64个可打印字符来表示二进制数据的方法。叫64是因为用了64个字符。

    一、需求

    计算机底层是二进制(0和1),但很多传输通道只支持文本。比如:

    • 电子邮件:早期SMTP协议只支持ASCII字符,传不了二进制附件
    • URL:有些特殊字符在URL里有特殊含义(? & =),二进制数据传不了
    • HTML/CSS:在网页里直接嵌入图片,不需要额外请求
    • JSON/XML:这些文本格式不能直接塞二进制数据

    Base64就是把二进制数据”翻译”成纯文本,安全地通过这些通道。


    二、原理

    每3个字节(24位)分成4组,每组6位,查表得到4个字符。

    原始数据:Man
    二进制:01001101 01100001 01101110
    6位分组:010011 011000 010110 1110(补00)
    查表:T W F u
    结果:TWFu

    如果原始数据不是3的倍数:

    剩1个字节 → 输出2个字符 + 2个 “=” 填充

    剩2个字节 → 输出3个字符 + 1个 “=” 填充

    任何二进制数据都可以,不只是PNG
    图片:PNG、JPG、GIF、WEBP、SVG、BMP、ICO
    音频:MP3、WAV、AAC、OGG
    视频:MP4、AVI、WEBM
    文档:PDF、ZIP、EXE、DLL
    证书:PEM格式的SSL证书
    任意文件:任何你能想到的文件


    三、其他编码

    Base16(Hex):用16个字符(0-9 A-F),体积膨胀100%
    Base32:用32个字符,体积膨胀约60%
    Base64:用64个字符,体积膨胀约33%
    一张100KB的PNG图片:
    base64编码后约133KB
    解码后恢复为原始的100KB PNG

    Base85:用85个字符,体积膨胀约25%,但字符集更复杂

    Base64是体积和可读性的最佳平衡点

    四、使用示例

    package main
    
    import (
            "encoding/base64"
            "fmt"
            "os"
    )
    
    func main() {
            //1. 基础字符串编解码
            original := "Hello, 世界! "
            fmt.Printf("原始字符串: %s\n", original)
            encoded := base64.StdEncoding.EncodeToString([]byte(original))
            fmt.Printf("Base64编码: %s\n", encoded)
            decoded, err := base64.StdEncoding.DecodeString(encoded)
            if err != nil {
                    fmt.Printf("解码错误: %v\n", err)
            } else {
                    fmt.Printf("Base64解码: %s\n", string(decoded))
            }
    
            // 2. URL安全的Base64
            urlData := "user+name/test=data&key=val"
            fmt.Printf("原始数据: %s\n", urlData)
            stdEncoded := base64.StdEncoding.EncodeToString([]byte(urlData))
            urlEncoded := base64.URLEncoding.EncodeToString([]byte(urlData))
            fmt.Printf("标准Base64: %s\n", stdEncoded)
            fmt.Printf("URL Base64: %s\n", urlEncoded)
    
            // 3. 无填充的Base64
            data := "abc"
            fmt.Printf("原始数据: %s\n", data)
            fmt.Printf("带填充: %s\n", base64.StdEncoding.EncodeToString([]byte(data)))
            fmt.Printf("无填充: %s\n", base64.RawStdEncoding.EncodeToString([]byte(data)))
    
            // 4. 自定义Base64字母表
            custom := base64.NewEncoding("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789~!")
            customEncoded := custom.EncodeToString([]byte("Hello World"))
            fmt.Printf("自定义编码: %s\n", customEncoded)
            customDecoded, _ := custom.DecodeString(customEncoded)
            fmt.Printf("自定义解码: %s\n", string(customDecoded))
    
            // 5. 模拟图片Base64编码
            // 创建一个假的PNG文件头(实际项目中替换为真实图片文件)
            fakePNG := []byte{
                    0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A, // PNG文件头
                    0x00, 0x00, 0x00, 0x0D, 0x49, 0x48, 0x44, 0x52, // IHDR
            }
            imgEncoded := base64.StdEncoding.EncodeToString(fakePNG)
            fmt.Printf("图片Base64: %s\n", imgEncoded)
    
            // 模拟HTML内嵌(取前min(30, len)个字符)
            prefixLen := 30
            if len(imgEncoded) < prefixLen {
                    prefixLen = len(imgEncoded)
            }
            fmt.Printf("HTML内嵌: <img src=\"data:image/png;base64,%s...\" />\n", imgEncoded[:prefixLen])
    
            // 6. 读真实文件并编码(如果存在)
            filename := "/tmp/test_base64.txt"
            os.WriteFile(filename, []byte("这是一个测试文件内容\n用于演示Base64编码"), 0644)
            content, err := os.ReadFile(filename)
            if err != nil {
                    fmt.Printf("读文件失败: %v\n", err)
            } else {
                    fileEncoded := base64.StdEncoding.EncodeToString(content)
                    fmt.Printf("文件名: %s\n", filename)
                    fmt.Printf("原始大小: %d 字节\n", len(content))
                    fmt.Printf("编码大小: %d 字节\n", len(fileEncoded))
                    fmt.Printf("膨胀率: %.1f%%\n", float64(len(fileEncoded)-len(content))/float64(len(content))*100)
                    fmt.Printf("Base64: %s\n", fileEncoded)
            }
            os.Remove(filename)
    
            // 7. 错误处理 - 非法Base64
            invalidBase64 := "ThisIsNot@Valid#Base64!"
            _, err = base64.StdEncoding.DecodeString(invalidBase64)
            if err != nil {
                    fmt.Printf("非法Base64 '%s' 解码错误: %v\n", invalidBase64, err)
            }
    
            // 8. 严格模式
            // 标准模式允许尾部bits非零
            loose := base64.StdEncoding
            strict := base64.StdEncoding.Strict()
            testData := "SGVsbG8=" // "Hello"
            _, err1 := loose.DecodeString(testData)
            _, err2 := strict.DecodeString(testData)
            fmt.Printf("标准模式解码 '%s': %v\n", testData, err1)
            fmt.Printf("严格模式解码 '%s': %v\n", testData, err2)
    
            // 9. 编解码长度计算
            for _, n := range []int{1, 2, 3, 4, 5, 6, 10, 100} {
                    encLen := base64.StdEncoding.EncodedLen(n)
                    decLen := base64.StdEncoding.DecodedLen(encLen)
                    fmt.Printf("输入%3d字节 → 编码%4d字节 → 解码最多%3d字节\n", n, encLen, decLen)
            }
    }
    
  • Cron定时(Golang)

    Golang的日常定时任务是一种常见需求,例如定期清理缓存、发送通知或同步数据。robfig/cron是Go社区广泛使用的定时任务库,支持秒级精度的cron表达式,并提供了安全启动、停止以及任务查询的接口。本文将从cron表达式的基本原理出发,结合一段实际的生产级代码,详细说明如何封装一个可管理、可监控的定时任务模块,并分析其中关键的并发控制与生命周期管理细节。


    cron 表达式原理

    cron表达式起源于Unix系统的cron守护进程,用于定义任务的执行时间。标准格式通常为五个字段:分、时、日、月、周几,每个字段可指定具体数字、范围(如0-5)、步长(如/5)或通配符。robfig/cron扩展了标准格式,支持可选的秒级字段,即六字段表达式:秒、分、时、日、月、周几。例如”0 30 9 * * 1-5″表示每个工作日上午9点30分执行,其中秒为0。库内部通过解析表达式生成下一次触发时间,并利用定时器(time.Ticker)在精确时刻调用注册的回调函数。其核心是一个调度循环,不断计算最近的下一个执行时间,休眠到该时刻并运行任务,然后重复此过程。这种设计保证了定时任务在单进程内的可靠触发,且不依赖外部守护进程。


    代码整体结构

    # Golang
    import (
    	"net/http"
    	"sync"
    	"time"
    
    	"github.com/gin-gonic/gin"
    	"github.com/robfig/cron/v3"
    )
    
    type CronEntry struct {
    	Cron      *cron.Cron
    	EntryID   cron.EntryID
    	MU        sync.RWMutex
    	StopOnce  sync.Once
    	Remark    string
    	RunStatus string
    }
    
    var (
    	CronJobs   []*CronEntry
    	CronJobsMu sync.RWMutex
    	MyCron     = cron.New(cron.WithSeconds())
    )
    
    func NewCronEntry(spec, remark string, job func() error) (*CronEntry, error) {
    	ce := &CronEntry{
    		Cron:      MyCron,
    		Remark:    remark,
    		RunStatus: "等待中",
    	}
    	id, err := MyCron.AddFunc(spec, func() {
    		err := job()
    		ce.MU.Lock()
    		if err != nil {
    			ce.RunStatus = "失败"
    		} else {
    			ce.RunStatus = "成功"
    		}
    		ce.MU.Unlock()
    	})
    	if err != nil {
    		return nil, err
    	}
    	ce.EntryID = id
    	MyCron.Start()
    	CronJobsMu.Lock()
    	CronJobs = append(CronJobs, ce)
    	CronJobsMu.Unlock()
    	return ce, nil
    }
    func (ce *CronEntry) GetScheduleTime() (prev, next time.Time) {
    	entry := ce.Cron.Entry(ce.EntryID) // cron.Entry() 本身是并发安全的
    	return entry.Prev, entry.Next
    }
    func (ce *CronEntry) Stop() {
    	ce.StopOnce.Do(func() {
    		ce.MU.Lock()
    		defer ce.MU.Unlock()
    		if ce.Cron == nil {
    			return
    		}
    		ce.Cron.Remove(ce.EntryID) // 先移除 job
    		ctx := ce.Cron.Stop()      // 停止调度器
    		<-ctx.Done()
    		ce.Cron = nil // 标记已停
    	})
    }
    func StopAndRemoveCronEntry(ce *CronEntry) {
    	if ce == nil {
    		return
    	}
    	ce.Stop()
    	CronJobsMu.Lock()
    	defer CronJobsMu.Unlock()
    	for i, job := range CronJobs {
    		if job == ce {
    			CronJobs = append(CronJobs[:i], CronJobs[i+1:]...)
    			break
    		}
    	}
    }
    func (ce *CronEntry) GetStatus() string {
    	ce.MU.RLock()
    	defer ce.MU.RUnlock()
    	return ce.RunStatus
    }
    func GetCronJobs(c *gin.Context) {
    	type CronJobInfo struct {
    		Job       int    `json:"job_id"`
    		LastDate  string `json:"上一次执行日期"`
    		NextDate  string `json:"下一次执行日期"`
    		Remark    string `json:"备注"`
    		RunStatus string `json:"上一次执行状态"`
    	}
    	CronJobsMu.RLock()
    	var list []CronJobInfo
    	defer CronJobsMu.RUnlock()
    	for _, entry := range CronJobs {
    		prev, next := entry.GetScheduleTime()
    		prevStr := ""
    		if !prev.IsZero() { // 如果任务还没执行过,Prev 是零值
    			prevStr = prev.Format("2006-01-02 15:04:05")
    		}
    
    		nextStr := ""
    		if !next.IsZero() {
    			nextStr = next.Format("2006-01-02 15:04:05")
    		}
    
    		list = append(list, CronJobInfo{
    			Job:       int(entry.EntryID),
    			LastDate:  prevStr,
    			NextDate:  nextStr,
    			Remark:    entry.Remark,
    			RunStatus: entry.GetStatus(),
    		})
    	}
    	c.AbortWithStatusJSON(http.StatusOK, list)
    }

    代码中定义了一个CronEntry结构体,封装了单个定时任务的完整信息:指向全局调度器的Cron指针、任务在调度器中的唯一EntryID、用于同步的读写锁MU、保证只停止一次的StopOnce、备注 Remark 以及最近一次执行状态RunStatus。全局变量MyCron是使用cron.New(cron.WithSeconds())创建的调度器实例,启用了秒级支持,同时全局切片CronJobs存储所有被管理的任务指针,并由CronJobsMu读写锁保护。这种设计将任务管理与调度器解耦:调度器负责底层触发,而用户代码通过CronEntry获取状态、控制启停。

    创建任务:NewCronEntry

    NewCronEntry函数接收cron表达式(spec)、备注和实际执行函数(返回error)作为参数。函数内部首先创建一个CronEntry实例,初始化运行状态为“等待中”。然后通过MyCron.AddFunc注册任务——这是robfig/cron的核心方法,它解析表达式并在调度器中插入一个entry,返回唯一的EntryID。注册的回调函数先执行用户提供的job,然后根据返回的err通过ce.MU.Lock()安全更新RunStatus为“成功”或“失败”。注意加锁是为了防止后续GetStatus与这里形成数据竞争,因为GetStatus 使用读锁。任务注册成功后,立即启动调度器(MyCron.Start()),这意味着一旦调用NewCronEntry,调度循环就开始运行,后续添加的任务也会被已启动的调度器处理。Start()函数是幂等的,重复调用不会导致多次启动,因此放在每个新任务创建时是安全的。最后,将新entry追加到全局CronJobs切片中,并返回该 entry指针。这里存在一个潜在问题:Start()在每次添加任务时都会调用,虽然不会重复启动,但代码风格上更适合在初始化时仅调用一次。不过考虑到后续可能动态添加任务,如此实现也能工作。

    停止单个任务:Stop方法

    Stop方法使用sync.Once确保一个任务只被停止一次,防止重复调用导致panic或资源泄漏。内部先获取写锁,检查ce.Cron是否为nil(已停止状态),然后依次执行ce.Cron.Remove(ce.EntryID) 从调度器中移除该任务entry,再调用ce.Cron.Stop()停止整个调度器。这里有一个值得注意的细节:Stop()返回一个 channel ctx,调用者需要等待<-ctx.Done()以确保调度器完全停止并清空内部计时器。但问题在于,Stop()是全局行为,它会停止所有任务,而不仅仅是当前 entry。因此,如果一个模块只想停用自己管理的单个任务,使用MyCron.Stop() 会中断其他仍在运行的任务。这应当是设计上的简化,实际项目中往往需要更细粒度的控制,例如使用cron.Cron.Remove(entryID)移除条目后,其他任务仍能继续调度。这里先移除条目再停止整个调度器,可能意味着使用者期望在stop后不再有任何任务执行。如果后续需要单独停止单个任务而不影响其他,更好的做法是在Stop中只调用Remove,而保留调度器运行。但当前代码体现了作者对整体生命周期控制的考量——当某个关键任务停止时,整个调度器也同步停止,避免因剩余任务不再被管理而产生混乱。

    状态查询与调度时间获取

    GetScheduleTime方法通过ce.Cron.Entry(ce.EntryID)获取cron库内部维护的entry 信息,其中包括Prev和Next两个time.Time值,分别表示上一次和下一次执行时间。Cron.Entry()本身是并发安全的,因此无需额外加锁。如果任务尚未执行过,Prev为零值,代码在GetCronJobs中通过IsZero()判断并格式化为空字符串,这样 HTTP响应中就不会显示不存在的日期。GetStatus使用读锁返回当前RunStatus,与回调中写锁对应,保证了数据一致性。

    全局停止与移除:StopAndRemoveCronEntry

    该函数接收一个*CronEntry,先调用其Stop方法,然后从全局CronJobs切片中移除该指针。切片删除采用“先查找再重新拼接”的方式,这种线性搜索在任务数不多时没有问题。注意这里对CronJobsMu加写锁,与NewCronEntry中添加时的锁一致,避免了并发读写切片的风险。

    HTTP 接口:GetCronJobs

    这是一个Gin处理函数,用于返回所有定时任务的信息。它首先通过读锁读取CronJobs切片,遍历每个entry,获取其调度时间和状态,组装成JSON结构。响应中包含了job_id(即EntryID)、上一次执行日期、下一次执行日期、备注和上一次执行状态。日期格式使用config.TimeNowFormat,这应是项目中预定义的时间格式字符串,例如”2006-01-02 15:04:05″。函数最后使用c.AbortWithStatusJSON返回状态码200和序列化后的列表。注意这里使用了AbortWithStatusJSON而不是c.JSON,意味着该函数可能会作为Gin的中间件调用,或者作者希望确保后续中间件不再处理。按照Gin的约定,如果是路由处理函数,更常见的写法是c.JSON,但AbortWithStatusJSON也能正常工作,只是它会设置Abort标志,阻止之后的所有中间件执行。如果该函数是路由的最后一个处理器,两者效果一致;如果有后续中间件,则会导致它们被跳过。这点需要结合具体路由设置来判断。


    总结

    在Go中基于robfig/cron封装一个可管理的定时任务模块。通过CronEntry结构体整合了任务的生命周期(创建、启动、停止、移除)和运行状态,利用读写锁和sync.Once保证了并发安全。同时,提供了HTTP接口用于监控所有任务的状态。在实际使用中,需要注意全局调度器的停止操作会对所有任务产生影响,如果希望实现更细粒度的单任务停止,可以仅调用Remove而不调用Stop()。此外,NewCronEntry中每次添加任务都调用Start()虽然可行,但建议将Start()放在初始化阶段一次调用,使逻辑更清晰。整体设计为中小规模定时任务管理提供了一个良好的模板,具备扩展性,例如可在此基础上增加持久化、错误重试或任务依赖等高级特性。

  • webRTC(Golang)

    在视频监控、工业巡检以及边缘计算等场景中,常见的一个现实问题是协议割裂:前端设备通常通过RTSP推流,而浏览器原生并不支持直接播放RTSP。这就带来了一个工程上的关键挑战——如何在不引入高延迟和复杂转码的前提下,将设备侧的视频流高效地分发到浏览器端。

    一种更直接且高效的思路,是利用WebRTC作为浏览器侧的实时传输协议,同时在服务端完成协议层的桥接,将RTSP流转换为WebRTC可消费的RTP数据流。这种方式避免了传统转码链路(例如FFMPEG+纯接口转发)带来的性能损耗,同时保留了实时性的优势。

    通过golang程序实现一个典型的视频桥接架构:上游通过RTSP拉流(通常来自摄像头或推流工具),服务端将RTP包转发到WebRTC PeerConnection,下游浏览器通过 WebRTC 实时播放视频。整体链路为:RTSP → RTP → Go服务 → WebRTC → 浏览器。


    零、在本机启一个mediamtx作为RTSP服务端,再通过ffmpeg把本机摄像头推流到服务器来模拟摄像头的RTSP流

    ./mediamtx &
    ./ffmpeg -f v4l2 -i /dev/video0 \
    -vcodec libx264 -preset veryfast -tune zerolatency \
    -f rtsp -rtsp_transport tcp \
    rtsp://test:123456@127.0.0.1:8554/test

    一、golang程序入口main中首先确定RTSP地址,并开一个Gin HTTP服务,同时维护一个clients映射,用于保存每个WebRTC客户端对应的Track:

    每个客户端并不是单独拉流,而是共享同一 RTSP输入流,服务端通过fan-out(扇出)机制将RTP包写入多个WebRTC Track,从而实现“一路输入,多路输出”

    import (
    	"github.com/bluenviron/gortsplib/v5"
    	"github.com/bluenviron/gortsplib/v5/pkg/base"
    	"github.com/bluenviron/gortsplib/v5/pkg/description"
    	"github.com/bluenviron/gortsplib/v5/pkg/format"
    	"github.com/gin-gonic/gin"
    	"github.com/pion/rtp"
    	"github.com/pion/webrtc/v3"
    )
    
    rtspURL := os.Getenv("RTSP_URL")
    if rtspURL == "" {
    	rtspURL = "rtsp://test:123456@127.0.0.1:8554/test"
    }
    
    var clientsMu sync.Mutex
    clients := map[string]*webrtc.TrackLocalStaticRTP{}

    二、HTTP部分分为三个路由:

    1)“/”路由返回播放器页面
    2)“/offer”路由处理WebRTC SDP信令交换
    3)“/stats”路由返回实时码率与包速率

    前端页面核心逻辑如下:

    浏览器创建RTCPeerConnection → 生成Offer(SDP)→ 发送给服务端 → 服务端生成Answer → 浏览器设置远端描述。需要注意的是,这里没有使用STUN/TURN(iceServers 为空),意味着该方案默认运行在内网或可直连环境,否则无法穿透NAT。

    pc=new RTCPeerConnection({iceServers:[]});
    pc.addTransceiver('video',{direction:'recvonly'});
    
    const offer=await pc.createOffer();
    await pc.setLocalDescription(offer);
    
    const resp=await fetch('/offer',{
      method:'POST',
      headers:{'Content-Type':'application/json'},
      body:JSON.stringify({sdp:offer.sdp,type:offer.type})
    });
    
    const answer=await resp.json();
    await pc.setRemoteDescription({type:answer.type,sdp:answer.sdp});

    服务端“/offer”路由处理逻辑是WebRTC的核心:

    这里做了两件关键事情:
    1)注册编解码器(H264/H265)
    2)创建 PeerConnection

    WebRTC本质上是RTP的增强版,但浏览器对编码格式要求严格,因此必须显式注册codec,否则SDP协商无法匹配。

    m := webrtc.MediaEngine{}
    _ = m.RegisterCodec(webrtc.RTPCodecParameters{
    	RTPCodecCapability: webrtc.RTPCodecCapability{
    		MimeType: webrtc.MimeTypeH264,
    		ClockRate: 90000,
    		SDPFmtpLine: "packetization-mode=1;profile-level-id=42e01f",
    	},
    	PayloadType: 96,
    }, webrtc.RTPCodecTypeVideo)
    
    api := webrtc.NewAPI(webrtc.WithMediaEngine(&m))
    pc, _ := api.NewPeerConnection(webrtc.Configuration{})

    创建Track,并绑定到PeerConnection:

    Track是WebRTC中媒体发送的抽象,本质上是RTP流的出口。这里使用TrackLocalStaticRTP,意味着可以手动写入RTP包

    track, _ := webrtc.NewTrackLocalStaticRTP(
    	webrtc.RTPCodecCapability{
    		MimeType: webrtc.MimeTypeH264,
    		ClockRate: 90000,
    	},
    	"video", "pion",
    )
    pc.AddTrack(track)

    信令交换部分:

    接收浏览器Offer → 设置远端 SDP → 生成 Answer → 返回给浏览器。GatheringCompletePromise用于等待ICE candidate收集完成,否则SDP不完整。

    offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: req.SDP}
    pc.SetRemoteDescription(offer)
    
    ans, _ := pc.CreateAnswer(nil)
    pc.SetLocalDescription(ans)
    
    <-webrtc.GatheringCompletePromise(pc)

    将Track存入clients,用于后续RTP分发:

    这里通过DESCRIBE + SETUP建立RTSP会话,并解析媒体格式(H264/H265)。

    clientsMu.Lock()
    clients[c.ClientIP()+"_"+time.Now().Format("150405")] = track
    clientsMu.Unlock()

    核心转发逻辑在OnPacketRTP:

    RTSP → 解复用 → 得到RTP包 → 广播写入所有WebRTC Track
    服务端本质上只是一个RTP转发器,并不进行转码,这带来两个重要特性:
    优点:延迟极低(基本无编码延迟)
    CPU占用极小
    架构简单稳定
    限制:浏览器必须支持该编码(通常是H264)
    RTSP输入必须与WebRTC codec兼容

    cli.OnPacketRTP(media, formaH264, func(pkt *rtp.Packet) {
    	clientsMu.Lock()
    	totalBytes += uint64(len(pkt.Payload))
    	totalPackets++
    
    	activeClients := make([]*webrtc.TrackLocalStaticRTP, 0, len(clients))
    	for _, t := range clients {
    		activeClients = append(activeClients, t)
    	}
    	clientsMu.Unlock()
    
    	for _, t := range activeClients {
    		t.WriteRTP(pkt)
    	}
    })

    统计接口“/stats”路由通过简单计数实现码率计算:

    前端每秒拉取一次,实现实时监控。

    kbps := uint64(float64(totalBytes*8) / 1024 / duration)
    pktps := uint64(float64(totalPackets) / duration)

    第一,WebRTC并不负责“获取视频”,它只负责“传输媒体流”。视频源可以来自 RTSP、文件、摄像头等。

    第二,WebRTC的关键不是API,而是:

    • SDP协商
    • ICE建连
    • RTP收发

    第三,这种架构属于典型的“边缘网关模式”:

    RTSP(设备侧协议) → WebRTC(浏览器协议)

    在工业监控、视频巡检、边缘计算中非常常见

  • 存取qDrant向量(Golang)

    检索增强生成(Retrieval-Augmented Generation, RAG)成为解决模型知识局限性和幻觉问题的有效手段。在RAG架构中,向量数据库负责存储和检索文本的向量表示,而qDrant作为高性能的向量数据库,常被选为底层存储。本文将结合Go语言代码示例,详细介绍如何通过langchain-go和qdrant-go库存取qDrant向量,并实现一个完整的RAG流程。

    环境:

    • qDrant 服务(本地或远程运行)
    • Ollama 服务(用于Embedding和文本生成)
      • 准备nomic-embed-text模型
      • 准备任意支持text的大模型
    • Go语言环境及以下库:
      • github.com/tmc/langchaingo(包含向量存储、文档加载器、文本分割器等)
      • github.com/qdrant/go-client(qDrant官方Go客户端)

    新建文件夹config/
    config文件含config.go,管理配置信息。包含qDrant地址、集合名称、Ollama URL、Embedding模型名和生成模型名等。

    1. 集合管理:EnsureCollection 和 resetCollection

    确保指定的集合存在,若不存在则创建。向量维度(768)与Embedding模型(如nomic-embed-text)匹配,距离度量采用余弦相似度。创建集合时指定的向量维度(768)必须与Embedding模型输出的维度完全一致。本例使用nomic-embed-text(维度768),若换用其他模型(如all-MiniLM-L6-v2维度384),需相应调整。
    resetCollection:先删除指定集合,再重新创建,用于重置整个知识库。

    func EnsureCollection(urlStr string, collectionName string) {
    	myclient, err := qdrant.NewClient(&qdrant.Config{
    		Host:   config.QdrantIP,
    		Port:   6334,
    		UseTLS: false,
    	})
    	if err != nil {
    		log.Fatal("连接Qdrant失败:", err)
    	}
    	defer myclient.Close()
    	ctx := context.Background()
    	exists, err := myclient.CollectionExists(ctx, collectionName)
    	if err != nil {
    		log.Fatal("查询集合状态失败:", err)
    	}
    	if !exists {
    		fmt.Printf("正在创建集合: %s...\n", collectionName)
    		err = myclient.CreateCollection(ctx, &qdrant.CreateCollection{
    			CollectionName: collectionName,
    			VectorsConfig: qdrant.NewVectorsConfig(&qdrant.VectorParams{
    				Size:     768, // nomic-embed-text 维度是 768:必须与 Embedding 模型一致
    				Distance: qdrant.Distance_Cosine,
    			}),
    		})
    		if err != nil {
    			log.Fatal("创建集合失败:", err)
    		}
    		fmt.Println("集合创建成功")
    	}
    }
    func resetCollection(name string) {
    	myclient, _ := qdrant.NewClient(&qdrant.Config{
    		Host: config.QdrantIP, Port: 6334, UseTLS: false,
    	})
    	defer myclient.Close()
    	ctx := context.Background()
    	_ = myclient.DeleteCollection(ctx, name)
    	EnsureCollection(config.QdrantIP, name)
    	fmt.Println("集合已重置,准备重新导入...")
    }

    2. 数据导入:IngestKnowledge

    分块策略直接影响检索效果,需根据文档类型和任务调整块大小和重叠。

    func IngestKnowledge(filePath string, shouldReset bool) {
        // 1. 重置或确保集合存在
        if shouldReset {
            resetCollection(config.Collection)
        } else {
            EnsureCollection(config.QdrantIP, config.Collection)
        }
    
        // 2. 根据文件类型选择加载器(PDF或文本)
        f, _ := os.Open(filePath)
        defer f.Close()
        var loader documentloaders.Loader
        if strings.HasSuffix(filePath, ".pdf") {
            loader = documentloaders.NewPDF(f, fileSize)
        } else {
            loader = documentloaders.NewText(f)
        }
    
        // 3. 加载并分割文档
        docs, _ := loader.LoadAndSplit(ctx, textsplitter.NewRecursiveCharacter(
            textsplitter.WithChunkSize(300),
            textsplitter.WithChunkOverlap(100),
        ))
    
        // 4. 初始化Embedder和向量存储
        embedLLM, _ := ollama.New(ollama.WithModel(config.EmbedModel), ollama.WithServerURL(config.OllamaURL))
        embedder, _ := embeddings.NewEmbedder(embedLLM)
        store, _ := qdrantl.New(
            qdrantl.WithURL(url.URL{Scheme: "http", Host: config.QdrantURL}),
            qdrantl.WithCollectionName(config.Collection),
            qdrantl.WithEmbedder(embedder),
        )
    
        _, err = store.AddDocuments(ctx, docs) // 5. 添加文档到qDrant
    }

    3. 查询与生成:UpdateRAG

    确保集合存在,初始化Embedder和向量存储(与导入时相同)。向集合中添加几个示例文档块(演示用)。执行相似性搜索:将查询文本向量化,在qDrant中检索最相似的3个文档块。将检索结果拼接为上下文,构造提示词,调用生成模型(Ollama)得到最终回答。

    注意:实际应用中,文档导入和查询生成通常是分开的步骤,此处合并仅为展示完整流程。

    func UpdateRAG() {
        // 1. 确保集合存在,初始化Embedder和向量存储
        EnsureCollection(config.QdrantIP, config.Collection)
        embedLLM, _ := ollama.New(ollama.WithModel(config.EmbedModel), ollama.WithServerURL(config.OllamaURL))
        embedder, _ := embeddings.NewEmbedder(embedLLM)
        store, _ := qdrantl.New(
            qdrantl.WithURL(url.URL{Scheme: "http", Host: config.QdrantURL}),
            qdrantl.WithCollectionName(config.Collection),
            qdrantl.WithEmbedder(embedder),
        )
    
        // 2. 准备示例文档并分割(此处仅为演示,实际可从文件加载)
        docs := []schema.Document{ ... }
        splitter := textsplitter.NewRecursiveCharacter(textsplitter.WithChunkSize(200), textsplitter.WithChunkOverlap(20))
        var chunks []schema.Document
        for _, doc := range docs {
            texts, _ := splitter.SplitText(doc.PageContent)
            for _, t := range texts {
                chunks = append(chunks, schema.Document{PageContent: t, Metadata: doc.Metadata})
            }
        }
        store.AddDocuments(ctx, chunks)
    
        // 3. 执行相似性搜索
        query := "Qdrant 适合用在什么场景?"
        results, _ := store.SimilaritySearch(ctx, query, 3)
    
        // 4. 构建提示词并调用LLM生成回答
        contextText := ""
        for _, doc := range results {
            contextText += doc.PageContent + "\n"
        }
        llm, _ := ollama.New(ollama.WithModel(config.GenerateModel), ollama.WithServerURL(config.OllamaURL))
        prompt := fmt.Sprintf(`
    你是一个基于知识库回答问题的助手。
    只能根据【知识库】内容作答,如果无法得到答案,请回答“不知道”。
    【知识库】
    %s
    【问题】
    %s
    `, contextText, query)
        answer, _ := llms.GenerateFromSinglePrompt(ctx, llm, prompt)
        fmt.Println(answer)
    }

    4. 运行示例

    func main() {
    	IngestKnowledge("./xxix.pdf", true) //指向需要索引的知识文件
            UpdateRAG()
    }
    输出:
    文档已成功写入 Qdrant
    
    Model回答:
    Qdrant 是一个高性能的向量数据库,常用于 RAG(检索增强生成)系统,因此适合用于构建基于语义搜索、推荐系统、异常检测等需要向量相似性检索的场景。
  • Load AVG(Linux)

    Load Average】:代表机器在某一时间段内,处于“可运行状态”或“不可中断等待状态”的进程平均数量。

    三个数值统计跨度分别为:1分钟平均 5分钟平均 15分钟平均


    一台4核心的处理器的Load为 1.21 1.34 1.12

    1、CPU压力为 1.21 / 4 ≈ 0.30

    2、CPU压力为 1.34 / 4 ≈ 0.34

    3、CPU压力为 1.12 / 4 ≈ 0.28

    平均只有约1个任务在运行或等待CPU,而系统有4核心,完美胜任服务


    在Linux中,平均负载并非在每个时钟滴答时计算,而是由一个基于HZ频率设置的变量值驱动,并在每个时钟滴答时进行检测。该设置定义了内核时钟滴答速率(单位:赫兹,即每秒次数),默认值为100,对应10毫秒的滴答间隔。内核活动使用这些滴答数进行计时。具体来说,calc_load() 函数(位于loadavg.h文件,原为 sched.h)负责计算平均负载,它大约每LOAD_FREQ(5*HZ+1)个滴答运行一次,即略多于5秒。

  • TOPS和TFLOPS

    选择


    TOPS和TFLOPS代表了计算系统中不同的硬件性能。
    TOPS代表 Tera Operations Per Second
    TFLOPS 代表 Tera Floating Point Operations Per Second

    TOPS衡量芯片每秒能完成多少万亿次整数运算(整数的加法/乘法)。这对于吞吐量比最终精度更重要的任务尤为关键——例如,自动驾驶车辆中的神经处理单元(NPU)或RTX 5070显卡提供数千个TOPS以快速处理传感器数据。 相比之下,TFLOPS计算的是每秒可执行的万亿次浮点(十进制)计算。 浮点数学对于高精度工作至关重要,比如训练神经网络或科学模拟。


    如果工作目标是AI训练或任何需要高精度的任务,选择TFLOPS评分很高的GPU。
    如果是边缘或移动端的实时AI推理,选择高TOPS的NPU或GPU。

    关键区别


    精度与速度:浮点运算涉及小数且精度更高,因此优化TFLOPS(每秒万亿次浮点运算)的硬件用于对精度敏感的任务(如图形处理或气候模型)。整数运算(以TOPS,即每秒万亿次运算衡量)使用整数,更简单快速。正如资料所示,浮点运算“涉及小数点”,适用于高精度场景;而整数运算用于更简单的任务。实践中,GPU(英伟达、AMD独显等)为复杂计算强调TFLOPS,而NPU(神经网络处理器)和数字信号处理单元则强调TOPS以快速处理大量推理任务。

    硬件效率:整数运算比浮点运算简单,硬件通常能实现更高吞吐量。这就是NPU和其他推理加速器宣传极高TOPS值的原因。例如,现代PC CPU如英特尔酷睿Ultra系列或高通骁龙芯片集成了擅长整数运算的NPU,能以低功耗实现每秒数万亿次运算。相比之下,在相同制程下,GPU的浮点单元每秒运算次数少于NPU的整数单元,因为浮点运算更复杂。

    应用适用性:根据工作负载选择合适的指标。若进行AI训练或重型计算,需要高TFLOPS。数据中心GPU和加速器(英伟达A100/H100、AMD Instinct、谷歌TPU)使用FP32、FP16或BF16精度提供巨大的TFLOPS(通常达数十或数百)。TOPS与TFLOPS数值不能直接等同比较。例如,NPU的“130 TOPS”并不天然优于或劣于GPU的“65 TFLOPS”——它们反映不同类型的吞吐能力。TOPS反映通用(通常为低精度)运算,而TFLOPS反映十进制精度运算。高TOPS芯片可能擅长实时运行图像分类器,但在训练需要高浮点精度的模型时可能吃力。反之,高TFLOPS芯片能训练庞大模型,但在相同推理任务上可能功耗更高、成本更大。

  • 对象存储(Object Storage Service)

    OSS(对象存储服务)以存储空间(Bucket)作为数据的逻辑容器,用来统一管理对象。每个Bucket都具有唯一性,并绑定了固定的存储类别、访问权限策略以及所属地域,用户可以通过Bucket对应的访问域名在互联网上定位并访问其中的数据。

    在OSS中,对象(Object)是最基本的数据存储单元,可以理解为“文件 + 属性信息”的整体。一个对象由Key、Metadata和Data三部分组成。Key即对象的唯一名称,是经过UTF-8编码的字符串,在同一个Bucket内不能重复;Metadata是对象的描述信息,以键值对形式存在,用于说明对象的各种属性;Data则是对象实际存储的二进制数据内容。Metadata又分为系统元数据和用户元数据,其中系统元数据由OSS自动维护,用于对象管理和传输,例如内容长度、最后修改时间、ETag等;用户元数据则由用户在上传时自定义,用来补充描述对象的业务属性。

    OSS根据数据访问频率和成本需求提供了多种存储类别。标准存储具备高可靠性、高可用性和高性能,访问时延低、吞吐能力强,适合需要频繁访问的热点数据场景,如网站图片、音视频内容和移动应用资源。低频存储面向访问次数较少但仍需实时读取的数据,存储成本低于标准存储,适合长期备份类数据,但对象存储时间不足30天提前删除会产生费用。归档存储则以最低的存储成本支持长期保存的数据,适合日志、档案和影视素材等几乎不访问的数据,取回时需要等待一定时间,并且对最短存储周期有更严格的限制。

    在底层能力上,OSS通过多副本和纠删码等冗余机制,将数据分布在跨可用区甚至跨设备的存储节点上,即使多台设备同时发生故障,也能保证数据不丢失、业务不中断,并自动完成冗余修复。同时,系统会周期性校验数据完整性,发现异常后利用冗余数据进行重建,进一步保障数据可靠性。

    OSS还提供对象级别的强一致性保障。所有对象操作都是原子的,不存在中间状态:操作要么成功,要么失败。只要用户收到成功响应,数据就已经处于可用状态。例如对象上传完成后即可立即读取,更新对象时,其他并发读请求只会读到旧数据,而不会读到部分或损坏的数据。这种强一致性特性使得OSS在使用方式上更接近传统存储系统,简化了应用架构设计。

    在数据组织方式上,OSS采用Key-Value模型,而不是传统文件系统的目录树结构。对象的Key是访问数据的唯一标识,即使Key看起来像“FolderA/FolderB/file.jpg”,在OSS内部也只是一个普通字符串,并不代表真实存在的层级目录。因此,不同Key的访问效率基本一致,不会因为“目录层级”变深而降低性能。所谓的目录操作,本质上是对前缀匹配的一批对象进行处理,成本较高,也不被推荐频繁使用。

    由于OSS不支持对象的在线修改,即使只改动一个字节,也需要重新上传整个对象,因此它天然适合“一次写入,多次读取”的业务模型。依托分布式架构和Key-Value存储方式,OSS能够支持海量数据和高并发访问,非常适合静态资源分发、备份归档等场景。

    在访问和管理方式上,用户可以通过网页形式的管理控制台直观地创建Bucket、上传下载文件并配置权限;也可以使用支持S3接口的客户端工具(如S3Browser)进行本地化管理;对于开发场景,OSS提供了封装良好的SDK,方便应用程序直接调用对象存储能力;同时还提供基于HTTPS、数据格式为JSON的RESTful API,满足更底层或统计类的接口调用需求。

  • 工具iproute2命令

    Iproute2是一组用于控制Linux中TCP/IP网络和流量控制的工具

    通过ip命令使用,搭配以下选项

    link — 逻辑网络设备

    address — 协议,设备上的(IPv4或IPv6)地址

    neighbour — ARP或NDISC缓存条目

    route — 路由表入口

    rule — 规则数据库

    maddress — 组播地址

    mroute — 路由缓存条目

    tunnel — IP协议通道


    使用

    ip neigh 
    //邻居表对象建立协议地址与链路层之间的绑定;为了共享同一物理链路的主机建立地址。邻居对象条目组织成表。IPv4邻居对象表也被称为ARP表。查看邻居表的绑定及其属性,添加新的邻居表条目,并删除旧的。
    ip link
    // 链路指的是网络设备。ip link对象及其对应的命令集允许查看并作网络设备的状态。
    ip addr
    // 地址指的是连接到网络设备上的协议(IP或IPv6)地址。每个设备必须至少有一个地址,才能使用对应的协议。可以有多个不同的地址连接到一个设备。这些地址在协议结构中并不被区分,因此“别名”一词并不完全适用于此类多地址。
    ip route
    // 管理内核路由表中的路由条目。内核路由表保存协议路径与其他网络节点的信息。每个路由条目都有一个密钥,由协议前缀组成,即网络地址和网络掩码长度的配对,以及可选的类型服务(TOS)值。如果IP数据包的目的地址的最高位数等于路由前缀至少是前缀长度,且路由的TOS为零或等于数据包的TOS。
    ss -tlpn
    // -t tcp连接 -l监听 -p进程 -n不解析服务名
    tc qdisc add dev eth0 root tbf rate 10mbit burst 32kbit latency 400ms
    // 限速eth0网卡下行10mbps
    
    tc qdisc del dev eth0 root
    // 解除eth0网卡限速
    ip rule
    // 路由策略中的规则数据库控制路由选择算法。经典路由互联网中使用的算法仅基于数据包的目的地址,理论上,但实际上不基于TOS字段。在某些情况下,我们希望路由数据包的方式不同,不仅取决于目的地址,还包括其他数据包字段,如源地址、IP协议,传输协议端口甚至数据包有效载荷。这项任务称为“策略路由”。

  • WebSocket和HTTP Upgrade机制

    HTTP协议用于浏览器和服务器之间传输数据。每次请求是独立的,不记历史,HTTP来轮询服务器更新增加性能支出。

    WebSocket协议旨在取代现有利用HTTP作为传输层的双向通信技术,从而利用现有基础设施(代理、过滤、认证)。但WS是基于HTTP层,而不是为了取代HTTP。

    在HTTP协议中,“Upgrade”机制允许客户端告诉服务器把当前的HTTP连接升级到另一种协议。(RFC 2616)。

    WebSocket协议由开启握手和基础消息框架组成,分层叠加在TCP之上, 该技术的目标是为需要与服务器双向通信的浏览器应用提供一种机制,无需开启多个HTTP连接。WS通过HTTP升级协议握手。一条向服务器的WS握手:

    GET /chat HTTP/1.1
    Host: server.example.com
    Upgrade: websocket
    Connection: Upgrade
    Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
    Origin: http://example.com
    Sec-WebSocket-Protocol: chat, superchat
    Sec-WebSocket-Version: 13

    一条向客户端的WS握手:

    HTTP/1.1 101 Switching Protocols
    Upgrade: websocket
    Connection: Upgrade
    Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
    Sec-WebSocket-Protocol: chat

    一旦客户端和服务器都发送了握手,如果握手成功,数据传输阶段才开始。这是一个双向通信通道,双方可以独立地随意发送数据。成功握手后,客户端和服务器以本规范中称为“消息”的概念单元进行数据传递。 在线路上,一条消息由一个或多个组成框架。 WebSocket消息不一定对应于特定的网络层框架,因为分段消息可能被中介合并或拆分。

    Sec-WebSocket-Protocol: chat
    用于防止脚本在网页浏览器中使用 WebSocket API 时未经授权交叉使用该服务器。 服务器会被告知生成WebSocket连接请求的脚本来源。 如果服务器不愿意接受来自该源的连接,可以通过发送相应的HTTP错误代码来拒绝连接。 该头字段由浏览器客户端发送;对于非浏览器客户端,如果在客户端上下文中合理,可能会发送该头字段。

    Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
    对于该头部字段,服务器必须将头部字段中的值与全局唯一标识符字符串形式“258EAFA5-E914-47DA- 95CA-C5AB0DC85B11”串联起来,这通常不会被不理解WebSocket协议的网络端点使用。 该连接的SHA-1哈希值(160位),以base64编码,随后在服务器握手中返回。

    HTTP/1.1 101 Switching Protocols
    除了101以外的任何状态码,都表示WebSocket握手尚未完成,HTTP的语义仍然适用。头部紧跟状态码。

    HTTP/1.1 101 Switching Protocols
    Upgrade: websocket
    Connection: Upgrade
    Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

    WebSocket客户端会检查这些字段以查找脚本页面。如果 |Sec-WebSocket-Accept|如果缺少头字段,或者 HTTP 状态码不是 101,则连接无法建立,WebSocket帧也不会发送。


    通过升级器,http服务被升级为websocket服务,除非得到websocket握手,否则连接失败:

    #Go
    
    #An upgrader
    var upgrader = websocket.Upgrader{
    	CheckOrigin: func(r *http.Request) bool {
    		return true
    	},
    }
    
    var wsClients   = make(map[*websocket.Conn]bool)
    
    func wsHandler(c *gin.Context) {
    	conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
    	if err != nil {
    		return
    	}
    	defer conn.Close()
    	wsClients[conn] = true
    	for {
    		_, _, err := conn.ReadMessage()
    		if err != nil {
    			delete(wsClients, conn)
    			break
    		}
    	}
    }
    
    func main() {
            r := gin.Default()
            r.GET("/ws", wsHandler)
    }