29日 七月 2019

可恢复的(resumable)文件上传

使用 fetch 方法来上传文件相当容易。

当文件上传过程中连接丢失,此时如何恢复上传呢?目前还没有针对此问题的内建选项,但是我们有实现它的一些方法。

当我们上传大型文件的时候(如果我们可能需要恢复),我们期待可恢复上传应带有上传进度指示。由于 fetch 不允许追踪上传进度,我们将会使用 XMLHttpRequest

不太实用的进度事件

要恢复上传,我们需要知道在连接丢失前已经上传了多少。

我们有 xhr.upload.onprogress 来追踪上传进程。

不幸的是,它在这里没什么作用,它在数据发送 sent 完成时触发,但是它真的被服务器接收了吗?浏览器并不知道。

或许它只是被本地代理缓冲(buffered),或是有可能远程服务器处理进程宕机无法处理它们,亦或是当连接断开时它刚刚从中间丢失,且没有到达服务器。

因此,这个事件只是对于显示一个漂亮的进度条来说很有用。

要恢复上传,我们需要知道服务器具体接收了多少字节。只有服务器能告诉我们它接收了多少。

算法

  1. 首先,我们创建一个独一无二的标识符作为我们上传的文件 id,例如:

    let fileId = file.name + '-' + file.size + '-' + +file.lastModifiedDate;

    这对恢复上传很有用,它能告诉服务器我们我们要恢复的文件是什么。

  2. 发送请求到服务器,获取该文件已经上传了多少字节,就像这样:

    let response = await fetch('status', {
      headers: {
        'X-File-Id': fileId
      }
    });
    
    // 服务器已有的字节
    let startByte = +await response.text();

    这假设服务器通过 X-File-Id 头跟踪文件上传。应该在服务端实现。

  3. 然后我们可以使用 Blob 方法 slice 来自 startByte 的要发送的文件:

    xhr.open("POST", "upload", true);
    
    // 发送文件 id,这样服务器就能知道要恢复哪个文件
    xhr.setRequestHeader('X-File-Id', fileId);
    // 发送我们正在恢复的字节,因此服务器知道我们正在恢复文件
    xhr.setRequestHeader('X-Start-Byte', startByte);
    
    xhr.upload.onprogress = (e) => {
      console.log(`Uploaded ${startByte + e.loaded} of ${startByte + e.total}`);
    };
    
    // 文件可以来自于 input.files[0] 或者其他资源
    xhr.send(file.slice(startByte));

    这里我们将服务器的文件 id 作为 X-File-Id 发送,此时服务器就知道我们正在上传哪个文件,并且起始字节为 X-Start-Byte,因此服务器知道我们并不是从头开始上传,而是恢复文件。

    服务器应该检查它的记录,如果这个文件之前上传过且当前上传大小是 X-Start-Byte,此时将数据附加到原来文件上。

这是用 Node.js 写的客户端和服务端的 demo。

在这个网页上,它只有部分能工作,因为 Node.js 位于另一个服务 Nginx 后面,该服务器缓冲上传过程,当完全上传后才传递给 Node.js。

但是你可以下载这些代码,在本地运行以进行完整演示:

结果
server.js
uploader.js
index.html
let http = require('http');
let static = require('node-static');
let fileServer = new static.Server('.');
let path = require('path');
let fs = require('fs');
let debug = require('debug')('example:resume-upload');

let uploads = Object.create(null);

function onUpload(req, res) {

  let fileId = req.headers['x-file-id'];
  let startByte = req.headers['x-start-byte'];

  if (!fileId) {
    res.writeHead(400, "No file id");
    res.end();
  }

  // 文件位置 “nowhere”
  let filePath = '/dev/null';
  // 可以使用真实路径替代,例如:
  // let filePath = path.join('/tmp', fileId);

  debug("onUpload fileId: ", fileId);

  // 初始化新 upload
  if (!uploads[fileId]) uploads[fileId] = {};
  let upload = uploads[fileId];

  debug("bytesReceived:" + upload.bytesReceived + " startByte:" + startByte)

  let fileStream;

  // 如果 startByte 是 0 或者没有设置,就创建一个新文件,否则检查文件大小并追加到已存在的文件上
  if (!startByte) {
    upload.bytesReceived = 0;
    fileStream = fs.createWriteStream(filePath, {
      flags: 'w'
    });
    debug("New file created: " + filePath);
  } else {
    // 我们也可以检查磁盘(on-disk)文件大小
    if (upload.bytesReceived != startByte) {
      res.writeHead(400, "Wrong start byte");
      res.end(upload.bytesReceived);
      return;
    }
    // 追加到已存在的文件上
    fileStream = fs.createWriteStream(filePath, {
      flags: 'a'
    });
    debug("File reopened: " + filePath);
  }


  req.on('data', function(data) {
    debug("bytes received", upload.bytesReceived);
    upload.bytesReceived += data.length;
  });

  // 将请求体发送到文件
  req.pipe(fileStream);

  // 当请求完成时,所有数据都被写入磁盘
  fileStream.on('close', function() {
    if (upload.bytesReceived == req.headers['x-file-size']) {
      debug("Upload finished");
      delete uploads[fileId];

      // 可以对上传的文件进行一些处理

      res.end("Success " + upload.bytesReceived);
    } else {
      // 连接丢失,我们留下未完成的文件
      debug("File unfinished, stopped at " + upload.bytesReceived);
      res.end();
    }
  });

  // 在 I/O 错误的情况下 —— 完成请求
  fileStream.on('error', function(err) {
    debug("fileStream error");
    res.writeHead(500, "File error");
    res.end();
  });

}

