<div dir="ltr"><div>marmoute, durin42, et al: I'd like you to think about possibly integrating the "stream clone" data format into changegroups/bundles. I'm not sure if this is something we could sneak into cg1 or cg2 as an alternative "compression" format or whether this will require a bundle2 part. Whatever the solution, this is something I'd like to see landed in 3.5 so servers can emit the stream clone data to vanilla clients.<br><br></div>As it stands, I imagine Mozilla will copy the stream handling code into our bundleclone extension until we can consume stream clones with vanilla Mercurial, presumably with bundle2.<br></div><div class="gmail_extra"><br><div class="gmail_quote">On Thu, May 21, 2015 at 10:41 AM, Gregory Szorc <span dir="ltr"><<a href="mailto:gregory.szorc@gmail.com" target="_blank">gregory.szorc@gmail.com</a>></span> wrote:<br><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex"># HG changeset patch<br>
# User Gregory Szorc <<a href="mailto:gregory.szorc@gmail.com">gregory.szorc@gmail.com</a>><br>
# Date 1432229242 25200<br>
#      Thu May 21 10:27:22 2015 -0700<br>
# Node ID 0aa9c408c2be4492495dce4ae04aea4472eefbeb<br>
# Parent  451df92cec4912aefac57a4cf82e9268192c867b<br>
exchange: move code for generating a streaming clone into exchange<br>
<br>
Streaming clones are fast because they are essentially tar files.<br>
On mozilla-central, a streaming clone only consumes ~55s CPU time<br>
on clients as opposed to ~340s CPU time for a regular clone or gzip<br>
bundle unbundle.<br>
<br>
Mozilla is deploying static file "lookaside" support to our Mercurial<br>
server. Static bundles are pre-generated and uploaded to S3. When a<br>
clone is performed, the static file is fetched, applied, and then an<br>
incremental pull is performed. Unfortunately, on an ideal network<br>
connection this still takes as much wall and CPU time as a regular<br>
clone (although it does save significant server resources).<br>
<br>
We like the client-side wall time wins of streaming clones. But we want<br>
to leverage S3-based pre-generated files for serving the bulk of clone<br>
data.<br>
<br>
This patch moves the code for producing a "stream bundle" into its<br>
own standalone function, away from the wire protocol. This will enable<br>
stream bundle files to be produced outside the context of the wire<br>
protocol.<br>
<br>
A bikeshed on whether exchange is the best module for this function<br>
might be warranted. I selected exchange instead of changegroup because<br>
"stream bundles" aren't changegroups (yet).<br>
<br>
diff --git a/mercurial/exchange.py b/mercurial/exchange.py<br>
--- a/mercurial/exchange.py<br>
+++ b/mercurial/exchange.py<br>
@@ -7,9 +7,9 @@<br>
<br>
 from i18n import _<br>
 from node import hex, nullid<br>
 import errno, urllib<br>
-import util, scmutil, changegroup, base85, error<br>
+import util, scmutil, changegroup, base85, error, store<br>
 import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey<br>
 import lock as lockmod<br>
<br>
 def readbundle(ui, fh, fname, vfs=None):<br>
