如何进行大文件传输?

本文首发微信公众号:码上观世界

网络文件传输的应用场景很多,如网络聊天的点对点传输、文件同步网盘的上传与下载、文件上传到分布式文件存储器等,其传输速度主要受限于网络带宽、存储器大小、CPU处理速度以及磁盘读写速度,尤其是网络带宽。本文主要讨论通常情况下数十GB规模大小的文件传输的优化方式,对于更大规模的文件容量建议考虑人工硬盘运输,毕竟基于公路运输的方式不仅带宽大而且成本低。

文件传输涉及到客户端、中间网络和服务器,常用的传输协议有HTTP(s)、(S)FTP和TCP(UDP)协议等,对于客户端用户来讲,能够起作用的地方不大,所以本文就两种基本的场景来讨论文件传输在客户端的优化方式:基于HTTP协议的非结构化文件传输和基于TCP协议的结构化文件传输。

基于HTTP协议的非结构化文件传输

最常用的文件上传是基于HTTP POST。观察浏览器的请求头数据可知,文件的二进制数据被置于请求body里面,也就是说在上传文件过程中,客户端是一次性将文件内容加载到内存,如果文件过大,浏览器很可能会崩溃,加上HTTP请求连接本身有超时时间限制,所以这种方式不适合传输大文件。

所以一种自然的方式就是手写符合规范的HTTP协议跟服务端通信:

上面的示例代码相比通过浏览器上传文件方式显得自由度更大,但是问题也更多,比如OutputStream将数据写入到PosterOutputStream内部缓冲区,而该缓冲区只有当调用HttpURLConnection的getInputStream方法之后才会发送到Socket流中。所以当文件过大(也许几十MB)就会导致内存溢出,即使通过调用flush方法也无济于事,因为PosterOutputStream的flush方法是空操作,什么都不干!幸运的是HttpURLConnection提供的setFixedLengthStreamingMode方法能够获取到自动刷新流缓存的StreamingOutputStream。虽然这种方式能够解决问题,但是还可能会遇到其他大大小小的坑,而且上述方式还是过于原始,使用Apache HttpClient能够轻易实现上述功能:

HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
httpClientBuilder.setDefaultCredentialsProvider(credsProvider);
RequestConfig requestConfig = RequestConfig.custom().setCookieSpec(CookieSpecs.DEFAULT).build();
CloseableHttpClient httpClient = httpClientBuilder.build();
File file = new File(filePath);
HttpPut httpPut = new HttpPut(url);
FileEntity fileEntity = new FileEntity(file);
httpPut.setEntity(fileEntity);
FileInputStream fileInputStream = new FileInputStream(file);
InputStreamEntity reqEntity = new InputStreamEntity(fileInputStream, file.length());
//post.setEntity(reqEntity);
HttpResponse response = httpClient.execute(httpPut);
String content = EntityUtils.toString(response.getEntity());

示例代码中,HttpClient帮我们封装了协议相关的所有内容。对于文件传输FileEntity 和InputStreamEntity 都可以使用,不同的是,InputStreamEntity 用了流传输的方式,我们需要做的是就是验证这两种方式是否存在文件过大导致的内存溢出问题。先看FileEntity ,直接翻到代码DefaultBHttpClientConnection :

class DefaultBHttpClientConnection extends BHttpConnectionBase{
......
public void sendRequestEntity(final HttpEntityEnclosingRequest request)
throws HttpException, IOException {Args.notNull(request, "HTTP request");ensureOpen();final HttpEntity entity = request.getEntity();if (entity == null) {return;}final OutputStream outstream = prepareOutput(request);entity.writeTo(outstream);outstream.close();}
......
}
class FileEntity{
......
public void writeTo(final OutputStream outstream) throws IOException {
Args.notNull(outstream, "Output stream");final InputStream instream = new FileInputStream(this.file);try {final byte[] tmp = new byte[OUTPUT_BUFFER_SIZE];int l;while ((l = instream.read(tmp)) != -1) {outstream.write(tmp, 0, l);}outstream.flush();} finally {instream.close();}
}
......
}

再看看InputStreamEntity:

