Thursday, 10 November 2011

Parallel.ForEach operations and the CancellationToken

If you use the CancellationToken to enable cancellation during the parallel iteration of an enumerable, then you should note that each operation is given a chance to finish.  The OperationCanceledException is only thrown after all the threads have completed their unit of work.

You could monitor the cancellation token inside the units of work as well if you liked.

Windows Azure DevFabric forcing port 80 rather port 81, 82 etc

Hi there, it's useful to run a Windows Azure app in the dev fabric on port 80 sometimes.  However, you need to be absolutely sure that port 80 is not in use beforehand.
Use TCPView from sysinternals and if you find that an item appears under "local port" called "http", then the port is in use.  You cannot easily find out what process that is as it's a service, however in my case, once I'd shut down "Web Deployment Agent Service" by doing Run -> CMD -> net stop MsDepSvc   this solved the issue and Azure was able to run the web role under port 80.

Also, ensure that Skype doesn't occupy port 80 too.

Tuesday, 25 January 2011

Distributed Lock

I’m working on a distributed system on Azure which has a Web and Worker Roles. Plus, there’s the possibility of N number of instances per role. The Worker role is a batch processor which consists of Jobs and the Job Scheduler.  My requirement is that we should only have one instance of the Job Scheduler throughout the whole system.
If this was a simple multi-threaded application with one app domain, this would be relatively simple using a mutex and the lock statement, however, given that the application could be spread across multiple virtual machines within the Azure Fabric I had to come up with another way.
I solved this problem by using a SQL Azure table, serializable transactions and a Distributed Lock Service in C#.
First create the table
/****** Object:  Table [dbo].[Lock]    Script Date: 01/25/2011 14:18:13 ******/SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE TABLE [dbo].[Lock](
    [id] [int] IDENTITY(1,1) NOT NULL,
    [LockName] [nvarchar](4000) NOT NULL,
    [AcquirerGuid] [uniqueidentifier] NULL,
    [AcquiredDate] [datetime] NULL,
 CONSTRAINT [PK_Lock] PRIMARY KEY CLUSTERED (
    [id] ASC)WITH (STATISTICS_NORECOMPUTE  = OFF, IGNORE_DUP_KEY = OFF)
)GO



Then create the Stored Procedures, “Lock_Acquire” and “Lock_Release”


IF  EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[Lock_Acquire]') AND type in (N'P', N'PC'))
    DROP PROCEDURE [dbo].[Lock_Acquire]GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO

CREATE PROCEDURE [dbo].[Lock_Acquire](
    @Name NVARCHAR(4000),
    @AcquirerGuid UNIQUEIDENTIFIER)AS
BEGIN

    SET TRANSACTION ISOLATION LEVEL SERIALIZABLE
    
    BEGIN TRANSACTION

        IF(NOT EXISTS(SELECT 1 FROM Lock WHERE LockName = @Name))
        BEGIN    
            INSERT INTO [Lock] (LockName, AcquiredDate, AcquirerGuid)
            VALUES (@Name, GETUTCDATE(), @AcquirerGuid)
        END
        ELSE
        BEGIN
            
            UPDATE L
               SET L.AcquiredDate = GETUTCDATE(),
                   L.AcquirerGuid = @AcquirerGuid
              FROM Lock L
             WHERE L.LockName = @Name 
               AND (L.AcquirerGuid = @AcquirerGuid -- owned by the current requestor
                        OR DATEDIFF(SS, L.AcquiredDate, GETUTCDATE()) > 30)                           OR (L.AcquirerGuid IS NULL AND L.AcquiredDate IS NULL))
        END
        
        SELECT @@ROWCOUNT; 
         
    COMMIT TRANSACTION
END

GO


IF  EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[Lock_Release]') AND type in (N'P', N'PC'))
    DROP PROCEDURE [dbo].[Lock_Release]GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO

CREATE PROCEDURE [dbo].[Lock_Release](
    @Name NVARCHAR(4000),
    @AcquirerGuid UNIQUEIDENTIFIER)AS 
BEGIN
    
    SET TRANSACTION ISOLATION LEVEL SERIALIZABLE
    
    UPDATE Lock
    SET AcquiredDate = NULL, 
    AcquirerGuid = NULL
    WHERE AcquirerGuid = @AcquirerGuid
    
    SELECT @@ROWCOUNT;
    END

GO



Import the Stored Procedures into Entity Framework (if you use EF)


Create the Distributed Lock Service


public class DistributedLockService : IDisposable
    {
        public Guid AcquirerId { get; set; }
        public string LockName { get; set; }
        public bool LockAcquired { get; set; }

        /// <summary>
        /// ctor
        /// </summary>
        /// <param name="lockName"></param>
        public DistributedLockService(string lockName)
        {
            LockName = lockName;
            AcquirerId = Guid.NewGuid();
        }

        /// <summary>
        /// Attempts to acquire the named lock
        /// </summary>
        /// <returns></returns>
        public bool Acquire()
        {
            LockAcquired = Facade.DataContext.Lock_Acquire(LockName, AcquirerId).FirstOrDefault() == 1;
            return LockAcquired;
        }

        /// <summary>
        /// Attempts to release the named lock
        /// </summary>
        /// <returns></returns>
        public bool Release()
        {
            if (LockAcquired) return Facade.DataContext.Lock_Release(LockName, AcquirerId) == 1;
            else return true;
        }


        #region IDisposable Members

        public void Dispose()
        {
            Release();
        }

        #endregion
    }



You may have to change Facade.DataContext to be your equivalent ORM or Sql Helper class.


Create the Unit Test


[TestClass]
    public class DistributedLockServiceTest
    {
        [TestMethod]
        public void CheckThatLockingWorks()
        {
            string fakeLockName = Guid.NewGuid().ToString();
            
            List<bool> results = new List<bool>();
            ManualResetEvent handle = new ManualResetEvent(false);
            object mutex = new object();

            Parallel.Invoke(() =>
            {
                Parallel.For(0, 100, x =>
                {
                    handle.WaitOne();
                    DistributedLockService svc = new DistributedLockService(fakeLockName);

                    lock(mutex) results.Add(svc.Acquire());
                });
            }, 
            () =>
            {

                Thread.Sleep(2000);
                handle.Set();
            });
            Assert.AreEqual(1, results.Where(x => x == true).Count(), "The number of trues should be 1 for fake lock: " + fakeLockName);

        }
    }


Use it for real…


private DistributedLockService _lockService = new DistributedLockService("JobScheduler");



if(_lockService.Acquire()) // do something, like start a Job


So the idea is that you can instantiate the Distributed Lock Service and keep it for as long as necessary, then whenever you wish to perform some action without the possibility of concurrency, do it after acquiring the lock.  Please also note that the lock will last for up to 30 seconds at a time, so you must call Acquire within the 30 second period if you wish to retain non-concurrency. Otherwise the lock will be relinquished to any other requestor.  If this is too short a timeframe, then you can change it inside the Lock_Acquire stored procedure.


The reason why there’s a time limit is in case the acquiring/lock-owning thread throws an exception and does not release the lock.


Please let me know should you find any problems. thanks.