3/17/2020
Java Concurrency
Threads
In concurrent programming, there are two basic units of execution: processes and
threads.
On a computer run many active threads and processes, even if there is only a single-core
processor. This is achieved through the OS feature of time-slicing.
In Java, concurrent programming is concerned with threads.
1
3/17/2020
Threads vs Processes
Process Thread
Synonymous with program or Lightweight processes
application
Exists within a process (each process
Self-contained execution environment has at least one thread)
(own memory space)
Share resources (memory and files) =>
Inter Process Communication (IPC) communication problems
resources, such as pipes and sockets
Multithreading is a feature supported
Most implementations of the Java VM by the Java platform.
run as a single process.
Threads in Java
"Java threads are objects like any other Java objects. Threads are instances of class [Link],
or instances of subclasses of this class. In addition to being objects, java threads can also execute
code."
In order to define a thread:
Extend Thread
Implement Runnable
Each thread overrides the run() method => this will be executed during the thread lifecycle
2
3/17/2020
Threads in Java
Threads Lifecycle
(From [Link]
Threads in Java
Lifecycle:
Starting a thread class MyClass extends Thread{
Create a thread
Call the start() method on it => starts executing the run() method public void run() {
While(condition){
Terminating a Thread
// code for processing
When it exists the run() method => the thread is dead =>once it is dead it cannot sleep(1000);
be restarted
Pausing, suspending and resuming a thread }
Suspend for a certain amount of time using the sleep(milis) method }
Use The wait() method and notify mechanism;
Thread cleanup
As long as some other active object holds a reference to the terminated thread
object, other threads can execute methods on the terminated thread and
retrieve that information.
3
3/17/2020
Threads in Java
1. Create a thread by providing a Runnable object 2. Create a thread by extending the Thread class
The Runnable interface defines a single method named run
– the method will be implemented by the class The Thread class itself implements Runnable - its run method
implementing the interface and it will include the code that does not do anything. A class can extend the Thread class
will be executed in the thread and can provide an implementation of the run method.
class MyClass implements Runnable { class MyClass extends Thread{
public void run() { public void run() {
//Display info about this particular thread //Display info about this particular thread
[Link]([Link]()); [Link]([Link]());
} }
} }
… …
Thread t = new Thread(new MyClass()); MyClass t = new MyClass ();
[Link](); [Link]();
The Runnable object is passed to the Thread constructor
Thread vs Runnable
Inheritance Option
Extends Thread => cannot have other inheritance
Reusability
Implements Runnable =>contains only the functionality we want in the run method
extends Thread" contains both thread and job specific behavior code
Object Oriented Design
Implements Runnable => Composite Design => A thread has-a Runnable behavior.
"extends Thread" is not a good Object Oriented practice.
Loosely Coupled
Implements Runnable => loosely coupled => splits code into 2 parts: behavior and thread
Extends thread => tightly coupled
Functions overhead
"extends Thread" means inheriting all the functions of the Thread class which we may do not need
4
3/17/2020
Threads – pausing execution
[Link]()
Used to suspend the execution of a running thread for a specified duration
The current thread will be put in the wait state until the wait time ends
public class App
{ Exception thrown in case the current
public static void printMessages() throws InterruptedException { thread is interrupted by another thread
while sleep is active.
for (int i=0; i< 10; i++){
[Link]( "Sending message number " + i);
[Link](4000);
} The time for suspending the execution of a
running thread must be given in
}
milliseconds and must be a positive
number. In the example, the thread’s
public static void main( String[] args ) throws InterruptedException { execution is suspended for 4 seconds.
[Link]();
}
}
Threads – Interrupts
A thread t in the waiting or sleeping state can be interrupted by calling the interrupt method declared in the
Thread class => the thread t will exit the wait/sleeping state and will throw an InterruptedException
The interrupted thread t must handle its interruption using one of the methods below [1]
The thread is invoking methods that throw InterruptedException => The thread is not invoking methods that throw InterruptedException =>
returns from the run method after catching the exception it will periodically invoke [Link] to check if an interrupt
has been received
public class MyThread implements Runnable{
public void run() { public class MyThread implements Runnable{
for (int i=0; i< 10; i++){ public void run() {
[Link]( "Sending message number " + i); for (int i=0; i< 10; i++){
try { [Link]( "Sending message number " + i);
[Link](4000); if([Link]()){
} catch (InterruptedException e) { [Link]("Someone interrupted me!");
[Link]("Someone interrupted me!"); return;
return; }
} }
} }
} }…
} Thread thread = new Thread(new MyThread());
… [Link](); [Link]();
Thread thread = new Thread(new MyThread());
[Link](); [Link]();
10
5
3/17/2020
Scheduled Execution [2]
Timers - used to schedule the specified task for repeated fixed-delay execution, beginning after the
specified delay.
public class SendingMessageTask extends TimerTask { //Step 1
Steps for scheduling a task using Timer private String message;
public SendingMessageAction(String aMessage){
Step 1: Create a subclass of the TimerTask class [Link] = aMessage;
and override the run method by specifying the }
instructions to be executed. @Override
public void run() {
Step 2: Create a thread using the Timer class. [Link]("Sending the message " + [Link]);
}
Each Timer object has a corresponding }
background thread that will execute the timer’s
tasks sequentially …
Timer aTimer = new Timer(); //Step 2
Step 3: Create an object of the subclass created SendingMessageTask sendingMessageTask = new SendingMessageTask("Hello"); //Step 3
at Step 1.
//Step 4 – schedule the task for repeated fixed-delay executions – e.g. delay = 1000
Step 4: Plan the execution of the object created milliseconds, time between successive task executions = 2000 milliseconds
at Step 3 using the schedule methods from the [Link](sendingMessageTask, 1000, 2000);
Timer class.
11
Thread Safety
1) Stateless objects – no state for that class
I.e. a class that has no instance vars => its state cannot change by running
methods in different threads
Local vars (in method) are independent per thread (each has its own stack)
=> Thread Safety
12
6
3/17/2020
Thread Safety
2) Volatile variables
"The Java volatile keyword is used to mark a Java variable as "being stored in main
memory". More precisely that means, that every read of a volatile variable will be
read from the computer's main memory, and not from the CPU cache, and that
every write to a volatile variable will be written to main memory, and not just to the
CPU cache."
[Link]
=> Thread Safety
13
synchronize
synchronized([Link]){
// some code
Thread Safety }
Or
synchronized(this){
// some code
}
3) Locks
- guard a shared resource from accessing or modifying it ReentrantLock
- guard resources in a block using synchronize private ReentrantLock lock;
- guard resources across blocks using ReentrantLock public void foo() { ...
[Link]();
- allow multiple accesses to same resource : ...}
CountDownLatch public void bar() {...
[Link]();
...}
CountDownLatch
CountDownLatch latch= new
=> Thread Safety CountDownLatch(3);
Public void foo(){...
[Link]();
...}
14
7
3/17/2020
Synchronize -deadlock
Thread Safety public void transfer(Account a, Account b, double sum)
{
synchronized(a){ // Th1 locks account a
//Th2 locks account b
synchronized(b){
//transfer sum
}
3) Locks }
}
=> can lead to deadlocks
Solution: sort input accounts and synchronize
Call function :
in-order
Th1: transfer(a,b,sum1);
Th2: transfer(b,a,sum1);
15
Race Condition
public static Operation getInstance(){
Thread Safety If(instance ==null){
instance = new Operation();
}
return instance;
}
4) Atomicity – needed to avoid problems in case of :
• Th2 running
• Th1 running
=> two instances are created
a) Problem: race conditions
Synchronized
(getting the right answer depends on lucky timing)
Solution: synchronization. How? public static Operation getInstance(){
If(instance ==null){
1. synchronize entire method
synchronized([Link]){
( inefficient once the instance is created) If(instance ==null){
2. synchronize the instantiation piece of code instance = new Operation();
(i.e. only if "instance == null") }
}
}
return instance;
=> Thread Safety
}
16
8
3/17/2020
Compound Operations
int i=0;
Thread Safety i++;
/*Accessed simultaneously by both Th1 and Th2
Can lead to inconsistencies:
- result can be 1(both threads got 0 and
incremented to 1)
- result can be 2(second thread got the value 1
4) Atomicity – needed to avoid problems in case of : incremented by the first thread)
Atomic Operations
AtomicInteger i= new
b) Problem: compound actions AtomicInteger();
i++; [Link] ();
Get I value & add one to it => two operations;
Solution: Atomic data types:
Ex. AtomicInteger
=> Thread Safety
17
Thread Safety
5) Use thread safe collections
A ) synchronized collections - synchronizes all methods
List<String> list = [Link](new ArrayList<String>());
(!!! For iteration the collection needs to use external sync)
B) use concurrent collections
BlockingQueue
ConcurrentMap - uses a multitude of locks, each lock controls one
segment of the hash.
CopyOnWriteArrayList -achieves thread-safety by creating a separate
copy of List for each write operation.
18
9
3/17/2020
Assignment 2
Simulation Server
Scheduler
Manager
Producer Task 6 Task 6
…
Task 5
Put task
Task 2 Task 4
Blocking Queue
Task 4
Task 1 Task 3 Task3
Consumer Task0
Take task
server1 server2
public void run(){
//process task
[Link](currentTaskProcessingTime)
}
19
Assignment 2
Task
Modeled using:
arrivalTime
finishTime
processingPeriod
FinishTime = arrivalTime + processingPeriod+ waitingPeriodOnChosenServer
20
10
3/17/2020
Assignment 2
Server -- Runnable
Modeled using:
Tasks (BlockingQueue<Task>…)
WaitingPeriod (AtomicInteger –
decremented by current thread once a task is completed
incremented by scheduler thread adding new tasks)
21
Assignment 2
Scheduler
Sends tasks to Servers according to the
established strategy
Modeled Using :
Servers
Constraints:
maxNoServers,, maxLoadPerServer
22
11
3/17/2020
Assignment 2
Scheduler – Strategy Patter
Choose the policy to distribute clients
[Link]
23
Assignment 2
Simulation Manager- Runnable
Generates randomly the tasks with:
Arrival time
processingPeriod
Contains simulation loop:
CurrentTime
Call scheduler to dispatch tasks
Update UI
24
12
3/17/2020
Assignment 2
Simulation Manager- Runnable
Generates randomly the tasks with:
Arrival time
processingPeriod
Contains simulation loop:
CurrentTime
Call scheduler to dispatch tasks
Update UI
25
Java Concurrency
Advanced Concepts
26
13
3/17/2020
Contents
Single thread design vs Multi-Thread Design
Java Executor Framework
Result bearing Jobs: Callable vs Runnable
Swing Concurrency Support
27
Job Execution
Web Application executing user jobs
1. Single thread design
Create one thread per application
Poor performance handling only one job at a time
Job 5
The other jobs are waiting for the previous to complete
Job 4
=> poor responsiveness
Job 3
Job 2
Thread
Job 1
28
14
3/17/2020
Job Execution
Web Application executing user jobs
2. Multiple threads design
Create one thread for each job
Improved performance – jobs are completed in parallel Job 5
Thread 5
!!! Job handling must be thread safe
Job 4
Thread 4
Job 3
Thread 3
Job 2
Thread 2
Job 1 Thread 1
29
Job Execution
Web Application executing user jobs
2. Multiple threads design
Unbound Threads Creation
1. Thread lifecycle overhead
Thread creation and tear down are not free
2. Resource consumption
Job 5
Creating more threads than the available processors does not help Thread 5
(it may even hurt : put pressure on garbage collector and compete for CPU) Job 4
Thread 4
Job 3
Thread 3
3. Stability
Each system has a limit of threads that can be created influenced by: Job 2
Thread 2
- JVM parameters, stack size, underlying OS, etc.
Job 1 Thread 1
- Exceed limit => OutOfMemoryError
30
15
3/17/2020
Job Execution
Web Application executing user jobs
Executor
Executor exec=[Link](100);
3. Executor Framework
Runnable task = new Runnable() {
public void run() {
Executor – simple interface that provides the basis for a //execute job
flexible and powerful framework }
};
Lifecycle support , hooks for adding statistics gathering,
[Link](task);
application management, and monitoring
Select the optimal policy at runtime, depending on the available
hardware
31
Job Execution
Web Application executing user jobs
3. Executor Framework- Thread pools
newFixedThreadPool - fixed size of threads
newCachedThreadPool -
newSingleThreadExecutor –one thread (automatically replaced if it dies). Tasks processed sequentially (FIFO, LIFO, priority, etc.)
newScheduledThreadPool – fixed sized; supports delayed and periodic execution
32
16
3/17/2020
Job Execution
Web Application executing user jobs
3. Executor Framework -Lifecycle
Non-daemon threads ( failing to shot down an Executor, could prevent JVM from exiting; i.e. thread continues to run even
after main tread terminates)
ExecutorService - interface extending Executor and providing methods for handling lifecycle management operations:
shutdown() //graceful shutdown – no new tasks are accepted; the previously submitted tasks are allowed to complete
shotdownNow() //abrupt shutdown - cancels all running tasks
isShotdown()
isTerminated() // after all tasks have terminated , the Executor transitions to terminated state;
awaitTermination(long timeout, Timeunit unit
33
Job Execution
Web Application executing user jobs
Callable vs Runnable
public interface Callable<V> {
V call() throws Exception;
4. Result bearing Jobs }
Callable vs Runnable public interface Runnable{
Callable = runnable on steroids void run();
}
Future – the result of submitting a Callable or a
Runnable task to the Executor Service
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException;
}
34
17
3/17/2020
Job Execution
Web Application executing user jobs
4. Result bearing Jobs
Executor service usage :
execute(Runnable) – executes the task – no way of obtaining the result
submit(Runnable) - returns a Future object – one can check if the Runnable has finished execution
[Link](); //returns null if the task has finished correctly.
submit(Callable) – returns a Future object
[Link](); //returns the value returned by call – it’s blocking
35
Swing Concurrency Support
public class SwingWorkerExample extends SwingWorker<Integer, Integer> {
SwingWorker @Override
protected Integer doInBackground() throws Exception {
Support for UI ( using FutureTask and Executor) : [Link](1000);
publish(1);
Cancellation
[Link](1000);
Completion notification publish(2);
Progress indication [Link](1000);
publish(3);
return 13;
doInBackground – executes the long job }
One can publish intermediate results (publish method) @Override
protected void done() {
done – called once the doInBackground finishes try {
[Link](null, get());
One can access the result by calling get() (see Futures) } catch (Exception e) {
[Link]();
process – called asynchronously to process the published }
information }
* UI components should only be allocated in done or process @Override
protected void process(List<Integer> v) {
which are executed on the Event Dispatch Thread for (int i=0; i < [Link](); i++) {
[Link]("received values: " + [Link](i));
}
}
}
36
18
3/17/2020
References
[1] [Link]
[2] [Link]
[3] [Link]
[4] [Link]
[Link]
[5] B. Goetz et al., Java Concurrency in Practice, Addison-Wesley Professional; 1 edition (May
19, 2006)
37
19