function onStatus(req, res) {
  let fileId = req.headers['x-file-id'];
  let upload = uploads[fileId];
  debug("onStatus fileId:", fileId, " upload:", upload);
  if (!upload) {
    res.end("0")
  } else {
    res.end(String(upload.bytesReceived));
  }
}


function accept(req, res) {
  if (req.url == '/status') {
    onStatus(req, res);
  } else if (req.url == '/upload' && req.method == 'POST') {
    onUpload(req, res);
  } else {
    fileServer.serve(req, res);
  }

}




// -----------------------------------

if (!module.parent) {
  http.createServer(accept).listen(8080);
  console.log('Server listening at port 8080');
} else {
  exports.accept = accept;
}
class Uploader {

  constructor({file, onProgress}) {
    this.file = file;
    this.onProgress = onProgress;

    // 创建文件的唯一标识 fileId
    // 我们还可以添加用户会话标识符(如果有的话)来使它更唯一
    this.fileId = file.name + '-' + file.size + '-' + +file.lastModifiedDate;
  }

  async getUploadedBytes() {
    let response = await fetch('status', {
      headers: {
        'X-File-Id': this.fileId
      }
    });

    if (response.status != 200) {
      throw new Error("Can't get uploaded bytes: " + response.statusText);
    }

    let text = await response.text();

    return +text;
  }

  async upload() {
    this.startByte = await this.getUploadedBytes();

    let xhr = this.xhr = new XMLHttpRequest();
    xhr.open("POST", "upload", true);

    // 发送文件 id,这样服务器就知道要恢复哪个文件
    xhr.setRequestHeader('X-File-Id', this.fileId);
    // 发送我们开始进行恢复的起点字节(即之前已经上传的字节数),这样服务器就知道我们正在恢复中
    xhr.setRequestHeader('X-Start-Byte', this.startByte);

    xhr.upload.onprogress = (e) => {
      this.onProgress(this.startByte + e.loaded, this.startByte + e.total);
    };

    console.log("send the file, starting from", this.startByte);
    xhr.send(this.file.slice(this.startByte));

    // 返回值
    //   如果 upload 成功,返回 true,
    //   如果终止,返回 false
    // 如果出现错误,抛出
    return await new Promise((resolve, reject) => {

      xhr.onload = xhr.onerror = () => {
        console.log("upload end status:" + xhr.status + " text:" + xhr.statusText);

        if (xhr.status == 200) {
          resolve(true);
        } else {
          reject(new Error("Upload failed: " + xhr.statusText));
        }
      };

      // 仅在 xhr.abort() 被调用后才会触发 onabort
      xhr.onabort = () => resolve(false);

    });

  }

  stop() {
    if (this.xhr) {
      this.xhr.abort();
    }
  }

}
<!DOCTYPE HTML>

<script src="uploader.js"></script>

<form name="upload" method="POST" enctype="multipart/form-data" action="/upload">
  <input type="file" name="myfile">
  <input type="submit" name="submit" value="Upload (Resumes automatically)">
</form>

<button onclick="uploader.stop()">Stop upload</button>


<div id="log">Progress indication</div>

<script>
  function log(html) {
    document.getElementById('log').innerHTML = html;
    console.log(html);
  }

  function onProgress(loaded, total) {
    log("progress " + loaded + ' / ' + total);
  }

  let uploader;

  document.forms.upload.onsubmit = async function(e) {
    e.preventDefault();

    let file = this.elements.myfile.files[0];
    if (!file) return;

    uploader = new Uploader({file, onProgress});

    try {
      let uploaded = await uploader.upload();

      if (uploaded) {
        log('success');
      } else {
        log('stopped');
      }

    } catch(err) {
      console.error(err);
      log('error');
    }
  };

</script>

正如你所见,现代网络方法在功能上和文件管理器非常接近 —— 控制 headers,进度指示,发送文件片段等等。

教程路线图

评论

在评论之前先阅读本内容…
  • 欢迎你在文章下添加补充内容、提出你的问题或回答提出的问题。
  • 使用 <code> 标签插入几行代码,对于多行代码 — 可以使用 <pre>,对于超过十行的代码 — 建议使用沙箱(plnkrJSBincodepen 等)。
  • 如果你无法理解文章中的内容 — 请详细说明。