@@ -1331,4 +1331,69 @@ def unbundle(repo, cg, heads, source, ur<br>
         lockmod.release(tr, lock, wlock)<br>
         if recordout is not None:<br>
             recordout(repo.ui.popbuffer())<br>
     return r<br>
+<br>
+# This is it's own function so extensions can override it.<br>
+def _walkstreamfiles(repo):<br>
+    return repo.store.walk()<br>
+<br>
+def generatestreamclone(repo):<br>
+    """Emit content for a streaming clone.<br>
+<br>
+    This is a generator of raw chunks that constitute a streaming clone.<br>
+<br>
+    The stream begins with a line of 2 space-delimited integers containing the<br>
+    number of entries and total bytes size.<br>
+<br>
+    Next, are N entries for each file being transferred. Each file entry starts<br>
+    as a line with the file name and integer size delimited by a null byte.<br>
+    The raw file data follows. Following the raw file data is the next file<br>
+    entry, or EOF.<br>
+<br>
+    When used on the wire protocol, an additional line indicating protocol<br>
+    success will be prepended to the stream. This function is not responsible<br>
+    for adding it.<br>
+<br>
+    This function will obtain a repository lock to ensure a consistent view of<br>
+    the store is captured. It therefore may raise LockError.<br>
+    """<br>
+    entries = []<br>
+    total_bytes = 0<br>
+    # Get consistent snapshot of repo, lock during scan.<br>
+    lock = repo.lock()<br>
+    try:<br>
+        repo.ui.debug('scanning\n')<br>
+        for name, ename, size in _walkstreamfiles(repo):<br>
+            if size:<br>
+                entries.append((name, size))<br>
+                total_bytes += size<br>
+    finally:<br>
+            lock.release()<br>
+<br>
+    repo.ui.debug('%d files, %d bytes to transfer\n' %<br>
+                  (len(entries), total_bytes))<br>
+    yield '%d %d\n' % (len(entries), total_bytes)<br>
+<br>
+    sopener = repo.svfs<br>
+    oldaudit = sopener.mustaudit<br>
+    debugflag = repo.ui.debugflag<br>
+    sopener.mustaudit = False<br>
+<br>
+    try:<br>
+        for name, size in entries:<br>
+            if debugflag:<br>
+                repo.ui.debug('sending %s (%d bytes)\n' % (name, size))<br>
+            # partially encode name over the wire for backwards compat<br>
+            yield '%s\0%d\n' % (store.encodedir(name), size)<br>
+            if size <= 65536:<br>
+                fp = sopener(name)<br>
+                try:<br>
+                    data = fp.read(size)<br>
+                finally:<br>
+                    fp.close()<br>
+                yield data<br>
+            else:<br>
+                for chunk in util.filechunkiter(sopener(name), limit=size):<br>
+                    yield chunk<br>
+    finally:<br>
+        sopener.mustaudit = oldaudit<br>
diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py<br>
--- a/mercurial/wireproto.py<br>
+++ b/mercurial/wireproto.py<br>
@@ -743,75 +743,29 @@ def pushkey(repo, proto, namespace, key,<br>
<br>
 def _allowstream(ui):<br>
     return ui.configbool('server', 'uncompressed', True, untrusted=True)<br>
<br>
-def _walkstreamfiles(repo):<br>
-    # this is it's own function so extensions can override it<br>
-    return repo.store.walk()<br>
-<br>
 @wireprotocommand('stream_out')<br>
 def stream(repo, proto):<br>
     '''If the server supports streaming clone, it advertises the "stream"<br>
     capability with a value representing the version and flags of the repo<br>
     it is serving. Client checks to see if it understands the format.<br>
-<br>
-    The format is simple: the server writes out a line with the amount<br>
-    of files, then the total amount of bytes to be transferred (separated<br>
-    by a space). Then, for each file, the server first writes the filename<br>
-    and file size (separated by the null character), then the file contents.<br>
     '''<br>
-<br>
     if not _allowstream(repo.ui):<br>
         return '1\n'<br>
<br>
-    entries = []<br>
-    total_bytes = 0<br>
+    def getstream(it):<br>
+        yield '0\n'<br>
+        for chunk in it:<br>
+            yield chunk<br>
+<br>
     try:<br>
-        # get consistent snapshot of repo, lock during scan<br>
-        lock = repo.lock()<br>
-        try:<br>
-            repo.ui.debug('scanning\n')<br>
-            for name, ename, size in _walkstreamfiles(repo):<br>
-                if size:<br>
-                    entries.append((name, size))<br>
-                    total_bytes += size<br>
-        finally:<br>
-            lock.release()<br>
+        # LockError may be raised before the first result is yielded. Don't<br>
+        # emit output until we're sure we got the lock successfully.<br>
+        it = exchange.generatestreamclone(repo)<br>
+        return streamres(getstream(it))<br>
     except error.LockError:<br>
-        return '2\n' # error: 2<br>
-<br>
-    def streamer(repo, entries, total):<br>
-        '''stream out all metadata files in repository.'''<br>
-        yield '0\n' # success<br>
-        repo.ui.debug('%d files, %d bytes to transfer\n' %<br>
-                      (len(entries), total_bytes))<br>
-        yield '%d %d\n' % (len(entries), total_bytes)<br>
-<br>
-        sopener = repo.svfs<br>
-        oldaudit = sopener.mustaudit<br>
-        debugflag = repo.ui.debugflag<br>
-        sopener.mustaudit = False<br>
-<br>
-        try:<br>
-            for name, size in entries:<br>
-                if debugflag:<br>
-                    repo.ui.debug('sending %s (%d bytes)\n' % (name, size))<br>
-                # partially encode name over the wire for backwards compat<br>
-                yield '%s\0%d\n' % (store.encodedir(name), size)<br>
-                if size <= 65536:<br>
-                    fp = sopener(name)<br>
-                    try:<br>
-                        data = fp.read(size)<br>
-                    finally:<br>
-                        fp.close()<br>
-                    yield data<br>
-                else:<br>
-                    for chunk in util.filechunkiter(sopener(name), limit=size):<br>
-                        yield chunk<br>
-        finally:<br>
-            sopener.mustaudit = oldaudit<br>
-<br>
-    return streamres(streamer(repo, entries, total_bytes))<br>
+        return '2\n'<br>
<br>
 @wireprotocommand('unbundle', 'heads')<br>
 def unbundle(repo, proto, heads):<br>
     their_heads = decodelist(heads)<br>
</blockquote></div><br></div>