class InputStreamEntity{
......
public void writeTo(final OutputStream outstream) throws IOException {
Args.notNull(outstream, "Output stream");final InputStream instream = this.content;try {final byte[] buffer = new byte[OUTPUT_BUFFER_SIZE];int l;if (this.length < 0) {// consume until EOFwhile ((l = instream.read(buffer)) != -1) {outstream.write(buffer, 0, l);}} else {// consume no more than lengthlong remaining = this.length;while (remaining > 0) {l = instream.read(buffer, 0, (int)Math.min(OUTPUT_BUFFER_SIZE, remaining));if (l == -1) {break;}outstream.write(buffer, 0, l);remaining -= l;}}} finally {instream.close();}
}
......
}

可见FileEntity 和InputStreamEntity使用了相同的outstream,其生成方式为:

class BHttpConnectionBase{
......protected OutputStream prepareOutput(final HttpMessage message) throws HttpException {final long len = this.outgoingContentStrategy.determineLength(message);return createOutputStream(len, this.outbuffer);}protected OutputStream createOutputStream(final long len,final SessionOutputBuffer outbuffer) {if (len == ContentLengthStrategy.CHUNKED) {return new ChunkedOutputStream(2048, outbuffer);} else if (len == ContentLengthStrategy.IDENTITY) {return new IdentityOutputStream(outbuffer);} else {return new ContentLengthOutputStream(outbuffer, len);}}
......
}

这里以ContentLengthOutputStream为例来看数据是如何发送到Socket流中的:

class ContentLengthOutputStream{private final SessionOutputBuffer out;
......public void write(final byte[] b, final int off, final int len) throws IOException {if (this.closed) {throw new IOException("Attempted write to closed stream.");}if (this.total < this.contentLength) {final long max = this.contentLength - this.total;int chunk = len;if (chunk > max) {chunk = (int) max;}this.out.write(b, off, chunk);this.total += chunk;}
}
......
}
class SessionOutputBufferImpl{private OutputStream outstream;
......public void write(final byte[] b, final int off, final int len) throws IOException {if (b == null) {return;}// Do not want to buffer large-ish chunks// if the byte array is larger then MIN_CHUNK_LIMIT// write it directly to the output streamif (len > this.fragementSizeHint || len > this.buffer.capacity()) {// flush the bufferflushBuffer();// write directly to the out streamstreamWrite(b, off, len);this.metrics.incrementBytesTransferred(len);} else {// Do not let the buffer grow unnecessarilyfinal int freecapacity = this.buffer.capacity() - this.buffer.length();if (len > freecapacity) {// flush the bufferflushBuffer();}// bufferthis.buffer.append(b, off, len);}
}
private void flushBuffer() throws IOException {final int len = this.buffer.length();if (len > 0) {streamWrite(this.buffer.buffer(), 0, len);this.buffer.clear();this.metrics.incrementBytesTransferred(len);}
}
private void streamWrite(final byte[] b, final int off, final int len) throws IOException {Asserts.notNull(outstream, "Output stream");this.outstream.write(b, off, len);
}
......
}
class SocketOutputStream {
......public void write(byte b[], int off, int len) throws IOException {socketWrite(b, off, len);}
......
}

通过上面关键代码可见,不管用哪一种Entity,当缓冲区满了就自动flush到Socket,理论上都可以进行大文件传输,只要超时时间允许,两者并没有什么特别的不同。

基于TCP协议的结构化文件传输

基于HTTP协议的文件传输,虽然通过流的方式能解决大文件传输问题,但是基于应用层协议毕竟效率不到,时间消耗仍是个大问题,尽管可以通过文件拆分,并行处理,但需要服务器端的配合才能完成(比如将小文件还原,断点续传等)。这里讨论的多文件传输到分布式系统不需要对服务端再做改造就能直接使用,天然具备并行处理能力。对于结构化文件传输的使用场景多用于数据迁移,比如从数据库系统或者文件系统传输到大数据存储计算平台。这里以将本地的CSV文件上传到HDFS为例,需要解决的是如何对文件拆分。虽然对非结构化,半结构化文件因为涉及到分隔符问题,对于文件拆分有点儿难度,但对规范化格式的文件,问题倒不大,但考虑让问题描述更简洁,这里不考虑文件拆分,只考虑一个文件(比如文件夹下已经拆分后的某个文件)的传输问题。该问题模型可以描述为:

引入Channel是为了解决File和HDFS存取速率不匹配的问题,通过Channel连接File读过程和HDFS写过程:当Channel缓存满的时候,File等待HDFS读取之后再开始写入Channel,HDFS读取之后File再写入Channel,两者通过信号量机制协调,HDFS每次写入都是一个独立的文件。关键代码实现如下:

File端读取数据到Channel:

public void readCSV(String filePath, String fieldDelimiter) {
......BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"), 8192);CsvReader csvReader = new CsvReader(reader);csvReader.setDelimiter(fieldDelimiter.charAt(0));String[] parseRows;while ((parseRows = splitBufferedReader(csvReader)) != null) {//Record 为文件中一行数据记录,由Column组成Record record = createRecord(parseRows);this.buffer.add(record);if (this.buffer.size() >= MemoryChannel.bufferSize) {this.channel.pushAll(this.buffer);this.buffer.clear();}}this.channel.pushAll(this.buffer);this.buffer.clear();
......}
//基于内存的Channel实现
class  MemoryChannel{private ArrayBlockingQueue<Record> queue;private ReentrantLock lock;private Condition notInsufficient, notEmpty;
......//将File读取端将记录push到Channelpublic void pushAll(final Collection<Record> rs) {Validate.notNull(rs);Validate.noNullElements(rs);try {lock.lockInterruptibly();while (!this.queue.isEmpty()) {notInsufficient.await(200L, TimeUnit.MILLISECONDS);}this.queue.addAll(rs);notEmpty.signalAll();} catch (InterruptedException e) {throw new RuntimeException("pushAll", e);} finally {lock.unlock();}}......
}
class HdfsWriteService{
......
public void writeFile(String fieldDelimiter) {FileOutputFormat outFormat = new TextOutputFormat();outFormat.setOutputPath(jobConf, outputPath);outFormat.setWorkOutputPath(jobConf, outputPath);List<Record> recordList= new ArrayList(MemoryChannel.bufferSize);this.channel.pullAll(recordList);RecordWriter writer = outFormat.getRecordWriter(fileSystem, jobConf, outputPath.toString(), Reporter.NULL);for (Record record : recordList) {//将Record记录组装成HDFS的TEXT行记录,列分隔符可自定义Text recordResult = new Text(StringUtils.join(mergeColumn(record), fieldDelimiter));writer.write(NullWritable.get(), recordResult);}writer.close(Reporter.NULL);
}
......
//基于内存的Channel实现
class  MemoryChannel{
......//HDFS写入端从Channel中Pull记录public void pullAll(Collection<Record> rs) {assert rs != null;rs.clear();try {lock.lockInterruptibly();while (this.queue.drainTo(rs, bufferSize) <= 0) {notEmpty.await(200L, TimeUnit.MILLISECONDS);}notInsufficient.signalAll();} catch (InterruptedException e) {throw new RuntimeException("pullAll", e);} finally {lock.unlock();}
}
......
}

上述方式是实现多文件并行传输的基础,每个独立Channel的传输过程互不影响,即使当前Chanel过程失败,也可以独立重跑恢复。

END

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/252812.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

HTTP传输大文件

一 概述 早期网络传输的文件非常小&#xff0c;只是一些几K大小的文本和图片&#xff0c;随着网络技术的发展&#xff0c;传输的不仅有几M的图片&#xff0c;还有可以达到几G和几十G的视频。 在这些大文件传输的情况下&#xff0c;100M的光纤或者4G移动网络都会因为网络压力导致…

大文件传输有哪些方式可用?大文件传输有哪些方式?

大文件传输有哪些方式可用&#xff1f;大文件传输有哪些方式&#xff1f;互联网时代&#xff0c;速度决定效率。在企业生产过程中需要进行信息数据交换、搬运。这时就需要进行大文件传输。方方面面的行业都要涉及到大文件传输。例如影视行业需要每天进行视频素材的传输&#xf…

大文件如何快速上传

之前仿造uploadify写了一个HTML5版的文件上传插件&#xff0c;没看过的朋友可以点此先看一下~得到了不少朋友的好评&#xff0c;我自己也用在了项目中&#xff0c;不论是用户头像上传&#xff0c;还是各种媒体文件的上传&#xff0c;以及各种个性的业务需求&#xff0c;都能得到…

如何快速传输大文件,介绍大文件快速方法

现在&#xff0c;企业比以往任何时候都面临着一个重大挑战&#xff1a;需要一个快速共享文件的解决方案。但是&#xff0c;并非所有快速文件传输解决方案都以相同的速度传输文件。文件大小、端点位置、路径、设备、防火墙、网络系统和加密需求都会限制文件的传输速度。 什么是快…

如何快速地向服务器传大文件,大文件如何快速传输

在这个互联网时代&#xff0c;信息更新速度逐渐加快。用户在进行文件传输时&#xff0c;一定是希望既稳定又快速的&#xff0c;并且还能够保证安全。但是通常来讲&#xff0c;FTP文件传输并不能同时实现这三点的&#xff0c;特别是上传大文件时&#xff0c;FTP上传文件速度明显…

快速传输大文件,怎么通过网络传大文件给对方(1G以上)

在生活和工作中&#xff0c;我们总是要发送一些比较大的文件给别人&#xff0c;或者在自己的设备之间。在互联网日益发达的今天&#xff0c;我们可以用什么方法通过互联网快速传输大文件&#xff0c;发送1G以上的文件&#xff1f; 一、使用QQ传 在电脑上打开QQ&#xff0c;选…

新装win10系统的桌面没有“我的电脑”图标解决办法

新装win10系统的桌面是没有“我的电脑”图标&#xff0c;需要我们自己配置&#xff0c;以下是具体步骤&#xff1a; 1、可以看到桌面上面没有我的电脑&#xff1b; 2、在桌面空白处点击右键&#xff0c;选择“个性化”&#xff1b; 3、选择“主题”&#xff0c;再选择“更改桌…

Win系统 - Win10 电脑桌面的 WiFi 图标不见了怎么办?

我们在使用电脑是&#xff0c;有时回发现电脑右下角的WiFi图标不见了&#xff0c;这让我们身份不方便&#xff0c;因为我们不知道是否连上WiFi了没有&#xff0c;连上谁的WiFi&#xff0c;今天教你怎么把它显示出来。

【图标消失】Win10我的电脑/回收站的图标消失

步骤&#xff1a; (右键)-个性化-主题-(下滑找到)相关的设置-桌面图标设置-勾选计算机/回收站-确定

win10我的电脑图标不见了的恢复方法

1.在桌面空白处单击鼠标右键, 2.选择最下方个性化选项 3,选择主题选项,并选择桌面图标设置 4.把计算机图标勾上,点击确定即可.

Win10电脑桌面上没有‘我的电脑’图标

NO.1 在电脑屏幕随机处右击&#xff0c;找到个性化设置 NO.2 进入个性化后&#xff0c;点击主题&#xff0c;往下翻找到桌面图标设置 NO.3 进入桌面图标设置后&#xff0c;把计算机&#xff08;M&#xff09;打钩即可&#xff0c;我的电脑图标就出现了

Win10系统如何在桌面添加我的电脑图标

步骤1&#xff1a;电脑桌面空白处鼠标右键选择个性化 步骤2&#xff1a;在个性化里找到主题&#xff0c;点击进入主题&#xff01; 步骤3:依次点击相关设置-------桌面图标设置&#xff01; 步骤4&#xff1a;桌面图标里有一个计算机&#xff0c;现在是未勾选状态&#xff0c;所…

win10的计算机 桌面图标不见了怎么办,Win10我的电脑怎么放在桌面?Win10此电脑图标不见了的解决方法...

电脑安装官方原版Windows10系统之后&#xff0c;发现桌面上只有一个回收站&#xff0c;而我的电脑(此电脑)图标不见了&#xff0c;我们要如何找回呢&#xff1f;那么Win10我的电脑怎么放在桌面&#xff1f;装机之家分享一下Win10此电脑图标不见了的解决方法。 Win10此电脑图标不…

Java编程基础篇

目录 计算机、程序和Java概述计算机组成 基本程序设计标识符数值数据类型格式化控制台输出输入输出重定向输入重定向输出重定向输入输出重定向方法重载变量的范围 数组声明数组变量创建数组初始化数组可边长参数二维数组静态变量、常量和方法静态变量 限定字符串StringBuilder和…

java编程语言基础

1&#xff0e; Java的注释方式有哪几种&#xff1f;如何书写&#xff1f; 单行注释&#xff1a;用两个正斜线“//”注释掉斜线后的同行文本。 多行注释&#xff1a;“/”和“/”符号用于多行代码的注释。用“/”打开注释&#xff0c;“/”关闭注释。所有在二者之间的文本都会…

JAVA的编程基础(上)

1. JAVA的基本语法 1.1 注释 单行注释&#xff1a;// 多行注释&#xff1a;/* */ 文档注释&#xff1a;/** */ 注意&#xff1a;单行注释可以嵌套&#xff0c;多行注释和文档注释不可以嵌套 1.2 关键字 含义&#xff1a;在编程语言中&#xff0c;已经被赋予一些特殊含义…

JAVA编程基础复习

Java入门 Java概述 计算机语言总的来说可以分成三类&#xff1a; 机器语言&#xff1a;计算机可以识别的语言&#xff0c;二进制编码。 汇编语言&#xff1a;英文缩写的标识符。 高级语言&#xff1a;接近人类自然语言&#xff0c;Java就是一种高级语言。 Java划分为三个技术…

Java 基础编程入门

一、什么是Java Java是一种编程语言二、应用场景 非windows平台下互联网环境的开发首选三、Java语言特点 1、应用面广 2、简单易学(相对于C/C) 3、面向对象(更符合我们看待事物的特点) 4、跨平台(一处编译&#xff0c;处处运行) 5、多线程(提升系统性能) 单线…

Java编程基础(1)

1.Java基本语法 &#xff08;1&#xff09;Java程序的基本格式&#xff1a; 修饰符 class 类名 { 程序代码 } 如果Java程序要运行&#xff0c;则必须要有main方法&#xff0c;main方法是程序运行的入口 格式&#xff1a; 修饰符 class 类名{ public static void main&…

Java编程基础之网络编程

网络编程 文章目录 网络编程概述C/S和B/S网络通信协议网络编程要素 IP类UDP协议数据传输发送端接收端udp数据传输丢失问题 TCP概述Socket客户端ServerSocket服务端Socket**服务器代码**客户端代码 文件上传实现客户端服务端多线程版 概述 C/S和B/S 网络通信协议 协议:protoco…