<div dir="ltr"><div>marmoute, durin42, et al: I'd like you to think about possibly integrating the "stream clone" data format into changegroups/bundles. I'm not sure if this is something we could sneak into cg1 or cg2 as an alternative "compression" format or whether this will require a bundle2 part. Whatever the solution, this is something I'd like to see landed in 3.5 so servers can emit the stream clone data to vanilla clients.<br><br></div>As it stands, I imagine Mozilla will copy the stream handling code into our bundleclone extension until we can consume stream clones with vanilla Mercurial, presumably with bundle2.<br></div><div class="gmail_extra"><br><div class="gmail_quote">On Thu, May 21, 2015 at 10:41 AM, Gregory Szorc <span dir="ltr"><<a href="mailto:gregory.szorc@gmail.com" target="_blank">gregory.szorc@gmail.com</a>></span> wrote:<br><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex"># HG changeset patch<br>
# User Gregory Szorc <<a href="mailto:gregory.szorc@gmail.com">gregory.szorc@gmail.com</a>><br>
# Date 1432229242 25200<br>
# Thu May 21 10:27:22 2015 -0700<br>
# Node ID 0aa9c408c2be4492495dce4ae04aea4472eefbeb<br>
# Parent 451df92cec4912aefac57a4cf82e9268192c867b<br>
exchange: move code for generating a streaming clone into exchange<br>
<br>
Streaming clones are fast because they are essentially tar files.<br>
On mozilla-central, a streaming clone only consumes ~55s CPU time<br>
on clients as opposed to ~340s CPU time for a regular clone or gzip<br>
bundle unbundle.<br>
<br>
Mozilla is deploying static file "lookaside" support to our Mercurial<br>
server. Static bundles are pre-generated and uploaded to S3. When a<br>
clone is performed, the static file is fetched, applied, and then an<br>
incremental pull is performed. Unfortunately, on an ideal network<br>
connection this still takes as much wall and CPU time as a regular<br>
clone (although it does save significant server resources).<br>
<br>
We like the client-side wall time wins of streaming clones. But we want<br>
to leverage S3-based pre-generated files for serving the bulk of clone<br>
data.<br>
<br>
This patch moves the code for producing a "stream bundle" into its<br>
own standalone function, away from the wire protocol. This will enable<br>
stream bundle files to be produced outside the context of the wire<br>
protocol.<br>
<br>
A bikeshed on whether exchange is the best module for this function<br>
might be warranted. I selected exchange instead of changegroup because<br>
"stream bundles" aren't changegroups (yet).<br>
<br>
diff --git a/mercurial/exchange.py b/mercurial/exchange.py<br>
--- a/mercurial/exchange.py<br>
+++ b/mercurial/exchange.py<br>
@@ -7,9 +7,9 @@<br>
<br>
from i18n import _<br>
from node import hex, nullid<br>
import errno, urllib<br>
-import util, scmutil, changegroup, base85, error<br>
+import util, scmutil, changegroup, base85, error, store<br>
import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey<br>
import lock as lockmod<br>
<br>
def readbundle(ui, fh, fname, vfs=None):<br>
@@ -1331,4 +1331,69 @@ def unbundle(repo, cg, heads, source, ur<br>
lockmod.release(tr, lock, wlock)<br>
if recordout is not None:<br>
recordout(repo.ui.popbuffer())<br>
return r<br>
+<br>
+# This is it's own function so extensions can override it.<br>
+def _walkstreamfiles(repo):<br>
+ return repo.store.walk()<br>
+<br>
+def generatestreamclone(repo):<br>
+ """Emit content for a streaming clone.<br>
+<br>
+ This is a generator of raw chunks that constitute a streaming clone.<br>
+<br>
+ The stream begins with a line of 2 space-delimited integers containing the<br>
+ number of entries and total bytes size.<br>
+<br>
+ Next, are N entries for each file being transferred. Each file entry starts<br>
+ as a line with the file name and integer size delimited by a null byte.<br>
+ The raw file data follows. Following the raw file data is the next file<br>
+ entry, or EOF.<br>
+<br>
+ When used on the wire protocol, an additional line indicating protocol<br>
+ success will be prepended to the stream. This function is not responsible<br>
+ for adding it.<br>
+<br>
+ This function will obtain a repository lock to ensure a consistent view of<br>
+ the store is captured. It therefore may raise LockError.<br>
+ """<br>
+ entries = []<br>
+ total_bytes = 0<br>
+ # Get consistent snapshot of repo, lock during scan.<br>
+ lock = repo.lock()<br>
+ try:<br>
+ repo.ui.debug('scanning\n')<br>
+ for name, ename, size in _walkstreamfiles(repo):<br>
+ if size:<br>
+ entries.append((name, size))<br>
+ total_bytes += size<br>
+ finally:<br>
+ lock.release()<br>
+<br>
+ repo.ui.debug('%d files, %d bytes to transfer\n' %<br>
+ (len(entries), total_bytes))<br>
+ yield '%d %d\n' % (len(entries), total_bytes)<br>
+<br>
+ sopener = repo.svfs<br>
+ oldaudit = sopener.mustaudit<br>
+ debugflag = repo.ui.debugflag<br>
+ sopener.mustaudit = False<br>
+<br>
+ try:<br>
+ for name, size in entries:<br>
+ if debugflag:<br>
+ repo.ui.debug('sending %s (%d bytes)\n' % (name, size))<br>
+ # partially encode name over the wire for backwards compat<br>
+ yield '%s\0%d\n' % (store.encodedir(name), size)<br>
+ if size <= 65536:<br>
+ fp = sopener(name)<br>
+ try:<br>
+ data = fp.read(size)<br>
+ finally:<br>
+ fp.close()<br>
+ yield data<br>
+ else:<br>
+ for chunk in util.filechunkiter(sopener(name), limit=size):<br>
+ yield chunk<br>
+ finally:<br>
+ sopener.mustaudit = oldaudit<br>
diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py<br>
--- a/mercurial/wireproto.py<br>
+++ b/mercurial/wireproto.py<br>
@@ -743,75 +743,29 @@ def pushkey(repo, proto, namespace, key,<br>
<br>
def _allowstream(ui):<br>
return ui.configbool('server', 'uncompressed', True, untrusted=True)<br>
<br>
-def _walkstreamfiles(repo):<br>
- # this is it's own function so extensions can override it<br>
- return repo.store.walk()<br>
-<br>
@wireprotocommand('stream_out')<br>
def stream(repo, proto):<br>
'''If the server supports streaming clone, it advertises the "stream"<br>
capability with a value representing the version and flags of the repo<br>
it is serving. Client checks to see if it understands the format.<br>
-<br>
- The format is simple: the server writes out a line with the amount<br>
- of files, then the total amount of bytes to be transferred (separated<br>
- by a space). Then, for each file, the server first writes the filename<br>
- and file size (separated by the null character), then the file contents.<br>
'''<br>
-<br>
if not _allowstream(repo.ui):<br>
return '1\n'<br>
<br>
- entries = []<br>
- total_bytes = 0<br>
+ def getstream(it):<br>
+ yield '0\n'<br>
+ for chunk in it:<br>
+ yield chunk<br>
+<br>
try:<br>
- # get consistent snapshot of repo, lock during scan<br>
- lock = repo.lock()<br>
- try:<br>
- repo.ui.debug('scanning\n')<br>
- for name, ename, size in _walkstreamfiles(repo):<br>
- if size:<br>
- entries.append((name, size))<br>
- total_bytes += size<br>
- finally:<br>
- lock.release()<br>
+ # LockError may be raised before the first result is yielded. Don't<br>
+ # emit output until we're sure we got the lock successfully.<br>
+ it = exchange.generatestreamclone(repo)<br>
+ return streamres(getstream(it))<br>
except error.LockError:<br>
- return '2\n' # error: 2<br>
-<br>
- def streamer(repo, entries, total):<br>
- '''stream out all metadata files in repository.'''<br>
- yield '0\n' # success<br>
- repo.ui.debug('%d files, %d bytes to transfer\n' %<br>
- (len(entries), total_bytes))<br>
- yield '%d %d\n' % (len(entries), total_bytes)<br>
-<br>
- sopener = repo.svfs<br>
- oldaudit = sopener.mustaudit<br>
- debugflag = repo.ui.debugflag<br>
- sopener.mustaudit = False<br>
-<br>
- try:<br>
- for name, size in entries:<br>
- if debugflag:<br>
- repo.ui.debug('sending %s (%d bytes)\n' % (name, size))<br>
- # partially encode name over the wire for backwards compat<br>
- yield '%s\0%d\n' % (store.encodedir(name), size)<br>
- if size <= 65536:<br>
- fp = sopener(name)<br>
- try:<br>
- data = fp.read(size)<br>
- finally:<br>
- fp.close()<br>
- yield data<br>
- else:<br>
- for chunk in util.filechunkiter(sopener(name), limit=size):<br>
- yield chunk<br>
- finally:<br>
- sopener.mustaudit = oldaudit<br>
-<br>
- return streamres(streamer(repo, entries, total_bytes))<br>
+ return '2\n'<br>
<br>
@wireprotocommand('unbundle', 'heads')<br>
def unbundle(repo, proto, heads):<br>
their_heads = decodelist(heads)<br>
</blockquote></div><br></div>