# HG changeset patch # User Vladimir Kvashin # Date 1369252348 -14400 # Node ID 80ca8bbc1e01b20264b7693173a3d4be28ee3d2d # Parent 29da0cda97df884d631bba6ac5bd5a6907711905 fixed #227936 - Smarter error processing when saving full remote file diff -r 29da0cda97df -r 80ca8bbc1e01 dlight.nativeexecution/src/org/netbeans/modules/nativeexecution/api/util/RemoteStatistics.java --- a/dlight.nativeexecution/src/org/netbeans/modules/nativeexecution/api/util/RemoteStatistics.java Wed May 22 12:04:18 2013 +0400 +++ b/dlight.nativeexecution/src/org/netbeans/modules/nativeexecution/api/util/RemoteStatistics.java Wed May 22 23:52:28 2013 +0400 @@ -45,12 +45,14 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.PrintStream; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import org.netbeans.modules.nativeexecution.api.ExecutionEnvironment; import org.netbeans.modules.nativeexecution.jsch.MeasurableSocketFactory; import org.netbeans.modules.nativeexecution.jsch.MeasurableSocketFactory.IOListener; import org.openide.modules.OnStop; @@ -61,6 +63,7 @@ @OnStop public final class RemoteStatistics implements Callable { + private static final String BREAK_UPLOADS_FLAG_FILE = System.getProperty("break.uploads"); // NOI18N public static final boolean COLLECT_STATISTICS = Boolean.parseBoolean(System.getProperty("jsch.statistics", "false")); // NOI18N public static final boolean COLLECT_STACKS = COLLECT_STATISTICS && Boolean.parseBoolean(System.getProperty("jsch.statistics.stacks", "false")); // NOI18N private static final TrafficCounters trafficCounters = new TrafficCounters(); @@ -274,10 +277,31 @@ @Override public void bytesUploaded(int bytes) { RemoteMeasurementsRef stat = reschedule(); + checkBreakUploads(); stat.stat.bytesUploaded(bytes); } + private void checkBreakUploads() { + if (BREAK_UPLOADS_FLAG_FILE != null && new File(BREAK_UPLOADS_FLAG_FILE).exists()) { + boolean isOpenW = false; + for (StackTraceElement el : Thread.currentThread().getStackTrace()) { + if (el.getClassName().endsWith(".ChannelSftp")) { // + if (el.getMethodName().equals("sendOPENW")) { + isOpenW = true; + break; + } + } + } + if (isOpenW) { + List recentConnections = ConnectionManager.getInstance().getRecentConnections(); + for (ExecutionEnvironment env : recentConnections) { + ConnectionManager.getInstance().disconnect(env); + } + } + } + } + @Override public void bytesDownloaded(int bytes) { RemoteMeasurementsRef stat = reschedule(); diff -r 29da0cda97df -r 80ca8bbc1e01 dlight.remote.impl/src/org/netbeans/modules/remote/impl/fs/PendingUploadsManager.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dlight.remote.impl/src/org/netbeans/modules/remote/impl/fs/PendingUploadsManager.java Wed May 22 23:52:28 2013 +0400 @@ -0,0 +1,197 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. + * + * Copyright 2013 Oracle and/or its affiliates. All rights reserved. + * + * Oracle and Java are registered trademarks of Oracle and/or its affiliates. + * Other names may be trademarks of their respective owners. + * + * The contents of this file are subject to the terms of either the GNU + * General Public License Version 2 only ("GPL") or the Common + * Development and Distribution License("CDDL") (collectively, the + * "License"). You may not use this file except in compliance with the + * License. You can obtain a copy of the License at + * http://www.netbeans.org/cddl-gplv2.html + * or nbbuild/licenses/CDDL-GPL-2-CP. See the License for the + * specific language governing permissions and limitations under the + * License. When distributing the software, include this License Header + * Notice in each file and include the License file at + * nbbuild/licenses/CDDL-GPL-2-CP. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the GPL Version 2 section of the License file that + * accompanied this code. If applicable, add the following below the + * License Header, with the fields enclosed by brackets [] replaced by + * your own identifying information: + * "Portions Copyrighted [year] [name of copyright owner]" + * + * If you wish your version of this file to be governed by only the CDDL + * or only the GPL Version 2, indicate your decision by adding + * "[Contributor] elects to include this software in this distribution + * under the [CDDL or GPL Version 2] license." If you do not indicate a + * single choice of license, a recipient has the option to distribute + * your version of this file under either the CDDL, the GPL Version 2 or + * to extend the choice of license to its licensees as provided above. + * However, if you add GPL Version 2 code and therefore, elected the GPL + * Version 2 license, then the option applies only if the new code is + * made subject to such option by the copyright holder. + * + * Contributor(s): + * + * Portions Copyrighted 2013 Sun Microsystems, Inc. + */ +package org.netbeans.modules.remote.impl.fs; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.netbeans.modules.remote.impl.RemoteLogger; + +/** + * + * @author vkvashin + */ +public class PendingUploadsManager { + + private final File storageFile; + private final Object lock = new Object(); + private final RemoteFileObjectFactory fileObjectFactory; + private final RemoteDirectory rootDirectory; + + /** files that are now being uploaded */ + private final Set pendingFiles = new HashSet(); + + /** files that failed to upload */ + private final Set failedFiles = new HashSet(); + + public PendingUploadsManager(RemoteFileObjectFactory fileObjectFactory, RemoteDirectory rootDirectory, File storageFile) { + this.storageFile = storageFile; + this.fileObjectFactory = fileObjectFactory; + this.rootDirectory = rootDirectory; + } + + public void addPendingUpload(RemotePlainFile file) { + synchronized (lock) { + pendingFiles.add(file.getPath()); + save(); + } + } + + public void addFailedUpload(RemotePlainFile file) { + synchronized (lock) { + failedFiles.add(file.getPath()); + save(); + } + } + + public void removePendingOrFailedUpload(RemotePlainFile file) { + synchronized (lock) { + pendingFiles.remove(file.getPath()); + failedFiles.remove(file.getPath()); + save(); + } + } + + public boolean isPendingOrFailedUpload(String path) { + synchronized (lock) { + return pendingFiles.contains(path) || failedFiles.contains(path); + } + } + + public void load() { + synchronized (lock) { + if (!storageFile.exists()) { + return; + } + BufferedReader isr = null; + try { + isr = new BufferedReader(new FileReader(storageFile)); + String line; + while ((line = isr.readLine()) != null) { + line = line.trim(); + if (!line.isEmpty()) { + failedFiles.add(line); + } + } + } catch (FileNotFoundException e) { + e.printStackTrace(System.err); + } catch (IOException e) { + e.printStackTrace(System.err); + } finally { + if (isr != null) { + try { + isr.close(); + } catch (IOException e) { + e.printStackTrace(System.err); + } + } + } + } + } + + public void save() { + synchronized (lock) { + BufferedWriter wr = null; + try { + wr = new BufferedWriter(new FileWriter(storageFile)); + for (String path : pendingFiles) { + wr.write(path.trim()); + wr.newLine(); + } + for (String path : failedFiles) { + wr.write(path.trim()); + wr.newLine(); + } + } catch (IOException e) { + e.printStackTrace(System.err); + } finally { + if (wr != null) { + try { + wr.close(); + } catch (IOException e) { + e.printStackTrace(System.err); + } + } + } + } + } + + public Collection getFailedFiles() { + List paths; + synchronized (lock) { + paths = new ArrayList(failedFiles); + } + List files = new ArrayList(paths.size()); + List pathsToRemove = new ArrayList(); + for (String path : paths) { + RemoteFileObjectBase fo = fileObjectFactory.getCachedFileObject(path); + if (fo == null) { + RemoteFileObject rfo = rootDirectory.getFileObject(path, new HashSet()); + if (rfo != null) { + fo = rfo.getImplementor(); + } + } + if (fo == null) { + pathsToRemove.remove(path); + } else { + if (fo instanceof RemotePlainFile) { + files.add((RemotePlainFile) fo); + } else { + pathsToRemove.remove(path); + } + } + } + synchronized (lock) { + failedFiles.removeAll(pathsToRemove); + } + return files; + } +} diff -r 29da0cda97df -r 80ca8bbc1e01 dlight.remote.impl/src/org/netbeans/modules/remote/impl/fs/RefreshManager.java --- a/dlight.remote.impl/src/org/netbeans/modules/remote/impl/fs/RefreshManager.java Wed May 22 12:04:18 2013 +0400 +++ b/dlight.remote.impl/src/org/netbeans/modules/remote/impl/fs/RefreshManager.java Wed May 22 23:52:28 2013 +0400 @@ -68,6 +68,7 @@ private final ExecutionEnvironment env; private final RemoteFileObjectFactory factory; + private final PendingUploadsManager pendingUploadsManager; private final RequestProcessor.Task updateTask; private final LinkedList queue = new LinkedList(); @@ -144,11 +145,12 @@ } } - public RefreshManager(ExecutionEnvironment env, RemoteFileObjectFactory factory) { + public RefreshManager(ExecutionEnvironment env, RemoteFileObjectFactory factory, PendingUploadsManager pendingUploadsManager) { this.env = env; this.factory = factory; + this.pendingUploadsManager = pendingUploadsManager; updateTask = new RequestProcessor("Remote File System RefreshManager " + env.getDisplayName(), 1).create(new RefreshWorker(false)); //NOI18N - } + } public void scheduleRefreshOnFocusGained(Collection fileObjects) { if (REFRESH_ON_FOCUS) { @@ -158,12 +160,13 @@ } public void scheduleRefreshOnConnect(Collection fileObjects) { + scheduleRefreshImpl(pendingUploadsManager.getFailedFiles(), false); if (REFRESH_ON_CONNECT) { RemoteLogger.getInstance().log(Level.FINE, "Refresh on connect schedulled for {0} directories on {1}", new Object[]{fileObjects.size(), env}); scheduleRefreshImpl(filterDirectories(fileObjects), false); } } - + private Collection filterDirectories(Collection fileObjects) { Collection result = new TreeSet(new PathComparator(true)); for (RemoteFileObjectBase fo : fileObjects) { @@ -223,7 +226,7 @@ } } - private void scheduleRefreshImpl(Collection fileObjects, boolean toTheHead) { + private void scheduleRefreshImpl(Collection fileObjects, boolean toTheHead) { if ( ! ConnectionManager.getInstance().isConnectedTo(env)) { RemoteLogger.getInstance().warning("scheduleRefresh(Collection) is called while host is not connected"); } diff -r 29da0cda97df -r 80ca8bbc1e01 dlight.remote.impl/src/org/netbeans/modules/remote/impl/fs/RemoteDirectory.java --- a/dlight.remote.impl/src/org/netbeans/modules/remote/impl/fs/RemoteDirectory.java Wed May 22 12:04:18 2013 +0400 +++ b/dlight.remote.impl/src/org/netbeans/modules/remote/impl/fs/RemoteDirectory.java Wed May 22 23:52:28 2013 +0400 @@ -405,7 +405,7 @@ List entries = storage.listValid(); List result = new ArrayList(entries.size()); for (DirEntry entry : entries) { - String path = getPath() + '/' + entry.getName(); + String path = getAbsolutePath(entry); RemoteFileObjectBase fo = getFileSystem().getFactory().getCachedFileObject(path); if (fo != null) { result.add(fo); @@ -698,14 +698,16 @@ changed = fire = true; File entryCache = new File(getCache(), oldEntry.getCache()); if (entryCache.exists()) { - if (trace) {trace("removing cache for updated file {0}", entryCache.getAbsolutePath());} // NOI18N - entryCache.delete(); // TODO: We must just mark it as invalid instead of physically deleting cache file... + if (!getFileSystem().getPendingUploadsManager().isPendingOrFailedUpload(getAbsolutePath(oldEntry))) { + if (trace) {trace("removing cache for updated file {0}", entryCache.getAbsolutePath());} // NOI18N + entryCache.delete(); // TODO: We must just mark it as invalid instead of physically deleting cache file... + } } } } if (!equals(newEntry.getLinkTarget(), oldEntry.getLinkTarget())) { changed = fire = true; // TODO: we forgot old link path, probably should be passed to change event - getFileSystem().getFactory().setLink(this, getPath() + '/' + newEntry.getName(), newEntry.getLinkTarget()); + getFileSystem().getFactory().setLink(this, getAbsolutePath(newEntry), newEntry.getLinkTarget()); } if (!newEntry.getAccessAsString().equals(oldEntry.getAccessAsString())) { changed = fire = true; @@ -808,7 +810,7 @@ fireRemoteFileObjectCreated(fo.getOwnerFileObject()); } for (DirEntry entry : entriesToFireChanged) { - RemoteFileObjectBase fo = getFileSystem().getFactory().getCachedFileObject(getPath() + '/' + entry.getName()); + RemoteFileObjectBase fo = getFileSystem().getFactory().getCachedFileObject(getAbsolutePath(entry)); if (fo != null) { RemoteFileObject ownerFileObject = fo.getOwnerFileObject(); fireFileChangedEvent(getListeners(), new FileEvent(ownerFileObject, ownerFileObject, false, ownerFileObject.lastModified().getTime())); @@ -851,8 +853,7 @@ } synchronized (refLock) { storageRef = new SoftReference(newStorage); - } - fo.setPendingRemoteDelivery(false); + } } } finally { writeLock.unlock(); @@ -1021,15 +1022,17 @@ changed = fire = true; File entryCache = new File(getCache(), oldEntry.getCache()); if (entryCache.exists()) { - if (trace) { trace("removing cache for updated file {0}", entryCache.getAbsolutePath()); } // NOI18N - entryCache.delete(); // TODO: We must just mark it as invalid instead of physically deleting cache file... + if (!getFileSystem().getPendingUploadsManager().isPendingOrFailedUpload(getAbsolutePath(oldEntry))) { + if (trace) { trace("removing cache for updated file {0}", entryCache.getAbsolutePath()); } // NOI18N + entryCache.delete(); // TODO: We must just mark it as invalid instead of physically deleting cache file... + } } } } if (!equals(newEntry.getLinkTarget(), oldEntry.getLinkTarget())) { changed = fire = true; // TODO: we forgot old link path, probably should be passed to change event - getFileSystem().getFactory().setLink(this, getPath() + '/' + newEntry.getName(), newEntry.getLinkTarget()); + getFileSystem().getFactory().setLink(this, getAbsolutePath(newEntry), newEntry.getLinkTarget()); } if (!newEntry.getAccessAsString().equals(oldEntry.getAccessAsString())) { changed = fire = true; @@ -1142,7 +1145,7 @@ fireRemoteFileObjectCreated(fo); } for (DirEntry entry : entriesToFireChanged) { - RemoteFileObjectBase fo = getFileSystem().getFactory().getCachedFileObject(getPath() + '/' + entry.getName()); + RemoteFileObjectBase fo = getFileSystem().getFactory().getCachedFileObject(getAbsolutePath(entry)); if (fo != null) { if (fo.isPendingRemoteDelivery()) { RemoteLogger.getInstance().log(Level.FINE, "Skipping change event for pending file {0}", fo); @@ -1153,7 +1156,7 @@ } } for (DirEntry entry : entriesToFireChangedRO) { - RemoteFileObjectBase fo = getFileSystem().getFactory().getCachedFileObject(getPath() + '/' + entry.getName()); + RemoteFileObjectBase fo = getFileSystem().getFactory().getCachedFileObject(getAbsolutePath(entry)); if (fo != null) { if (fo.isPendingRemoteDelivery()) { RemoteLogger.getInstance().log(Level.FINE, "Skipping change event for pending file {0}", fo); @@ -1169,7 +1172,11 @@ } return storage; } - + + private String getAbsolutePath(DirEntry oldEntry) { + return getPath() + '/' + oldEntry.getName(); //NOI18N + } + private void fireDeletedEvent(RemoteFileObject parent, RemoteFileObject fo, FilesystemInterceptorProvider.FilesystemInterceptor interceptor, boolean expected) { if (interceptor != null) { interceptor.deletedExternally(FilesystemInterceptorProvider.toFileProxy(fo)); @@ -1292,7 +1299,7 @@ } private RemoteFileObject invalidate(DirEntry oldEntry) { - RemoteFileObject fo = getFileSystem().getFactory().invalidate(getPath() + '/' + oldEntry.getName()); + RemoteFileObject fo = getFileSystem().getFactory().invalidate(getAbsolutePath(oldEntry)); File oldEntryCache = new File(getCache(), oldEntry.getCache()); removeFile(oldEntryCache); return fo; diff -r 29da0cda97df -r 80ca8bbc1e01 dlight.remote.impl/src/org/netbeans/modules/remote/impl/fs/RemoteFileObjectBase.java --- a/dlight.remote.impl/src/org/netbeans/modules/remote/impl/fs/RemoteFileObjectBase.java Wed May 22 12:04:18 2013 +0400 +++ b/dlight.remote.impl/src/org/netbeans/modules/remote/impl/fs/RemoteFileObjectBase.java Wed May 22 23:52:28 2013 +0400 @@ -100,8 +100,7 @@ private static final byte MASK_VALID = 1; private static final byte CHECK_CAN_WRITE = 2; - private static final byte BEING_UPLOADED = 4; - protected static final byte CONNECTION_ISSUES = 8; + protected static final byte CONNECTION_ISSUES = 4; protected RemoteFileObjectBase(RemoteFileObject wrapper, RemoteFileSystem fileSystem, ExecutionEnvironment execEnv, RemoteFileObjectBase parent, String remotePath, File cache) { @@ -169,15 +168,11 @@ flags &= ~mask; } } - + /*package*/ boolean isPendingRemoteDelivery() { - return getFlag(BEING_UPLOADED); + return false; } - - /*package*/ void setPendingRemoteDelivery(boolean value) { - setFlag(BEING_UPLOADED, value); - } - + public ExecutionEnvironment getExecutionEnvironment() { return fileSystem.getExecutionEnvironment(); } diff -r 29da0cda97df -r 80ca8bbc1e01 dlight.remote.impl/src/org/netbeans/modules/remote/impl/fs/RemoteFileSystem.java --- a/dlight.remote.impl/src/org/netbeans/modules/remote/impl/fs/RemoteFileSystem.java Wed May 22 12:04:18 2013 +0400 +++ b/dlight.remote.impl/src/org/netbeans/modules/remote/impl/fs/RemoteFileSystem.java Wed May 22 23:52:28 2013 +0400 @@ -122,13 +122,13 @@ new ArrayList(); transient private final StatusImpl status = new StatusImpl(); private final LinkedHashSet deleteOnExitFiles = new LinkedHashSet(); + private final PendingUploadsManager pendingUploadsManager; /*package*/ RemoteFileSystem(ExecutionEnvironment execEnv) throws IOException { RemoteLogger.assertTrue(execEnv.isRemote()); this.execEnv = execEnv; this.remoteFileSupport = new RemoteFileSupport(); factory = new RemoteFileObjectFactory(this); - refreshManager = new RefreshManager(execEnv, factory); // FIXUP: it's better than asking a compiler instance... but still a fixup. // Should be moved to a proper place this.filePrefix = FileSystemCacheProvider.getCacheRoot(execEnv); @@ -136,10 +136,13 @@ throw new IllegalStateException("Can not find cache root for remote file system at " + execEnv); //NOI18N } cache = new File(filePrefix); + rootDelegate = new RootFileObject(this.root = new RemoteFileObject(this), this, execEnv, cache); // NOI18N + pendingUploadsManager = new PendingUploadsManager(factory, rootDelegate, new File(getCache(), ".rfs_pending")); + pendingUploadsManager.load(); + refreshManager = new RefreshManager(execEnv, factory, pendingUploadsManager); if (!cache.exists() && !cache.mkdirs()) { throw new IOException(NbBundle.getMessage(getClass(), "ERR_CreateDir", cache.getAbsolutePath())); } - this.rootDelegate = new RootFileObject(this.root = new RemoteFileObject(this), this, execEnv, cache); // NOI18N final WindowFocusListener windowFocusListener = new WindowFocusListener() { @@ -193,7 +196,11 @@ } if (ATTR_STATS) { dumpAttrStat(); } } - + + public PendingUploadsManager getPendingUploadsManager() { + return pendingUploadsManager; + } + /*package*/ ExecutionEnvironment getExecutionEnvironment() { return execEnv; } diff -r 29da0cda97df -r 80ca8bbc1e01 dlight.remote.impl/src/org/netbeans/modules/remote/impl/fs/RemotePlainFile.java --- a/dlight.remote.impl/src/org/netbeans/modules/remote/impl/fs/RemotePlainFile.java Wed May 22 12:04:18 2013 +0400 +++ b/dlight.remote.impl/src/org/netbeans/modules/remote/impl/fs/RemotePlainFile.java Wed May 22 23:52:28 2013 +0400 @@ -49,7 +49,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.lang.ref.SoftReference; import java.net.ConnectException; import java.util.Set; import java.util.StringTokenizer; @@ -58,6 +57,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import org.netbeans.modules.dlight.libs.common.DLightLibsCommonLogger; @@ -72,6 +72,7 @@ import org.openide.filesystems.FileEvent; import org.openide.filesystems.FileLock; import org.openide.filesystems.FileObject; +import org.openide.util.Exceptions; /** * @@ -79,16 +80,33 @@ */ public final class RemotePlainFile extends RemoteFileObjectBase { + public static enum UploadState { + OK, + UPLOADING, + FAILED + } + private static final int LOCK_TIMEOUT = Integer.getInteger("remote.rwlock.timeout", 4); // NOI18N - + private final char fileTypeChar; // private SoftReference fileContentCache = new SoftReference(null); private SimpleRWLock rwl = new SimpleRWLock(); + + private final Object uploadStateLock = new Object(); + + /** guarded by uploadStateLock */ + private UploadState uploadState; + + /** guarded by RemoteFileSystem.getLock(getCache()) */ + private long lastUploadTimestamp = -1; /*package*/ RemotePlainFile(RemoteFileObject wrapper, RemoteFileSystem fileSystem, ExecutionEnvironment execEnv, RemoteDirectory parent, String remotePath, File cache, FileType fileType) { super(wrapper, fileSystem, execEnv, parent, remotePath, cache); fileTypeChar = fileType.toChar(); // TODO: pass when created + changeUploadState( + fileSystem.getPendingUploadsManager().isPendingOrFailedUpload(remotePath) ? + UploadState.FAILED : UploadState.OK); } @Override @@ -444,6 +462,9 @@ // Fixing #206726 - If a remote file is saved frequently, "File modified externally" message appears, user changes are lost @Override protected void refreshImpl(boolean recursive, Set antiLoop, boolean expected) throws ConnectException, IOException, InterruptedException, CancellationException, ExecutionException { + if (fixUpload()) { + return; + } if (Boolean.getBoolean("cnd.remote.refresh.plain.file")) { //NOI18N long time = System.currentTimeMillis(); getParent().refreshImpl(false, antiLoop, expected); @@ -451,11 +472,70 @@ } } + /*package*/ boolean fixUpload() { + boolean fixUpload = false; + if (ConnectionManager.getInstance().isConnectedTo(getExecutionEnvironment())) { + synchronized (uploadStateLock) { + fixUpload = (uploadState == UploadState.FAILED); + } + if (fixUpload) { + try { + upload(); + } catch (IOException ex) { + ex.printStackTrace(); + } + } + } + return fixUpload; + } + @Override public FileType getType() { return FileType.fromChar(fileTypeChar); } + private UploadState changeUploadState(UploadState newState) { + return changeUploadState(null, newState); + } + + public UploadState getUploadState() { + synchronized (uploadStateLock) { + return uploadState; + } + } + + private UploadState changeUploadState(UploadState from, UploadState to) { + synchronized (uploadStateLock) { + if (from != null && from != uploadState) { + return null; + } + UploadState prevState = uploadState; + uploadState = to; + switch (to) { + case OK: + getFileSystem().getPendingUploadsManager().removePendingOrFailedUpload(this); + break; + case UPLOADING: + getFileSystem().getPendingUploadsManager().addPendingUpload(this); + break; + case FAILED: + getFileSystem().getPendingUploadsManager().addFailedUpload(this); + break; + default: + Exceptions.printStackTrace(new IllegalStateException( + "Unexpected " + UploadState.class.getSimpleName() + ": " + to)); //NOI18N + } + return prevState; + } + } + + @Override + /*package*/ boolean isPendingRemoteDelivery() { + synchronized (uploadStateLock) { + return uploadState == UploadState.UPLOADING || uploadState == UploadState.FAILED; + } + } + private static class DelegateOutputStream extends OutputStream { private final FileOutputStream delegate; @@ -487,53 +567,73 @@ } try { delegate.close(); - file.setPendingRemoteDelivery(true); - CommonTasksSupport.UploadParameters params = new CommonTasksSupport.UploadParameters( - file.getCache(), file.getExecutionEnvironment(), file.getPath(), -1, false, null); - Future task = CommonTasksSupport.uploadFile(params); - try { - UploadStatus uploadStatus = task.get(); - if (uploadStatus.isOK()) { - RemoteLogger.getInstance().log(Level.FINEST, "WritingQueue: uploading {0} succeeded", this); - file.getParent().updateStat(file, uploadStatus.getStatInfo()); - FileEvent ev = new FileEvent(file.getOwnerFileObject(), file.getOwnerFileObject(), true, uploadStatus.getStatInfo().getLastModified().getTime()); - file.getOwnerFileObject().fireFileChangedEvent(file.getListenersWithParent(), ev); - } else { - RemoteLogger.getInstance().log(Level.FINEST, "WritingQueue: uploading {0} failed", this); - file.setPendingRemoteDelivery(false); - throw new IOException(uploadStatus.getError() + " " + uploadStatus.getExitCode()); //NOI18N - } - } catch (InterruptedException ex) { - throw newIOException(ex); - } catch (ExecutionException ex) { - //Exceptions.printStackTrace(ex); // should never be the case - the task is done - if (!ConnectionManager.getInstance().isConnectedTo(file.getExecutionEnvironment())) { - file.getFileSystem().addPendingFile(file); - throw new ConnectException(ex.getMessage()); - } else { - if (RemoteFileSystemUtils.isFileNotFoundException(ex)) { - throw new FileNotFoundException(file.getPath()); - } else if (ex.getCause() instanceof IOException) { - throw (IOException) ex.getCause(); - } else { - throw newIOException(ex); - } - } - } + file.upload(); closed = true; } finally { file.rwl.writeUnlock(); } } - private IOException newIOException(Exception cause) { - return new IOException("Error uploading " + file.getPath() + " to " + file.getExecutionEnvironment() + ':' + //NOI18N - cause.getMessage(), cause); - } - @Override public void flush() throws IOException { delegate.flush(); } } + + private void upload() throws IOException { + + Lock writeLock = RemoteFileSystem.getLock(getCache()).writeLock(); + writeLock.lock(); + try { + long cacheTimestamp = getCache().lastModified(); + UploadState us = getUploadState(); + if (us == UploadState.OK && cacheTimestamp == lastUploadTimestamp) { + return; + } + lastUploadTimestamp = -1; + changeUploadState(UploadState.UPLOADING); + CommonTasksSupport.UploadParameters params = new CommonTasksSupport.UploadParameters( + getCache(), getExecutionEnvironment(), getPath(), -1, false, null); + Future task = CommonTasksSupport.uploadFile(params); + try { + UploadStatus uploadStatus = task.get(); + if (uploadStatus.isOK()) { + RemoteLogger.getInstance().log(Level.FINEST, "WritingQueue: uploading {0} succeeded", this); + getParent().updateStat(this, uploadStatus.getStatInfo()); + changeUploadState(UploadState.OK); + lastUploadTimestamp = cacheTimestamp; + FileEvent ev = new FileEvent(getOwnerFileObject(), getOwnerFileObject(), true, uploadStatus.getStatInfo().getLastModified().getTime()); + getOwnerFileObject().fireFileChangedEvent(getListenersWithParent(), ev); + } else { + RemoteLogger.getInstance().log(Level.FINEST, "WritingQueue: uploading {0} failed", this); + throw new IOException(uploadStatus.getError() + " " + uploadStatus.getExitCode()); //NOI18N + } + } catch (InterruptedException ex) { + throw newIOException(this, ex); + } catch (ExecutionException ex) { + //Exceptions.printStackTrace(ex); // should never be the case - the task is done + if (!ConnectionManager.getInstance().isConnectedTo(getExecutionEnvironment())) { + getFileSystem().addPendingFile(this); + throw new ConnectException(ex.getMessage()); + } else { + if (RemoteFileSystemUtils.isFileNotFoundException(ex)) { + throw new FileNotFoundException(getPath()); + } else if (ex.getCause() instanceof IOException) { + throw (IOException) ex.getCause(); + } else { + throw newIOException(this, ex); + } + } + } finally { + changeUploadState(UploadState.UPLOADING, UploadState.FAILED); + } + } finally { + writeLock.unlock(); + } + } + + private IOException newIOException(RemotePlainFile file, Exception cause) { + return new IOException("Error uploading " + file.getPath() + " to " + file.getExecutionEnvironment() + ':' + //NOI18N + cause.getMessage(), cause); + } }